From 3ccb48e516b1b61cf7dcf83037543bed84bad71d Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Fri, 26 Apr 2019 10:23:43 -0500 Subject: [PATCH] [ML] data frame, verify primary shards are active for configs index before task start (#41551) (#41580) --- ...FrameTransformPersistentTasksExecutor.java | 35 ++++++++ ...TransformPersistentTasksExecutorTests.java | 88 +++++++++++++++++++ 2 files changed, 123 insertions(+) create mode 100644 x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index 708585a8dc3..01999eff64c 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -11,7 +11,11 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.Nullable; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTaskState; @@ -32,9 +36,12 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.dataframe.DataFrame; import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService; import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; +import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -67,6 +74,34 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx this.threadPool = threadPool; } + @Override + public PersistentTasksCustomMetaData.Assignment getAssignment(DataFrameTransform params, ClusterState clusterState) { + List unavailableIndices = verifyIndicesPrimaryShardsAreActive(clusterState); + if (unavailableIndices.size() != 0) { + String reason = "Not starting data frame transform [" + params.getId() + "], " + + "because not all primary shards are active for the following indices [" + + String.join(",", unavailableIndices) + "]"; + logger.debug(reason); + return new PersistentTasksCustomMetaData.Assignment(null, reason); + } + return super.getAssignment(params, clusterState); + } + + static List verifyIndicesPrimaryShardsAreActive(ClusterState clusterState) { + IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(); + String[] indices = resolver.concreteIndexNames(clusterState, + IndicesOptions.lenientExpandOpen(), + DataFrameInternalIndex.INDEX_TEMPLATE_PATTERN + "*"); + List unavailableIndices = new ArrayList<>(indices.length); + for (String index : indices) { + IndexRoutingTable routingTable = clusterState.getRoutingTable().index(index); + if (routingTable == null || routingTable.allPrimaryShardsActive() == false) { + unavailableIndices.add(index); + } + } + return unavailableIndices; + } + @Override protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTransform params, PersistentTaskState state) { final String transformId = params.getId(); diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java new file mode 100644 index 00000000000..9e29fef1b5a --- /dev/null +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java @@ -0,0 +1,88 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.dataframe.transforms; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; + +import java.util.ArrayList; +import java.util.List; + +public class DataFrameTransformPersistentTasksExecutorTests extends ESTestCase { + + public void testVerifyIndicesPrimaryShardsAreActive() { + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addIndices(metaData, routingTable); + + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + csBuilder.routingTable(routingTable.build()); + csBuilder.metaData(metaData); + + ClusterState cs = csBuilder.build(); + assertEquals(0, DataFrameTransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(cs).size()); + + metaData = new MetaData.Builder(cs.metaData()); + routingTable = new RoutingTable.Builder(cs.routingTable()); + String indexToRemove = DataFrameInternalIndex.INDEX_NAME; + if (randomBoolean()) { + routingTable.remove(indexToRemove); + } else { + Index index = new Index(indexToRemove, "_uuid"); + ShardId shardId = new ShardId(index, 0); + ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); + shardRouting = shardRouting.initialize("node_id", null, 0L); + routingTable.add(IndexRoutingTable.builder(index) + .addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build())); + } + + csBuilder.routingTable(routingTable.build()); + csBuilder.metaData(metaData); + List result = DataFrameTransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(csBuilder.build()); + assertEquals(1, result.size()); + assertEquals(indexToRemove, result.get(0)); + } + + private void addIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable) { + List indices = new ArrayList<>(); + indices.add(DataFrameInternalIndex.AUDIT_INDEX); + indices.add(DataFrameInternalIndex.INDEX_NAME); + for (String indexName : indices) { + IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName); + indexMetaData.settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + ); + metaData.put(indexMetaData); + Index index = new Index(indexName, "_uuid"); + ShardId shardId = new ShardId(index, 0); + ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); + shardRouting = shardRouting.initialize("node_id", null, 0L); + shardRouting = shardRouting.moveToStarted(); + routingTable.add(IndexRoutingTable.builder(index) + .addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build())); + } + } + +}