Refactor StartDataFeed to more intelligently wait on shard status (elastic/x-pack-elasticsearch#974)
Rather than using an async call, this leverages the Assignment logic while selecting nodes. Now with 300% more tests! Original commit: elastic/x-pack-elasticsearch@300d628f72
This commit is contained in:
parent
9a9ae5edc7
commit
e2a30331ba
|
@ -12,16 +12,16 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.ValidateActions;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -62,7 +62,6 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Persiste
|
|||
import org.elasticsearch.xpack.persistent.PersistentTasksExecutor;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
|
||||
import org.elasticsearch.xpack.security.InternalClient;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
@ -343,19 +342,14 @@ public class StartDatafeedAction
|
|||
|
||||
private final XPackLicenseState licenseState;
|
||||
private final PersistentTasksService persistentTasksService;
|
||||
private final InternalClient client;
|
||||
private final ClusterService clusterService;
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, XPackLicenseState licenseState,
|
||||
PersistentTasksService persistentTasksService, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, InternalClient client,
|
||||
ClusterService clusterService) {
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
|
||||
this.licenseState = licenseState;
|
||||
this.persistentTasksService = persistentTasksService;
|
||||
this.client = client;
|
||||
this.clusterService = clusterService;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -364,7 +358,7 @@ public class StartDatafeedAction
|
|||
ActionListener<PersistentTask<Request>> finalListener = new ActionListener<PersistentTask<Request>>() {
|
||||
@Override
|
||||
public void onResponse(PersistentTask<Request> persistentTask) {
|
||||
waitForYellow(persistentTask.getId(), request, listener);
|
||||
waitForDatafeedStarted(persistentTask.getId(), request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -378,22 +372,6 @@ public class StartDatafeedAction
|
|||
}
|
||||
}
|
||||
|
||||
void waitForYellow(long taskId, Request request, ActionListener<Response> listener) {
|
||||
ClusterState state = clusterService.state();
|
||||
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
|
||||
DatafeedConfig config = mlMetadata.getDatafeed(request.getDatafeedId());
|
||||
List<String> indices = config.getIndexes();
|
||||
if (!indices.isEmpty()) {
|
||||
ClusterHealthRequest healthRequest = new ClusterHealthRequest(indices.toArray(new String[]{}));
|
||||
healthRequest.waitForYellowStatus();
|
||||
client.admin().cluster().health(healthRequest, ActionListener.wrap(clusterHealthResponse -> {
|
||||
waitForDatafeedStarted(taskId, request, listener);
|
||||
}, listener::onFailure));
|
||||
} else {
|
||||
waitForDatafeedStarted(taskId, request, listener);
|
||||
}
|
||||
}
|
||||
|
||||
void waitForDatafeedStarted(long taskId, Request request, ActionListener<Response> listener) {
|
||||
Predicate<PersistentTask<?>> predicate = persistentTask -> {
|
||||
if (persistentTask == null) {
|
||||
|
@ -422,6 +400,7 @@ public class StartDatafeedAction
|
|||
private final XPackLicenseState licenseState;
|
||||
private final Auditor auditor;
|
||||
private final ThreadPool threadPool;
|
||||
private final IndexNameExpressionResolver resolver;
|
||||
|
||||
public StartDatafeedPersistentTasksExecutor(Settings settings, ThreadPool threadPool, XPackLicenseState licenseState,
|
||||
DatafeedManager datafeedManager, Auditor auditor) {
|
||||
|
@ -430,11 +409,12 @@ public class StartDatafeedAction
|
|||
this.datafeedManager = datafeedManager;
|
||||
this.auditor = auditor;
|
||||
this.threadPool = threadPool;
|
||||
this.resolver = new IndexNameExpressionResolver(settings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Assignment getAssignment(Request request, ClusterState clusterState) {
|
||||
Assignment assignment = selectNode(logger, request.getDatafeedId(), clusterState);
|
||||
Assignment assignment = selectNode(logger, request.getDatafeedId(), clusterState, resolver);
|
||||
writeAssignmentNotification(request.getDatafeedId(), assignment, clusterState);
|
||||
return assignment;
|
||||
}
|
||||
|
@ -514,7 +494,8 @@ public class StartDatafeedAction
|
|||
}
|
||||
}
|
||||
|
||||
static Assignment selectNode(Logger logger, String datafeedId, ClusterState clusterState) {
|
||||
static Assignment selectNode(Logger logger, String datafeedId, ClusterState clusterState,
|
||||
IndexNameExpressionResolver resolver) {
|
||||
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
|
||||
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
|
||||
|
@ -539,7 +520,43 @@ public class StartDatafeedAction
|
|||
logger.debug(reason);
|
||||
return new Assignment(null, reason);
|
||||
}
|
||||
String reason = verifyIndicesActive(logger, datafeed, clusterState, resolver);
|
||||
if (reason != null) {
|
||||
return new Assignment(null, reason);
|
||||
}
|
||||
return new Assignment(jobTask.getExecutorNode(), "");
|
||||
}
|
||||
|
||||
private static String verifyIndicesActive(Logger logger, DatafeedConfig datafeed, ClusterState clusterState,
|
||||
IndexNameExpressionResolver resolver) {
|
||||
List<String> indices = datafeed.getIndexes();
|
||||
for (String index : indices) {
|
||||
String[] concreteIndices;
|
||||
String reason = "cannot start datafeed [" + datafeed.getId() + "] because index ["
|
||||
+ index + "] does not exist, is closed, or is still initializing.";
|
||||
|
||||
try {
|
||||
concreteIndices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), index);
|
||||
if (concreteIndices.length == 0) {
|
||||
logger.debug(reason);
|
||||
return reason;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.debug(reason);
|
||||
return reason;
|
||||
}
|
||||
|
||||
for (String concreteIndex : concreteIndices) {
|
||||
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(concreteIndex);
|
||||
if (routingTable == null || !routingTable.allPrimaryShardsActive()) {
|
||||
reason = "cannot start datafeed [" + datafeed.getId() + "] because index ["
|
||||
+ concreteIndex + "] does not have all primary shards active yet.";
|
||||
logger.debug(reason);
|
||||
return reason;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -5,6 +5,8 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.action;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
|
||||
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
|
@ -128,6 +130,7 @@ public class DatafeedJobsIT extends SecurityIntegTestCase {
|
|||
client().admin().indices().prepareCreate("data-2")
|
||||
.addMapping("type", "time", "type=date")
|
||||
.get();
|
||||
ClusterHealthResponse r = client().admin().cluster().prepareHealth("data-1", "data-2").setWaitForYellowStatus().get();
|
||||
long numDocs2 = randomIntBetween(32, 2048);
|
||||
indexDocs(logger, "data-2", numDocs2, oneWeekAgo, now);
|
||||
|
||||
|
@ -142,7 +145,10 @@ public class DatafeedJobsIT extends SecurityIntegTestCase {
|
|||
assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED);
|
||||
});
|
||||
|
||||
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList("data-*"));
|
||||
List<String> t = new ArrayList<>(2);
|
||||
t.add("data-1");
|
||||
t.add("data-2");
|
||||
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), t);
|
||||
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(datafeedConfig);
|
||||
PutDatafeedAction.Response putDatafeedResponse = client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).get();
|
||||
assertTrue(putDatafeedResponse.isAcknowledged());
|
||||
|
|
|
@ -10,10 +10,23 @@ import org.elasticsearch.ResourceNotFoundException;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RecoverySource;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
|
@ -28,9 +41,11 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignme
|
|||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask;
|
||||
|
@ -42,10 +57,17 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
public class StartDatafeedActionTests extends ESTestCase {
|
||||
|
||||
public void testSelectNode() throws Exception {
|
||||
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("foo")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
|
||||
Job job = createScheduledJob("job_id").build(new Date());
|
||||
mlMetadata.putJob(job, false);
|
||||
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*")));
|
||||
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")));
|
||||
|
||||
JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED);
|
||||
PersistentTask<OpenJobAction.Request> task = createJobTask(0L, job.getId(), "node_id", jobState);
|
||||
|
@ -57,11 +79,14 @@ public class StartDatafeedActionTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
|
||||
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks))
|
||||
.nodes(nodes);
|
||||
.metaData(new MetaData.Builder()
|
||||
.putCustom(MlMetadata.TYPE, mlMetadata.build())
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks)
|
||||
.put(indexMetaData, false))
|
||||
.nodes(nodes)
|
||||
.routingTable(generateRoutingTable(indexMetaData));
|
||||
|
||||
Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
|
||||
Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver);
|
||||
assertNull(result.getExecutorNode());
|
||||
assertEquals("cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState +
|
||||
"] while state [opened] is required", result.getExplanation());
|
||||
|
@ -69,18 +94,144 @@ public class StartDatafeedActionTests extends ESTestCase {
|
|||
task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED);
|
||||
tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task));
|
||||
cs = ClusterState.builder(new ClusterName("cluster_name"))
|
||||
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks))
|
||||
.nodes(nodes);
|
||||
result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
|
||||
.metaData(new MetaData.Builder()
|
||||
.putCustom(MlMetadata.TYPE, mlMetadata.build())
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks)
|
||||
.put(indexMetaData, false))
|
||||
.nodes(nodes)
|
||||
.routingTable(generateRoutingTable(indexMetaData));
|
||||
result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver);
|
||||
assertEquals("node_id", result.getExecutorNode());
|
||||
}
|
||||
|
||||
public void testSelectNode_jobTaskStale() {
|
||||
public void testShardUnassigned() throws Exception {
|
||||
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("foo")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
|
||||
Job job = createScheduledJob("job_id").build(new Date());
|
||||
mlMetadata.putJob(job, false);
|
||||
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*")));
|
||||
|
||||
// Using wildcard index name to test for index resolving as well
|
||||
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")));
|
||||
|
||||
DiscoveryNodes nodes = DiscoveryNodes.builder()
|
||||
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
|
||||
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
|
||||
.build();
|
||||
|
||||
PersistentTask<OpenJobAction.Request> task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED);
|
||||
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task));
|
||||
|
||||
List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(2);
|
||||
states.add(new Tuple<>(0, ShardRoutingState.UNASSIGNED));
|
||||
|
||||
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
|
||||
.metaData(new MetaData.Builder()
|
||||
.putCustom(MlMetadata.TYPE, mlMetadata.build())
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks)
|
||||
.put(indexMetaData, false))
|
||||
.nodes(nodes)
|
||||
.routingTable(generateRoutingTable(indexMetaData, states));
|
||||
|
||||
Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver);
|
||||
assertNull(result.getExecutorNode());
|
||||
assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " +
|
||||
"does not have all primary shards active yet."));
|
||||
}
|
||||
|
||||
public void testShardNotAllActive() throws Exception {
|
||||
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("foo")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(2)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
|
||||
Job job = createScheduledJob("job_id").build(new Date());
|
||||
mlMetadata.putJob(job, false);
|
||||
|
||||
// Using wildcard index name to test for index resolving as well
|
||||
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")));
|
||||
|
||||
DiscoveryNodes nodes = DiscoveryNodes.builder()
|
||||
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
|
||||
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
|
||||
.build();
|
||||
|
||||
PersistentTask<OpenJobAction.Request> task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED);
|
||||
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task));
|
||||
|
||||
List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(2);
|
||||
states.add(new Tuple<>(0, ShardRoutingState.STARTED));
|
||||
states.add(new Tuple<>(1, ShardRoutingState.INITIALIZING));
|
||||
|
||||
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
|
||||
.metaData(new MetaData.Builder()
|
||||
.putCustom(MlMetadata.TYPE, mlMetadata.build())
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks)
|
||||
.put(indexMetaData, false))
|
||||
.nodes(nodes)
|
||||
.routingTable(generateRoutingTable(indexMetaData, states));
|
||||
|
||||
Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver);
|
||||
assertNull(result.getExecutorNode());
|
||||
assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " +
|
||||
"does not have all primary shards active yet."));
|
||||
}
|
||||
|
||||
public void testIndexDoesntExist() throws Exception {
|
||||
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("foo")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
|
||||
Job job = createScheduledJob("job_id").build(new Date());
|
||||
mlMetadata.putJob(job, false);
|
||||
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")));
|
||||
|
||||
DiscoveryNodes nodes = DiscoveryNodes.builder()
|
||||
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
|
||||
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
|
||||
.build();
|
||||
|
||||
PersistentTask<OpenJobAction.Request> task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED);
|
||||
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task));
|
||||
|
||||
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
|
||||
.metaData(new MetaData.Builder()
|
||||
.putCustom(MlMetadata.TYPE, mlMetadata.build())
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks)
|
||||
.put(indexMetaData, false))
|
||||
.nodes(nodes)
|
||||
.routingTable(generateRoutingTable(indexMetaData));
|
||||
|
||||
Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver);
|
||||
assertNull(result.getExecutorNode());
|
||||
assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [not_foo] " +
|
||||
"does not exist, is closed, or is still initializing."));
|
||||
}
|
||||
|
||||
public void testSelectNode_jobTaskStale() {
|
||||
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("foo")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
|
||||
Job job = createScheduledJob("job_id").build(new Date());
|
||||
mlMetadata.putJob(job, false);
|
||||
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")));
|
||||
|
||||
String nodeId = randomBoolean() ? "node_id2" : null;
|
||||
PersistentTask<OpenJobAction.Request> task = createJobTask(0L, job.getId(), nodeId, JobState.OPENED);
|
||||
|
@ -92,11 +243,14 @@ public class StartDatafeedActionTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
|
||||
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks))
|
||||
.nodes(nodes);
|
||||
.metaData(new MetaData.Builder()
|
||||
.putCustom(MlMetadata.TYPE, mlMetadata.build())
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks)
|
||||
.put(indexMetaData, false))
|
||||
.nodes(nodes)
|
||||
.routingTable(generateRoutingTable(indexMetaData));
|
||||
|
||||
Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
|
||||
Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver);
|
||||
assertNull(result.getExecutorNode());
|
||||
assertEquals("cannot start datafeed [datafeed_id], job [job_id] is unassigned or unassigned to a non existing node",
|
||||
result.getExplanation());
|
||||
|
@ -104,10 +258,13 @@ public class StartDatafeedActionTests extends ESTestCase {
|
|||
task = createJobTask(0L, job.getId(), "node_id1", JobState.OPENED);
|
||||
tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task));
|
||||
cs = ClusterState.builder(new ClusterName("cluster_name"))
|
||||
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks))
|
||||
.nodes(nodes);
|
||||
result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
|
||||
.metaData(new MetaData.Builder()
|
||||
.putCustom(MlMetadata.TYPE, mlMetadata.build())
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks)
|
||||
.put(indexMetaData, false))
|
||||
.nodes(nodes)
|
||||
.routingTable(generateRoutingTable(indexMetaData));
|
||||
result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver);
|
||||
assertEquals("node_id1", result.getExecutorNode());
|
||||
}
|
||||
|
||||
|
@ -170,4 +327,44 @@ public class StartDatafeedActionTests extends ESTestCase {
|
|||
return task;
|
||||
}
|
||||
|
||||
private RoutingTable generateRoutingTable(IndexMetaData indexMetaData) {
|
||||
List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(1);
|
||||
states.add(new Tuple<>(0, ShardRoutingState.STARTED));
|
||||
return generateRoutingTable(indexMetaData, states);
|
||||
}
|
||||
|
||||
private RoutingTable generateRoutingTable(IndexMetaData indexMetaData, List<Tuple<Integer, ShardRoutingState>> states) {
|
||||
IndexRoutingTable.Builder rtBuilder = IndexRoutingTable.builder(indexMetaData.getIndex());
|
||||
|
||||
final String index = indexMetaData.getIndex().getName();
|
||||
int counter = 0;
|
||||
for (Tuple<Integer, ShardRoutingState> state : states) {
|
||||
ShardId shardId = new ShardId(index, "_na_", counter);
|
||||
IndexShardRoutingTable.Builder shardRTBuilder = new IndexShardRoutingTable.Builder(shardId);
|
||||
ShardRouting shardRouting;
|
||||
|
||||
if (state.v2().equals(ShardRoutingState.STARTED)) {
|
||||
shardRouting = TestShardRouting.newShardRouting(index, shardId.getId(),
|
||||
"node_" + Integer.toString(state.v1()), null, true, ShardRoutingState.STARTED);
|
||||
} else if (state.v2().equals(ShardRoutingState.INITIALIZING)) {
|
||||
shardRouting = TestShardRouting.newShardRouting(index, shardId.getId(),
|
||||
"node_" + Integer.toString(state.v1()), null, true, ShardRoutingState.INITIALIZING);
|
||||
} else if (state.v2().equals(ShardRoutingState.RELOCATING)) {
|
||||
shardRouting = TestShardRouting.newShardRouting(index, shardId.getId(),
|
||||
"node_" + Integer.toString(state.v1()), "node_" + Integer.toString((state.v1() + 1) % 3),
|
||||
true, ShardRoutingState.RELOCATING);
|
||||
} else {
|
||||
shardRouting = ShardRouting.newUnassigned(shardId, true,
|
||||
RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE,
|
||||
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""));
|
||||
}
|
||||
|
||||
shardRTBuilder.addShard(shardRouting);
|
||||
rtBuilder.addIndexShard(shardRTBuilder.build());
|
||||
counter += 1;
|
||||
}
|
||||
|
||||
return new RoutingTable.Builder().add(rtBuilder).build();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue