Merge branch 'master' into 1702-rest-hijack-delete-by-query

Original commit: elastic/x-pack-elasticsearch@c206f84300
This commit is contained in:
Alexander Reelsen 2017-02-20 12:06:55 +01:00
commit 7e8b43bfb1
63 changed files with 1048 additions and 485 deletions

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.license;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
@ -230,16 +231,20 @@ public class XPackInfoResponse extends ActionResponse {
@Nullable private final String description;
private final boolean available;
private final boolean enabled;
@Nullable private final Map<String, Object> nativeCodeInfo;
public FeatureSet(StreamInput in) throws IOException {
this(in.readString(), in.readOptionalString(), in.readBoolean(), in.readBoolean());
this(in.readString(), in.readOptionalString(), in.readBoolean(), in.readBoolean(),
in.getVersion().onOrAfter(Version.V_5_4_0_UNRELEASED) ? in.readMap() : null);
}
public FeatureSet(String name, @Nullable String description, boolean available, boolean enabled) {
public FeatureSet(String name, @Nullable String description, boolean available, boolean enabled,
@Nullable Map<String, Object> nativeCodeInfo) {
this.name = name;
this.description = description;
this.available = available;
this.enabled = enabled;
this.nativeCodeInfo = nativeCodeInfo;
}
public String name() {
@ -259,6 +264,11 @@ public class XPackInfoResponse extends ActionResponse {
return enabled;
}
@Nullable
public Map<String, Object> nativeCodeInfo() {
return nativeCodeInfo;
}
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (description != null) {
@ -266,6 +276,9 @@ public class XPackInfoResponse extends ActionResponse {
}
builder.field("available", available);
builder.field("enabled", enabled);
if (nativeCodeInfo != null) {
builder.field("native_code_info", nativeCodeInfo);
}
return builder.endObject();
}
@ -274,6 +287,9 @@ public class XPackInfoResponse extends ActionResponse {
out.writeOptionalString(description);
out.writeBoolean(available);
out.writeBoolean(enabled);
if (out.getVersion().onOrAfter(Version.V_5_4_0_UNRELEASED)) {
out.writeMap(nativeCodeInfo);
}
}
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Map;
public interface XPackFeatureSet {
@ -23,6 +24,8 @@ public interface XPackFeatureSet {
boolean enabled();
Map<String, Object> nativeCodeInfo();
Usage usage();
abstract class Usage implements ToXContentObject, NamedWriteable {

View File

@ -59,7 +59,8 @@ public class TransportXPackInfoAction extends HandledTransportAction<XPackInfoRe
XPackInfoResponse.FeatureSetsInfo featureSetsInfo = null;
if (request.getCategories().contains(XPackInfoRequest.Category.FEATURES)) {
Set<FeatureSet> featureSets = this.featureSets.stream().map(fs ->
new FeatureSet(fs.name(), request.isVerbose() ? fs.description() : null, fs.available(), fs.enabled()))
new FeatureSet(fs.name(), request.isVerbose() ? fs.description() : null, fs.available(), fs.enabled(),
request.isVerbose() ? fs.nativeCodeInfo() : null))
.collect(Collectors.toSet());
featureSetsInfo = new XPackInfoResponse.FeatureSetsInfo(featureSets);
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.graph;
import java.io.IOException;
import java.util.Map;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
@ -47,6 +48,11 @@ public class GraphFeatureSet implements XPackFeatureSet {
return enabled;
}
@Override
public Map<String, Object> nativeCodeInfo() {
return null;
}
@Override
public XPackFeatureSet.Usage usage() {
return new Usage(available(), enabled());

View File

@ -13,7 +13,6 @@ import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Module;
@ -24,7 +23,6 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.ActionPlugin;
@ -81,6 +79,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.NativeController;
import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
import org.elasticsearch.xpack.ml.job.process.ProcessCtrl;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessFactory;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
@ -121,6 +120,7 @@ import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction;
import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction;
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.xpack.persistent.CreatePersistentTaskAction;
import org.elasticsearch.xpack.persistent.PersistentActionCoordinator;
import org.elasticsearch.xpack.persistent.PersistentActionRegistry;
import org.elasticsearch.xpack.persistent.PersistentActionRequest;
@ -128,7 +128,6 @@ import org.elasticsearch.xpack.persistent.PersistentActionService;
import org.elasticsearch.xpack.persistent.PersistentTaskClusterService;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import org.elasticsearch.xpack.persistent.CreatePersistentTaskAction;
import org.elasticsearch.xpack.persistent.StartPersistentTaskAction;
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction;
@ -156,6 +155,8 @@ public class MachineLearning extends Plugin implements ActionPlugin {
public static final String ALLOCATION_ENABLED_ATTR = "xpack.ml.allocation_enabled";
public static final Setting<Boolean> ALLOCATION_ENABLED = Setting.boolSetting("node.attr." + ALLOCATION_ENABLED_ATTR,
XPackSettings.MACHINE_LEARNING_ENABLED, Setting.Property.NodeScope);
public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS =
Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope);
private final Settings settings;
private final Environment env;
@ -178,6 +179,7 @@ public class MachineLearning extends Plugin implements ActionPlugin {
return Collections.unmodifiableList(
Arrays.asList(USE_NATIVE_PROCESS_OPTION,
ALLOCATION_ENABLED,
CONCURRENT_JOB_ALLOCATIONS,
ProcessCtrl.DONT_PERSIST_MODEL_STATE_SETTING,
ProcessCtrl.MAX_ANOMALY_RECORDS_SETTING,
DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING,
@ -256,12 +258,17 @@ public class MachineLearning extends Plugin implements ActionPlugin {
NormalizerProcessFactory normalizerProcessFactory;
if (USE_NATIVE_PROCESS_OPTION.get(settings)) {
try {
NativeController nativeController = new NativeController(env, new NamedPipeHelper());
nativeController.tailLogsInThread();
NativeController nativeController = NativeControllerHolder.getNativeController(settings);
if (nativeController == null) {
// This will only only happen when path.home is not set, which is disallowed in production
throw new ElasticsearchException("Failed to create native process controller for Machine Learning");
}
autodetectProcessFactory = new NativeAutodetectProcessFactory(jobProvider, env, settings, nativeController, client);
normalizerProcessFactory = new NativeNormalizerProcessFactory(env, settings, nativeController);
} catch (IOException e) {
throw new ElasticsearchException("Failed to create native process factories", e);
// This also should not happen in production, as the MachineLearningFeatureSet should have
// hit the same error first and brought down the node with a friendlier error message
throw new ElasticsearchException("Failed to create native process factories for Machine Learning", e);
}
} else {
autodetectProcessFactory = (jobDetails, modelSnapshot, quantiles, filters, ignoreDowntime, executorService) ->

View File

@ -5,26 +5,50 @@
*/
package org.elasticsearch.xpack.ml;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.XPackFeatureSet;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ml.job.process.NativeController;
import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class MachineLearningFeatureSet implements XPackFeatureSet {
private final boolean enabled;
private final XPackLicenseState licenseState;
private final Map<String, Object> nativeCodeInfo;
@Inject
public MachineLearningFeatureSet(Settings settings, @Nullable XPackLicenseState licenseState) {
this.enabled = XPackSettings.MACHINE_LEARNING_ENABLED.get(settings);
this.licenseState = licenseState;
Map<String, Object> nativeCodeInfo = NativeController.UNKNOWN_NATIVE_CODE_INFO;
// Don't try to get the native code version in the transport client - the controller process won't be running
if (XPackPlugin.transportClientMode(settings) == false) {
try {
NativeController nativeController = NativeControllerHolder.getNativeController(settings);
if (nativeController != null) {
nativeCodeInfo = nativeController.getNativeCodeInfo();
}
} catch (IOException | TimeoutException e) {
Loggers.getLogger(MachineLearningFeatureSet.class).error("Cannot get native code info for Machine Learning", e);
if (enabled) {
throw new ElasticsearchException("Cannot communicate with Machine Learning native code "
+ "- please check that you are running on a supported platform");
}
}
}
this.nativeCodeInfo = nativeCodeInfo;
}
@Override
@ -47,6 +71,11 @@ public class MachineLearningFeatureSet implements XPackFeatureSet {
return enabled;
}
@Override
public Map<String, Object> nativeCodeInfo() {
return nativeCodeInfo;
}
@Override
public XPackFeatureSet.Usage usage() {
return new Usage(available(), enabled());

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
@ -46,7 +47,7 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTaskClusterService;
import org.elasticsearch.xpack.ml.utils.JobStateObserver;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
@ -225,22 +226,22 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
private final JobStateObserver observer;
private final ClusterService clusterService;
private final TransportListTasksAction listTasksAction;
private final TransportCancelTasksAction cancelTasksAction;
private final PersistentTaskClusterService persistentTaskClusterService;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, TransportListTasksAction listTasksAction,
TransportCancelTasksAction cancelTasksAction, PersistentTaskClusterService persistentTaskClusterService) {
TransportCancelTasksAction cancelTasksAction) {
super(settings, CloseJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.observer = new JobStateObserver(threadPool, clusterService);
this.clusterService = clusterService;
this.listTasksAction = listTasksAction;
this.cancelTasksAction = cancelTasksAction;
this.persistentTaskClusterService = persistentTaskClusterService;
}
@Override
@ -255,7 +256,6 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
PersistentTaskInProgress<?> task = validateAndFindTask(request.getJobId(), state);
clusterService.submitStateUpdateTask("closing job [" + request.getJobId() + "]", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
@ -269,6 +269,14 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
protected void doRun() throws Exception {
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setDetailed(true);
listTasksRequest.setActions(OpenJobAction.NAME + "[c]");
@ -279,12 +287,13 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
cancelTasksRequest.setTaskId(taskInfo.getTaskId());
cancelTasksAction.execute(cancelTasksRequest, ActionListener.wrap(cancelTaskResponse -> {
persistentTaskClusterService.completeOrRestartPersistentTask(task.getId(), null,
ActionListener.wrap(
empty -> listener.onResponse(new CloseJobAction.Response(true)),
listener::onFailure
)
);
observer.waitForState(request.getJobId(), request.getTimeout(), JobState.CLOSED, e -> {
if (e == null) {
listener.onResponse(new CloseJobAction.Response(true));
} else {
listener.onFailure(e);
}
});
}, listener::onFailure));
return;
}
@ -294,6 +303,8 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
}
});
}
});
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
@ -333,11 +344,9 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
PersistentTaskInProgress<?> task = validateAndFindTask(jobId, currentState);
PersistentTasksInProgress currentTasks = currentState.getMetaData().custom(PersistentTasksInProgress.TYPE);
Map<Long, PersistentTaskInProgress<?>> updatedTasks = new HashMap<>(currentTasks.taskMap());
for (PersistentTaskInProgress<?> taskInProgress : currentTasks.tasks()) {
if (taskInProgress.getId() == task.getId()) {
updatedTasks.put(taskInProgress.getId(), new PersistentTaskInProgress<>(taskInProgress, JobState.CLOSING));
}
}
PersistentTaskInProgress<?> taskToUpdate = currentTasks.getTask(task.getId());
taskToUpdate = new PersistentTaskInProgress<>(taskToUpdate, JobState.CLOSING);
updatedTasks.put(taskToUpdate.getId(), taskToUpdate);
PersistentTasksInProgress newTasks = new PersistentTasksInProgress(currentTasks.getCurrentId(), updatedTasks);
MlMetadata mlMetadata = currentState.metaData().custom(MlMetadata.TYPE);

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
@ -28,7 +27,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
@ -94,10 +92,6 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
}
public String getJobId() {
return jobId;
}
public boolean getCalcInterim() {
return calcInterim;
}
@ -248,10 +242,9 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutodetectProcessManager processManager, JobManager jobManager, TransportListTasksAction listTasksAction) {
AutodetectProcessManager processManager) {
super(settings, FlushJobAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
FlushJobAction.Request::new, FlushJobAction.Response::new, MachineLearning.THREAD_POOL_NAME, jobManager, processManager,
Request::getJobId, listTasksAction);
FlushJobAction.Request::new, FlushJobAction.Response::new, MachineLearning.THREAD_POOL_NAME, processManager);
}
@Override
@ -262,10 +255,8 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
}
@Override
protected void taskOperation(Request request, OpenJobAction.JobTask task,
protected void innerTaskOperation(Request request, OpenJobAction.JobTask task,
ActionListener<FlushJobAction.Response> listener) {
jobManager.getJobOrThrowIfUnknown(request.getJobId());
InterimResultsParams.Builder paramsBuilder = InterimResultsParams.builder();
paramsBuilder.calcInterim(request.getCalcInterim());
if (request.getAdvanceTime() != null) {

View File

@ -56,6 +56,8 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa
import org.elasticsearch.xpack.persistent.TransportPersistentAction;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
@ -247,6 +249,8 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
private final AutodetectProcessManager autodetectProcessManager;
private XPackLicenseState licenseState;
private volatile int maxConcurrentJobAllocations;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, XPackLicenseState licenseState,
PersistentActionService persistentActionService, PersistentActionRegistry persistentActionRegistry,
@ -258,6 +262,9 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
this.clusterService = clusterService;
this.autodetectProcessManager = autodetectProcessManager;
this.observer = new JobStateObserver(threadPool, clusterService);
this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, this::setMaxConcurrentJobAllocations);
}
@Override
@ -266,7 +273,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
// If we already know that we can't find an ml node because all ml nodes are running at capacity or
// simply because there are no ml nodes in the cluster then we fail quickly here:
ClusterState clusterState = clusterService.state();
if (selectLeastLoadedMlNode(request.getJobId(), clusterState, logger) == null) {
if (selectLeastLoadedMlNode(request.getJobId(), clusterState, maxConcurrentJobAllocations, logger) == null) {
throw new ElasticsearchStatusException("no nodes available to open job [" + request.getJobId() + "]",
RestStatus.TOO_MANY_REQUESTS);
}
@ -291,7 +298,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
@Override
public DiscoveryNode executorNode(Request request, ClusterState clusterState) {
return selectLeastLoadedMlNode(request.getJobId(), clusterState, logger);
return selectLeastLoadedMlNode(request.getJobId(), clusterState, maxConcurrentJobAllocations, logger);
}
@Override
@ -321,6 +328,11 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
});
}
void setMaxConcurrentJobAllocations(int maxConcurrentJobAllocations) {
logger.info("Changing [{}] from [{}] to [{}]", MachineLearning.CONCURRENT_JOB_ALLOCATIONS.getKey(),
this.maxConcurrentJobAllocations, maxConcurrentJobAllocations);
this.maxConcurrentJobAllocations = maxConcurrentJobAllocations;
}
}
/**
@ -356,36 +368,69 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
}
}
static DiscoveryNode selectLeastLoadedMlNode(String jobId, ClusterState clusterState, Logger logger) {
static DiscoveryNode selectLeastLoadedMlNode(String jobId, ClusterState clusterState, int maxConcurrentJobAllocations,
Logger logger) {
long maxAvailable = Long.MIN_VALUE;
DiscoveryNode leastLoadedNode = null;
List<String> reasons = new LinkedList<>();
DiscoveryNode minLoadedNode = null;
PersistentTasksInProgress persistentTasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
for (DiscoveryNode node : clusterState.getNodes()) {
Map<String, String> nodeAttributes = node.getAttributes();
String allocationEnabled = nodeAttributes.get(MachineLearning.ALLOCATION_ENABLED_ATTR);
if ("true".equals(allocationEnabled) == false) {
logger.debug("Not opening job [{}] on node [{}], because this node isn't a ml node.", jobId, node);
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node isn't a ml node.";
logger.debug(reason);
reasons.add(reason);
continue;
}
long numberOfOpenedJobs;
long numberOfAssignedJobs;
int numberOfAllocatingJobs;
if (persistentTasksInProgress != null) {
numberOfOpenedJobs = persistentTasksInProgress.getNumberOfTasksOnNode(node.getId(), OpenJobAction.NAME);
} else {
numberOfOpenedJobs = 0;
numberOfAssignedJobs = persistentTasksInProgress.getNumberOfTasksOnNode(node.getId(), OpenJobAction.NAME);
numberOfAllocatingJobs = persistentTasksInProgress.findTasks(OpenJobAction.NAME, task -> {
if (node.getId().equals(task.getExecutorNode()) == false) {
return false;
}
long maxNumberOfOpenJobs = Long.parseLong(node.getAttributes().get(MAX_RUNNING_JOBS_PER_NODE.getKey()));
long available = maxNumberOfOpenJobs - numberOfOpenedJobs;
if (available == 0) {
logger.debug("Not opening job [{}] on node [{}], because this node is full. Number of opened jobs [{}], {} [{}]",
jobId, node, numberOfOpenedJobs, MAX_RUNNING_JOBS_PER_NODE.getKey(), maxNumberOfOpenJobs);
JobState jobTaskState = (JobState) task.getStatus();
return jobTaskState == null || // executor node didn't have the chance to set job status to OPENING
jobTaskState == JobState.OPENING || // executor node is busy starting the cpp process
task.isCurrentStatus() == false; // previous executor node failed and
// current executor node didn't have the chance to set job status to OPENING
}).size();
} else {
numberOfAssignedJobs = 0;
numberOfAllocatingJobs = 0;
}
if (numberOfAllocatingJobs >= maxConcurrentJobAllocations) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because node exceeds [" + numberOfAllocatingJobs +
"] the maximum number of jobs [" + maxConcurrentJobAllocations + "] in opening state";
logger.debug(reason);
reasons.add(reason);
continue;
}
long maxNumberOfOpenJobs = Long.parseLong(node.getAttributes().get(MAX_RUNNING_JOBS_PER_NODE.getKey()));
long available = maxNumberOfOpenJobs - numberOfAssignedJobs;
if (available == 0) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node is full. " +
"Number of opened jobs [" + numberOfAssignedJobs + "], " + MAX_RUNNING_JOBS_PER_NODE.getKey() +
" [" + maxNumberOfOpenJobs + "]";
logger.debug(reason);
reasons.add(reason);
continue;
}
if (maxAvailable < available) {
maxAvailable = available;
leastLoadedNode = node;
minLoadedNode = node;
}
}
return leastLoadedNode;
if (minLoadedNode != null) {
logger.info("selected node [{}] for job [{}]", minLoadedNode, jobId);
} else {
logger.info("no node selected for job [{}], reasons [{}]", jobId, String.join(",\n", reasons));
}
return minLoadedNode;
}
}

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
@ -27,7 +26,6 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
@ -223,11 +221,10 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager,
AutodetectProcessManager processManager, TransportListTasksAction listTasksAction) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutodetectProcessManager processManager) {
super(settings, PostDataAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
Request::new, Response::new, MachineLearning.THREAD_POOL_NAME, jobManager, processManager, Request::getJobId,
listTasksAction);
Request::new, Response::new, MachineLearning.THREAD_POOL_NAME, processManager);
}
@Override
@ -238,7 +235,7 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
}
@Override
protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
TimeRange timeRange = TimeRange.builder().startTime(request.getResetStart()).endTime(request.getResetEnd()).build();
DataLoadParams params = new DataLoadParams(timeRange, Optional.ofNullable(request.getDataDescription()));
try {

View File

@ -346,10 +346,6 @@ public class StartDatafeedAction
throw ExceptionsHelper.missingJobException(datafeed.getJobId());
}
DatafeedJobValidator.validate(datafeed, job);
if (tasks == null) {
return;
}
JobState jobState = MlMetadata.getJobState(datafeed.getJobId(), tasks);
if (jobState != JobState.OPENED) {
throw new ElasticsearchStatusException("cannot start datafeed, expected job state [{}], but got [{}]",

View File

@ -7,16 +7,14 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
@ -25,18 +23,18 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
/**
@ -47,55 +45,49 @@ import java.util.function.Supplier;
public abstract class TransportJobTaskAction<OperationTask extends Task, Request extends TransportJobTaskAction.JobTaskRequest<Request>,
Response extends BaseTasksResponse & Writeable> extends TransportTasksAction<OperationTask, Request, Response, Response> {
protected final JobManager jobManager;
protected final AutodetectProcessManager processManager;
private final Function<Request, String> jobIdFromRequest;
private final TransportListTasksAction listTasksAction;
TransportJobTaskAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> requestSupplier,
Supplier<Response> responseSupplier, String nodeExecutor, JobManager jobManager,
AutodetectProcessManager processManager, Function<Request, String> jobIdFromRequest,
TransportListTasksAction listTasksAction) {
Supplier<Response> responseSupplier, String nodeExecutor, AutodetectProcessManager processManager) {
super(settings, actionName, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
requestSupplier, responseSupplier, nodeExecutor);
this.jobManager = jobManager;
this.processManager = processManager;
this.jobIdFromRequest = jobIdFromRequest;
this.listTasksAction = listTasksAction;
}
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
// the same validation that exists in AutodetectProcessManager#processData(...) and flush(...) methods
// is required here too because if the job hasn't been opened yet then no task exist for it yet and then
// #taskOperation(...) method will not be invoked, returning an empty result to the client.
// This ensures that we return an understandable error:
String jobId = jobIdFromRequest.apply(request);
jobManager.getJobOrThrowIfUnknown(jobId);
JobState jobState = jobManager.getJobState(jobId);
if (jobState != JobState.OPENED) {
listener.onFailure( new ElasticsearchStatusException("job [" + jobId + "] state is [" + jobState +
String jobId = request.getJobId();
// We need to check whether there is at least an assigned task here, otherwise we cannot redirect to the
// node running the job task.
ClusterState state = clusterService.state();
JobManager.getJobOrThrowIfUnknown(state, jobId);
PersistentTasksInProgress tasks = clusterService.state().getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress.PersistentTaskInProgress<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
if (jobTask == null || jobTask.getExecutorNode() == null) {
listener.onFailure( new ElasticsearchStatusException("job [" + jobId + "] state is [" + JobState.CLOSED +
"], but must be [" + JobState.OPENED + "] to perform requested action", RestStatus.CONFLICT));
} else {
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setDetailed(true);
listTasksRequest.setActions(OpenJobAction.NAME + "[c]");
listTasksAction.execute(listTasksRequest, ActionListener.wrap(listTasksResponse -> {
String expectedDescription = "job-" + jobId;
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
if (expectedDescription.equals(taskInfo.getDescription())) {
request.setTaskId(taskInfo.getTaskId());
request.setNodes(jobTask.getExecutorNode());
super.doExecute(task, request, listener);
return;
}
}
listener.onFailure(new ResourceNotFoundException("task not found for job [" + jobId + "] " + listTasksResponse));
}, listener::onFailure));
@Override
protected final void taskOperation(Request request, OperationTask task, ActionListener<Response> listener) {
PersistentTasksInProgress tasks = clusterService.state().metaData().custom(PersistentTasksInProgress.TYPE);
JobState jobState = MlMetadata.getJobState(request.getJobId(), tasks);
if (jobState == JobState.OPENED) {
innerTaskOperation(request, task, listener);
} else {
listener.onFailure(new ElasticsearchStatusException("job [" + request.getJobId() + "] state is [" + jobState +
"], but must be [" + JobState.OPENED + "] to perform requested action", RestStatus.CONFLICT));
}
}
protected abstract void innerTaskOperation(Request request, OperationTask task, ActionListener<Response> listener);
@Override
protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) {

View File

@ -24,18 +24,23 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobAction.Response, UpdateJobAction.RequestBuilder> {
public static final UpdateJobAction INSTANCE = new UpdateJobAction();
@ -58,7 +63,7 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
public static class Request extends AcknowledgedRequest<UpdateJobAction.Request> implements ToXContent {
public static UpdateJobAction.Request parseRequest(String jobId, XContentParser parser) {
JobUpdate update = JobUpdate.PARSER.apply(parser, null);
JobUpdate update = JobUpdate.PARSER.apply(parser, null).build();
return new UpdateJobAction.Request(jobId, update);
}
@ -132,6 +137,7 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
public static class TransportAction extends TransportMasterNodeAction<UpdateJobAction.Request, PutJobAction.Response> {
private final ConcurrentMap<String, Semaphore> semaphoreByJob = ConcurrentCollections.newConcurrentMap();
private final JobManager jobManager;
private final Client client;
@ -162,14 +168,33 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
throw new IllegalArgumentException("Job Id " + Job.ALL + " cannot be for update");
}
ActionListener<PutJobAction.Response> wrappedListener = listener;
if (request.getJobUpdate().isAutodetectProcessUpdate()) {
PersistentTasksInProgress tasks = clusterService.state().custom(PersistentTasksInProgress.TYPE);
boolean jobIsOpen = MlMetadata.getJobState(request.getJobId(), tasks) == JobState.OPENED;
semaphoreByJob.computeIfAbsent(request.getJobId(), id -> new Semaphore(1)).acquire();
ActionListener<PutJobAction.Response> wrappedListener;
if (jobIsOpen && request.getJobUpdate().isAutodetectProcessUpdate()) {
wrappedListener = ActionListener.wrap(
response -> updateProcess(request, response, listener),
listener::onFailure);
e -> {
releaseJobSemaphore(request.getJobId());
listener.onFailure(e);
});
}
else {
wrappedListener = ActionListener.wrap(
response -> {
releaseJobSemaphore(request.getJobId());
listener.onResponse(response);
},
e -> {
releaseJobSemaphore(request.getJobId());
listener.onFailure(e);
});
}
jobManager.updateJob(request.getJobId(), request.getJobUpdate(), request, wrappedListener);
jobManager.updateJob(request.getJobId(), request.getJobUpdate(), request, client, wrappedListener);
}
private void updateProcess(Request request, PutJobAction.Response updateConfigResponse,
@ -177,19 +202,26 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
UpdateProcessAction.Request updateProcessRequest = new UpdateProcessAction.Request(request.getJobId(),
request.getJobUpdate().getModelDebugConfig(), request.getJobUpdate().getDetectorUpdates());
client.execute(UpdateProcessAction.INSTANCE, updateProcessRequest, new ActionListener<UpdateProcessAction.Response>() {
@Override
public void onResponse(UpdateProcessAction.Response response) {
releaseJobSemaphore(request.getJobId());
listener.onResponse(updateConfigResponse);
}
@Override
public void onFailure(Exception e) {
releaseJobSemaphore(request.getJobId());
listener.onFailure(e);
}
});
}
private void releaseJobSemaphore(String jobId) {
semaphoreByJob.remove(jobId).release();
}
@Override
protected ClusterBlockException checkBlock(UpdateJobAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
@ -25,7 +24,6 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
@ -37,7 +35,6 @@ import java.util.Objects;
public class UpdateProcessAction extends
Action<UpdateProcessAction.Request, UpdateProcessAction.Response, UpdateProcessAction.RequestBuilder> {
public static final UpdateProcessAction INSTANCE = new UpdateProcessAction();
public static final String NAME = "cluster:admin/ml/job/update/process";
@ -182,10 +179,9 @@ public class UpdateProcessAction extends
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
JobManager jobManager, AutodetectProcessManager processManager, TransportListTasksAction listTasksAction) {
AutodetectProcessManager processManager) {
super(settings, NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
Request::new, Response::new, MachineLearning.THREAD_POOL_NAME, jobManager, processManager, Request::getJobId,
listTasksAction);
Request::new, Response::new, MachineLearning.THREAD_POOL_NAME, processManager);
}
@Override
@ -196,18 +192,11 @@ public class UpdateProcessAction extends
}
@Override
protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
threadPool.executor(MachineLearning.THREAD_POOL_NAME).execute(() -> {
try {
if (request.getModelDebugConfig() != null) {
processManager.writeUpdateModelDebugMessage(request.getJobId(), request.getModelDebugConfig());
}
if (request.getDetectorUpdates() != null) {
for (JobUpdate.DetectorUpdate update : request.getDetectorUpdates()) {
processManager.writeUpdateDetectorRulesMessage(request.getJobId(), update.getIndex(), update.getRules());
}
}
processManager.writeUpdateProcessMessage(request.getJobId(), request.getDetectorUpdates(),
request.getModelDebugConfig());
listener.onResponse(new Response());
} catch (Exception e) {
listener.onFailure(e);

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.ml.job;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
@ -150,7 +149,7 @@ public class JobManager extends AbstractComponent {
* @throws org.elasticsearch.ResourceNotFoundException
* if there is no job with matching the given {@code jobId}
*/
Job getJobOrThrowIfUnknown(ClusterState clusterState, String jobId) {
public static Job getJobOrThrowIfUnknown(ClusterState clusterState, String jobId) {
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
Job job = mlMetadata.getJobs().get(jobId);
if (job == null) {
@ -198,7 +197,9 @@ public class JobManager extends AbstractComponent {
});
}
public void updateJob(String jobId, JobUpdate jobUpdate, AckedRequest request, ActionListener<PutJobAction.Response> actionListener) {
public void updateJob(String jobId, JobUpdate jobUpdate, AckedRequest request, Client client,
ActionListener<PutJobAction.Response> actionListener) {
clusterService.submitStateUpdateTask("update-job-" + jobId,
new AckedClusterStateUpdateTask<PutJobAction.Response>(request, actionListener) {
private Job updatedJob;

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.MlStrings;
import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
@ -134,32 +135,11 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
private final String resultsIndexName;
private final boolean deleted;
public Job(String jobId, String description, Date createTime, Date finishedTime, Date lastDataTime,
private Job(String jobId, String description, Date createTime, Date finishedTime, Date lastDataTime,
AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription,
ModelDebugConfig modelDebugConfig, Long renormalizationWindowDays, Long backgroundPersistInterval,
Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map<String, Object> customSettings,
String modelSnapshotId, String resultsIndexName, boolean deleted) {
if (analysisConfig == null) {
throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_CONFIG_MISSING_ANALYSISCONFIG));
}
checkValueNotLessThan(0, "renormalizationWindowDays", renormalizationWindowDays);
checkValueNotLessThan(MIN_BACKGROUND_PERSIST_INTERVAL, "backgroundPersistInterval", backgroundPersistInterval);
checkValueNotLessThan(0, "modelSnapshotRetentionDays", modelSnapshotRetentionDays);
checkValueNotLessThan(0, "resultsRetentionDays", resultsRetentionDays);
if (!MlStrings.isValidId(jobId)) {
throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_ID, ID.getPreferredName(), jobId));
}
if (jobId.length() > MAX_JOB_ID_LENGTH) {
throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_CONFIG_ID_TOO_LONG, MAX_JOB_ID_LENGTH));
}
if (Strings.isNullOrEmpty(resultsIndexName)) {
resultsIndexName = jobId;
} else if (!MlStrings.isValidId(resultsIndexName)) {
throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_ID, RESULTS_INDEX_NAME.getPreferredName()));
}
this.jobId = jobId;
this.description = description;
@ -564,7 +544,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
}
public void setAnalysisConfig(AnalysisConfig.Builder configBuilder) {
analysisConfig = configBuilder.build();
analysisConfig = ExceptionsHelper.requireNonNull(configBuilder, ANALYSIS_CONFIG.getPreferredName()).build();
}
public void setAnalysisLimits(AnalysisLimits analysisLimits) {
@ -597,7 +577,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
}
public void setDataDescription(DataDescription.Builder description) {
dataDescription = description.build();
dataDescription = ExceptionsHelper.requireNonNull(description, DATA_DESCRIPTION.getPreferredName()).build();
}
public void setModelDebugConfig(ModelDebugConfig modelDebugConfig) {
@ -659,6 +639,28 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
modelSnapshotId = this.modelSnapshotId;
}
if (analysisConfig == null) {
throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_CONFIG_MISSING_ANALYSISCONFIG));
}
checkValueNotLessThan(0, "renormalizationWindowDays", renormalizationWindowDays);
checkValueNotLessThan(MIN_BACKGROUND_PERSIST_INTERVAL, "backgroundPersistInterval", backgroundPersistInterval);
checkValueNotLessThan(0, "modelSnapshotRetentionDays", modelSnapshotRetentionDays);
checkValueNotLessThan(0, "resultsRetentionDays", resultsRetentionDays);
if (!MlStrings.isValidId(id)) {
throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_ID, ID.getPreferredName(), id));
}
if (id.length() > MAX_JOB_ID_LENGTH) {
throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_CONFIG_ID_TOO_LONG, MAX_JOB_ID_LENGTH));
}
if (Strings.isNullOrEmpty(resultsIndexName)) {
resultsIndexName = id;
} else if (!MlStrings.isValidId(resultsIndexName)) {
throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_ID, RESULTS_INDEX_NAME.getPreferredName()));
}
return new Job(
id, description, createTime, finishedTime, lastDataTime, analysisConfig, analysisLimits,
dataDescription, modelDebugConfig, renormalizationWindowDays, backgroundPersistInterval,

View File

@ -23,25 +23,19 @@ import java.util.Objects;
public class JobUpdate implements Writeable, ToXContent {
public static final ParseField DETECTORS = new ParseField("detectors");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<JobUpdate, Void> PARSER =
new ConstructingObjectParser<>("job_update", a -> new JobUpdate((String) a[0], (List<DetectorUpdate>) a[1],
(ModelDebugConfig) a[2], (AnalysisLimits) a[3], (Long) a[4], (Long) a[5], (Long) a[6], (Long) a[7],
(List<String>) a[8], (Map<String, Object>) a[9]));
public static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("job_udpate", Builder::new);
static {
PARSER.declareStringOrNull(ConstructingObjectParser.optionalConstructorArg(), Job.DESCRIPTION);
PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), DetectorUpdate.PARSER, DETECTORS);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelDebugConfig.PARSER, Job.MODEL_DEBUG_CONFIG);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), AnalysisLimits.PARSER, Job.ANALYSIS_LIMITS);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), Job.BACKGROUND_PERSIST_INTERVAL);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), Job.RENORMALIZATION_WINDOW_DAYS);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), Job.RESULTS_RETENTION_DAYS);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), Job.MODEL_SNAPSHOT_RETENTION_DAYS);
PARSER.declareStringArray(ConstructingObjectParser.optionalConstructorArg(), AnalysisConfig.CATEGORIZATION_FILTERS);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), Job.CUSTOM_SETTINGS,
ObjectParser.ValueType.OBJECT);
PARSER.declareStringOrNull(Builder::setDescription, Job.DESCRIPTION);
PARSER.declareObjectArray(Builder::setDetectorUpdates, DetectorUpdate.PARSER, DETECTORS);
PARSER.declareObject(Builder::setModelDebugConfig, ModelDebugConfig.PARSER, Job.MODEL_DEBUG_CONFIG);
PARSER.declareObject(Builder::setAnalysisLimits, AnalysisLimits.PARSER, Job.ANALYSIS_LIMITS);
PARSER.declareLong(Builder::setBackgroundPersistInterval, Job.BACKGROUND_PERSIST_INTERVAL);
PARSER.declareLong(Builder::setRenormalizationWindowDays, Job.RENORMALIZATION_WINDOW_DAYS);
PARSER.declareLong(Builder::setResultsRetentionDays, Job.RESULTS_RETENTION_DAYS);
PARSER.declareLong(Builder::setModelSnapshotRetentionDays, Job.MODEL_SNAPSHOT_RETENTION_DAYS);
PARSER.declareStringArray(Builder::setCategorizationFilters, AnalysisConfig.CATEGORIZATION_FILTERS);
PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT);
}
private final String description;
@ -55,7 +49,7 @@ public class JobUpdate implements Writeable, ToXContent {
private final List<String> categorizationFilters;
private final Map<String, Object> customSettings;
public JobUpdate(@Nullable String description, @Nullable List<DetectorUpdate> detectorUpdates,
private JobUpdate(@Nullable String description, @Nullable List<DetectorUpdate> detectorUpdates,
@Nullable ModelDebugConfig modelDebugConfig, @Nullable AnalysisLimits analysisLimits,
@Nullable Long backgroundPersistInterval, @Nullable Long renormalizationWindowDays,
@Nullable Long resultsRetentionDays, @Nullable Long modelSnapshotRetentionDays,
@ -377,4 +371,74 @@ public class JobUpdate implements Writeable, ToXContent {
&& Objects.equals(this.rules, that.rules);
}
}
public static class Builder {
private String description;
private List<DetectorUpdate> detectorUpdates;
private ModelDebugConfig modelDebugConfig;
private AnalysisLimits analysisLimits;
private Long renormalizationWindowDays;
private Long backgroundPersistInterval;
private Long modelSnapshotRetentionDays;
private Long resultsRetentionDays;
private List<String> categorizationFilters;
private Map<String, Object> customSettings;
public Builder() {}
public Builder setDescription(String description) {
this.description = description;
return this;
}
public Builder setDetectorUpdates(List<DetectorUpdate> detectorUpdates) {
this.detectorUpdates = detectorUpdates;
return this;
}
public Builder setModelDebugConfig(ModelDebugConfig modelDebugConfig) {
this.modelDebugConfig = modelDebugConfig;
return this;
}
public Builder setAnalysisLimits(AnalysisLimits analysisLimits) {
this.analysisLimits = analysisLimits;
return this;
}
public Builder setRenormalizationWindowDays(Long renormalizationWindowDays) {
this.renormalizationWindowDays = renormalizationWindowDays;
return this;
}
public Builder setBackgroundPersistInterval(Long backgroundPersistInterval) {
this.backgroundPersistInterval = backgroundPersistInterval;
return this;
}
public Builder setModelSnapshotRetentionDays(Long modelSnapshotRetentionDays) {
this.modelSnapshotRetentionDays = modelSnapshotRetentionDays;
return this;
}
public Builder setResultsRetentionDays(Long resultsRetentionDays) {
this.resultsRetentionDays = resultsRetentionDays;
return this;
}
public Builder setCategorizationFilters(List<String> categorizationFilters) {
this.categorizationFilters = categorizationFilters;
return this;
}
public Builder setCustomSettings(Map<String, Object> customSettings) {
this.customSettings = customSettings;
return this;
}
public JobUpdate build() {
return new JobUpdate(description, detectorUpdates, modelDebugConfig, analysisLimits, backgroundPersistInterval,
renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings);
}
}
}

View File

@ -403,12 +403,14 @@ public class MlMetadata implements MetaData.Custom {
public static JobState getJobState(String jobId, @Nullable PersistentTasksInProgress tasks) {
PersistentTasksInProgress.PersistentTaskInProgress<?> task = getJobTask(jobId, tasks);
if (task != null && task.getStatus() != null) {
return (JobState) task.getStatus();
} else {
JobState jobTaskState = (JobState) task.getStatus();
if (jobTaskState != null) {
return jobTaskState;
}
}
// If we haven't opened a job than there will be no persistent task, which is the same as if the job was closed
return JobState.CLOSED;
}
}
public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksInProgress tasks) {
PersistentTasksInProgress.PersistentTaskInProgress<?> task = getDatafeedTask(datafeedId, tasks);

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job.process;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.ml.job.process.logging.CppLogMessageHandler;
@ -15,8 +16,13 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
@ -30,6 +36,15 @@ public class NativeController {
private static final String START_COMMAND = "start";
public static final Map<String, Object> UNKNOWN_NATIVE_CODE_INFO;
static {
Map<String, Object> unknownInfo = new HashMap<>(2);
unknownInfo.put("version", "N/A");
unknownInfo.put("build_hash", "N/A");
UNKNOWN_NATIVE_CODE_INFO = Collections.unmodifiableMap(unknownInfo);
}
private final CppLogMessageHandler cppLogHandler;
private final OutputStream commandStream;
private Thread logTailThread;
@ -59,6 +74,23 @@ public class NativeController {
return cppLogHandler.getPid(CONTROLLER_CONNECT_TIMEOUT);
}
public Map<String, Object> getNativeCodeInfo() throws TimeoutException {
String copyrightMessage = cppLogHandler.getCppCopyright(CONTROLLER_CONNECT_TIMEOUT);
Matcher matcher = Pattern.compile("Version (.+) \\(Build ([0-9a-f]+)\\) Copyright ").matcher(copyrightMessage);
if (matcher.find()) {
Map<String, Object> info = new HashMap<>(2);
info.put("version", matcher.group(1));
info.put("build_hash", matcher.group(2));
return info;
} else {
// If this happens it probably means someone has changed the format in lib/ver/CBuildInfo.cc
// in the machine-learning-cpp repo without changing the pattern above to match
String msg = "Unexpected native controller process copyright format: " + copyrightMessage;
LOGGER.error(msg);
throw new ElasticsearchException(msg);
}
}
public void startProcess(List<String> command) throws IOException {
// Sanity check to avoid hard-to-debug errors - tabs and newlines will confuse the controller process
for (String arg : command) {

View File

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
import java.io.IOException;
/**
* Manages a singleton NativeController so that both the MachineLearningFeatureSet and MachineLearning classes can
* get access to the same one.
*/
public class NativeControllerHolder {
private static final Object lock = new Object();
private static NativeController nativeController;
private NativeControllerHolder() {
}
/**
* Get a reference to the singleton native process controller.
*
* The NativeController is created lazily to allow time for the C++ process to be started before connection is attempted.
*
* null is returned to tests that haven't bothered to set up path.home and all runs where useNativeProcess=false.
*
* Calls may throw an exception if initial connection to the C++ process fails.
*/
public static NativeController getNativeController(Settings settings) throws IOException {
if (Environment.PATH_HOME_SETTING.exists(settings) && MachineLearning.USE_NATIVE_PROCESS_OPTION.get(settings)) {
synchronized (lock) {
if (nativeController == null) {
nativeController = new NativeController(new Environment(settings), new NamedPipeHelper());
nativeController.tailLogsInThread();
}
}
return nativeController;
}
return null;
}
}

View File

@ -35,7 +35,8 @@ import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -51,7 +52,7 @@ public class AutodetectCommunicator implements Closeable {
private final AutoDetectResultProcessor autoDetectResultProcessor;
private final Consumer<Exception> handler;
final AtomicBoolean inUse = new AtomicBoolean(false);
final AtomicReference<CountDownLatch> inUse = new AtomicReference<>();
public AutodetectCommunicator(long taskId, Job job, AutodetectProcess process, DataCountsReporter dataCountsReporter,
AutoDetectResultProcessor autoDetectResultProcessor, Consumer<Exception> handler) {
@ -83,7 +84,7 @@ public class AutodetectCommunicator implements Closeable {
DataCounts results = autoDetectWriter.write(countingStream);
autoDetectWriter.flush();
return results;
});
}, false);
}
@Override
@ -98,21 +99,22 @@ public class AutodetectCommunicator implements Closeable {
autoDetectResultProcessor.awaitCompletion();
handler.accept(errorReason != null ? new ElasticsearchException(errorReason) : null);
return null;
});
}, true);
}
public void writeUpdateModelDebugMessage(ModelDebugConfig config) throws IOException {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, job.getId()), () -> {
autodetectProcess.writeUpdateModelDebugMessage(config);
return null;
});
}, false);
}
public void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rules) throws IOException {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, job.getId()), () -> {
autodetectProcess.writeUpdateDetectorRulesMessage(detectorIndex, rules);
return null;
});
}, false);
}
public void flushJob(InterimResultsParams params) throws IOException {
@ -120,7 +122,7 @@ public class AutodetectCommunicator implements Closeable {
String flushId = autodetectProcess.flushJob(params);
waitFlushToCompletion(flushId);
return null;
});
}, false);
}
private void waitFlushToCompletion(String flushId) throws IOException {
@ -171,17 +173,33 @@ public class AutodetectCommunicator implements Closeable {
return taskId;
}
private <T> T checkAndRun(Supplier<String> errorMessage, CheckedSupplier<T, IOException> callback) throws IOException {
if (inUse.compareAndSet(false, true)) {
private <T> T checkAndRun(Supplier<String> errorMessage, CheckedSupplier<T, IOException> callback, boolean wait) throws IOException {
CountDownLatch latch = new CountDownLatch(1);
if (inUse.compareAndSet(null, latch)) {
try {
checkProcessIsAlive();
return callback.get();
} finally {
inUse.set(false);
latch.countDown();
inUse.set(null);
}
} else {
if (wait) {
latch = inUse.get();
if (latch != null) {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS);
}
}
checkProcessIsAlive();
return callback.get();
} else {
throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS);
}
}
}
}

View File

@ -19,9 +19,9 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
@ -116,15 +116,9 @@ public class AutodetectProcessManager extends AbstractComponent {
* @return Count of records, fields, bytes, etc written
*/
public DataCounts processData(String jobId, InputStream input, DataLoadParams params) {
JobState jobState = jobManager.getJobState(jobId);
if (jobState != JobState.OPENED) {
throw new IllegalArgumentException("job [" + jobId + "] state is [" + jobState + "], but must be ["
+ JobState.OPENED + "] for processing data");
}
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
throw new IllegalStateException("job [" + jobId + "] with state [" + jobState + "] hasn't been started");
throw new IllegalStateException("[" + jobId + "] Cannot process data: no active autodetect process for job");
}
try {
return communicator.writeToJob(input, params);
@ -168,23 +162,25 @@ public class AutodetectProcessManager extends AbstractComponent {
}
}
public void writeUpdateModelDebugMessage(String jobId, ModelDebugConfig config) throws IOException {
public void writeUpdateProcessMessage(String jobId, List<JobUpdate.DetectorUpdate> updates, ModelDebugConfig config)
throws IOException {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
logger.debug("Cannot update model debug config: no active autodetect process for job {}", jobId);
return;
}
communicator.writeUpdateModelDebugMessage(config);
// TODO check for errors from autodetects
}
public void writeUpdateDetectorRulesMessage(String jobId, int detectorIndex, List<DetectionRule> rules) throws IOException {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
logger.debug("Cannot update model debug config: no active autodetect process for job {}", jobId);
return;
if (config != null) {
communicator.writeUpdateModelDebugMessage(config);
}
if (updates != null) {
for (JobUpdate.DetectorUpdate update : updates) {
if (update.getRules() != null) {
communicator.writeUpdateDetectorRulesMessage(update.getIndex(), update.getRules());
}
}
}
communicator.writeUpdateDetectorRulesMessage(detectorIndex, rules);
// TODO check for errors from autodetects
}
@ -204,7 +200,10 @@ public class AutodetectProcessManager extends AbstractComponent {
}
setJobState(taskId, JobState.FAILED, e2 -> handler.accept(e1));
}
}, handler);
}, e1 -> {
logger.warn("Failed to gather information required to open job [" + jobId + "]", e1);
setJobState(taskId, JobState.FAILED, e2 -> handler.accept(e1));
});
}
// TODO: add a method on JobProvider that fetches all required info via 1 msearch call, so that we have a single lambda

