[ML] data frame, verify primary shards are active for configs index before task start (#41551) (#41580)

This commit is contained in:
Benjamin Trent 2019-04-26 10:23:43 -05:00 committed by GitHub
parent f8081e8558
commit 3ccb48e516
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 123 additions and 0 deletions

View File

@ -11,7 +11,11 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTaskState;
@ -32,9 +36,12 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.dataframe.DataFrame;
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -67,6 +74,34 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
this.threadPool = threadPool;
}
@Override
public PersistentTasksCustomMetaData.Assignment getAssignment(DataFrameTransform params, ClusterState clusterState) {
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(clusterState);
if (unavailableIndices.size() != 0) {
String reason = "Not starting data frame transform [" + params.getId() + "], " +
"because not all primary shards are active for the following indices [" +
String.join(",", unavailableIndices) + "]";
logger.debug(reason);
return new PersistentTasksCustomMetaData.Assignment(null, reason);
}
return super.getAssignment(params, clusterState);
}
static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterState) {
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
String[] indices = resolver.concreteIndexNames(clusterState,
IndicesOptions.lenientExpandOpen(),
DataFrameInternalIndex.INDEX_TEMPLATE_PATTERN + "*");
List<String> unavailableIndices = new ArrayList<>(indices.length);
for (String index : indices) {
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(index);
if (routingTable == null || routingTable.allPrimaryShardsActive() == false) {
unavailableIndices.add(index);
}
}
return unavailableIndices;
}
@Override
protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTransform params, PersistentTaskState state) {
final String transformId = params.getId();

View File

@ -0,0 +1,88 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.transforms;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import java.util.ArrayList;
import java.util.List;
public class DataFrameTransformPersistentTasksExecutorTests extends ESTestCase {
public void testVerifyIndicesPrimaryShardsAreActive() {
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addIndices(metaData, routingTable);
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
csBuilder.routingTable(routingTable.build());
csBuilder.metaData(metaData);
ClusterState cs = csBuilder.build();
assertEquals(0, DataFrameTransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(cs).size());
metaData = new MetaData.Builder(cs.metaData());
routingTable = new RoutingTable.Builder(cs.routingTable());
String indexToRemove = DataFrameInternalIndex.INDEX_NAME;
if (randomBoolean()) {
routingTable.remove(indexToRemove);
} else {
Index index = new Index(indexToRemove, "_uuid");
ShardId shardId = new ShardId(index, 0);
ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""));
shardRouting = shardRouting.initialize("node_id", null, 0L);
routingTable.add(IndexRoutingTable.builder(index)
.addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build()));
}
csBuilder.routingTable(routingTable.build());
csBuilder.metaData(metaData);
List<String> result = DataFrameTransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(csBuilder.build());
assertEquals(1, result.size());
assertEquals(indexToRemove, result.get(0));
}
private void addIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable) {
List<String> indices = new ArrayList<>();
indices.add(DataFrameInternalIndex.AUDIT_INDEX);
indices.add(DataFrameInternalIndex.INDEX_NAME);
for (String indexName : indices) {
IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName);
indexMetaData.settings(Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
);
metaData.put(indexMetaData);
Index index = new Index(indexName, "_uuid");
ShardId shardId = new ShardId(index, 0);
ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""));
shardRouting = shardRouting.initialize("node_id", null, 0L);
shardRouting = shardRouting.moveToStarted();
routingTable.add(IndexRoutingTable.builder(index)
.addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build()));
}
}
}