[ML] Add compatibility checks while opening a job (elastic/x-pack-elasticsearch#1458)

This commit adds compatibility checks while opening a job:

- Checks that jobs without versions (< 5.5) are not opened
- Checks that jobs with incompatible types are not opened

Original commit: elastic/x-pack-elasticsearch@a3adab733e
This commit is contained in:
Dimitris Athanasiou 2017-05-17 18:10:36 +01:00 committed by GitHub
parent 6d6c776cd4
commit f0cb7b816d
7 changed files with 196 additions and 5 deletions

View File

@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
@ -59,6 +60,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Persiste
import org.elasticsearch.xpack.persistent.PersistentTasksExecutor;
import org.elasticsearch.xpack.persistent.PersistentTasksService;
import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
import org.elasticsearch.xpack.security.support.Exceptions;
import java.io.IOException;
import java.util.ArrayList;
@ -66,6 +68,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE;
@ -538,8 +541,12 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
}
/**
* Fail fast before trying to update the job state on master node if the job doesn't exist or its state
* is not what it should be.
* Validations to fail fast before trying to update the job state on master node:
* <ul>
* <li>check job exists</li>
* <li>check job is not marked as deleted</li>
* <li>check job's version is supported</li>
* </ul>
*/
static void validate(String jobId, MlMetadata mlMetadata) {
Job job = mlMetadata.getJobs().get(jobId);
@ -549,6 +556,10 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
if (job.isDeleted()) {
throw ExceptionsHelper.conflictStatusException("Cannot open job [" + jobId + "] because it has been marked as deleted");
}
if (job.getJobVersion() == null) {
throw ExceptionsHelper.badRequestException("Cannot open job [" + jobId
+ "] because jobs created prior to version 5.5 are not supported");
}
}
static Assignment selectLeastLoadedMlNode(String jobId, ClusterState clusterState, int maxConcurrentJobAllocations,
@ -575,6 +586,25 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
continue;
}
MlMetadata mlMetadata = clusterState.getMetaData().custom(MlMetadata.TYPE);
Job job = mlMetadata.getJobs().get(jobId);
Set<String> compatibleJobTypes = Job.getCompatibleJobTypes(node.getVersion());
if (compatibleJobTypes.contains(job.getJobType()) == false) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node does not support jobs of type ["
+ job.getJobType() + "]";
logger.trace(reason);
reasons.add(reason);
continue;
}
if (nodeSupportsJobVersion(node.getVersion(), job.getJobVersion()) == false) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node does not support jobs of version ["
+ job.getJobVersion() + "]";
logger.trace(reason);
reasons.add(reason);
continue;
}
long numberOfAssignedJobs;
int numberOfAllocatingJobs;
if (persistentTasks != null) {
@ -646,4 +676,8 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
}
return unavailableIndices;
}
static boolean nodeSupportsJobVersion(Version nodeVersion, Version jobVersion) {
return nodeVersion.onOrAfter(Version.V_5_5_0_UNRELEASED);
}
}

View File

