[ML] Check datafeed can be assigned before creating task (elastic/x-pack-elasticsearch#1027)
When a datafeed task is created but it cannot be assigned the task has a null status. This means _stats report it as stopped, however deleting it fails. In addition, it's a better experience to error the start datafeed request all together and give the user the chance to fix his data indices. This change fails a datafeed-start if it cannot be assigned. relates elastic/x-pack-elasticsearch#1018 Original commit: elastic/x-pack-elasticsearch@532288fda0
This commit is contained in:
parent
105ba131e0
commit
0b6ac175da
|
@ -6,6 +6,7 @@
|
||||||
package org.elasticsearch.xpack.ml.action;
|
package org.elasticsearch.xpack.ml.action;
|
||||||
|
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.ElasticsearchStatusException;
|
import org.elasticsearch.ElasticsearchStatusException;
|
||||||
import org.elasticsearch.action.Action;
|
import org.elasticsearch.action.Action;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
@ -336,6 +337,12 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
||||||
public void onFailure(Exception e) {
|
public void onFailure(Exception e) {
|
||||||
listener.onFailure(e);
|
listener.onFailure(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTimeout(TimeValue timeout) {
|
||||||
|
listener.onFailure(new ElasticsearchException("Opening job ["
|
||||||
|
+ request.getJobId() + "] timed out after [" + timeout + "]"));
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
package org.elasticsearch.xpack.ml.action;
|
package org.elasticsearch.xpack.ml.action;
|
||||||
|
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.ElasticsearchParseException;
|
import org.elasticsearch.ElasticsearchParseException;
|
||||||
import org.elasticsearch.action.Action;
|
import org.elasticsearch.action.Action;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
@ -392,6 +393,12 @@ public class StartDatafeedAction
|
||||||
public void onFailure(Exception e) {
|
public void onFailure(Exception e) {
|
||||||
listener.onFailure(e);
|
listener.onFailure(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTimeout(TimeValue timeout) {
|
||||||
|
listener.onFailure(new ElasticsearchException("Starting datafeed ["
|
||||||
|
+ request.getDatafeedId() + "] timed out after [" + timeout + "]"));
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -426,6 +433,14 @@ public class StartDatafeedAction
|
||||||
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
|
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
|
||||||
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||||
StartDatafeedAction.validate(request.getDatafeedId(), mlMetadata, tasks);
|
StartDatafeedAction.validate(request.getDatafeedId(), mlMetadata, tasks);
|
||||||
|
Assignment assignment = selectNode(logger, request.getDatafeedId(), clusterState, resolver);
|
||||||
|
if (assignment.getExecutorNode() == null) {
|
||||||
|
DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId());
|
||||||
|
String msg = "No node found to start datafeed [" + request.getDatafeedId()
|
||||||
|
+ "], allocation explanation [" + assignment.getExplanation() + "]";
|
||||||
|
logger.warn("[{}] {}", datafeed.getJobId(), msg);
|
||||||
|
throw new ElasticsearchException(msg);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
throw LicenseUtils.newComplianceException(XPackPlugin.MACHINE_LEARNING);
|
throw LicenseUtils.newComplianceException(XPackPlugin.MACHINE_LEARNING);
|
||||||
}
|
}
|
||||||
|
@ -463,7 +478,7 @@ public class StartDatafeedAction
|
||||||
if (assignment.getExecutorNode() == null) {
|
if (assignment.getExecutorNode() == null) {
|
||||||
String msg = "No node found to start datafeed [" + datafeedId +"]. Reasons [" +
|
String msg = "No node found to start datafeed [" + datafeedId +"]. Reasons [" +
|
||||||
assignment.getExplanation() + "]";
|
assignment.getExplanation() + "]";
|
||||||
logger.warn(msg);
|
logger.warn("[{}] {}", datafeed.getJobId(), msg);
|
||||||
auditor.warning(jobId, msg);
|
auditor.warning(jobId, msg);
|
||||||
} else {
|
} else {
|
||||||
DiscoveryNode node = state.nodes().get(assignment.getExecutorNode());
|
DiscoveryNode node = state.nodes().get(assignment.getExecutorNode());
|
||||||
|
|
|
@ -191,3 +191,22 @@ setup:
|
||||||
datafeed_id: "datafeed-1"
|
datafeed_id: "datafeed-1"
|
||||||
start: "2017-02-01T01:00:00Z"
|
start: "2017-02-01T01:00:00Z"
|
||||||
end: "2017-02-01T01:00:00Z"
|
end: "2017-02-01T01:00:00Z"
|
||||||
|
|
||||||
|
---
|
||||||
|
"Test start given datafeed index does not exist":
|
||||||
|
- do:
|
||||||
|
xpack.ml.update_datafeed:
|
||||||
|
datafeed_id: datafeed-1
|
||||||
|
body: >
|
||||||
|
{
|
||||||
|
"indexes":["utopia"]
|
||||||
|
}
|
||||||
|
|
||||||
|
- do:
|
||||||
|
xpack.ml.open_job:
|
||||||
|
job_id: "datafeed-job"
|
||||||
|
|
||||||
|
- do:
|
||||||
|
catch: /No node found to start datafeed \[datafeed-1\].*\[utopia\] does not exist.*/
|
||||||
|
xpack.ml.start_datafeed:
|
||||||
|
datafeed_id: "datafeed-1"
|
||||||
|
|
Loading…
Reference in New Issue