View File

@ -13,6 +13,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import java.io.IOException;
@ -80,7 +81,7 @@ public class StateProcessor extends AbstractComponent {
try {
logger.trace("[{}] ES API CALL: bulk index", jobId);
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(bytes, null, null);
bulkRequest.add(bytes, null, null, XContentType.JSON);
client.bulk(bulkRequest).actionGet();
} catch (Exception e) {
logger.error(new ParameterizedMessage("[{}] Error persisting bulk state", jobId), e);

View File

@ -47,6 +47,7 @@ public class CppLogMessageHandler implements Closeable {
private final int errorStoreSize;
private final Deque<String> errorStore;
private final CountDownLatch pidLatch;
private final CountDownLatch cppCopyrightLatch;
private volatile boolean hasLogStreamEnded;
private volatile boolean seenFatalError;
private volatile long pid;
@ -70,6 +71,7 @@ public class CppLogMessageHandler implements Closeable {
this.errorStoreSize = errorStoreSize;
errorStore = ConcurrentCollections.newDeque();
pidLatch = new CountDownLatch(1);
cppCopyrightLatch = new CountDownLatch(1);
hasLogStreamEnded = false;
}
@ -133,7 +135,23 @@ public class CppLogMessageHandler implements Closeable {
return pid;
}
public String getCppCopyright() {
/**
* Get the process ID of the C++ process whose log messages are being read. This will
* arrive in the first log message logged by the C++ process. They all log a copyright
* message immediately on startup so it should not take long to arrive, but will not be
* available instantly after the process starts.
*/
public String getCppCopyright(Duration timeout) throws TimeoutException {
if (cppCopyright == null) {
try {
cppCopyrightLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (cppCopyright == null) {
throw new TimeoutException("Timed out waiting for C++ process copyright");
}
}
return cppCopyright;
}
@ -193,6 +211,7 @@ public class CppLogMessageHandler implements Closeable {
String latestMessage = msg.getMessage();
if (cppCopyright == null && latestMessage.contains("Copyright")) {
cppCopyright = latestMessage;
cppCopyrightLatch.countDown();
}
// TODO: Is there a way to preserve the original timestamp when re-logging?
if (jobId != null) {

View File

@ -37,4 +37,9 @@ public class RestPostDataAction extends BaseRestHandler {
return channel -> client.execute(PostDataAction.INSTANCE, request, new RestStatusToXContentListener<>(channel));
}
@Override
public boolean supportsContentStream() {
return true;
}
}

View File

@ -54,6 +54,11 @@ public class MonitoringFeatureSet implements XPackFeatureSet {
return enabled;
}
@Override
public Map<String, Object> nativeCodeInfo() {
return null;
}
@Override
public XPackFeatureSet.Usage usage() {
return new Usage(available(), enabled(), exportersUsage(exporters));

View File

@ -57,4 +57,5 @@ public class PersistentTask extends CancellableTask {
public void setPersistentTaskId(long persistentTaskId) {
this.persistentTaskId = persistentTaskId;
}
}

View File

@ -93,6 +93,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
taskBuilder.setStatus(builder.status);
}, ACTION_PARSER, new ParseField("action"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareStringOrNull(TaskBuilder::setExecutorNode, new ParseField("executor_node"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareLong(TaskBuilder::setAllocationIdOnLastStatusUpdate,
new ParseField("allocation_id_on_last_status_update"));
}
/**
@ -194,24 +196,28 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
private final Status status;
@Nullable
private final String executorNode;
@Nullable
private final Long allocationIdOnLastStatusUpdate;
public PersistentTaskInProgress(long id, String action, Request request, boolean stopped, boolean removeOnCompletion,
String executorNode) {
this(id, 0L, action, request, stopped, removeOnCompletion, null, executorNode);
this(id, 0L, action, request, stopped, removeOnCompletion, null, executorNode, null);
}
public PersistentTaskInProgress(PersistentTaskInProgress<Request> task, boolean stopped, String newExecutorNode) {
this(task.id, task.allocationId + 1L, task.action, task.request, stopped, task.removeOnCompletion, task.status,
newExecutorNode);
newExecutorNode, task.allocationId);
}
public PersistentTaskInProgress(PersistentTaskInProgress<Request> task, Status status) {
this(task.id, task.allocationId, task.action, task.request, task.stopped, task.removeOnCompletion, status, task.executorNode);
this(task.id, task.allocationId, task.action, task.request, task.stopped, task.removeOnCompletion, status,
task.executorNode, task.allocationId);
}
private PersistentTaskInProgress(long id, long allocationId, String action, Request request,
boolean stopped, boolean removeOnCompletion, Status status, String executorNode) {
boolean stopped, boolean removeOnCompletion, Status status,
String executorNode, Long allocationIdOnLastStatusUpdate) {
this.id = id;
this.allocationId = allocationId;
this.action = action;
@ -220,6 +226,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
this.stopped = stopped;
this.removeOnCompletion = removeOnCompletion;
this.executorNode = executorNode;
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
// Update parent request for starting tasks with correct parent task ID
request.setParentTask("cluster", id);
}
@ -234,6 +241,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
removeOnCompletion = in.readBoolean();
status = in.readOptionalNamedWriteable(Task.Status.class);
executorNode = in.readOptionalString();
allocationIdOnLastStatusUpdate = in.readOptionalLong();
}
@Override
@ -246,6 +254,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
out.writeBoolean(removeOnCompletion);
out.writeOptionalNamedWriteable(status);
out.writeOptionalString(executorNode);
out.writeOptionalLong(allocationIdOnLastStatusUpdate);
}
@Override
@ -260,12 +269,14 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
stopped == that.stopped &&
removeOnCompletion == that.removeOnCompletion &&
Objects.equals(status, that.status) &&
Objects.equals(executorNode, that.executorNode);
Objects.equals(executorNode, that.executorNode) &&
Objects.equals(allocationIdOnLastStatusUpdate, that.allocationIdOnLastStatusUpdate);
}
@Override
public int hashCode() {
return Objects.hash(id, allocationId, action, request, stopped, removeOnCompletion, status, executorNode);
return Objects.hash(id, allocationId, action, request, stopped, removeOnCompletion, status, executorNode,
allocationIdOnLastStatusUpdate);
}
@Override
@ -307,6 +318,14 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
return removeOnCompletion;
}
/**
* @return Whether the task status isn't stale. When a task gets unassigned from the executor node or assigned
* to a new executor node and the status hasn't been updated then the task status is stale.
*/
public boolean isCurrentStatus() {
return allocationIdOnLastStatusUpdate == allocationId;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -329,6 +348,9 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
// These are transient values that shouldn't be persisted to gateway cluster state or snapshot
builder.field("allocation_id", allocationId);
builder.field("executor_node", executorNode);
if (allocationIdOnLastStatusUpdate != null) {
builder.field("allocation_id_on_last_status_update", allocationIdOnLastStatusUpdate);
}
}
builder.field("stopped", stopped);
builder.field("remove_on_completion", removeOnCompletion);
@ -352,6 +374,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
private boolean removeOnCompletion;
private Status status;
private String executorNode;
private Long allocationIdOnLastStatusUpdate;
public TaskBuilder<Request> setId(long id) {
this.id = id;
@ -394,8 +417,14 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
return this;
}
public TaskBuilder<Request> setAllocationIdOnLastStatusUpdate(Long allocationIdOnLastStatusUpdate) {
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
return this;
}
public PersistentTaskInProgress<Request> build() {
return new PersistentTaskInProgress<>(id, allocationId, action, request, stopped, removeOnCompletion, status, executorNode);
return new PersistentTaskInProgress<>(id, allocationId, action, request, stopped, removeOnCompletion, status,
executorNode, allocationIdOnLastStatusUpdate);
}
}

View File

@ -83,6 +83,11 @@ public class SecurityFeatureSet implements XPackFeatureSet {
return enabled;
}
@Override
public Map<String, Object> nativeCodeInfo() {
return null;
}
@Override
public XPackFeatureSet.Usage usage() {
Map<String, Object> realmsUsage = buildRealmsUsage(realms);

View File

@ -10,7 +10,6 @@ import org.elasticsearch.action.support.WriteRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.security.authz.RoleDescriptor;
@ -31,15 +30,6 @@ public class PutRoleRequestBuilder extends ActionRequestBuilder<PutRoleRequest,
super(client, action, new PutRoleRequest());
}
/**
* Populate the put role request from the source and the role's name
* @deprecated use {@link #source(String, BytesReference, XContentType)} to avoid content type auto-detection
*/
@Deprecated
public PutRoleRequestBuilder source(String name, BytesReference source) throws IOException {
return source(name, source, XContentFactory.xContentType(source));
}
/**
* Populate the put role request from the source and the role's name
*/

View File

@ -12,7 +12,6 @@ import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.security.authc.support.Hasher;
@ -60,15 +59,6 @@ public class ChangePasswordRequestBuilder
return this;
}
/**
* Populate the change password request from the source
* @deprecated use {@link #source(BytesReference, XContentType)} to avoid content type auto-detection
*/
@Deprecated
public ChangePasswordRequestBuilder source(BytesReference source) throws IOException {
return source(source, XContentFactory.xContentType(source));
}
/**
* Populate the change password request from the source in the provided content type
*/

View File

@ -14,7 +14,6 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.common.xcontent.XContentType;
@ -92,15 +91,6 @@ public class PutUserRequestBuilder extends ActionRequestBuilder<PutUserRequest,
return this;
}
/**
* Populate the put user request using the given source and username
* @deprecated use {@link #source(String, BytesReference, XContentType)} to avoid content type auto-detection
*/
@Deprecated
public PutUserRequestBuilder source(String username, BytesReference source) throws IOException {
return source(username, source, XContentFactory.xContentType(source));
}
/**
* Populate the put user request using the given source and username
*/

View File

@ -53,6 +53,11 @@ public class WatcherFeatureSet implements XPackFeatureSet {
return enabled;
}
@Override
public Map<String, Object> nativeCodeInfo() {
return null;
}
@Override
public XPackFeatureSet.Usage usage() {
return new Usage(available(), enabled(), watcherService != null ? watcherService.usageStats() : Collections.emptyMap());

View File

@ -141,16 +141,6 @@ public class ExecuteWatchRequest extends MasterNodeReadRequest<ExecuteWatchReque
return xContentType;
}
/**
* @param watchSource instead of using an existing watch use this non persisted watch
* @deprecated use {@link #setWatchSource(BytesReference, XContentType)}
*/
@Deprecated
public void setWatchSource(BytesReference watchSource) {
this.watchSource = watchSource;
this.xContentType = XContentFactory.xContentType(watchSource);
}
/**
* @param watchSource instead of using an existing watch use this non persisted watch
*/

View File

@ -78,16 +78,6 @@ public class ExecuteWatchRequestBuilder extends MasterNodeOperationRequestBuilde
return this;
}
/**
* @param watchSource instead of using an existing watch use this non persisted watch
* @deprecated use {@link #setWatchSource(BytesReference, XContentType)}
*/
@Deprecated
public ExecuteWatchRequestBuilder setWatchSource(BytesReference watchSource) {
request.setWatchSource(watchSource);
return this;
}
/**
* @param watchSource instead of using an existing watch use this non persisted watch
*/

View File

@ -41,11 +41,6 @@ public class PutWatchRequest extends MasterNodeRequest<PutWatchRequest> {
this(id, source.buildAsBytes(XContentType.JSON), XContentType.JSON);
}
@Deprecated
public PutWatchRequest(String id, BytesReference source) {
this(id, source, source != null ? XContentFactory.xContentType(source) : null);
}
public PutWatchRequest(String id, BytesReference source, XContentType xContentType) {
this.id = id;
this.source = source;
@ -81,16 +76,6 @@ public class PutWatchRequest extends MasterNodeRequest<PutWatchRequest> {
setSource(source.buildAsBytes(XContentType.JSON), XContentType.JSON);
}
/**
* Set the source of the watch
* @deprecated use {@link #setSource(BytesReference, XContentType)}
*/
@Deprecated
public void setSource(BytesReference source) {
this.source = source;
this.xContentType = XContentFactory.xContentType(source);
}
/**
* Set the source of the watch
*/

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.transport.actions.put;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.watcher.client.WatchSourceBuilder;
public class PutWatchRequestBuilder extends MasterNodeOperationRequestBuilder<PutWatchRequest, PutWatchResponse, PutWatchRequestBuilder> {
@ -31,9 +32,10 @@ public class PutWatchRequestBuilder extends MasterNodeOperationRequestBuilder<Pu
/**
* @param source the source of the watch to be created
* @param xContentType the content type of the source
*/
public PutWatchRequestBuilder setSource(BytesReference source) {
request.setSource(source);
public PutWatchRequestBuilder setSource(BytesReference source, XContentType xContentType) {
request.setSource(source, xContentType);
return this;
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.license;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.license.License.OperationMode;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.Transport;
@ -279,7 +280,9 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
PlainListenableActionFuture<CloseJobAction.Response> listener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).closeJob(new CloseJobAction.Request("foo"), listener);
CloseJobAction.Request request = new CloseJobAction.Request("foo");
request.setTimeout(TimeValue.timeValueSeconds(30));
new MachineLearningClient(client).closeJob(request, listener);
listener.actionGet();
}
}

View File

@ -18,6 +18,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa
import java.util.Collections;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
public class CloseJobActionTests extends ESTestCase {
@ -26,9 +27,7 @@ public class CloseJobActionTests extends ESTestCase {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(buildJobBuilder("job_id").build(), false);
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, null);
task = new PersistentTaskInProgress<>(task, randomFrom(JobState.OPENED, JobState.FAILED));
createJobTask(1L, "job_id", null, randomFrom(JobState.OPENED, JobState.FAILED));
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task))));
@ -52,10 +51,7 @@ public class CloseJobActionTests extends ESTestCase {
public void testMoveJobToClosingState_unexpectedJobState() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(buildJobBuilder("job_id").build(), false);
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, null);
task = new PersistentTaskInProgress<>(task, JobState.OPENING);
PersistentTaskInProgress<OpenJobAction.Request> task = createJobTask(1L, "job_id", null, JobState.OPENING);
ClusterState.Builder csBuilder1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task))));

