Wait for datafeed index to hit yellow before proceeding (elastic/x-pack-elasticsearch#940)

relates elastic/x-pack-elasticsearch#783

Original commit: elastic/x-pack-elasticsearch@838482e645
This commit is contained in:
Zachary Tong 2017-04-04 16:14:02 +00:00 committed by GitHub
parent 4fdcedb9f9
commit 2153c71e8f
3 changed files with 46 additions and 2 deletions

View File

@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions; import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
@ -20,6 +21,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -61,8 +63,10 @@ import org.elasticsearch.xpack.persistent.PersistentTasksExecutor;
import org.elasticsearch.xpack.persistent.PersistentTasksService; import org.elasticsearch.xpack.persistent.PersistentTasksService;
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener; import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
import org.elasticsearch.xpack.security.InternalClient;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -340,14 +344,19 @@ public class StartDatafeedAction
private final XPackLicenseState licenseState; private final XPackLicenseState licenseState;
private final PersistentTasksService persistentTasksService; private final PersistentTasksService persistentTasksService;
private final InternalClient client;
private final ClusterService clusterService;
@Inject @Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, XPackLicenseState licenseState, public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, XPackLicenseState licenseState,
PersistentTasksService persistentTasksService, ActionFilters actionFilters, PersistentTasksService persistentTasksService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) { IndexNameExpressionResolver indexNameExpressionResolver, InternalClient client,
ClusterService clusterService) {
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new); super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
this.licenseState = licenseState; this.licenseState = licenseState;
this.persistentTasksService = persistentTasksService; this.persistentTasksService = persistentTasksService;
this.client = client;
this.clusterService = clusterService;
} }
@Override @Override
@ -356,7 +365,7 @@ public class StartDatafeedAction
PersistentTaskOperationListener finalListener = new PersistentTaskOperationListener() { PersistentTaskOperationListener finalListener = new PersistentTaskOperationListener() {
@Override @Override
public void onResponse(long taskId) { public void onResponse(long taskId) {
waitForDatafeedStarted(taskId, request, listener); waitForYellow(taskId, request, listener);
} }
@Override @Override
@ -370,6 +379,22 @@ public class StartDatafeedAction
} }
} }
void waitForYellow(long taskId, Request request, ActionListener<Response> listener) {
ClusterState state = clusterService.state();
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
DatafeedConfig config = mlMetadata.getDatafeed(request.getDatafeedId());
List<String> indices = config.getIndexes();
if (!indices.isEmpty()) {
ClusterHealthRequest healthRequest = new ClusterHealthRequest(indices.toArray(new String[]{}));
healthRequest.waitForYellowStatus();
client.admin().cluster().health(healthRequest, ActionListener.wrap(clusterHealthResponse -> {
waitForDatafeedStarted(taskId, request, listener);
}, listener::onFailure));
} else {
waitForDatafeedStarted(taskId, request, listener);
}
}
void waitForDatafeedStarted(long taskId, Request request, ActionListener<Response> listener) { void waitForDatafeedStarted(long taskId, Request request, ActionListener<Response> listener) {
Predicate<PersistentTask<?>> predicate = persistentTask -> { Predicate<PersistentTask<?>> predicate = persistentTask -> {
if (persistentTask == null) { if (persistentTask == null) {

View File

@ -179,6 +179,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
public void testAutoCloseJobWithDatafeed() throws Exception { public void testAutoCloseJobWithDatafeed() throws Exception {
assertMLAllowed(true); assertMLAllowed(true);
createIndex("foo");
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
// put job // put job
@ -276,6 +277,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
public void testMachineLearningStartDatafeedActionRestricted() throws Exception { public void testMachineLearningStartDatafeedActionRestricted() throws Exception {
assertMLAllowed(true); assertMLAllowed(true);
createIndex("foo");
// test that license restricted apis do now work // test that license restricted apis do now work
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
@ -345,6 +347,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
public void testMachineLearningStopDatafeedActionNotRestricted() throws Exception { public void testMachineLearningStopDatafeedActionNotRestricted() throws Exception {
assertMLAllowed(true); assertMLAllowed(true);
createIndex("foo");
// test that license restricted apis do now work // test that license restricted apis do now work
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());

View File

@ -31,6 +31,22 @@ setup:
} }
} }
- do:
indices.create:
index: index-1
body:
settings:
index:
number_of_replicas: 1
- do:
indices.create:
index: index-2
body:
settings:
index:
number_of_replicas: 1
- do: - do:
xpack.ml.put_datafeed: xpack.ml.put_datafeed:
datafeed_id: datafeed-1 datafeed_id: datafeed-1