[ML] Write header to autodetect before it is visible to other calls (#41085)

This commit is contained in:
David Kyle 2019-04-16 13:43:00 +01:00
parent 8577bbd73b
commit 116167df55
2 changed files with 16 additions and 5 deletions

View File

@ -90,9 +90,8 @@ public class AutodetectCommunicator implements Closeable {
&& job.getAnalysisConfig().getCategorizationFieldName() != null;
}
public void init(ModelSnapshot modelSnapshot) throws IOException {
public void restoreState(ModelSnapshot modelSnapshot) {
autodetectProcess.restoreState(stateStreamer, modelSnapshot);
createProcessWriter(Optional.empty()).writeHeader();
}
private DataToProcessWriter createProcessWriter(Optional<DataDescription> dataDescription) {
@ -101,6 +100,17 @@ public class AutodetectCommunicator implements Closeable {
dataCountsReporter, xContentRegistry);
}
/**
* This must be called once before {@link #writeToJob(InputStream, AnalysisRegistry, XContentType, DataLoadParams, BiConsumer)}
* can be used
*/
public void writeHeader() throws IOException {
createProcessWriter(Optional.empty()).writeHeader();
}
/**
* Call {@link #writeHeader()} exactly once before using this method
*/
public void writeToJob(InputStream inputStream, AnalysisRegistry analysisRegistry, XContentType xContentType,
DataLoadParams params, BiConsumer<DataCounts, Exception> handler) {
submitOperation(() -> {

View File

@ -458,7 +458,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
try {
createProcessAndSetRunning(processContext, job, params, closeHandler);
processContext.getAutodetectCommunicator().init(params.modelSnapshot());
processContext.getAutodetectCommunicator().restoreState(params.modelSnapshot());
setJobState(jobTask, JobState.OPENED);
} catch (Exception e1) {
// No need to log here as the persistent task framework will log it
@ -499,7 +499,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
private void createProcessAndSetRunning(ProcessContext processContext,
Job job,
AutodetectParams params,
BiConsumer<Exception, Boolean> handler) {
BiConsumer<Exception, Boolean> handler) throws IOException {
// At this point we lock the process context until the process has been started.
// The reason behind this is to ensure closing the job does not happen before
// the process is started as that can result to the job getting seemingly closed
@ -507,6 +507,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
processContext.tryLock();
try {
AutodetectCommunicator communicator = create(processContext.getJobTask(), job, params, handler);
communicator.writeHeader();
processContext.setRunning(communicator);
} finally {
// Now that the process is running and we have updated its state we can unlock.
@ -639,7 +640,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
processContext.tryLock();
try {
if (processContext.setDying() == false) {
logger.debug("Cannot close job [{}] as it has already been closed", jobId);
logger.debug("Cannot close job [{}] as it has been marked as dying", jobId);
return;
}