View File

@ -36,10 +36,10 @@ public class GetJobsActionResponseTests extends AbstractStreamableTestCase<GetJo
Date finishedTime = randomBoolean() ? new Date(randomNonNegativeLong()) : null;
Date lastDataTime = randomBoolean() ? new Date(randomNonNegativeLong()) : null;
long timeout = randomNonNegativeLong();
AnalysisConfig analysisConfig = new AnalysisConfig.Builder(
Collections.singletonList(new Detector.Builder("metric", "some_field").build())).build();
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(
Collections.singletonList(new Detector.Builder("metric", "some_field").build()));
AnalysisLimits analysisLimits = new AnalysisLimits(randomNonNegativeLong(), randomNonNegativeLong());
DataDescription dataDescription = randomBoolean() ? new DataDescription.Builder().build() : null;
DataDescription.Builder dataDescription = new DataDescription.Builder();
ModelDebugConfig modelDebugConfig = randomBoolean() ? new ModelDebugConfig(randomDouble(), randomAsciiOfLength(10)) : null;
Long normalizationWindowDays = randomBoolean() ? Long.valueOf(randomIntBetween(0, 365)) : null;
Long backgroundPersistInterval = randomBoolean() ? Long.valueOf(randomIntBetween(3600, 86400)) : null;
@ -48,11 +48,26 @@ public class GetJobsActionResponseTests extends AbstractStreamableTestCase<GetJo
Map<String, Object> customConfig = randomBoolean() ? Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10))
: null;
String modelSnapshotId = randomBoolean() ? randomAsciiOfLength(10) : null;
String indexName = randomBoolean() ? "index" + j : null;
Job job = new Job(jobId, description, createTime, finishedTime, lastDataTime,
analysisConfig, analysisLimits, dataDescription,
modelDebugConfig, normalizationWindowDays, backgroundPersistInterval,
modelSnapshotRetentionDays, resultsRetentionDays, customConfig, modelSnapshotId, indexName, randomBoolean());
String indexName = "index" + j;
Job.Builder builder = new Job.Builder();
builder.setId(jobId);
builder.setDescription(description);
builder.setCreateTime(createTime);
builder.setFinishedTime(finishedTime);
builder.setLastDataTime(lastDataTime);
builder.setAnalysisConfig(analysisConfig);
builder.setAnalysisLimits(analysisLimits);
builder.setDataDescription(dataDescription);
builder.setModelDebugConfig(modelDebugConfig);
builder.setRenormalizationWindowDays(normalizationWindowDays);
builder.setBackgroundPersistInterval(backgroundPersistInterval);
builder.setModelSnapshotRetentionDays(modelSnapshotRetentionDays);
builder.setResultsRetentionDays(resultsRetentionDays);
builder.setCustomSettings(customConfig);
builder.setModelSnapshotId(modelSnapshotId);
builder.setResultsIndexName(indexName);
builder.setDeleted(randomBoolean());
Job job = builder.build();
jobList.add(job);
}

