[ML] Wait for .ml-config primary before assigning persistent tasks (#44170)
Now that ML job configs are stored in an index rather than cluster state, availability of the .ml-config index is very important to the operation of ML. When a cluster starts up the ML persistent tasks will be considered for node assignment very early on. It is best in this case if assignment is deferred until after the .ml-config index is available. The introduction of data frame analytics jobs has made this problem worse, because anomaly detection jobs already waited for the primary shards of the .ml-state, .ml-anomalies-shared and .ml-meta indices to be available before doing node assignment, and by coincidence this would probably lead to the primary shards of .ml-config also being searchable. But data frame analytics jobs had no other index checks prior to this change. This fixes problem 2 of #44156
This commit is contained in:
parent
c0ed64bb92
commit
5886aefeed
|
@ -127,9 +127,11 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
|
|||
|
||||
static String[] indicesOfInterest(String resultsIndex) {
|
||||
if (resultsIndex == null) {
|
||||
return new String[]{AnomalyDetectorsIndex.jobStateIndexPattern(), MlMetaIndex.INDEX_NAME};
|
||||
return new String[]{AnomalyDetectorsIndex.jobStateIndexPattern(), MlMetaIndex.INDEX_NAME,
|
||||
AnomalyDetectorsIndex.configIndexName()};
|
||||
}
|
||||
return new String[]{AnomalyDetectorsIndex.jobStateIndexPattern(), resultsIndex, MlMetaIndex.INDEX_NAME};
|
||||
return new String[]{AnomalyDetectorsIndex.jobStateIndexPattern(), resultsIndex, MlMetaIndex.INDEX_NAME,
|
||||
AnomalyDetectorsIndex.configIndexName()};
|
||||
}
|
||||
|
||||
static List<String> verifyIndicesPrimaryShardsAreActive(String resultsWriteIndex, ClusterState clusterState) {
|
||||
|
|
|
@ -15,6 +15,7 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.client.Client;
|
||||
|
@ -23,6 +24,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
|
|||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -48,6 +50,7 @@ import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
|
|||
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
|
||||
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
|
||||
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
|
||||
|
@ -59,6 +62,8 @@ import org.elasticsearch.xpack.ml.job.JobNodeSelector;
|
|||
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
@ -374,6 +379,19 @@ public class TransportStartDataFrameAnalyticsAction
|
|||
}
|
||||
}
|
||||
|
||||
static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterState, String... indexNames) {
|
||||
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
|
||||
String[] concreteIndices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), indexNames);
|
||||
List<String> unavailableIndices = new ArrayList<>(concreteIndices.length);
|
||||
for (String index : concreteIndices) {
|
||||
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(index);
|
||||
if (routingTable == null || routingTable.allPrimaryShardsActive() == false) {
|
||||
unavailableIndices.add(index);
|
||||
}
|
||||
}
|
||||
return unavailableIndices;
|
||||
}
|
||||
|
||||
public static class TaskExecutor extends PersistentTasksExecutor<StartDataFrameAnalyticsAction.TaskParams> {
|
||||
|
||||
private final Client client;
|
||||
|
@ -421,11 +439,20 @@ public class TransportStartDataFrameAnalyticsAction
|
|||
|
||||
String id = params.getId();
|
||||
|
||||
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(clusterState, AnomalyDetectorsIndex.configIndexName());
|
||||
if (unavailableIndices.size() != 0) {
|
||||
String reason = "Not opening data frame analytics job [" + id +
|
||||
"], because not all primary shards are active for the following indices [" + String.join(",", unavailableIndices) + "]";
|
||||
LOGGER.debug(reason);
|
||||
return new PersistentTasksCustomMetaData.Assignment(null, reason);
|
||||
}
|
||||
|
||||
boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
|
||||
if (isMemoryTrackerRecentlyRefreshed == false) {
|
||||
boolean scheduledRefresh = memoryTracker.asyncRefresh();
|
||||
if (scheduledRefresh) {
|
||||
String reason = "Not opening job [" + id + "] because job memory requirements are stale - refresh requested";
|
||||
String reason = "Not opening data frame analytics job [" + id +
|
||||
"] because job memory requirements are stale - refresh requested";
|
||||
LOGGER.debug(reason);
|
||||
return new PersistentTasksCustomMetaData.Assignment(null, reason);
|
||||
}
|
||||
|
|
|
@ -435,6 +435,10 @@ public class MlMemoryTracker implements LocalNodeMasterListener {
|
|||
}, e -> {
|
||||
if (e instanceof ResourceNotFoundException) {
|
||||
// TODO: does this also happen if the .ml-config index exists but is unavailable?
|
||||
// However, note that we wait for the .ml-config index to be available earlier on in the
|
||||
// job assignment process, so that scenario should be very rare, i.e. somebody has closed
|
||||
// the .ml-config index (which would be unexpected and unsupported for an internal index)
|
||||
// during the memory refresh.
|
||||
logger.trace("[{}] anomaly detector job deleted during ML memory update", jobId);
|
||||
} else {
|
||||
logger.error("[" + jobId + "] failed to get anomaly detector job during ML memory update", e);
|
||||
|
|
|
@ -202,6 +202,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|||
|
||||
private void addIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable) {
|
||||
List<String> indices = new ArrayList<>();
|
||||
indices.add(AnomalyDetectorsIndex.configIndexName());
|
||||
indices.add(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX);
|
||||
indices.add(MlMetaIndex.INDEX_NAME);
|
||||
indices.add(AuditorField.NOTIFICATIONS_INDEX);
|
||||
|
|
|
@ -0,0 +1,87 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.action;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RecoverySource;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
|
||||
public class TransportStartDataFrameAnalyticsActionTests extends ESTestCase {
|
||||
|
||||
public void testVerifyIndicesPrimaryShardsAreActive() {
|
||||
|
||||
// At present the only critical index is the config index
|
||||
String indexName = AnomalyDetectorsIndex.configIndexName();
|
||||
|
||||
MetaData.Builder metaData = MetaData.builder();
|
||||
RoutingTable.Builder routingTable = RoutingTable.builder();
|
||||
|
||||
IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName);
|
||||
indexMetaData.settings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
);
|
||||
if (indexName.equals(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX)) {
|
||||
indexMetaData.putAlias(new AliasMetaData.Builder(AnomalyDetectorsIndex.jobStateIndexWriteAlias()));
|
||||
}
|
||||
metaData.put(indexMetaData);
|
||||
Index index = new Index(indexName, "_uuid");
|
||||
ShardId shardId = new ShardId(index, 0);
|
||||
ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE,
|
||||
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""));
|
||||
shardRouting = shardRouting.initialize("node_id", null, 0L);
|
||||
shardRouting = shardRouting.moveToStarted();
|
||||
routingTable.add(IndexRoutingTable.builder(index)
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build()));
|
||||
|
||||
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
|
||||
csBuilder.routingTable(routingTable.build());
|
||||
csBuilder.metaData(metaData);
|
||||
|
||||
ClusterState cs = csBuilder.build();
|
||||
assertThat(TransportStartDataFrameAnalyticsAction.verifyIndicesPrimaryShardsAreActive(cs, indexName), empty());
|
||||
|
||||
metaData = new MetaData.Builder(cs.metaData());
|
||||
routingTable = new RoutingTable.Builder(cs.routingTable());
|
||||
if (randomBoolean()) {
|
||||
routingTable.remove(indexName);
|
||||
} else {
|
||||
index = new Index(indexName, "_uuid");
|
||||
shardId = new ShardId(index, 0);
|
||||
shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE,
|
||||
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""));
|
||||
shardRouting = shardRouting.initialize("node_id", null, 0L);
|
||||
routingTable.add(IndexRoutingTable.builder(index)
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build()));
|
||||
}
|
||||
|
||||
csBuilder.routingTable(routingTable.build());
|
||||
csBuilder.metaData(metaData);
|
||||
List<String> result = TransportStartDataFrameAnalyticsAction.verifyIndicesPrimaryShardsAreActive(csBuilder.build(), indexName);
|
||||
assertThat(result, contains(indexName));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue