[ML] Auto job close should use the current node as coordinating node.

Original commit: elastic/x-pack-elasticsearch@4f3f8f9915
This commit is contained in:
Martijn van Groningen 2017-04-19 20:20:27 +02:00
parent 1f75dec642
commit 7ee48846ec
3 changed files with 68 additions and 2 deletions

View File

@ -117,7 +117,9 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
// changes here should be reflected there too.
private TimeValue timeout = MachineLearning.STATE_PERSIST_RESTORE_TIMEOUT;
String[] resolvedJobIds;
private String[] resolvedJobIds;
private boolean local;
Request() {}
@ -152,6 +154,10 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
this.force = force;
}
public void setLocal(boolean local) {
this.local = local;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -159,6 +165,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
timeout = new TimeValue(in);
force = in.readBoolean();
resolvedJobIds = in.readStringArray();
local = in.readBoolean();
}
@Override
@ -168,6 +175,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
timeout.writeTo(out);
out.writeBoolean(force);
out.writeStringArray(resolvedJobIds);
out.writeBoolean(local);
}
@Override
@ -296,7 +304,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
final ClusterState state = clusterService.state();
final DiscoveryNodes nodes = state.nodes();
if (nodes.isLocalNodeElectedMaster() == false) {
if (request.local == false && nodes.isLocalNodeElectedMaster() == false) {
// Delegates close job to elected master node, so it becomes the coordinating node.
// See comment in OpenJobAction.Transport class for more information.
if (nodes.getMasterNode() == null) {

View File

@ -379,6 +379,20 @@ public class DatafeedManager extends AbstractComponent {
@Override
public void onResponse(PersistentTask<StartDatafeedAction.DatafeedParams> PersistentTask) {
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(datafeed.getJobId());
/*
Enforces that for the close job api call the current node is the coordinating node.
If we are in this callback then the local node's cluster state doesn't contain a persistent task
for the datafeed and therefor the datafeed is stopped, so there is no need for the master node to
be to coordinating node.
Normally close job and stop datafeed are both executed via master node and both apis use master
node's local cluster state for validation purposes. In case of auto close this isn't the case and
if the job runs on a regular node then it may see the update before the close job api does in
the master node's local cluster state. This can cause the close job api the fail with a validation
error that the datafeed isn't stopped. To avoid this we use the current node as coordinating node
for the close job api call.
*/
closeJobRequest.setLocal(true);
client.execute(CloseJobAction.INSTANCE, closeJobRequest, new ActionListener<CloseJobAction.Response>() {
@Override

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesArray;
@ -142,6 +143,49 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
});
}
public void testJobAutoClose() throws Exception {
internalCluster().ensureAtMostNumDataNodes(0);
internalCluster().startNode(Settings.builder().put(MachineLearning.ML_ENABLED.getKey(), false));
internalCluster().startNode(Settings.builder().put(MachineLearning.ML_ENABLED.getKey(), true));
client().admin().indices().prepareCreate("data")
.addMapping("type", "time", "type=date")
.get();
IndexRequest indexRequest = new IndexRequest("data", "type");
indexRequest.source("time", 1407081600L);
client().index(indexRequest).get();
indexRequest = new IndexRequest("data", "type");
indexRequest.source("time", 1407082600L);
client().index(indexRequest).get();
indexRequest = new IndexRequest("data", "type");
indexRequest.source("time", 1407083600L);
client().index(indexRequest).get();
refresh();
Job.Builder job = createScheduledJob("job_id");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
DatafeedConfig config = createDatafeed("data_feed_id", job.getId(), Collections.singletonList("data"));
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config);
PutDatafeedAction.Response putDatadeedResponse = client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest)
.actionGet();
assertTrue(putDatadeedResponse.isAcknowledged());
client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).get();
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(config.getId(), 0L);
startDatafeedRequest.getParams().setEndTime(1492616844L);
client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get();
assertBusy(() -> {
GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId());
assertEquals(3L, jobStats.getDataCounts().getProcessedRecordCount());
assertEquals(JobState.CLOSED, jobStats.getState());
});
}
@TestLogging("org.elasticsearch.xpack.persistent:TRACE,org.elasticsearch.cluster.service:DEBUG,org.elasticsearch.xpack.ml.action:DEBUG")
public void testDedicatedMlNode() throws Exception {
internalCluster().ensureAtMostNumDataNodes(0);