From a054e62bc4b549edd258f0761e24e1a760877ce7 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Mon, 28 Sep 2020 12:58:07 -0400 Subject: [PATCH] [ML] allow datafeeds to run if there are any concrete indices (#62827) (#62965) This commit allows a datafeed to be assigned to a node if only one index pattern has concrete indices. --- .../xpack/ml/integration/DatafeedJobsIT.java | 8 ++- .../ml/datafeed/DatafeedNodeSelector.java | 54 +++++++++---------- .../datafeed/DatafeedNodeSelectorTests.java | 26 +++++++++ 3 files changed, 55 insertions(+), 33 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java index 126bb2b7d80..b4ccb1551a9 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java @@ -33,7 +33,6 @@ import org.junit.After; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -80,10 +79,9 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { openJob(job.getId()); assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED)); - List t = new ArrayList<>(2); - t.add("data-1"); - t.add("data-2"); - DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), t); + // Having a pattern with missing indices is acceptable + List indices = Arrays.asList("data-*", "missing-*"); + DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), indices); registerDatafeed(datafeedConfig); putDatafeed(datafeedConfig); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index a050154ee9f..2937ecaf782 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -14,6 +14,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.rest.RestStatus; @@ -120,37 +121,34 @@ public class DatafeedNodeSelector { @Nullable private AssignmentFailure verifyIndicesActive() { - for (String index : datafeedIndices) { + String[] index = datafeedIndices.stream() + // We cannot verify remote indices + .filter(i -> RemoteClusterLicenseChecker.isRemoteIndex(i) == false) + .toArray(String[]::new); - if (RemoteClusterLicenseChecker.isRemoteIndex(index)) { - // We cannot verify remote indices - continue; + final String[] concreteIndices; + + try { + concreteIndices = resolver.concreteIndexNames(clusterState, indicesOptions, true, index); + if (concreteIndices.length == 0) { + return new AssignmentFailure("cannot start datafeed [" + datafeedId + "] because index [" + + Strings.arrayToCommaDelimitedString(index) + "] does not exist, is closed, or is still initializing.", true); } + } catch (Exception e) { + String msg = new ParameterizedMessage("failed resolving indices given [{}] and indices_options [{}]", + Strings.arrayToCommaDelimitedString(index), + indicesOptions).getFormattedMessage(); + LOGGER.debug("[" + datafeedId + "] " + msg, e); + return new AssignmentFailure( + "cannot start datafeed [" + datafeedId + "] because it " + msg + " with exception [" + e.getMessage() + "]", + true); + } - String[] concreteIndices; - - try { - concreteIndices = resolver.concreteIndexNames(clusterState, indicesOptions, true, index); - if (concreteIndices.length == 0) { - return new AssignmentFailure("cannot start datafeed [" + datafeedId + "] because index [" - + index + "] does not exist, is closed, or is still initializing.", true); - } - } catch (Exception e) { - String msg = new ParameterizedMessage("failed resolving indices given [{}] and indices_options [{}]", - index, - indicesOptions).getFormattedMessage(); - LOGGER.debug("[" + datafeedId + "] " + msg, e); - return new AssignmentFailure( - "cannot start datafeed [" + datafeedId + "] because it " + msg + " with exception [" + e.getMessage() + "]", - true); - } - - for (String concreteIndex : concreteIndices) { - IndexRoutingTable routingTable = clusterState.getRoutingTable().index(concreteIndex); - if (routingTable == null || !routingTable.allPrimaryShardsActive()) { - return new AssignmentFailure("cannot start datafeed [" + datafeedId + "] because index [" - + concreteIndex + "] does not have all primary shards active yet.", false); - } + for (String concreteIndex : concreteIndices) { + IndexRoutingTable routingTable = clusterState.getRoutingTable().index(concreteIndex); + if (routingTable == null || !routingTable.allPrimaryShardsActive()) { + return new AssignmentFailure("cannot start datafeed [" + datafeedId + "] because index [" + + concreteIndex + "] does not have all primary shards active yet.", false); } } return null; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java index cdddb03d095..183f0ad6eb3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java @@ -40,6 +40,7 @@ import org.junit.Before; import java.net.InetAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.List; @@ -319,6 +320,31 @@ public class DatafeedNodeSelectorTests extends ESTestCase { "]] with exception [no such index [not_foo]]]")); } + public void testIndexPatternDoesntExist() { + Job job = createScheduledJob("job_id").build(new Date()); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Arrays.asList("missing-*", "foo*")); + + PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder(); + addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); + tasks = tasksBuilder.build(); + + givenClusterState("foo", 1, 0); + + PersistentTasksCustomMetadata.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); + assertEquals("node_id", result.getExecutorNode()); + new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated(); + } + public void testRemoteIndex() { Job job = createScheduledJob("job_id").build(new Date()); DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("remote:foo"));