View File

@ -41,16 +41,14 @@ public class OpenJobActionTests extends ESTestCase {
.build();
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, "_node_id");
task = new PersistentTaskInProgress<>(task, randomFrom(JobState.CLOSED, JobState.FAILED));
createJobTask(1L, "job_id", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED));
PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task));
OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes);
OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksInProgress(1L, Collections.emptyMap()), nodes);
OpenJobAction.validate("job_id", mlBuilder.build(), null, nodes);
task = new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, "_other_node_id");
task = new PersistentTaskInProgress<>(task, JobState.OPENED);
task = createJobTask(1L, "job_id", "_other_node_id", JobState.OPENED);
tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task));
OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes);
}
@ -79,19 +77,16 @@ public class OpenJobActionTests extends ESTestCase {
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, "_node_id");
JobState jobState = randomFrom(JobState.OPENING, JobState.OPENED, JobState.CLOSING);
task = new PersistentTaskInProgress<>(task, jobState);
PersistentTaskInProgress<OpenJobAction.Request> task = createJobTask(1L, "job_id", "_node_id", jobState);
PersistentTasksInProgress tasks1 = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task));
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks1, nodes));
assertEquals("[job_id] expected state [closed] or [failed], but got [" + jobState +"]", e.getMessage());
task = new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, "_other_node_id");
jobState = randomFrom(JobState.OPENING, JobState.CLOSING);
task = new PersistentTaskInProgress<>(task, jobState);
task = createJobTask(1L, "job_id", "_other_node_id", jobState);
PersistentTasksInProgress tasks2 = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task));
e = expectThrows(ElasticsearchStatusException.class,
@ -124,7 +119,7 @@ public class OpenJobActionTests extends ESTestCase {
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), logger);
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, logger);
assertEquals("_node_id3", result.getId());
}
@ -152,7 +147,7 @@ public class OpenJobActionTests extends ESTestCase {
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), logger);
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger);
assertNull(result);
}
@ -174,8 +169,71 @@ public class OpenJobActionTests extends ESTestCase {
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), logger);
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger);
assertNull(result);
}
public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true");
nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10");
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.build();
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
taskMap.put(0L, createJobTask(0L, "job_id1", "_node_id1", JobState.OPENING));
taskMap.put(1L, createJobTask(1L, "job_id2", "_node_id1", JobState.OPENING));
taskMap.put(2L, createJobTask(2L, "job_id3", "_node_id2", JobState.OPENING));
taskMap.put(3L, createJobTask(3L, "job_id4", "_node_id2", JobState.OPENING));
taskMap.put(4L, createJobTask(4L, "job_id5", "_node_id3", JobState.OPENING));
PersistentTasksInProgress tasks = new PersistentTasksInProgress(5L, taskMap);
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs.build(), 2, logger);
assertEquals("_node_id3", result.getId());
PersistentTaskInProgress<OpenJobAction.Request> lastTask = createJobTask(5L, "job_id6", "_node_id3", JobState.OPENING);
taskMap.put(5L, lastTask);
tasks = new PersistentTasksInProgress(6L, taskMap);
cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs.build(), 2, logger);
assertNull("no node selected, because OPENING state", result);
taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, false, "_node_id3"));
tasks = new PersistentTasksInProgress(6L, taskMap);
cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs.build(), 2, logger);
assertNull("no node selected, because stale task", result);
taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, null));
tasks = new PersistentTasksInProgress(6L, taskMap);
cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs.build(), 2, logger);
assertNull("no node selected, because null state", result);
}
public static PersistentTaskInProgress<OpenJobAction.Request> createJobTask(long id, String jobId, String nodeId, JobState jobState) {
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), false, true, nodeId);
task = new PersistentTaskInProgress<>(task, jobState);
return task;
}
}

View File

@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob;
import static org.hamcrest.Matchers.equalTo;
@ -41,10 +42,8 @@ public class StartDatafeedActionTests extends ESTestCase {
mlMetadata.putJob(job, false);
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*")));
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request(job.getId()), false, true, "node_id");
task = new PersistentTaskInProgress<>(task, randomFrom(JobState.FAILED, JobState.CLOSED,
JobState.CLOSING, JobState.OPENING));
JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED, JobState.CLOSING, JobState.OPENING);
PersistentTaskInProgress<OpenJobAction.Request> task = createJobTask(0L, job.getId(), "node_id", jobState);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task));
DiscoveryNodes nodes = DiscoveryNodes.builder()
@ -61,7 +60,7 @@ public class StartDatafeedActionTests extends ESTestCase {
DiscoveryNode node = StartDatafeedAction.selectNode(logger, request, cs.build());
assertNull(node);
task = new PersistentTaskInProgress<>(task, JobState.OPENED);
task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED);
tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task));
cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
@ -110,10 +109,7 @@ public class StartDatafeedActionTests extends ESTestCase {
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
PersistentTaskInProgress<OpenJobAction.Request> jobTask =
new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"),
false, true, "node_id");
jobTask = new PersistentTaskInProgress<>(jobTask, JobState.OPENED);
PersistentTaskInProgress<OpenJobAction.Request> jobTask = createJobTask(0L, "job_id", "node_id", JobState.OPENED);
PersistentTaskInProgress<StartDatafeedAction.Request> datafeedTask =
new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
false, true, "node_id");
@ -140,10 +136,7 @@ public class StartDatafeedActionTests extends ESTestCase {
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
PersistentTaskInProgress<OpenJobAction.Request> jobTask =
new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"),
false, true, "node_id2");
jobTask = new PersistentTaskInProgress<>(jobTask, JobState.OPENED);
PersistentTaskInProgress<OpenJobAction.Request> jobTask = createJobTask(0L, "job_id", "node_id2", JobState.OPENED);
PersistentTaskInProgress<StartDatafeedAction.Request> datafeedTask =
new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
false, true, "node_id1");

View File

