From e2a30331ba7528adb17260a7ce46f69ca0133db9 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Thu, 6 Apr 2017 18:14:51 +0000 Subject: [PATCH] Refactor StartDataFeed to more intelligently wait on shard status (elastic/x-pack-elasticsearch#974) Rather than using an async call, this leverages the Assignment logic while selecting nodes. Now with 300% more tests! Original commit: elastic/x-pack-elasticsearch@300d628f72228a322d78749cf150aa5874406c81 --- .../xpack/ml/action/StartDatafeedAction.java | 73 +++--- .../xpack/ml/action/DatafeedJobsIT.java | 8 +- .../ml/action/StartDatafeedActionTests.java | 235 ++++++++++++++++-- 3 files changed, 268 insertions(+), 48 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index 7d547810f99..c2741a646ae 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -12,16 +12,16 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ValidateActions; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.Inject; @@ -62,7 +62,6 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Persiste import org.elasticsearch.xpack.persistent.PersistentTasksExecutor; import org.elasticsearch.xpack.persistent.PersistentTasksService; import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; -import org.elasticsearch.xpack.security.InternalClient; import java.io.IOException; import java.util.List; @@ -343,19 +342,14 @@ public class StartDatafeedAction private final XPackLicenseState licenseState; private final PersistentTasksService persistentTasksService; - private final InternalClient client; - private final ClusterService clusterService; @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, XPackLicenseState licenseState, PersistentTasksService persistentTasksService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, InternalClient client, - ClusterService clusterService) { + IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new); this.licenseState = licenseState; this.persistentTasksService = persistentTasksService; - this.client = client; - this.clusterService = clusterService; } @Override @@ -364,7 +358,7 @@ public class StartDatafeedAction ActionListener> finalListener = new ActionListener>() { @Override public void onResponse(PersistentTask persistentTask) { - waitForYellow(persistentTask.getId(), request, listener); + waitForDatafeedStarted(persistentTask.getId(), request, listener); } @Override @@ -378,22 +372,6 @@ public class StartDatafeedAction } } - void waitForYellow(long taskId, Request request, ActionListener listener) { - ClusterState state = clusterService.state(); - MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); - DatafeedConfig config = mlMetadata.getDatafeed(request.getDatafeedId()); - List indices = config.getIndexes(); - if (!indices.isEmpty()) { - ClusterHealthRequest healthRequest = new ClusterHealthRequest(indices.toArray(new String[]{})); - healthRequest.waitForYellowStatus(); - client.admin().cluster().health(healthRequest, ActionListener.wrap(clusterHealthResponse -> { - waitForDatafeedStarted(taskId, request, listener); - }, listener::onFailure)); - } else { - waitForDatafeedStarted(taskId, request, listener); - } - } - void waitForDatafeedStarted(long taskId, Request request, ActionListener listener) { Predicate> predicate = persistentTask -> { if (persistentTask == null) { @@ -422,6 +400,7 @@ public class StartDatafeedAction private final XPackLicenseState licenseState; private final Auditor auditor; private final ThreadPool threadPool; + private final IndexNameExpressionResolver resolver; public StartDatafeedPersistentTasksExecutor(Settings settings, ThreadPool threadPool, XPackLicenseState licenseState, DatafeedManager datafeedManager, Auditor auditor) { @@ -430,11 +409,12 @@ public class StartDatafeedAction this.datafeedManager = datafeedManager; this.auditor = auditor; this.threadPool = threadPool; + this.resolver = new IndexNameExpressionResolver(settings); } @Override public Assignment getAssignment(Request request, ClusterState clusterState) { - Assignment assignment = selectNode(logger, request.getDatafeedId(), clusterState); + Assignment assignment = selectNode(logger, request.getDatafeedId(), clusterState, resolver); writeAssignmentNotification(request.getDatafeedId(), assignment, clusterState); return assignment; } @@ -514,7 +494,8 @@ public class StartDatafeedAction } } - static Assignment selectNode(Logger logger, String datafeedId, ClusterState clusterState) { + static Assignment selectNode(Logger logger, String datafeedId, ClusterState clusterState, + IndexNameExpressionResolver resolver) { MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE); PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); @@ -539,7 +520,43 @@ public class StartDatafeedAction logger.debug(reason); return new Assignment(null, reason); } + String reason = verifyIndicesActive(logger, datafeed, clusterState, resolver); + if (reason != null) { + return new Assignment(null, reason); + } return new Assignment(jobTask.getExecutorNode(), ""); } + private static String verifyIndicesActive(Logger logger, DatafeedConfig datafeed, ClusterState clusterState, + IndexNameExpressionResolver resolver) { + List indices = datafeed.getIndexes(); + for (String index : indices) { + String[] concreteIndices; + String reason = "cannot start datafeed [" + datafeed.getId() + "] because index [" + + index + "] does not exist, is closed, or is still initializing."; + + try { + concreteIndices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), index); + if (concreteIndices.length == 0) { + logger.debug(reason); + return reason; + } + } catch (Exception e) { + logger.debug(reason); + return reason; + } + + for (String concreteIndex : concreteIndices) { + IndexRoutingTable routingTable = clusterState.getRoutingTable().index(concreteIndex); + if (routingTable == null || !routingTable.allPrimaryShardsActive()) { + reason = "cannot start datafeed [" + datafeed.getId() + "] because index [" + + concreteIndex + "] does not have all primary shards active yet."; + logger.debug(reason); + return reason; + } + } + } + return null; + } + } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java index a92e919c69d..1fdc322fca4 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.action; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads; import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse; import org.elasticsearch.client.Client; @@ -128,6 +130,7 @@ public class DatafeedJobsIT extends SecurityIntegTestCase { client().admin().indices().prepareCreate("data-2") .addMapping("type", "time", "type=date") .get(); + ClusterHealthResponse r = client().admin().cluster().prepareHealth("data-1", "data-2").setWaitForYellowStatus().get(); long numDocs2 = randomIntBetween(32, 2048); indexDocs(logger, "data-2", numDocs2, oneWeekAgo, now); @@ -142,7 +145,10 @@ public class DatafeedJobsIT extends SecurityIntegTestCase { assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED); }); - DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList("data-*")); + List t = new ArrayList<>(2); + t.add("data-1"); + t.add("data-2"); + DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), t); PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(datafeedConfig); PutDatafeedAction.Response putDatafeedResponse = client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).get(); assertTrue(putDatafeedResponse.isAcknowledged()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java index 7ba9c05896a..f7a12765e51 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java @@ -10,10 +10,23 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.MlMetadata; @@ -28,9 +41,11 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignme import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.net.InetAddress; +import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; @@ -42,10 +57,17 @@ import static org.hamcrest.Matchers.equalTo; public class StartDatafeedActionTests extends ESTestCase { public void testSelectNode() throws Exception { + IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY); + IndexMetaData indexMetaData = IndexMetaData.builder("foo") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadata.putJob(job, false); - mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*"))); + mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED); PersistentTask task = createJobTask(0L, job.getId(), "node_id", jobState); @@ -57,11 +79,14 @@ public class StartDatafeedActionTests extends ESTestCase { .build(); ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasks)) - .nodes(nodes); + .metaData(new MetaData.Builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) + .put(indexMetaData, false)) + .nodes(nodes) + .routingTable(generateRoutingTable(indexMetaData)); - Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build()); + Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver); assertNull(result.getExecutorNode()); assertEquals("cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState + "] while state [opened] is required", result.getExplanation()); @@ -69,18 +94,144 @@ public class StartDatafeedActionTests extends ESTestCase { task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); cs = ClusterState.builder(new ClusterName("cluster_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasks)) - .nodes(nodes); - result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build()); + .metaData(new MetaData.Builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) + .put(indexMetaData, false)) + .nodes(nodes) + .routingTable(generateRoutingTable(indexMetaData)); + result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver); assertEquals("node_id", result.getExecutorNode()); } - public void testSelectNode_jobTaskStale() { + public void testShardUnassigned() throws Exception { + IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY); + IndexMetaData indexMetaData = IndexMetaData.builder("foo") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadata.putJob(job, false); - mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*"))); + + // Using wildcard index name to test for index resolving as well + mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*"))); + + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) + .build(); + + PersistentTask task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); + PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); + + List> states = new ArrayList<>(2); + states.add(new Tuple<>(0, ShardRoutingState.UNASSIGNED)); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) + .metaData(new MetaData.Builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) + .put(indexMetaData, false)) + .nodes(nodes) + .routingTable(generateRoutingTable(indexMetaData, states)); + + Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver); + assertNull(result.getExecutorNode()); + assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " + + "does not have all primary shards active yet.")); + } + + public void testShardNotAllActive() throws Exception { + IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY); + IndexMetaData indexMetaData = IndexMetaData.builder("foo") + .settings(settings(Version.CURRENT)) + .numberOfShards(2) + .numberOfReplicas(0) + .build(); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + Job job = createScheduledJob("job_id").build(new Date()); + mlMetadata.putJob(job, false); + + // Using wildcard index name to test for index resolving as well + mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*"))); + + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) + .build(); + + PersistentTask task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); + PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); + + List> states = new ArrayList<>(2); + states.add(new Tuple<>(0, ShardRoutingState.STARTED)); + states.add(new Tuple<>(1, ShardRoutingState.INITIALIZING)); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) + .metaData(new MetaData.Builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) + .put(indexMetaData, false)) + .nodes(nodes) + .routingTable(generateRoutingTable(indexMetaData, states)); + + Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver); + assertNull(result.getExecutorNode()); + assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " + + "does not have all primary shards active yet.")); + } + + public void testIndexDoesntExist() throws Exception { + IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY); + IndexMetaData indexMetaData = IndexMetaData.builder("foo") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + Job job = createScheduledJob("job_id").build(new Date()); + mlMetadata.putJob(job, false); + mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo"))); + + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) + .build(); + + PersistentTask task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); + PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) + .metaData(new MetaData.Builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) + .put(indexMetaData, false)) + .nodes(nodes) + .routingTable(generateRoutingTable(indexMetaData)); + + Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver); + assertNull(result.getExecutorNode()); + assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [not_foo] " + + "does not exist, is closed, or is still initializing.")); + } + + public void testSelectNode_jobTaskStale() { + IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY); + IndexMetaData indexMetaData = IndexMetaData.builder("foo") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + Job job = createScheduledJob("job_id").build(new Date()); + mlMetadata.putJob(job, false); + mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); String nodeId = randomBoolean() ? "node_id2" : null; PersistentTask task = createJobTask(0L, job.getId(), nodeId, JobState.OPENED); @@ -92,11 +243,14 @@ public class StartDatafeedActionTests extends ESTestCase { .build(); ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasks)) - .nodes(nodes); + .metaData(new MetaData.Builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) + .put(indexMetaData, false)) + .nodes(nodes) + .routingTable(generateRoutingTable(indexMetaData)); - Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build()); + Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver); assertNull(result.getExecutorNode()); assertEquals("cannot start datafeed [datafeed_id], job [job_id] is unassigned or unassigned to a non existing node", result.getExplanation()); @@ -104,10 +258,13 @@ public class StartDatafeedActionTests extends ESTestCase { task = createJobTask(0L, job.getId(), "node_id1", JobState.OPENED); tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); cs = ClusterState.builder(new ClusterName("cluster_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasks)) - .nodes(nodes); - result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build()); + .metaData(new MetaData.Builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) + .put(indexMetaData, false)) + .nodes(nodes) + .routingTable(generateRoutingTable(indexMetaData)); + result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver); assertEquals("node_id1", result.getExecutorNode()); } @@ -170,4 +327,44 @@ public class StartDatafeedActionTests extends ESTestCase { return task; } + private RoutingTable generateRoutingTable(IndexMetaData indexMetaData) { + List> states = new ArrayList<>(1); + states.add(new Tuple<>(0, ShardRoutingState.STARTED)); + return generateRoutingTable(indexMetaData, states); + } + + private RoutingTable generateRoutingTable(IndexMetaData indexMetaData, List> states) { + IndexRoutingTable.Builder rtBuilder = IndexRoutingTable.builder(indexMetaData.getIndex()); + + final String index = indexMetaData.getIndex().getName(); + int counter = 0; + for (Tuple state : states) { + ShardId shardId = new ShardId(index, "_na_", counter); + IndexShardRoutingTable.Builder shardRTBuilder = new IndexShardRoutingTable.Builder(shardId); + ShardRouting shardRouting; + + if (state.v2().equals(ShardRoutingState.STARTED)) { + shardRouting = TestShardRouting.newShardRouting(index, shardId.getId(), + "node_" + Integer.toString(state.v1()), null, true, ShardRoutingState.STARTED); + } else if (state.v2().equals(ShardRoutingState.INITIALIZING)) { + shardRouting = TestShardRouting.newShardRouting(index, shardId.getId(), + "node_" + Integer.toString(state.v1()), null, true, ShardRoutingState.INITIALIZING); + } else if (state.v2().equals(ShardRoutingState.RELOCATING)) { + shardRouting = TestShardRouting.newShardRouting(index, shardId.getId(), + "node_" + Integer.toString(state.v1()), "node_" + Integer.toString((state.v1() + 1) % 3), + true, ShardRoutingState.RELOCATING); + } else { + shardRouting = ShardRouting.newUnassigned(shardId, true, + RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); + } + + shardRTBuilder.addShard(shardRouting); + rtBuilder.addIndexShard(shardRTBuilder.build()); + counter += 1; + } + + return new RoutingTable.Builder().add(rtBuilder).build(); + } + }