Fix closing/stopping unassigned jobs/datafeeds (elastic/x-pack-elasticsearch#1672)

Original commit: elastic/x-pack-elasticsearch@9f032ac98f
This commit is contained in:
David Kyle 2017-06-09 09:33:36 +01:00 committed by GitHub
parent e29ab36849
commit d6e92c19da
3 changed files with 112 additions and 25 deletions

View File

@ -342,21 +342,23 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
return;
}
Set<String> executorNodes = new HashSet<>();
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
for (String resolvedJobId : request.openJobIds) {
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(resolvedJobId, tasks);
if (jobTask == null || jobTask.isAssigned() == false) {
String message = "Cannot perform requested action because job [" + resolvedJobId
+ "] is not open";
listener.onFailure(ExceptionsHelper.conflictStatusException(message));
return;
} else {
executorNodes.add(jobTask.getExecutorNode());
if (request.isForce() == false) {
Set<String> executorNodes = new HashSet<>();
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
for (String resolvedJobId : request.openJobIds) {
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(resolvedJobId, tasks);
if (jobTask == null || jobTask.isAssigned() == false) {
String message = "Cannot close job [" + resolvedJobId + "] because the job does not have an assigned node." +
" Use force close to close the job";
listener.onFailure(ExceptionsHelper.conflictStatusException(message));
return;
} else {
executorNodes.add(jobTask.getExecutorNode());
}
}
request.setNodes(executorNodes.toArray(new String[executorNodes.size()]));
}
request.setNodes(executorNodes.toArray(new String[executorNodes.size()]));
if (request.isForce()) {
List<String> jobIdsToForceClose = new ArrayList<>(openJobIds);
jobIdsToForceClose.addAll(closingJobIds);

View File

@ -305,8 +305,16 @@ public class StopDatafeedAction
Set<String> executorNodes = new HashSet<>();
for (String datafeedId : startedDatafeeds) {
PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks);
executorNodes.add(datafeedTask.getExecutorNode());
if (datafeedTask == null || datafeedTask.isAssigned() == false) {
String message = "Cannot stop datafeed [" + datafeedId + "] because the datafeed does not have an assigned node." +
" Use force stop to stop the datafeed";
listener.onFailure(ExceptionsHelper.conflictStatusException(message));
return;
} else {
executorNodes.add(datafeedTask.getExecutorNode());
}
}
request.setNodes(executorNodes.toArray(new String[executorNodes.size()]));
// wait for started and stopping datafeeds

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -17,6 +18,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction.Response.DatafeedStats;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
@ -25,6 +27,7 @@ import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
@ -95,7 +98,21 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
});
}
private void run(String jobId, CheckedRunnable<Exception> disrupt) throws Exception {
public void testCloseUnassignedJobAndDatafeed() throws Exception {
internalCluster().ensureAtMostNumDataNodes(0);
logger.info("Starting dedicated master node...");
internalCluster().startNode(Settings.builder()
.put("node.master", true)
.put("node.data", true)
.put("node.ml", false)
.build());
logger.info("Starting ml and data node...");
internalCluster().startNode(Settings.builder()
.put("node.master", false)
.build());
ensureStableClusterOnAllNodes(2);
// index some datafeed data
client().admin().indices().prepareCreate("data")
.addMapping("type", "time", "type=date")
.get();
@ -105,12 +122,58 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
long twoWeeksAgo = weekAgo - 604800000;
indexDocs(logger, "data", numDocs1, twoWeeksAgo, weekAgo);
String jobId = "test-lose-ml-node";
String datafeedId = jobId + "-datafeed";
setupJobAndDatafeed(jobId, datafeedId);
waitForDatafeed(jobId, numDocs1);
// stop the only ML node
internalCluster().stopRandomNonMasterNode();
// Job state is opened but the job is not assigned to a node (because we just killed the only ML node)
GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(jobId);
GetJobsStatsAction.Response jobStatsResponse = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet();
assertEquals(jobStatsResponse.getResponse().results().get(0).getState(), JobState.OPENED);
GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId);
GetDatafeedsStatsAction.Response datafeedStatsResponse =
client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet();
assertEquals(datafeedStatsResponse.getResponse().results().get(0).getDatafeedState(), DatafeedState.STARTED);
// Can't normal stop an unassigned datafeed
StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId);
ElasticsearchStatusException statusException = expectThrows(ElasticsearchStatusException.class,
() -> client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet());
assertEquals("Cannot stop datafeed [" + datafeedId +
"] because the datafeed does not have an assigned node. Use force stop to stop the datafeed",
statusException.getMessage());
// Can only force stop an unassigned datafeed
stopDatafeedRequest.setForce(true);
StopDatafeedAction.Response stopDatafeedResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet();
assertTrue(stopDatafeedResponse.isStopped());
// Can't normal stop an unassigned job
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId);
statusException = expectThrows(ElasticsearchStatusException.class,
() -> client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet());
assertEquals("Cannot close job [" + jobId +
"] because the job does not have an assigned node. Use force close to close the job",
statusException.getMessage());
// Can only force close an unassigned job
closeJobRequest.setForce(true);
CloseJobAction.Response closeJobResponse = client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet();
assertTrue(closeJobResponse.isClosed());
}
private void setupJobAndDatafeed(String jobId, String datafeedId) throws Exception {
Job.Builder job = createScheduledJob(jobId);
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
DatafeedConfig config = createDatafeed("data_feed_id", job.getId(), Collections.singletonList("data"));
DatafeedConfig config = createDatafeed(datafeedId, job.getId(), Collections.singletonList("data"));
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config);
PutDatafeedAction.Response putDatadeedResponse = client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest)
.actionGet();
@ -125,11 +188,21 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(config.getId(), 0L);
client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get();
assertBusy(() -> {
DataCounts dataCounts = getDataCountsFromIndex(job.getId());
assertEquals(numDocs1, dataCounts.getProcessedRecordCount());
assertEquals(0L, dataCounts.getOutOfOrderTimeStampCount());
});
}
private void run(String jobId, CheckedRunnable<Exception> disrupt) throws Exception {
client().admin().indices().prepareCreate("data")
.addMapping("type", "time", "type=date")
.get();
long numDocs1 = randomIntBetween(32, 2048);
long now = System.currentTimeMillis();
long weekAgo = now - 604800000;
long twoWeeksAgo = weekAgo - 604800000;
indexDocs(logger, "data", numDocs1, twoWeeksAgo, weekAgo);
setupJobAndDatafeed(jobId, "data_feed_id");
waitForDatafeed(jobId, numDocs1);
client().admin().indices().prepareSyncedFlush().get();
disrupt.run();
@ -158,11 +231,7 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
long numDocs2 = randomIntBetween(2, 64);
long now2 = System.currentTimeMillis();
indexDocs(logger, "data", numDocs2, now2 + 5000, now2 + 6000);
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertEquals(numDocs1 + numDocs2, dataCounts.getProcessedRecordCount());
assertEquals(0L, dataCounts.getOutOfOrderTimeStampCount());
}, 30, TimeUnit.SECONDS);
waitForDatafeed(jobId, numDocs1 + numDocs2);
}
// Get datacounts from index instead of via job stats api,
@ -185,6 +254,14 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
}
}
private void waitForDatafeed(String jobId, long numDocs) throws Exception {
assertBusy(() -> {
DataCounts dataCounts = getDataCountsFromIndex(jobId);
assertEquals(numDocs, dataCounts.getProcessedRecordCount());
assertEquals(0L, dataCounts.getOutOfOrderTimeStampCount());
}, 30, TimeUnit.SECONDS);
}
private void ensureStableClusterOnAllNodes(int nodeCount) {
for (String nodeName : internalCluster().getNodeNames()) {
ensureStableCluster(nodeCount, nodeName);