@ -22,10 +22,16 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE;
@ -155,7 +161,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
assertBusy(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
Map<String, String> expectedNodeAttr = new HashMap<>();
@ -165,29 +171,28 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
assertEquals(JobState.OPENED, task.getStatus());
});
// stop the only running ml node
logger.info("!!!!");
logger.info("stop the only running ml node");
internalCluster().stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ALLOCATION_ENABLED.getKey(), true));
ensureStableCluster(2);
assertBusy(() -> {
// job should get and remain in a failed state:
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
assertNull(task.getExecutorNode());
// The status remains to be opened as from ml we didn't had the chance to set the status to failed:
assertEquals(JobState.OPENED, task.getStatus());
});
// start ml node
logger.info("start ml node");
internalCluster().startNode(Settings.builder().put(MachineLearning.ALLOCATION_ENABLED.getKey(), true));
ensureStableCluster(3);
assertBusy(() -> {
// job should be re-opened:
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
assertNotNull(task.getExecutorNode());
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
@ -197,6 +202,114 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
assertEquals(expectedNodeAttr, node.getAttributes());
assertEquals(JobState.OPENED, task.getStatus());
});
cleanupWorkaround(3);
}
public void testMaxConcurrentJobAllocations() throws Exception {
int numMlNodes = 2;
internalCluster().ensureAtMostNumDataNodes(0);
// start non ml node, but that will hold the indices
logger.info("Start non ml node:");
String nonMlNode = internalCluster().startNode(Settings.builder()
.put(MachineLearning.ALLOCATION_ENABLED.getKey(), false));
logger.info("Starting ml nodes");
internalCluster().startNodes(numMlNodes, Settings.builder()
.put("node.data", false)
.put("node.master", false)
.put(MachineLearning.ALLOCATION_ENABLED.getKey(), true).build());
ensureStableCluster(numMlNodes + 1);
int maxConcurrentJobAllocations = randomIntBetween(1, 4);
client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put(MachineLearning.CONCURRENT_JOB_ALLOCATIONS.getKey(), maxConcurrentJobAllocations))
.get();
// Sample each cs update and keep track each time a node holds more than `maxConcurrentJobAllocations` opening jobs.
List<String> violations = new CopyOnWriteArrayList<>();
internalCluster().clusterService(nonMlNode).addListener(event -> {
PersistentTasksInProgress tasks = event.state().metaData().custom(PersistentTasksInProgress.TYPE);
if (tasks == null) {
return;
}
for (DiscoveryNode node : event.state().nodes()) {
Collection<PersistentTaskInProgress<?>> foundTasks = tasks.findTasks(OpenJobAction.NAME, task -> {
return node.getId().equals(task.getExecutorNode()) &&
(task.getStatus() == null || task.getStatus() == JobState.OPENING || task.isCurrentStatus() == false);
});
int count = foundTasks.size();
if (count > maxConcurrentJobAllocations) {
violations.add("Observed node [" + node.getName() + "] with [" + count + "] opening jobs on cluster state version [" +
event.state().version() + "]");
}
}
});
int numJobs = numMlNodes * 10;
for (int i = 0; i < numJobs; i++) {
Job.Builder job = createJob(Integer.toString(i));
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true, job.getId()));
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).get();
}
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE);
assertEquals(numJobs, tasks.taskMap().size());
for (PersistentTaskInProgress<?> task : tasks.taskMap().values()) {
assertNotNull(task.getExecutorNode());
assertEquals(JobState.OPENED, task.getStatus());
}
});
logger.info("stopping ml nodes");
for (int i = 0; i < numMlNodes; i++) {
// fork so stopping all ml nodes proceeds quicker:
Runnable r = () -> {
try {
internalCluster()
.stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ALLOCATION_ENABLED.getKey(), false));
} catch (IOException e) {
logger.error("error stopping node", e);
}
};
new Thread(r).start();
}
ensureStableCluster(1, nonMlNode);
assertBusy(() -> {
ClusterState state = client(nonMlNode).admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE);
assertEquals(numJobs, tasks.taskMap().size());
for (PersistentTaskInProgress<?> task : tasks.taskMap().values()) {
assertNull(task.getExecutorNode());
}
});
logger.info("re-starting ml nodes");
internalCluster().startNodes(numMlNodes, Settings.builder()
.put("node.data", false)
.put("node.master", false)
.put(MachineLearning.ALLOCATION_ENABLED.getKey(), true).build());
ensureStableCluster(1 + numMlNodes);
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE);
assertEquals(numJobs, tasks.taskMap().size());
for (PersistentTaskInProgress<?> task : tasks.taskMap().values()) {
assertNotNull(task.getExecutorNode());
assertEquals(JobState.OPENED, task.getStatus());
}
}, 30, TimeUnit.SECONDS);
assertEquals("Expected no violations, but got [" + violations + "]", 0, violations.size());
cleanupWorkaround(numMlNodes + 1);
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
@ -52,7 +53,8 @@ public class DatafeedJobIT extends ESRestTestCase {
+ " }"
+ " }"
+ "}";
client().performRequest("put", "airline-data-empty", Collections.emptyMap(), new StringEntity(mappings));
client().performRequest("put", "airline-data-empty", Collections.emptyMap(),
new StringEntity(mappings, ContentType.APPLICATION_JSON));
// Create index with source = enabled, doc_values = enabled, stored = false
mappings = "{"
@ -66,12 +68,14 @@ public class DatafeedJobIT extends ESRestTestCase {
+ " }"
+ " }"
+ "}";
client().performRequest("put", "airline-data", Collections.emptyMap(), new StringEntity(mappings));
client().performRequest("put", "airline-data", Collections.emptyMap(), new StringEntity(mappings, ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data/response/1", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}"));
new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data/response/2", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}"));
new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}",
ContentType.APPLICATION_JSON));
// Create index with source = enabled, doc_values = disabled (except time), stored = false
mappings = "{"
@ -85,12 +89,15 @@ public class DatafeedJobIT extends ESRestTestCase {
+ " }"
+ " }"
+ "}";
client().performRequest("put", "airline-data-disabled-doc-values", Collections.emptyMap(), new StringEntity(mappings));
client().performRequest("put", "airline-data-disabled-doc-values", Collections.emptyMap(),
new StringEntity(mappings, ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-disabled-doc-values/response/1", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}"));
new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-disabled-doc-values/response/2", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}"));
new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}",
ContentType.APPLICATION_JSON));
// Create index with source = disabled, doc_values = enabled (except time), stored = true
mappings = "{"
@ -105,12 +112,15 @@ public class DatafeedJobIT extends ESRestTestCase {
+ " }"
+ " }"
+ "}";
client().performRequest("put", "airline-data-disabled-source", Collections.emptyMap(), new StringEntity(mappings));
client().performRequest("put", "airline-data-disabled-source", Collections.emptyMap(),
new StringEntity(mappings, ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-disabled-source/response/1", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}"));
new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-disabled-source/response/2", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}"));
new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}",
ContentType.APPLICATION_JSON));
// Create index with nested documents
mappings = "{"
@ -122,12 +132,14 @@ public class DatafeedJobIT extends ESRestTestCase {
+ " }"
+ " }"
+ "}";
client().performRequest("put", "nested-data", Collections.emptyMap(), new StringEntity(mappings));
client().performRequest("put", "nested-data", Collections.emptyMap(), new StringEntity(mappings, ContentType.APPLICATION_JSON));
client().performRequest("put", "nested-data/response/1", Collections.emptyMap(),
new StringEntity("{\"time\":\"2016-06-01T00:00:00Z\", \"responsetime\":{\"millis\":135.22}}"));
new StringEntity("{\"time\":\"2016-06-01T00:00:00Z\", \"responsetime\":{\"millis\":135.22}}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "nested-data/response/2", Collections.emptyMap(),
new StringEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"responsetime\":{\"millis\":222.0}}"));
new StringEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"responsetime\":{\"millis\":222.0}}",
ContentType.APPLICATION_JSON));
// Create index with multiple docs per time interval for aggregation testing
mappings = "{"
@ -141,24 +153,33 @@ public class DatafeedJobIT extends ESRestTestCase {
+ " }"
+ " }"
+ "}";
client().performRequest("put", "airline-data-aggs", Collections.emptyMap(), new StringEntity(mappings));
client().performRequest("put", "airline-data-aggs", Collections.emptyMap(),
new StringEntity(mappings, ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-aggs/response/1", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":100.0}"));
new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":100.0}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-aggs/response/2", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:01:00Z\",\"airline\":\"AAA\",\"responsetime\":200.0}"));
new StringEntity("{\"time stamp\":\"2016-06-01T00:01:00Z\",\"airline\":\"AAA\",\"responsetime\":200.0}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-aggs/response/3", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"BBB\",\"responsetime\":1000.0}"));
new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"BBB\",\"responsetime\":1000.0}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-aggs/response/4", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:01:00Z\",\"airline\":\"BBB\",\"responsetime\":2000.0}"));
new StringEntity("{\"time stamp\":\"2016-06-01T00:01:00Z\",\"airline\":\"BBB\",\"responsetime\":2000.0}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-aggs/response/5", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:00:00Z\",\"airline\":\"AAA\",\"responsetime\":300.0}"));
new StringEntity("{\"time stamp\":\"2016-06-01T01:00:00Z\",\"airline\":\"AAA\",\"responsetime\":300.0}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-aggs/response/6", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:01:00Z\",\"airline\":\"AAA\",\"responsetime\":400.0}"));
new StringEntity("{\"time stamp\":\"2016-06-01T01:01:00Z\",\"airline\":\"AAA\",\"responsetime\":400.0}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-aggs/response/7", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:00:00Z\",\"airline\":\"BBB\",\"responsetime\":3000.0}"));
new StringEntity("{\"time stamp\":\"2016-06-01T01:00:00Z\",\"airline\":\"BBB\",\"responsetime\":3000.0}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-aggs/response/8", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:01:00Z\",\"airline\":\"BBB\",\"responsetime\":4000.0}"));
new StringEntity("{\"time stamp\":\"2016-06-01T01:01:00Z\",\"airline\":\"BBB\",\"responsetime\":4000.0}",
ContentType.APPLICATION_JSON));
// Ensure all data is searchable
client().performRequest("post", "_refresh");
@ -210,7 +231,7 @@ public class DatafeedJobIT extends ESRestTestCase {
+ "\"data_description\" : {\"time_field\":\"time stamp\"}"
+ "}";
client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(),
new StringEntity(job));
new StringEntity(job, ContentType.APPLICATION_JSON));
String datafeedId = "datafeed-" + jobId;
String aggregations = "{\"time stamp\":{\"histogram\":{\"field\":\"time stamp\",\"interval\":3600000},"
@ -235,7 +256,7 @@ public class DatafeedJobIT extends ESRestTestCase {
+ "\"data_description\" : {\"time_field\":\"time stamp\"}"
+ "}";
client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(),
new StringEntity(job));
new StringEntity(job, ContentType.APPLICATION_JSON));
String datafeedId = "datafeed-" + jobId;
String aggregations = "{\"time stamp\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":\"1h\"},"
@ -398,9 +419,8 @@ public class DatafeedJobIT extends ESRestTestCase {
+ " },\n" + " \"data_description\" : {\n" + " \"format\":\"JSON\",\n"
+ " \"time_field\":\"time stamp\",\n" + " \"time_format\":\"yyyy-MM-dd'T'HH:mm:ssX\"\n" + " }\n"
+ "}";
return client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + id,
Collections.emptyMap(), new StringEntity(job));
Collections.emptyMap(), new StringEntity(job, ContentType.APPLICATION_JSON));
}
private static String responseEntityToString(Response response) throws Exception {
@ -419,7 +439,7 @@ public class DatafeedJobIT extends ESRestTestCase {
+ "[{\"function\":\"mean\",\"field_name\":\"responsetime.millis\"}]}, \"data_description\" : {\"time_field\":\"time\"}"
+ "}";
client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(),
new StringEntity(job));
new StringEntity(job, ContentType.APPLICATION_JSON));
String datafeedId = jobId + "-datafeed";
new DatafeedBuilder(datafeedId, jobId, "nested-data", "response").setSource(source).build();
@ -478,7 +498,7 @@ public class DatafeedJobIT extends ESRestTestCase {
+ (aggregations == null ? "" : ",\"aggs\":" + aggregations)
+ "}";
return client().performRequest("put", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId, Collections.emptyMap(),
new StringEntity(datafeedConfig));
new StringEntity(datafeedConfig, ContentType.APPLICATION_JSON));
}
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
@ -139,7 +140,7 @@ public class MlJobIT extends ESRestTestCase {
+ " \"time_format\":\"yyyy-MM-dd HH:mm:ssX\"\n" + " }\n" + "}";
return client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId,
Collections.emptyMap(), new StringEntity(job));
Collections.emptyMap(), new StringEntity(job, ContentType.APPLICATION_JSON));
}
public void testGetBucketResults() throws Exception {
@ -216,13 +217,15 @@ public class MlJobIT extends ESRestTestCase {
String jobConfig = String.format(Locale.ROOT, jobTemplate, "index-1");
Response response = client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/repeated-id" , Collections.emptyMap(), new StringEntity(jobConfig));
+ "anomaly_detectors/repeated-id" , Collections.emptyMap(),
new StringEntity(jobConfig, ContentType.APPLICATION_JSON));
assertEquals(200, response.getStatusLine().getStatusCode());
final String jobConfig2 = String.format(Locale.ROOT, jobTemplate, "index-2");
ResponseException e = expectThrows(ResponseException.class,
() ->client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/repeated-id" , Collections.emptyMap(), new StringEntity(jobConfig2)));
+ "anomaly_detectors/repeated-id" , Collections.emptyMap(),
new StringEntity(jobConfig2, ContentType.APPLICATION_JSON)));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
assertThat(e.getMessage(), containsString("The job cannot be created with the Id 'repeated-id'. The Id is already used."));
@ -240,12 +243,12 @@ public class MlJobIT extends ESRestTestCase {
String jobConfig = String.format(Locale.ROOT, jobTemplate, indexName);
Response response = client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/" + jobId1, Collections.emptyMap(), new StringEntity(jobConfig));
+ "anomaly_detectors/" + jobId1, Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON));
assertEquals(200, response.getStatusLine().getStatusCode());
String jobId2 = "aliased-job-2";
response = client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/" + jobId2, Collections.emptyMap(), new StringEntity(jobConfig));
+ "anomaly_detectors/" + jobId2, Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON));
assertEquals(200, response.getStatusLine().getStatusCode());
response = client().performRequest("get", "_aliases");
@ -307,7 +310,7 @@ public class MlJobIT extends ESRestTestCase {
String jobConfig = String.format(Locale.ROOT, jobTemplate, byFieldName1);
Response response = client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/" + jobId1, Collections.emptyMap(), new StringEntity(jobConfig));
+ "anomaly_detectors/" + jobId1, Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON));
assertEquals(200, response.getStatusLine().getStatusCode());
// Check the index mapping contains the first by_field_name
@ -319,7 +322,7 @@ public class MlJobIT extends ESRestTestCase {
jobConfig = String.format(Locale.ROOT, jobTemplate, byFieldName2);
response = client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/" + jobId2, Collections.emptyMap(), new StringEntity(jobConfig));
+ "anomaly_detectors/" + jobId2, Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON));
assertEquals(200, response.getStatusLine().getStatusCode());
// Check the index mapping now contains both fields
@ -406,11 +409,11 @@ public class MlJobIT extends ESRestTestCase {
"{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"bucket_span\":%d, \"sequence_num\": %d, \"result_type\":\"record\"}",
jobId, 123, 1, 1);
client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "/result/" + 123,
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult));
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON));
client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-001/result/" + 123,
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult));
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON));
client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-002/result/" + 123,
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult));
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON));
client().performRequest("post", "_refresh");
@ -461,7 +464,7 @@ public class MlJobIT extends ESRestTestCase {
private Response addBucketResult(String jobId, String timestamp, long bucketSpan) throws Exception {
try {
client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Collections.emptyMap(), new StringEntity(RESULT_MAPPING));
Collections.emptyMap(), new StringEntity(RESULT_MAPPING, ContentType.APPLICATION_JSON));
} catch (ResponseException e) {
// it is ok: the index already exists
assertThat(e.getMessage(), containsString("resource_already_exists_exception"));
@ -474,13 +477,13 @@ public class MlJobIT extends ESRestTestCase {
String id = String.format(Locale.ROOT,
"%s_%s_%s", jobId, timestamp, bucketSpan);
return client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "/result/" + id,
Collections.singletonMap("refresh", "true"), new StringEntity(bucketResult));
Collections.singletonMap("refresh", "true"), new StringEntity(bucketResult, ContentType.APPLICATION_JSON));
}
private Response addRecordResult(String jobId, String timestamp, long bucketSpan, int sequenceNum) throws Exception {
try {
client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId), Collections.emptyMap(),
new StringEntity(RESULT_MAPPING));
new StringEntity(RESULT_MAPPING, ContentType.APPLICATION_JSON));
} catch (ResponseException e) {
// it is ok: the index already exists
assertThat(e.getMessage(), containsString("resource_already_exists_exception"));
@ -492,7 +495,7 @@ public class MlJobIT extends ESRestTestCase {
"{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"bucket_span\":%d, \"sequence_num\": %d, \"result_type\":\"record\"}",
jobId, timestamp, bucketSpan, sequenceNum);
return client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "/result/" + timestamp,
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult));
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON));
}
private static String responseEntityToString(Response response) throws Exception {

View File

@ -72,7 +72,6 @@ public class MlRestTestStateCleaner {
try {
client.performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close");
} catch (Exception e) {
logger.info("Test clean up close");
if (e.getMessage().contains("cannot close job, expected job state [opened], but got [closed]")
|| e.getMessage().contains("cannot close job, expected job state [opened], but got [closing]")) {
logger.debug("job [" + jobId + "] has already been closed", e);

View File

@ -66,19 +66,17 @@ public class JobManagerTests extends ESTestCase {
}
public void testGetJobOrThrowIfUnknown_GivenUnknownJob() {
JobManager jobManager = createJobManager();
ClusterState cs = createClusterState();
ESTestCase.expectThrows(ResourceNotFoundException.class, () -> jobManager.getJobOrThrowIfUnknown(cs, "foo"));
ESTestCase.expectThrows(ResourceNotFoundException.class, () -> JobManager.getJobOrThrowIfUnknown(cs, "foo"));
}
public void testGetJobOrThrowIfUnknown_GivenKnownJob() {
JobManager jobManager = createJobManager();
Job job = buildJobBuilder("foo").build();
MlMetadata mlMetadata = new MlMetadata.Builder().putJob(job, false).build();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata)).build();
assertEquals(job, jobManager.getJobOrThrowIfUnknown(cs, "foo"));
assertEquals(job, JobManager.getJobOrThrowIfUnknown(cs, "foo"));
}
public void testGetJob_GivenJobIdIsAll() {

View File

@ -12,7 +12,6 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;

View File

@ -22,12 +22,12 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
@Override
protected JobUpdate createTestInstance() {
String description = null;
JobUpdate.Builder update = new JobUpdate.Builder();
if (randomBoolean()) {
description = randomAsciiOfLength(20);
update.setDescription(randomAsciiOfLength(20));
}
List<JobUpdate.DetectorUpdate> detectorUpdates = null;
if (randomBoolean()) {
List<JobUpdate.DetectorUpdate> detectorUpdates = null;
int size = randomInt(10);
detectorUpdates = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
@ -44,42 +44,34 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
}
detectorUpdates.add(new JobUpdate.DetectorUpdate(i, detectorDescription, detectionRules));
}
update.setDetectorUpdates(detectorUpdates);
}
ModelDebugConfig modelDebugConfig = null;
if (randomBoolean()) {
modelDebugConfig = new ModelDebugConfig(randomDouble(), randomAsciiOfLength(10));
update.setModelDebugConfig(new ModelDebugConfig(randomDouble(), randomAsciiOfLength(10)));
}
AnalysisLimits analysisLimits = null;
if (randomBoolean()) {
analysisLimits = new AnalysisLimits(randomNonNegativeLong(), randomNonNegativeLong());
update.setAnalysisLimits(new AnalysisLimits(randomNonNegativeLong(), randomNonNegativeLong()));
}
Long renormalizationWindowDays = null;
if (randomBoolean()) {
renormalizationWindowDays = randomNonNegativeLong();
update.setRenormalizationWindowDays(randomNonNegativeLong());
}
Long backgroundPersistInterval = null;
if (randomBoolean()) {
backgroundPersistInterval = randomNonNegativeLong();
update.setBackgroundPersistInterval(randomNonNegativeLong());
}
Long modelSnapshotRetentionDays = null;
if (randomBoolean()) {
modelSnapshotRetentionDays = randomNonNegativeLong();
update.setModelSnapshotRetentionDays(randomNonNegativeLong());
}
Long resultsRetentionDays = null;
if (randomBoolean()) {
resultsRetentionDays = randomNonNegativeLong();
update.setResultsRetentionDays(randomNonNegativeLong());
}
List<String> categorizationFilters = null;
if (randomBoolean()) {
categorizationFilters = Arrays.asList(generateRandomStringArray(10, 10, false));
update.setCategorizationFilters(Arrays.asList(generateRandomStringArray(10, 10, false)));
}
Map<String, Object> customSettings = null;
if (randomBoolean()) {
customSettings = Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10));
update.setCustomSettings(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
}
return new JobUpdate(description, detectorUpdates, modelDebugConfig, analysisLimits, backgroundPersistInterval,
renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings);
return update.build();
}
@Override
@ -89,7 +81,7 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
@Override
protected JobUpdate parseInstance(XContentParser parser) {
return JobUpdate.PARSER.apply(parser, null);
return JobUpdate.PARSER.apply(parser, null).build();
}
public void testMergeWithJob() {
@ -108,9 +100,18 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
List<String> categorizationFilters = Arrays.asList(generateRandomStringArray(10, 10, false));
Map<String, Object> customSettings = Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10));
JobUpdate update = new JobUpdate("updated_description", detectorUpdates, modelDebugConfig,
analysisLimits, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
categorizationFilters, customSettings);
JobUpdate.Builder updateBuilder = new JobUpdate.Builder();
updateBuilder.setDescription("updated_description");
updateBuilder.setDetectorUpdates(detectorUpdates);
updateBuilder.setModelDebugConfig(modelDebugConfig);
updateBuilder.setAnalysisLimits(analysisLimits);
updateBuilder.setBackgroundPersistInterval(randomNonNegativeLong());
updateBuilder.setResultsRetentionDays(randomNonNegativeLong());
updateBuilder.setModelSnapshotRetentionDays(randomNonNegativeLong());
updateBuilder.setRenormalizationWindowDays(randomNonNegativeLong());
updateBuilder.setCategorizationFilters(categorizationFilters);
updateBuilder.setCustomSettings(customSettings);
JobUpdate update = updateBuilder.build();
Job.Builder jobBuilder = new Job.Builder("foo");
Detector.Builder d1 = new Detector.Builder("info_content", "domain");
@ -143,11 +144,11 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
}
public void testIsAutodetectProcessUpdate() {
JobUpdate update = new JobUpdate(null, null, null, null, null, null, null, null, null, null);
JobUpdate update = new JobUpdate.Builder().build();
assertFalse(update.isAutodetectProcessUpdate());
update = new JobUpdate(null, null, new ModelDebugConfig(1.0, "ff"), null, null, null, null, null, null, null);
update = new JobUpdate.Builder().setModelDebugConfig(new ModelDebugConfig(1.0, "ff")).build();
assertTrue(update.isAutodetectProcessUpdate());
update = new JobUpdate(null, Arrays.asList(mock(JobUpdate.DetectorUpdate.class)), null, null, null, null, null, null, null, null);
update = new JobUpdate.Builder().setDetectorUpdates(Arrays.asList(mock(JobUpdate.DetectorUpdate.class))).build();
assertTrue(update.isAutodetectProcessUpdate());
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa
import java.io.IOException;
import java.util.Collections;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
@ -146,11 +147,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
assertThat(result.getJobs().get("1"), sameInstance(job1));
assertThat(result.getDatafeeds().get("1"), nullValue());
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(
new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("1"), false, true, null),
JobState.CLOSED
);
PersistentTaskInProgress<OpenJobAction.Request> task = createJobTask(0L, "1", null, JobState.CLOSED);
MlMetadata.Builder builder2 = new MlMetadata.Builder(result);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> builder2.deleteJob("1", new PersistentTasksInProgress(0L, Collections.singletonMap(0L, task))));

View File

@ -18,6 +18,8 @@ import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.contains;
@ -25,9 +27,9 @@ import static org.mockito.Mockito.when;
public class NativeControllerTests extends ESTestCase {
public void testNativeController() throws IOException {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
Environment env = new Environment(settings);
private Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
public void testStartProcessCommand() throws IOException {
NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class);
ByteArrayInputStream logStream = new ByteArrayInputStream(new byte[1]);
@ -43,10 +45,34 @@ public class NativeControllerTests extends ESTestCase {
command.add("--arg2=42");
command.add("--arg3=something with spaces");
NativeController nativeController = new NativeController(env, namedPipeHelper);
NativeController nativeController = new NativeController(new Environment(settings), namedPipeHelper);
nativeController.startProcess(command);
assertEquals("start\tmy_process\t--arg1\t--arg2=42\t--arg3=something with spaces\n",
commandStream.toString(StandardCharsets.UTF_8.name()));
}
public void testGetNativeCodeInfo() throws IOException, TimeoutException {
String testMessage = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\",\"pid\":10211,"
+ "\"thread\":\"0x7fff7d2a8000\",\"message\":\"controller (64 bit): Version 6.0.0-alpha1-SNAPSHOT (Build a0d6ef8819418c) "
+ "Copyright (c) 2017 Elasticsearch BV\",\"method\":\"main\",\"file\":\"Main.cc\",\"line\":123}\n";
NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class);
ByteArrayInputStream logStream = new ByteArrayInputStream(testMessage.getBytes(StandardCharsets.UTF_8));
when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class)))
.thenReturn(logStream);
ByteArrayOutputStream commandStream = new ByteArrayOutputStream();
when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class)))
.thenReturn(commandStream);
NativeController nativeController = new NativeController(new Environment(settings), namedPipeHelper);
nativeController.tailLogsInThread();
Map<String, Object> nativeCodeInfo = nativeController.getNativeCodeInfo();
assertNotNull(nativeCodeInfo);
assertEquals(2, nativeCodeInfo.size());
assertEquals("6.0.0-alpha1-SNAPSHOT", nativeCodeInfo.get("version"));
assertEquals("a0d6ef8819418c", nativeCodeInfo.get("build_hash"));
}
}

View File

@ -28,6 +28,7 @@ import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
@ -155,11 +156,11 @@ public class AutodetectCommunicatorTests extends ESTestCase {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class));
communicator.inUse.set(true);
communicator.inUse.set(new CountDownLatch(1));
expectThrows(ElasticsearchStatusException.class,
() -> communicator.writeToJob(in, mock(DataLoadParams.class)));
communicator.inUse.set(false);
communicator.inUse.set(null);
communicator.writeToJob(in, new DataLoadParams(TimeRange.builder().build(), Optional.empty()));
}
@ -169,11 +170,11 @@ public class AutodetectCommunicatorTests extends ESTestCase {
when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
communicator.inUse.set(true);
communicator.inUse.set(new CountDownLatch(1));
InterimResultsParams params = mock(InterimResultsParams.class);
expectThrows(ElasticsearchStatusException.class, () -> communicator.flushJob(params));
communicator.inUse.set(false);
communicator.inUse.set(null);
communicator.flushJob(params);
}
@ -183,10 +184,12 @@ public class AutodetectCommunicatorTests extends ESTestCase {
when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
communicator.inUse.set(true);
expectThrows(ElasticsearchStatusException.class, () -> communicator.close());
CountDownLatch latch = mock(CountDownLatch.class);
communicator.inUse.set(latch);
communicator.close();
verify(latch, times(1)).await();
communicator.inUse.set(false);
communicator.inUse.set(null);
communicator.close();
}
@ -195,10 +198,10 @@ public class AutodetectCommunicatorTests extends ESTestCase {
AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
communicator.inUse.set(true);
communicator.inUse.set(new CountDownLatch(1));
expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateModelDebugMessage(mock(ModelDebugConfig.class)));
communicator.inUse.set(false);
communicator.inUse.set(null);
communicator.writeUpdateModelDebugMessage(mock(ModelDebugConfig.class));
}
@ -208,10 +211,10 @@ public class AutodetectCommunicatorTests extends ESTestCase {
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
List<DetectionRule> rules = Collections.singletonList(mock(DetectionRule.class));
communicator.inUse.set(true);
communicator.inUse.set(new CountDownLatch(1));
expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateDetectorRulesMessage(0, rules));
communicator.inUse.set(false);
communicator.inUse.set(null);
communicator.writeUpdateDetectorRulesMessage(0, rules);
}
}

View File

