diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index afe27ee7bbd..91431d67203 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -127,9 +127,11 @@ public class TransportOpenJobAction extends TransportMasterNodeAction verifyIndicesPrimaryShardsAreActive(String resultsWriteIndex, ClusterState clusterState) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index 915e39abcf3..afa16e3e9be 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; @@ -23,6 +24,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; @@ -48,6 +50,7 @@ import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager; @@ -59,6 +62,8 @@ import org.elasticsearch.xpack.ml.job.JobNodeSelector; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; @@ -374,6 +379,19 @@ public class TransportStartDataFrameAnalyticsAction } } + static List verifyIndicesPrimaryShardsAreActive(ClusterState clusterState, String... indexNames) { + IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(); + String[] concreteIndices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), indexNames); + List unavailableIndices = new ArrayList<>(concreteIndices.length); + for (String index : concreteIndices) { + IndexRoutingTable routingTable = clusterState.getRoutingTable().index(index); + if (routingTable == null || routingTable.allPrimaryShardsActive() == false) { + unavailableIndices.add(index); + } + } + return unavailableIndices; + } + public static class TaskExecutor extends PersistentTasksExecutor { private final Client client; @@ -421,11 +439,20 @@ public class TransportStartDataFrameAnalyticsAction String id = params.getId(); + List unavailableIndices = verifyIndicesPrimaryShardsAreActive(clusterState, AnomalyDetectorsIndex.configIndexName()); + if (unavailableIndices.size() != 0) { + String reason = "Not opening data frame analytics job [" + id + + "], because not all primary shards are active for the following indices [" + String.join(",", unavailableIndices) + "]"; + LOGGER.debug(reason); + return new PersistentTasksCustomMetaData.Assignment(null, reason); + } + boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed(); if (isMemoryTrackerRecentlyRefreshed == false) { boolean scheduledRefresh = memoryTracker.asyncRefresh(); if (scheduledRefresh) { - String reason = "Not opening job [" + id + "] because job memory requirements are stale - refresh requested"; + String reason = "Not opening data frame analytics job [" + id + + "] because job memory requirements are stale - refresh requested"; LOGGER.debug(reason); return new PersistentTasksCustomMetaData.Assignment(null, reason); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java index 11835cdb8db..1aaaf1a65d2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java @@ -435,6 +435,10 @@ public class MlMemoryTracker implements LocalNodeMasterListener { }, e -> { if (e instanceof ResourceNotFoundException) { // TODO: does this also happen if the .ml-config index exists but is unavailable? + // However, note that we wait for the .ml-config index to be available earlier on in the + // job assignment process, so that scenario should be very rare, i.e. somebody has closed + // the .ml-config index (which would be unexpected and unsupported for an internal index) + // during the memory refresh. logger.trace("[{}] anomaly detector job deleted during ML memory update", jobId); } else { logger.error("[" + jobId + "] failed to get anomaly detector job during ML memory update", e); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index cc9a0ba0181..b4bad2e8949 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -202,6 +202,7 @@ public class TransportOpenJobActionTests extends ESTestCase { private void addIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable) { List indices = new ArrayList<>(); + indices.add(AnomalyDetectorsIndex.configIndexName()); indices.add(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX); indices.add(MlMetaIndex.INDEX_NAME); indices.add(AuditorField.NOTIFICATIONS_INDEX); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java new file mode 100644 index 00000000000..a322b92deaa --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; + +import java.util.List; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; + +public class TransportStartDataFrameAnalyticsActionTests extends ESTestCase { + + public void testVerifyIndicesPrimaryShardsAreActive() { + + // At present the only critical index is the config index + String indexName = AnomalyDetectorsIndex.configIndexName(); + + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + + IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName); + indexMetaData.settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + ); + if (indexName.equals(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX)) { + indexMetaData.putAlias(new AliasMetaData.Builder(AnomalyDetectorsIndex.jobStateIndexWriteAlias())); + } + metaData.put(indexMetaData); + Index index = new Index(indexName, "_uuid"); + ShardId shardId = new ShardId(index, 0); + ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); + shardRouting = shardRouting.initialize("node_id", null, 0L); + shardRouting = shardRouting.moveToStarted(); + routingTable.add(IndexRoutingTable.builder(index) + .addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build())); + + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + csBuilder.routingTable(routingTable.build()); + csBuilder.metaData(metaData); + + ClusterState cs = csBuilder.build(); + assertThat(TransportStartDataFrameAnalyticsAction.verifyIndicesPrimaryShardsAreActive(cs, indexName), empty()); + + metaData = new MetaData.Builder(cs.metaData()); + routingTable = new RoutingTable.Builder(cs.routingTable()); + if (randomBoolean()) { + routingTable.remove(indexName); + } else { + index = new Index(indexName, "_uuid"); + shardId = new ShardId(index, 0); + shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); + shardRouting = shardRouting.initialize("node_id", null, 0L); + routingTable.add(IndexRoutingTable.builder(index) + .addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build())); + } + + csBuilder.routingTable(routingTable.build()); + csBuilder.metaData(metaData); + List result = TransportStartDataFrameAnalyticsAction.verifyIndicesPrimaryShardsAreActive(csBuilder.build(), indexName); + assertThat(result, contains(indexName)); + } +}