@ -27,7 +27,9 @@ import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -217,7 +219,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
return jobId;
}
String getJobType() {
public String getJobType() {
return jobType;
}
@ -544,6 +546,19 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
}
}
/**
* Returns the job types that are compatible with a node running on {@code nodeVersion}
* @param nodeVersion the version of the node
* @return the compatible job types
*/
public static Set<String> getCompatibleJobTypes(Version nodeVersion) {
Set<String> compatibleTypes = new HashSet<>();
if (nodeVersion.onOrAfter(Version.V_5_4_0_UNRELEASED)) {
compatibleTypes.add(ANOMALY_DETECTOR_JOB_TYPE);
}
return compatibleTypes;
}
public static class Builder implements Writeable, ToXContent {
private String id;

View File

@ -210,8 +210,15 @@ public class AutodetectProcessManager extends AbstractComponent {
public void openJob(JobTask jobTask, boolean ignoreDowntime, Consumer<Exception> handler) {
String jobId = jobTask.getJobId();
logger.info("Opening job [{}]", jobId);
Job job = jobManager.getJobOrThrowIfUnknown(jobId);
if (job.getJobVersion() == null) {
handler.accept(ExceptionsHelper.badRequestException("Cannot open job [" + jobId
+ "] because jobs created prior to version 5.5 are not supported"));
return;
}
logger.info("Opening job [{}]", jobId);
jobProvider.getAutodetectParams(job, params -> {
// We need to fork, otherwise we restore model state from a network thread (several GET api calls):
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {

View File

@ -42,6 +42,10 @@ public class ExceptionsHelper {
return new ElasticsearchStatusException(msg, RestStatus.CONFLICT, args);
}
public static ElasticsearchStatusException badRequestException(String msg, Object... args) {
return new ElasticsearchStatusException(msg, RestStatus.BAD_REQUEST, args);
}
/**
* Creates an error message that explains there are shard failures, displays info
* for the first failure (shard/reason) and kindly asks to see more info in the logs

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
@ -24,6 +25,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata;
@ -43,8 +45,12 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
import static org.hamcrest.Matchers.containsString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class OpenJobActionTests extends ESTestCase {
@ -64,6 +70,23 @@ public class OpenJobActionTests extends ESTestCase {
assertEquals("Cannot open job [job_id] because it has been marked as deleted", e.getMessage());
}
public void testValidate_jobWithoutVersion() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
Job.Builder jobBuilder = buildJobBuilder("job_id");
mlBuilder.putJob(jobBuilder.build(), false);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> OpenJobAction.validate("job_id", mlBuilder.build()));
assertEquals("Cannot open job [job_id] because jobs created prior to version 5.5 are not supported", e.getMessage());
assertEquals(RestStatus.BAD_REQUEST, e.status());
}
public void testValidate_givenValidJob() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
Job.Builder jobBuilder = buildJobBuilder("job_id");
mlBuilder.putJob(jobBuilder.build(new Date()), false);
OpenJobAction.validate("job_id", mlBuilder.build());
}
public void testSelectLeastLoadedMlNode() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
@ -219,6 +242,68 @@ public class OpenJobActionTests extends ESTestCase {
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
}
public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.build();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("incompatible_type_job", "_node_id1", null, tasksBuilder);
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
Function<String, Job> incompatibleJobCreator = jobId -> {
Job job = mock(Job.class);
when(job.getId()).thenReturn(jobId);
when(job.getJobVersion()).thenReturn(Version.CURRENT);
when(job.getJobType()).thenReturn("incompatible_type");
when(job.getResultsIndexName()).thenReturn("shared");
return job;
};
addJobAndIndices(metaData, routingTable, incompatibleJobCreator, "incompatible_type_job");
cs.nodes(nodes);
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
cs.metaData(metaData);
cs.routingTable(routingTable.build());
Assignment result = OpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", cs.build(), 2, 10, logger);
assertThat(result.getExplanation(), containsString("because this node does not support jobs of type [incompatible_type]"));
assertNull(result.getExecutorNode());
}
public void testSelectLeastLoadedMlNode_noNodesPriorTo_V_5_5() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.V_5_4_0_UNRELEASED))
.add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301),
nodeAttr, Collections.emptySet(), Version.V_5_4_0_UNRELEASED))
.build();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("incompatible_type_job", "_node_id1", null, tasksBuilder);
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addJobAndIndices(metaData, routingTable, "incompatible_type_job");
cs.nodes(nodes);
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
cs.metaData(metaData);
cs.routingTable(routingTable.build());
Assignment result = OpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", cs.build(), 2, 10, logger);
assertThat(result.getExplanation(), containsString("because this node does not support jobs of version [" + Version.CURRENT + "]"));
assertNull(result.getExecutorNode());
}
public void testVerifyIndicesPrimaryShardsAreActive() {
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
@ -263,6 +348,11 @@ public class OpenJobActionTests extends ESTestCase {
}
private void addJobAndIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable, String... jobIds) {
addJobAndIndices(metaData, routingTable, jobId -> BaseMlIntegTestCase.createFareQuoteJob(jobId).build(new Date()), jobIds);
}
private void addJobAndIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable, Function<String, Job> jobCreator,
String... jobIds) {
List<String> indices = new ArrayList<>();
indices.add(AnomalyDetectorsIndex.jobStateIndexName());
indices.add(AnomalyDetectorsIndex.ML_META_INDEX);
@ -288,7 +378,7 @@ public class OpenJobActionTests extends ESTestCase {
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
for (String jobId : jobIds) {
Job job = BaseMlIntegTestCase.createFareQuoteJob(jobId).build(new Date());
Job job = jobCreator.apply(jobId);
mlMetadata.putJob(job, false);
}
metaData.putCustom(MlMetadata.TYPE, mlMetadata.build());

View File

@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
@ -389,6 +390,19 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
assertThat(e.getMessage(), equalTo("A data_description must be set"));
}
public void testGetCompatibleJobTypes_givenVersionBefore_V_5_4() {
assertThat(Job.getCompatibleJobTypes(Version.V_5_0_0).isEmpty(), is(true));
assertThat(Job.getCompatibleJobTypes(Version.V_5_3_0_UNRELEASED).isEmpty(), is(true));
assertThat(Job.getCompatibleJobTypes(Version.V_5_3_2_UNRELEASED).isEmpty(), is(true));
}
public void testGetCompatibleJobTypes_givenVersionAfter_V_5_4() {
assertThat(Job.getCompatibleJobTypes(Version.V_5_4_0_UNRELEASED), contains(Job.ANOMALY_DETECTOR_JOB_TYPE));
assertThat(Job.getCompatibleJobTypes(Version.V_5_4_0_UNRELEASED).size(), equalTo(1));
assertThat(Job.getCompatibleJobTypes(Version.V_5_5_0_UNRELEASED), contains(Job.ANOMALY_DETECTOR_JOB_TYPE));
assertThat(Job.getCompatibleJobTypes(Version.V_5_5_0_UNRELEASED).size(), equalTo(1));
}
public static Job.Builder buildJobBuilder(String id, Date date) {
Job.Builder builder = new Job.Builder(id);
builder.setCreateTime(date);

View File

@ -47,6 +47,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
@ -56,6 +57,7 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ -66,6 +68,9 @@ import static org.elasticsearch.mock.orig.Mockito.times;
import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions;
import static org.elasticsearch.mock.orig.Mockito.when;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
@ -114,6 +119,28 @@ public class AutodetectProcessManagerTests extends ESTestCase {
}).when(jobProvider).getAutodetectParams(any(), any(), any());
}
public void testOpenJob_withoutVersion() {
Client client = mock(Client.class);
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
Job.Builder jobBuilder = new Job.Builder(createJobDetails("no_version"));
jobBuilder.setJobVersion(null);
Job job = jobBuilder.build();
assertThat(job.getJobVersion(), is(nullValue()));
when(jobManager.getJobOrThrowIfUnknown(job.getId())).thenReturn(job);
AutodetectProcessManager manager = createManager(communicator, client);
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn(job.getId());
AtomicReference<Exception> errorHolder = new AtomicReference<>();
manager.openJob(jobTask, false, e -> errorHolder.set(e));
Exception error = errorHolder.get();
assertThat(error, is(notNullValue()));
assertThat(error.getMessage(), equalTo("Cannot open job [no_version] because jobs created prior to version 5.5 are not supported"));
}
public void testOpenJob() {
Client client = mock(Client.class);
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);