[ML] expand data_streams when assigning datafeed to node (#58175) (#58242)

This commit is contained in:
Benjamin Trent 2020-06-17 08:34:34 -04:00 committed by GitHub
parent 2d3d7ab387
commit 69338b03d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 55 additions and 1 deletions

View File

@ -130,7 +130,7 @@ public class DatafeedNodeSelector {
String[] concreteIndices; String[] concreteIndices;
try { try {
concreteIndices = resolver.concreteIndexNames(clusterState, indicesOptions, index); concreteIndices = resolver.concreteIndexNames(clusterState, indicesOptions, true, index);
if (concreteIndices.length == 0) { if (concreteIndices.length == 0) {
return new AssignmentFailure("cannot start datafeed [" + datafeedId + "] because index [" return new AssignmentFailure("cannot start datafeed [" + datafeedId + "] because index ["
+ index + "] does not exist, is closed, or is still initializing.", true); + index + "] does not exist, is closed, or is still initializing.", true);

View File

@ -10,6 +10,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Metadata;
@ -25,6 +26,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -42,6 +44,8 @@ import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE;
import static org.elasticsearch.xpack.ml.action.TransportOpenJobActionTests.addJobTask; import static org.elasticsearch.xpack.ml.action.TransportOpenJobActionTests.addJobTask;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob;
@ -91,6 +95,34 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated(); SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated();
} }
public void testSelectNode_GivenJobIsOpenedAndDataStream() {
Job job = createScheduledJob("job_id").build(new Date());
DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"));
PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder();
addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
tasks = tasksBuilder.build();
givenClusterStateWithDatastream("foo",
1,
0,
Collections.singletonList(new Tuple<>(0, ShardRoutingState.STARTED)));
PersistentTasksCustomMetadata.Assignment result = new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
assertEquals("node_id", result.getExecutorNode());
new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated();
}
public void testSelectNode_GivenJobIsOpening() { public void testSelectNode_GivenJobIsOpening() {
Job job = createScheduledJob("job_id").build(new Date()); Job job = createScheduledJob("job_id").build(new Date());
DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")); DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"));
@ -449,6 +481,28 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
.build(); .build();
} }
private void givenClusterStateWithDatastream(String dataStreamName,
int numberOfShards,
int numberOfReplicas,
List<Tuple<Integer, ShardRoutingState>> states) {
Index index = new Index(getDefaultBackingIndexName(dataStreamName, 1), INDEX_UUID_NA_VALUE);
IndexMetadata indexMetadata = IndexMetadata.builder(index.getName())
.settings(settings(Version.CURRENT))
.numberOfShards(numberOfShards)
.numberOfReplicas(numberOfReplicas)
.build();
clusterState = ClusterState.builder(new ClusterName("cluster_name"))
.metadata(new Metadata.Builder()
.put(new DataStream(dataStreamName, "@timestamp", Collections.singletonList(index), 1L))
.putCustom(PersistentTasksCustomMetadata.TYPE, tasks)
.putCustom(MlMetadata.TYPE, mlMetadata)
.put(indexMetadata, false))
.nodes(nodes)
.routingTable(generateRoutingTable(indexMetadata, states))
.build();
}
private static RoutingTable generateRoutingTable(IndexMetadata indexMetadata, List<Tuple<Integer, ShardRoutingState>> states) { private static RoutingTable generateRoutingTable(IndexMetadata indexMetadata, List<Tuple<Integer, ShardRoutingState>> states) {
IndexRoutingTable.Builder rtBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); IndexRoutingTable.Builder rtBuilder = IndexRoutingTable.builder(indexMetadata.getIndex());