diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index c4f9fc425c5..a56d074499b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestBuilder; @@ -59,6 +60,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Persiste import org.elasticsearch.xpack.persistent.PersistentTasksExecutor; import org.elasticsearch.xpack.persistent.PersistentTasksService; import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; +import org.elasticsearch.xpack.security.support.Exceptions; import java.io.IOException; import java.util.ArrayList; @@ -66,6 +68,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.function.Predicate; import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE; @@ -538,8 +541,12 @@ public class OpenJobAction extends Action + *
  • check job exists
  • + *
  • check job is not marked as deleted
  • + *
  • check job's version is supported
  • + * */ static void validate(String jobId, MlMetadata mlMetadata) { Job job = mlMetadata.getJobs().get(jobId); @@ -549,6 +556,10 @@ public class OpenJobAction extends Action compatibleJobTypes = Job.getCompatibleJobTypes(node.getVersion()); + if (compatibleJobTypes.contains(job.getJobType()) == false) { + String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node does not support jobs of type [" + + job.getJobType() + "]"; + logger.trace(reason); + reasons.add(reason); + continue; + } + + if (nodeSupportsJobVersion(node.getVersion(), job.getJobVersion()) == false) { + String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node does not support jobs of version [" + + job.getJobVersion() + "]"; + logger.trace(reason); + reasons.add(reason); + continue; + } + long numberOfAssignedJobs; int numberOfAllocatingJobs; if (persistentTasks != null) { @@ -646,4 +676,8 @@ public class OpenJobAction extends Action implements Writeable, ToXContent return jobId; } - String getJobType() { + public String getJobType() { return jobType; } @@ -544,6 +546,19 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent } } + /** + * Returns the job types that are compatible with a node running on {@code nodeVersion} + * @param nodeVersion the version of the node + * @return the compatible job types + */ + public static Set getCompatibleJobTypes(Version nodeVersion) { + Set compatibleTypes = new HashSet<>(); + if (nodeVersion.onOrAfter(Version.V_5_4_0_UNRELEASED)) { + compatibleTypes.add(ANOMALY_DETECTOR_JOB_TYPE); + } + return compatibleTypes; + } + public static class Builder implements Writeable, ToXContent { private String id; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 3c6b281e6ca..70f9ddb5a3c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -210,8 +210,15 @@ public class AutodetectProcessManager extends AbstractComponent { public void openJob(JobTask jobTask, boolean ignoreDowntime, Consumer handler) { String jobId = jobTask.getJobId(); - logger.info("Opening job [{}]", jobId); Job job = jobManager.getJobOrThrowIfUnknown(jobId); + + if (job.getJobVersion() == null) { + handler.accept(ExceptionsHelper.badRequestException("Cannot open job [" + jobId + + "] because jobs created prior to version 5.5 are not supported")); + return; + } + + logger.info("Opening job [{}]", jobId); jobProvider.getAutodetectParams(job, params -> { // We need to fork, otherwise we restore model state from a network thread (several GET api calls): threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/ExceptionsHelper.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/ExceptionsHelper.java index 9d4e7c7c048..34bf730585a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/ExceptionsHelper.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/ExceptionsHelper.java @@ -42,6 +42,10 @@ public class ExceptionsHelper { return new ElasticsearchStatusException(msg, RestStatus.CONFLICT, args); } + public static ElasticsearchStatusException badRequestException(String msg, Object... args) { + return new ElasticsearchStatusException(msg, RestStatus.BAD_REQUEST, args); + } + /** * Creates an error message that explains there are shard failures, displays info * for the first failure (shard/reason) and kindly asks to see more info in the logs diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java index b8c4adf2a62..e89f8ae3404 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.action; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; @@ -24,6 +25,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MlMetadata; @@ -43,8 +45,12 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; +import static org.hamcrest.Matchers.containsString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class OpenJobActionTests extends ESTestCase { @@ -64,6 +70,23 @@ public class OpenJobActionTests extends ESTestCase { assertEquals("Cannot open job [job_id] because it has been marked as deleted", e.getMessage()); } + public void testValidate_jobWithoutVersion() { + MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); + Job.Builder jobBuilder = buildJobBuilder("job_id"); + mlBuilder.putJob(jobBuilder.build(), false); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> OpenJobAction.validate("job_id", mlBuilder.build())); + assertEquals("Cannot open job [job_id] because jobs created prior to version 5.5 are not supported", e.getMessage()); + assertEquals(RestStatus.BAD_REQUEST, e.status()); + } + + public void testValidate_givenValidJob() { + MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); + Job.Builder jobBuilder = buildJobBuilder("job_id"); + mlBuilder.putJob(jobBuilder.build(new Date()), false); + OpenJobAction.validate("job_id", mlBuilder.build()); + } + public void testSelectLeastLoadedMlNode() { Map nodeAttr = new HashMap<>(); nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); @@ -219,6 +242,68 @@ public class OpenJobActionTests extends ESTestCase { assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } + public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() { + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .build(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("incompatible_type_job", "_node_id1", null, tasksBuilder); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + Function incompatibleJobCreator = jobId -> { + Job job = mock(Job.class); + when(job.getId()).thenReturn(jobId); + when(job.getJobVersion()).thenReturn(Version.CURRENT); + when(job.getJobType()).thenReturn("incompatible_type"); + when(job.getResultsIndexName()).thenReturn("shared"); + return job; + }; + addJobAndIndices(metaData, routingTable, incompatibleJobCreator, "incompatible_type_job"); + cs.nodes(nodes); + metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); + cs.metaData(metaData); + cs.routingTable(routingTable.build()); + Assignment result = OpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", cs.build(), 2, 10, logger); + assertThat(result.getExplanation(), containsString("because this node does not support jobs of type [incompatible_type]")); + assertNull(result.getExecutorNode()); + } + + public void testSelectLeastLoadedMlNode_noNodesPriorTo_V_5_5() { + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, Collections.emptySet(), Version.V_5_4_0_UNRELEASED)) + .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), + nodeAttr, Collections.emptySet(), Version.V_5_4_0_UNRELEASED)) + .build(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("incompatible_type_job", "_node_id1", null, tasksBuilder); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addJobAndIndices(metaData, routingTable, "incompatible_type_job"); + cs.nodes(nodes); + metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); + cs.metaData(metaData); + cs.routingTable(routingTable.build()); + Assignment result = OpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", cs.build(), 2, 10, logger); + assertThat(result.getExplanation(), containsString("because this node does not support jobs of version [" + Version.CURRENT + "]")); + assertNull(result.getExecutorNode()); + } + public void testVerifyIndicesPrimaryShardsAreActive() { MetaData.Builder metaData = MetaData.builder(); RoutingTable.Builder routingTable = RoutingTable.builder(); @@ -263,6 +348,11 @@ public class OpenJobActionTests extends ESTestCase { } private void addJobAndIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable, String... jobIds) { + addJobAndIndices(metaData, routingTable, jobId -> BaseMlIntegTestCase.createFareQuoteJob(jobId).build(new Date()), jobIds); + } + + private void addJobAndIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable, Function jobCreator, + String... jobIds) { List indices = new ArrayList<>(); indices.add(AnomalyDetectorsIndex.jobStateIndexName()); indices.add(AnomalyDetectorsIndex.ML_META_INDEX); @@ -288,7 +378,7 @@ public class OpenJobActionTests extends ESTestCase { MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); for (String jobId : jobIds) { - Job job = BaseMlIntegTestCase.createFareQuoteJob(jobId).build(new Date()); + Job job = jobCreator.apply(jobId); mlMetadata.putJob(job, false); } metaData.putCustom(MlMetadata.TYPE, mlMetadata.build()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java index d83669c07da..33000cf20b1 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -389,6 +390,19 @@ public class JobTests extends AbstractSerializingTestCase { assertThat(e.getMessage(), equalTo("A data_description must be set")); } + public void testGetCompatibleJobTypes_givenVersionBefore_V_5_4() { + assertThat(Job.getCompatibleJobTypes(Version.V_5_0_0).isEmpty(), is(true)); + assertThat(Job.getCompatibleJobTypes(Version.V_5_3_0_UNRELEASED).isEmpty(), is(true)); + assertThat(Job.getCompatibleJobTypes(Version.V_5_3_2_UNRELEASED).isEmpty(), is(true)); + } + + public void testGetCompatibleJobTypes_givenVersionAfter_V_5_4() { + assertThat(Job.getCompatibleJobTypes(Version.V_5_4_0_UNRELEASED), contains(Job.ANOMALY_DETECTOR_JOB_TYPE)); + assertThat(Job.getCompatibleJobTypes(Version.V_5_4_0_UNRELEASED).size(), equalTo(1)); + assertThat(Job.getCompatibleJobTypes(Version.V_5_5_0_UNRELEASED), contains(Job.ANOMALY_DETECTOR_JOB_TYPE)); + assertThat(Job.getCompatibleJobTypes(Version.V_5_5_0_UNRELEASED).size(), equalTo(1)); + } + public static Job.Builder buildJobBuilder(String id, Date date) { Job.Builder builder = new Job.Builder(id); builder.setCreateTime(date); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 9b41f81d9ef..ccf82fe8c40 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -47,6 +47,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashSet; @@ -56,6 +57,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -66,6 +68,9 @@ import static org.elasticsearch.mock.orig.Mockito.times; import static org.elasticsearch.mock.orig.Mockito.verify; import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions; import static org.elasticsearch.mock.orig.Mockito.when; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.IsEqual.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; @@ -114,6 +119,28 @@ public class AutodetectProcessManagerTests extends ESTestCase { }).when(jobProvider).getAutodetectParams(any(), any(), any()); } + public void testOpenJob_withoutVersion() { + Client client = mock(Client.class); + AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); + Job.Builder jobBuilder = new Job.Builder(createJobDetails("no_version")); + jobBuilder.setJobVersion(null); + Job job = jobBuilder.build(); + assertThat(job.getJobVersion(), is(nullValue())); + + when(jobManager.getJobOrThrowIfUnknown(job.getId())).thenReturn(job); + AutodetectProcessManager manager = createManager(communicator, client); + + JobTask jobTask = mock(JobTask.class); + when(jobTask.getJobId()).thenReturn(job.getId()); + + AtomicReference errorHolder = new AtomicReference<>(); + manager.openJob(jobTask, false, e -> errorHolder.set(e)); + + Exception error = errorHolder.get(); + assertThat(error, is(notNullValue())); + assertThat(error.getMessage(), equalTo("Cannot open job [no_version] because jobs created prior to version 5.5 are not supported")); + } + public void testOpenJob() { Client client = mock(Client.class); AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);