From b40d5dde9b3f9193fa5435911c52c9e9e2a45b1b Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 21 Feb 2017 21:05:57 +0100 Subject: [PATCH] [ML] Before selecting an executor node for running job task, make sure that all required indices exist and are accessible. Original commit: elastic/x-pack-elasticsearch@5459a0d5f237d5d2d468a25d617280ed4e102835 --- .../xpack/ml/action/OpenJobAction.java | 31 +++- .../xpack/ml/action/OpenJobActionTests.java | 145 +++++++++++++++--- .../integration/BasicDistributedJobsIT.java | 101 +++++++++--- .../xpack/ml/support/BaseMlIntegTestCase.java | 20 +++ 4 files changed, 258 insertions(+), 39 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index f128b010a91..3f01f6aa3b0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; @@ -43,7 +44,10 @@ import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; +import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.JobStateObserver; import org.elasticsearch.xpack.persistent.PersistentActionRegistry; @@ -277,6 +281,7 @@ public class OpenJobAction extends Action reasons = new LinkedList<>(); DiscoveryNode minLoadedNode = null; @@ -433,8 +442,28 @@ public class OpenJobAction extends Action lastTask = createJobTask(5L, "job_id6", "_node_id3", JobState.OPENING); taskMap.put(5L, lastTask); tasks = new PersistentTasksInProgress(6L, taskMap); - cs = ClusterState.builder(new ClusterName("_name")); - cs.nodes(nodes); - cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks)); - result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs.build(), 2, logger); + csBuilder = ClusterState.builder(cs); + csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks)); + cs = csBuilder.build(); + result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger); assertNull("no node selected, because OPENING state", result); taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, false, "_node_id3")); tasks = new PersistentTasksInProgress(6L, taskMap); - cs = ClusterState.builder(new ClusterName("_name")); - cs.nodes(nodes); - cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks)); - result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs.build(), 2, logger); + csBuilder = ClusterState.builder(cs); + csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks)); + cs = csBuilder.build(); + result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger); assertNull("no node selected, because stale task", result); taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, null)); tasks = new PersistentTasksInProgress(6L, taskMap); - cs = ClusterState.builder(new ClusterName("_name")); - cs.nodes(nodes); - cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks)); - result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs.build(), 2, logger); + csBuilder = ClusterState.builder(cs); + csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks)); + cs = csBuilder.build(); + result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger); assertNull("no node selected, because null state", result); } + public void testVerifyIndicesExistAndPrimaryShardsAreActive() { + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addJobAndIndices(metaData, routingTable, "job_id"); + + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + csBuilder.routingTable(routingTable.build()); + csBuilder.metaData(metaData); + + ClusterState cs = csBuilder.build(); + assertTrue(OpenJobAction.verifyIndicesExistAndPrimaryShardsAreActive(logger, "job_id", cs)); + + metaData = new MetaData.Builder(cs.metaData()); + routingTable = new RoutingTable.Builder(cs.routingTable()); + String indexToRemove = randomFrom(cs.metaData().getConcreteAllIndices()); + if (randomBoolean()) { + routingTable.remove(indexToRemove); + } else if (randomBoolean()) { + Index index = new Index(indexToRemove, "_uuid"); + ShardId shardId = new ShardId(index, 0); + ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); + shardRouting = shardRouting.initialize("node_id", null, 0L); + routingTable.add(IndexRoutingTable.builder(index) + .addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build())); + } else { + metaData.remove(indexToRemove); + } + csBuilder.routingTable(routingTable.build()); + csBuilder.metaData(metaData); + assertFalse(OpenJobAction.verifyIndicesExistAndPrimaryShardsAreActive(logger, "job_id", csBuilder.build())); + } + public static PersistentTaskInProgress createJobTask(long id, String jobId, String nodeId, JobState jobState) { PersistentTaskInProgress task = new PersistentTaskInProgress<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), false, true, nodeId); @@ -229,4 +300,38 @@ public class OpenJobActionTests extends ESTestCase { return task; } + private void addJobAndIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable, String... jobIds) { + List indices = new ArrayList<>(); + indices.add(AnomalyDetectorsIndex.jobStateIndexName()); + indices.add(JobProvider.ML_META_INDEX); + indices.add(Auditor.NOTIFICATIONS_INDEX); + for (String jobId : jobIds) { + indices.add(AnomalyDetectorsIndex.jobResultsIndexName(jobId)); + } + for (String indexName : indices) { + IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName); + indexMetaData.settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + ); + metaData.put(indexMetaData); + Index index = new Index(indexName, "_uuid"); + ShardId shardId = new ShardId(index, 0); + ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); + shardRouting = shardRouting.initialize("node_id", null, 0L); + shardRouting = shardRouting.moveToStarted(); + routingTable.add(IndexRoutingTable.builder(index) + .addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build())); + } + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + for (String jobId : jobIds) { + Job job = BaseMlIntegTestCase.createFareQuoteJob(jobId).build(); + mlMetadata.putJob(job, false); + } + metaData.putCustom(MlMetadata.TYPE, mlMetadata.build()); + } + } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index 37734179e98..7896dbe2591 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -5,14 +5,18 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.action.CloseJobAction; import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.ml.action.OpenJobAction; +import org.elasticsearch.xpack.ml.action.PostDataAction; import org.elasticsearch.xpack.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.ml.action.PutJobAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction; @@ -172,14 +176,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { internalCluster().stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ML_ENABLED.getKey(), true)); ensureStableCluster(2); assertBusy(() -> { - // job should get and remain in a failed state: - ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); - PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); - - assertNull(task.getExecutorNode()); - // The status remains to be opened as from ml we didn't had the chance to set the status to failed: - assertEquals(JobState.OPENED, task.getStatus()); + // job should get and remain in a failed state and + // the status remains to be opened as from ml we didn't had the chance to set the status to failed: + assertJobTask("job_id", JobState.OPENED, false); }); logger.info("start ml node"); @@ -187,16 +186,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { ensureStableCluster(3); assertBusy(() -> { // job should be re-opened: - ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); - PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); - - assertNotNull(task.getExecutorNode()); - DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); - Map expectedNodeAttr = new HashMap<>(); - expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); - assertEquals(expectedNodeAttr, node.getAttributes()); - assertEquals(JobState.OPENED, task.getStatus()); + assertJobTask("job_id", JobState.OPENED, true); }); } @@ -305,5 +295,80 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { assertEquals("Expected no violations, but got [" + violations + "]", 0, violations.size()); } + public void testMlIndicesNotAvailable() throws Exception { + internalCluster().ensureAtMostNumDataNodes(0); + // start non ml node, but that will hold the indices + logger.info("Start non ml node:"); + internalCluster().startNode(Settings.builder() + .put("node.data", true) + .put(MachineLearning.ML_ENABLED.getKey(), false)); + ensureStableCluster(1); + logger.info("Starting ml node"); + internalCluster().startNode(Settings.builder() + .put("node.data", false) + .put(MachineLearning.ML_ENABLED.getKey(), true)); + ensureStableCluster(2); + + Job.Builder job = createFareQuoteJob("job_id"); + PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build()); + PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get(); + assertTrue(putJobResponse.isAcknowledged()); + + OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId()); + client().execute(OpenJobAction.INSTANCE, openJobRequest).get(); + + PostDataAction.Request postDataRequest = new PostDataAction.Request("job_id"); + postDataRequest.setContent(new BytesArray( + "{\"airline\":\"AAL\",\"responsetime\":\"132.2046\",\"sourcetype\":\"farequote\",\"time\":\"1403481600\"}\n" + + "{\"airline\":\"JZA\",\"responsetime\":\"990.4628\",\"sourcetype\":\"farequote\",\"time\":\"1403481700\"}" + )); + PostDataAction.Response response = client().execute(PostDataAction.INSTANCE, postDataRequest).actionGet(); + assertEquals(2, response.getDataCounts().getProcessedRecordCount()); + + CloseJobAction.Request closeJobRequest = new CloseJobAction.Request("job_id"); + client().execute(CloseJobAction.INSTANCE, closeJobRequest); + assertBusy(() -> { + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + assertEquals(0, tasks.taskMap().size()); + }); + logger.info("Stop data node"); + internalCluster().stopRandomNode(settings -> settings.getAsBoolean("node.data", true)); + ensureStableCluster(1); + + Exception e = expectThrows(ElasticsearchStatusException.class, + () -> client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet()); + assertEquals("no nodes available to open job [job_id]", e.getMessage()); + + logger.info("Start data node"); + internalCluster().startNode(Settings.builder() + .put("node.data", true) + .put(MachineLearning.ML_ENABLED.getKey(), false)); + ensureStableCluster(2); + client().execute(OpenJobAction.INSTANCE, openJobRequest).get(); + assertBusy(() -> assertJobTask("job_id", JobState.OPENED, true)); + } + + private void assertJobTask(String jobId, JobState expectedState, boolean hasExecutorNode) { + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + assertEquals(1, tasks.taskMap().size()); + PersistentTaskInProgress task = tasks.findTasks(OpenJobAction.NAME, p -> { + return p.getRequest() instanceof OpenJobAction.Request && + jobId.equals(((OpenJobAction.Request) p.getRequest()).getJobId()); + }).iterator().next(); + assertNotNull(task); + + if (hasExecutorNode) { + assertNotNull(task.getExecutorNode()); + DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); + Map expectedNodeAttr = new HashMap<>(); + expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); + assertEquals(expectedNodeAttr, node.getAttributes()); + } else { + assertNull(task.getExecutorNode()); + } + assertEquals(expectedState, task.getStatus()); + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index 38c9f7751d3..51ed91426a1 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -93,6 +93,26 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase { return builder; } + public static Job.Builder createFareQuoteJob(String id) { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setFormat(DataDescription.DataFormat.JSON); + dataDescription.setTimeFormat(DataDescription.EPOCH); + dataDescription.setTimeField("time"); + + Detector.Builder d = new Detector.Builder("metric", "responsetime"); + d.setByFieldName("by_field_name"); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + analysisConfig.setBucketSpan(3600L); + + Job.Builder builder = new Job.Builder(); + builder.setId(id); + builder.setCreateTime(new Date()); + + builder.setAnalysisConfig(analysisConfig); + builder.setDataDescription(dataDescription); + return builder; + } + public static Job.Builder createScheduledJob(String jobId) { DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setFormat(DataDescription.DataFormat.JSON);