@ -22,6 +22,7 @@ import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
@ -93,7 +94,6 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobResultsPersister.bulkPersisterBuilder(any())).thenReturn(mock(JobResultsPersister.Builder.class));
jobDataCountsPersister = mock(JobDataCountsPersister.class);
normalizerFactory = mock(NormalizerFactory.class);
givenAllocationWithState(JobState.OPENED);
when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo"));
doAnswer(invocationOnMock -> {
@ -131,7 +131,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
manager.openJob("foo", 1L, false, e -> {});
assertEquals(1, manager.numberOfOpenJobs());
assertTrue(manager.jobHasActiveAutodetectProcess("foo"));
UpdatePersistentTaskStatusAction.Request expectedRequest = new UpdatePersistentTaskStatusAction.Request(1L, JobState.OPENED);
UpdatePersistentTaskStatusAction.Request expectedRequest =
new UpdatePersistentTaskStatusAction.Request(1L, JobState.OPENED);
verify(client).execute(eq(UpdatePersistentTaskStatusAction.INSTANCE), eq(expectedRequest), any());
}
@ -270,19 +271,14 @@ public class AutodetectProcessManagerTests extends ESTestCase {
assertEquals("[foo] exception while flushing job", e.getMessage());
}
public void testWriteUpdateModelDebugMessage() throws IOException {
public void testwriteUpdateProcessMessage() throws IOException {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo");
ModelDebugConfig debugConfig = mock(ModelDebugConfig.class);
manager.writeUpdateModelDebugMessage("foo", debugConfig);
verify(communicator).writeUpdateModelDebugMessage(debugConfig);
}
public void testWriteUpdateDetectorRulesMessage() throws IOException {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo");
List<DetectionRule> rules = Collections.singletonList(mock(DetectionRule.class));
manager.writeUpdateDetectorRulesMessage("foo", 2, rules);
List<JobUpdate.DetectorUpdate> detectorUpdates = Collections.singletonList(new JobUpdate.DetectorUpdate(2, null, rules));
manager.writeUpdateProcessMessage("foo", detectorUpdates, debugConfig);
verify(communicator).writeUpdateModelDebugMessage(debugConfig);
verify(communicator).writeUpdateDetectorRulesMessage(2, rules);
}
@ -302,7 +298,6 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
when(communicator.writeToJob(any(), any())).thenReturn(new DataCounts("foo"));
AutodetectProcessManager manager = createManager(communicator);
givenAllocationWithState(JobState.OPENED);
InputStream inputStream = createInputStream("");
manager.openJob("foo", 1L, false, e -> {});
@ -337,10 +332,6 @@ public class AutodetectProcessManagerTests extends ESTestCase {
verify(autodetectProcess, times(1)).close();
}
private void givenAllocationWithState(JobState state) {
when(jobManager.getJobState("foo")).thenReturn(state);
}
private AutodetectProcessManager createManager(AutodetectCommunicator communicator) {
Client client = mock(Client.class);
return createManager(communicator, client);

View File

@ -42,9 +42,11 @@ public class CppLogMessageHandlerTests extends ESTestCase {
handler.tailStream();
assertTrue(handler.hasLogStreamEnded());
assertEquals(10211L, handler.getPid(Duration.ofMillis(1)));
// Since this is all being done in one thread and we know the stream has
// been completely consumed at this point the wait duration can be zero
assertEquals(10211L, handler.getPid(Duration.ZERO));
assertEquals("controller (64 bit): Version based on 6.0.0-alpha1 (Build b0d6ef8819418c) "
+ "Copyright (c) 2017 Elasticsearch BV", handler.getCppCopyright());
+ "Copyright (c) 2017 Elasticsearch BV", handler.getCppCopyright(Duration.ZERO));
assertEquals("Did not understand verb 'a'\n", handler.getErrors());
assertFalse(handler.seenFatalError());
}

View File

@ -115,7 +115,7 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase {
deleteAllDatafeeds(client());
deleteAllJobs(client());
for (int i = 0; i < numNodes; i++) {
internalCluster().stopRandomDataNode();
internalCluster().stopRandomNode(settings -> true);
}
internalCluster().startNode(Settings.builder().put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false));
ensureStableCluster(1);

View File

@ -569,7 +569,7 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
when(response.getStatusLine()).thenReturn(statusLine);
when(statusLine.getStatusCode()).thenReturn(RestStatus.OK.getStatus());
when(response.getEntity()).thenReturn(new StringEntity("{}"));
when(response.getEntity()).thenReturn(new StringEntity("{}", ContentType.APPLICATION_JSON));
when(client.performRequest(eq("GET"),
startsWith("/.marvel-es-1-*"),

View File

@ -60,9 +60,9 @@ public class LocalExporterTemplateTests extends MonitoringIntegTestCase {
request.settings(Settings.builder().put("index.mapper.dynamic", false).build());
// notably absent are: kibana, logstash, and beats
request.mapping("cluster_info", "{\"enabled\": false}");
request.mapping("node", "{\"enabled\": false}");
request.mapping("fake", "{\"enabled\": false}");
request.mapping("cluster_info", "{\"enabled\": false}", XContentType.JSON);
request.mapping("node", "{\"enabled\": false}", XContentType.JSON);
request.mapping("fake", "{\"enabled\": false}", XContentType.JSON);
client().admin().indices().create(request).actionGet();

View File

@ -162,7 +162,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
watchSource.endObject();
try {
watcherClient.preparePutWatch("_name")
.setSource(watchSource.bytes())
.setSource(watchSource.bytes(), watchSource.contentType())
.get();
fail();
} catch (ElasticsearchParseException e) {

View File

@ -69,19 +69,19 @@ public class WatchRequestValidationTests extends ESTestCase {
}
public void testPutWatchInvalidWatchId() {
ActionRequestValidationException e = new PutWatchRequest("id with whitespaces", BytesArray.EMPTY).validate();
ActionRequestValidationException e = new PutWatchRequest("id with whitespaces", BytesArray.EMPTY, XContentType.JSON).validate();
assertThat(e, is(notNullValue()));
assertThat(e.validationErrors(), hasItem("watch id contains whitespace"));
}
public void testPutWatchNullId() {
ActionRequestValidationException e = new PutWatchRequest(null, BytesArray.EMPTY).validate();
ActionRequestValidationException e = new PutWatchRequest(null, BytesArray.EMPTY, XContentType.JSON).validate();
assertThat(e, is(notNullValue()));
assertThat(e.validationErrors(), hasItem("watch id is missing"));
}
public void testPutWatchSourceNull() {
ActionRequestValidationException e = new PutWatchRequest("foo", (BytesReference) null).validate();
ActionRequestValidationException e = new PutWatchRequest("foo", (BytesReference) null, XContentType.JSON).validate();
assertThat(e, is(notNullValue()));
assertThat(e.validationErrors(), hasItem("watch source is missing"));
}

View File

@ -214,12 +214,23 @@
}
- match: { job_id: "to-update" }
- do:
xpack.ml.open_job:
job_id: to-update
- do:
xpack.ml.update_job:
job_id: to-update
body: >
{
"description":"Post update description",
"detectors": [{"index": 0, "rules": {"target_field_name": "airline",
"rule_conditions": [ { "condition_type": "numerical_actual",
"condition": {"operator": "gt", "value": "10" } } ] } },
{"index": 1, "description": "updated description"}],
"model_debug_config": {
"bounds_percentile": 99.0
},
"analysis_limits": {
"model_memory_limit": 20
},
@ -234,8 +245,11 @@
}
- match: { job_id: "to-update" }
- match: { description: "Post update description" }
- match: { model_debug_config.bounds_percentile: 99.0 }
- match: { analysis_limits.model_memory_limit: 20 }
- match: { analysis_config.categorization_filters: ["cat3.*"] }
- match: { analysis_config.detectors.0.detector_rules.0.target_field_name: "airline" }
- match: { analysis_config.detectors.1.detector_description: "updated description" }
- match: { renormalization_window_days: 10 }
- match: { background_persist_interval: 10800 }
- match: { model_snapshot_retention_days: 30 }

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.apache.http.HttpHost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -14,27 +16,42 @@ import org.elasticsearch.xpack.ml.MachineLearning;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.common.xcontent.XContentType.JSON;
public class MlBasicMultiNodeIT extends ESRestTestCase {
@SuppressWarnings("unchecked")
public void testMachineLearningInstalled() throws Exception {
Response response = client().performRequest("get", "/_xpack");
assertEquals(200, response.getStatusLine().getStatusCode());
Map<String, Object> features = (Map<String, Object>) responseEntityToMap(response).get("features");
Map<String, Object> ml = (Map<String, Object>) features.get("ml");
assertNotNull(ml);
assertTrue((Boolean) ml.get("available"));
assertTrue((Boolean) ml.get("enabled"));
}
public void testMiniFarequote() throws Exception {
String jobId = "foo";
String jobId = "foo1";
createFarequoteJob(jobId);
Response response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open");
assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(Collections.singletonMap("opened", true), responseEntityToMap(response));
assertBusy(this::assertSameClusterStateOnAllNodes);
String postData =
"{\"airline\":\"AAL\",\"responsetime\":\"132.2046\",\"sourcetype\":\"farequote\",\"time\":\"1403481600\"}\n" +
"{\"airline\":\"JZA\",\"responsetime\":\"990.4628\",\"sourcetype\":\"farequote\",\"time\":\"1403481700\"}";
response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data",
Collections.emptyMap(), new StringEntity(postData));
Collections.emptyMap(),
new StringEntity(postData, randomFrom(ContentType.APPLICATION_JSON, ContentType.create("application/x-ndjson"))));
assertEquals(202, response.getStatusLine().getStatusCode());
Map<String, Object> responseBody = responseEntityToMap(response);
assertEquals(2, responseBody.get("processed_record_count"));
@ -86,16 +103,18 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
+ " }"
+ " }"
+ "}";
client().performRequest("put", "airline-data", Collections.emptyMap(), new StringEntity(mappings));
client().performRequest("put", "airline-data", Collections.emptyMap(), new StringEntity(mappings, ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data/response/1", Collections.emptyMap(),
new StringEntity("{\"time\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}"));
new StringEntity("{\"time\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data/response/2", Collections.emptyMap(),
new StringEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}"));
new StringEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}",
ContentType.APPLICATION_JSON));
// Ensure all data is searchable
client().performRequest("post", "_refresh");
String jobId = "foo";
String jobId = "foo2";
createFarequoteJob(jobId);
String datafeedId = "bar";
createDatafeed(datafeedId, jobId);
@ -148,7 +167,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
xContentBuilder.field("_source", true);
xContentBuilder.endObject();
return client().performRequest("put", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId,
Collections.emptyMap(), new StringEntity(xContentBuilder.string()));
Collections.emptyMap(), new StringEntity(xContentBuilder.string(), ContentType.APPLICATION_JSON));
}
private Response createFarequoteJob(String jobId) throws Exception {
@ -176,11 +195,35 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
xContentBuilder.endObject();
return client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId,
Collections.emptyMap(), new StringEntity(xContentBuilder.string()));
Collections.emptyMap(), new StringEntity(xContentBuilder.string(), ContentType.APPLICATION_JSON));
}
private static Map<String, Object> responseEntityToMap(Response response) throws IOException {
return XContentHelper.convertToMap(JSON.xContent(), response.getEntity().getContent(), false);
}
// When open job api returns the cluster state on nodes other than master node or node that acted as coordinating node,
// may not have had the latest update with job state set to opened. This may fail subsequent post data, flush, or
// close calls until that node that is running the job task has applied the cluster state where job state has been set to opened.
// this method waits until all nodes in the cluster have the same cluster state version, so that such failures can be
// avoided in tests. Note that the job has been started on the node running the job task (autodetect process is running),
// this is just a workaround for inconsistency in cluster states that may happen for a small amount of time.
private void assertSameClusterStateOnAllNodes(){
assert getClusterHosts().size() > 1;
Set<Integer> versions = new HashSet<>();
for (HttpHost host : getClusterHosts()) {
try {
// Client round robins between cluster hosts:
Response response = client().performRequest("get", "/_cluster/state/version", Collections.singletonMap("local", "true"));
assertEquals(200, response.getStatusLine().getStatusCode());
int version = (Integer) responseEntityToMap(response).get("version");
logger.info("Sampled version [{}]", version);
versions.add(version);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
assertEquals(1, versions.size());
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -47,7 +48,8 @@ public class MlPluginDisabledIT extends ESRestTestCase {
xContentBuilder.endObject();
ResponseException exception = expectThrows(ResponseException.class, () -> client().performRequest("put",
MachineLearning.BASE_PATH + "anomaly_detectors/foo", Collections.emptyMap(), new StringEntity(xContentBuilder.string())));
MachineLearning.BASE_PATH + "anomaly_detectors/foo", Collections.emptyMap(),
new StringEntity(xContentBuilder.string(), ContentType.APPLICATION_JSON)));
assertThat(exception.getMessage(), containsString("No handler found for uri [/_xpack/ml/anomaly_detectors/foo] and method [PUT]"));
}
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.transforms;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Response;
@ -188,13 +189,13 @@ public class PainlessDomainSplitIT extends ESRestTestCase {
private void createIndex(String name, Settings settings) throws IOException {
assertOK(client().performRequest("PUT", name, Collections.emptyMap(),
new StringEntity("{ \"settings\": " + Strings.toString(settings) + " }")));
new StringEntity("{ \"settings\": " + Strings.toString(settings) + " }", ContentType.APPLICATION_JSON)));
}
private void createIndex(String name, Settings settings, String mapping) throws IOException {
assertOK(client().performRequest("PUT", name, Collections.emptyMap(),
new StringEntity("{ \"settings\": " + Strings.toString(settings)
+ ", \"mappings\" : {" + mapping + "} }")));
+ ", \"mappings\" : {" + mapping + "} }", ContentType.APPLICATION_JSON)));
}
public void testIsolated() throws Exception {
@ -205,7 +206,7 @@ public class PainlessDomainSplitIT extends ESRestTestCase {
createIndex("painless", settings.build());
client().performRequest("PUT", "painless/test/1", Collections.emptyMap(),
new StringEntity("{\"test\": \"test\"}"));
new StringEntity("{\"test\": \"test\"}", ContentType.APPLICATION_JSON));
client().performRequest("POST", "painless/_refresh");
Pattern pattern = Pattern.compile("domain_split\":\\[(.*?),(.*?)\\]");
@ -230,7 +231,7 @@ public class PainlessDomainSplitIT extends ESRestTestCase {
" }\n" +
" }\n" +
" }\n" +
"}");
"}", ContentType.APPLICATION_JSON);
Response response = client().performRequest("GET", "painless/test/_search", Collections.emptyMap(), body);
String responseBody = EntityUtils.toString(response.getEntity());
@ -275,7 +276,7 @@ public class PainlessDomainSplitIT extends ESRestTestCase {
" }";
client().performRequest("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/painless", Collections.emptyMap(),
new StringEntity(job));
new StringEntity(job, ContentType.APPLICATION_JSON));
client().performRequest("POST", MachineLearning.BASE_PATH + "anomaly_detectors/painless/_open");
// Create index to hold data
@ -304,14 +305,14 @@ public class PainlessDomainSplitIT extends ESRestTestCase {
client().performRequest("PUT", "painless/test/" + time.toDateTimeISO() + "_" + j,
Collections.emptyMap(),
new StringEntity("{\"domain\": \"" + "bar.bar.com\", \"time\": \"" + time.toDateTimeISO()
+ "\"}"));
+ "\"}", ContentType.APPLICATION_JSON));
}
} else {
// Non-anomalous values will be what's seen when the anomaly is reported
client().performRequest("PUT", "painless/test/" + time.toDateTimeISO(),
Collections.emptyMap(),
new StringEntity("{\"domain\": \"" + test.hostName + "\", \"time\": \"" + time.toDateTimeISO()
+ "\"}"));
+ "\"}", ContentType.APPLICATION_JSON));
}
}
@ -330,10 +331,9 @@ public class PainlessDomainSplitIT extends ESRestTestCase {
" }";
client().performRequest("PUT", MachineLearning.BASE_PATH + "datafeeds/painless", Collections.emptyMap(),
new StringEntity(body));
new StringEntity(body, ContentType.APPLICATION_JSON));
client().performRequest("POST", MachineLearning.BASE_PATH + "datafeeds/painless/_start");
boolean passed = awaitBusy(() -> {
try {
client().performRequest("POST", "/_refresh");