diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index 2d755d9fec1..3a91091f4b0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ValidateActions; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -20,6 +21,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.Inject; @@ -61,8 +63,10 @@ import org.elasticsearch.xpack.persistent.PersistentTasksExecutor; import org.elasticsearch.xpack.persistent.PersistentTasksService; import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener; import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; +import org.elasticsearch.xpack.security.InternalClient; import java.io.IOException; +import java.util.List; import java.util.Objects; import java.util.function.LongSupplier; import java.util.function.Predicate; @@ -340,14 +344,19 @@ public class StartDatafeedAction private final XPackLicenseState licenseState; private final PersistentTasksService persistentTasksService; + private final InternalClient client; + private final ClusterService clusterService; @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, XPackLicenseState licenseState, PersistentTasksService persistentTasksService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver) { + IndexNameExpressionResolver indexNameExpressionResolver, InternalClient client, + ClusterService clusterService) { super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new); this.licenseState = licenseState; this.persistentTasksService = persistentTasksService; + this.client = client; + this.clusterService = clusterService; } @Override @@ -356,7 +365,7 @@ public class StartDatafeedAction PersistentTaskOperationListener finalListener = new PersistentTaskOperationListener() { @Override public void onResponse(long taskId) { - waitForDatafeedStarted(taskId, request, listener); + waitForYellow(taskId, request, listener); } @Override @@ -370,6 +379,22 @@ public class StartDatafeedAction } } + void waitForYellow(long taskId, Request request, ActionListener listener) { + ClusterState state = clusterService.state(); + MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); + DatafeedConfig config = mlMetadata.getDatafeed(request.getDatafeedId()); + List indices = config.getIndexes(); + if (!indices.isEmpty()) { + ClusterHealthRequest healthRequest = new ClusterHealthRequest(indices.toArray(new String[]{})); + healthRequest.waitForYellowStatus(); + client.admin().cluster().health(healthRequest, ActionListener.wrap(clusterHealthResponse -> { + waitForDatafeedStarted(taskId, request, listener); + }, listener::onFailure)); + } else { + waitForDatafeedStarted(taskId, request, listener); + } + } + void waitForDatafeedStarted(long taskId, Request request, ActionListener listener) { Predicate> predicate = persistentTask -> { if (persistentTask == null) { diff --git a/plugin/src/test/java/org/elasticsearch/license/MachineLearningLicensingTests.java b/plugin/src/test/java/org/elasticsearch/license/MachineLearningLicensingTests.java index fee82eb0a69..89b8b5ace3c 100644 --- a/plugin/src/test/java/org/elasticsearch/license/MachineLearningLicensingTests.java +++ b/plugin/src/test/java/org/elasticsearch/license/MachineLearningLicensingTests.java @@ -179,6 +179,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { public void testAutoCloseJobWithDatafeed() throws Exception { assertMLAllowed(true); + createIndex("foo"); try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); // put job @@ -276,6 +277,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { public void testMachineLearningStartDatafeedActionRestricted() throws Exception { assertMLAllowed(true); + createIndex("foo"); // test that license restricted apis do now work try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); @@ -345,6 +347,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { public void testMachineLearningStopDatafeedActionNotRestricted() throws Exception { assertMLAllowed(true); + createIndex("foo"); // test that license restricted apis do now work try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yaml index b591896b710..d33c449e6d3 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yaml @@ -31,6 +31,22 @@ setup: } } + - do: + indices.create: + index: index-1 + body: + settings: + index: + number_of_replicas: 1 + + - do: + indices.create: + index: index-2 + body: + settings: + index: + number_of_replicas: 1 + - do: xpack.ml.put_datafeed: datafeed_id: datafeed-1