This commit allows a datafeed to be assigned to a node if only one index pattern has concrete indices.
This commit is contained in:
parent
2247ab3295
commit
a054e62bc4
|
@ -33,7 +33,6 @@ import org.junit.After;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -80,10 +79,9 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
|
||||||
openJob(job.getId());
|
openJob(job.getId());
|
||||||
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
|
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
|
||||||
|
|
||||||
List<String> t = new ArrayList<>(2);
|
// Having a pattern with missing indices is acceptable
|
||||||
t.add("data-1");
|
List<String> indices = Arrays.asList("data-*", "missing-*");
|
||||||
t.add("data-2");
|
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), indices);
|
||||||
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), t);
|
|
||||||
registerDatafeed(datafeedConfig);
|
registerDatafeed(datafeedConfig);
|
||||||
putDatafeed(datafeedConfig);
|
putDatafeed(datafeedConfig);
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@ import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.license.RemoteClusterLicenseChecker;
|
import org.elasticsearch.license.RemoteClusterLicenseChecker;
|
||||||
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
|
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
|
@ -120,37 +121,34 @@ public class DatafeedNodeSelector {
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
private AssignmentFailure verifyIndicesActive() {
|
private AssignmentFailure verifyIndicesActive() {
|
||||||
for (String index : datafeedIndices) {
|
String[] index = datafeedIndices.stream()
|
||||||
|
// We cannot verify remote indices
|
||||||
|
.filter(i -> RemoteClusterLicenseChecker.isRemoteIndex(i) == false)
|
||||||
|
.toArray(String[]::new);
|
||||||
|
|
||||||
if (RemoteClusterLicenseChecker.isRemoteIndex(index)) {
|
final String[] concreteIndices;
|
||||||
// We cannot verify remote indices
|
|
||||||
continue;
|
try {
|
||||||
|
concreteIndices = resolver.concreteIndexNames(clusterState, indicesOptions, true, index);
|
||||||
|
if (concreteIndices.length == 0) {
|
||||||
|
return new AssignmentFailure("cannot start datafeed [" + datafeedId + "] because index ["
|
||||||
|
+ Strings.arrayToCommaDelimitedString(index) + "] does not exist, is closed, or is still initializing.", true);
|
||||||
}
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
String msg = new ParameterizedMessage("failed resolving indices given [{}] and indices_options [{}]",
|
||||||
|
Strings.arrayToCommaDelimitedString(index),
|
||||||
|
indicesOptions).getFormattedMessage();
|
||||||
|
LOGGER.debug("[" + datafeedId + "] " + msg, e);
|
||||||
|
return new AssignmentFailure(
|
||||||
|
"cannot start datafeed [" + datafeedId + "] because it " + msg + " with exception [" + e.getMessage() + "]",
|
||||||
|
true);
|
||||||
|
}
|
||||||
|
|
||||||
String[] concreteIndices;
|
for (String concreteIndex : concreteIndices) {
|
||||||
|
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(concreteIndex);
|
||||||
try {
|
if (routingTable == null || !routingTable.allPrimaryShardsActive()) {
|
||||||
concreteIndices = resolver.concreteIndexNames(clusterState, indicesOptions, true, index);
|
return new AssignmentFailure("cannot start datafeed [" + datafeedId + "] because index ["
|
||||||
if (concreteIndices.length == 0) {
|
+ concreteIndex + "] does not have all primary shards active yet.", false);
|
||||||
return new AssignmentFailure("cannot start datafeed [" + datafeedId + "] because index ["
|
|
||||||
+ index + "] does not exist, is closed, or is still initializing.", true);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
String msg = new ParameterizedMessage("failed resolving indices given [{}] and indices_options [{}]",
|
|
||||||
index,
|
|
||||||
indicesOptions).getFormattedMessage();
|
|
||||||
LOGGER.debug("[" + datafeedId + "] " + msg, e);
|
|
||||||
return new AssignmentFailure(
|
|
||||||
"cannot start datafeed [" + datafeedId + "] because it " + msg + " with exception [" + e.getMessage() + "]",
|
|
||||||
true);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (String concreteIndex : concreteIndices) {
|
|
||||||
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(concreteIndex);
|
|
||||||
if (routingTable == null || !routingTable.allPrimaryShardsActive()) {
|
|
||||||
return new AssignmentFailure("cannot start datafeed [" + datafeedId + "] because index ["
|
|
||||||
+ concreteIndex + "] does not have all primary shards active yet.", false);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.junit.Before;
|
||||||
|
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -319,6 +320,31 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
|
||||||
"]] with exception [no such index [not_foo]]]"));
|
"]] with exception [no such index [not_foo]]]"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testIndexPatternDoesntExist() {
|
||||||
|
Job job = createScheduledJob("job_id").build(new Date());
|
||||||
|
DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Arrays.asList("missing-*", "foo*"));
|
||||||
|
|
||||||
|
PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder();
|
||||||
|
addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
|
||||||
|
tasks = tasksBuilder.build();
|
||||||
|
|
||||||
|
givenClusterState("foo", 1, 0);
|
||||||
|
|
||||||
|
PersistentTasksCustomMetadata.Assignment result = new DatafeedNodeSelector(clusterState,
|
||||||
|
resolver,
|
||||||
|
df.getId(),
|
||||||
|
df.getJobId(),
|
||||||
|
df.getIndices(),
|
||||||
|
SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
|
||||||
|
assertEquals("node_id", result.getExecutorNode());
|
||||||
|
new DatafeedNodeSelector(clusterState,
|
||||||
|
resolver,
|
||||||
|
df.getId(),
|
||||||
|
df.getJobId(),
|
||||||
|
df.getIndices(),
|
||||||
|
SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated();
|
||||||
|
}
|
||||||
|
|
||||||
public void testRemoteIndex() {
|
public void testRemoteIndex() {
|
||||||
Job job = createScheduledJob("job_id").build(new Date());
|
Job job = createScheduledJob("job_id").build(new Date());
|
||||||
DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("remote:foo"));
|
DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("remote:foo"));
|
||||||
|
|
Loading…
Reference in New Issue