[7.x][ML] Improve handling of exception while starting DFA process (#61838) (#61847)

While starting the data frame analytics process it is possible
to get an exception before the process crash handler is in place.
In addition, right after starting the process, we check the process
is alive to ensure we capture a failed process. However, those exceptions
are unhandled.

This commit catches any exception thrown while starting the process
and sets the task to failed with the root cause error message.

I have also taken the chance to remove some unused parameters
in `NativeAnalyticsProcessFactory`.

Relates #61704

Backport of #61838
This commit is contained in:
Dimitris Athanasiou 2020-09-02 16:32:45 +03:00 committed by GitHub
parent e6dc8054a5
commit 07ab0beea0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 6 deletions

View File

@ -141,7 +141,17 @@ public class AnalyticsProcessManager {
// Fetch existing model state (if any) // Fetch existing model state (if any)
BytesReference state = getModelState(config); BytesReference state = getModelState(config);
if (processContext.startProcess(dataExtractorFactory, task, state)) { boolean isProcessStarted;
try {
isProcessStarted = processContext.startProcess(dataExtractorFactory, task, state);
} catch (Exception e) {
processContext.stop();
task.setFailed(processContext.getFailureReason() == null ?
e : ExceptionsHelper.serverError(processContext.getFailureReason()));
return;
}
if (isProcessStarted) {
executorServiceForProcess.execute(() -> processContext.resultProcessor.get().process(processContext.process.get())); executorServiceForProcess.execute(() -> processContext.resultProcessor.get().process(processContext.process.get()));
executorServiceForProcess.execute(() -> processData(task, processContext, state)); executorServiceForProcess.execute(() -> processData(task, processContext, state));
} else { } else {

View File

@ -80,14 +80,14 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<An
// The extra 2 are for the checksum and the control field // The extra 2 are for the checksum and the control field
int numberOfFields = analyticsProcessConfig.cols() + 2; int numberOfFields = analyticsProcessConfig.cols() + 2;
createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes, executorService); createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes);
NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, processPipes, NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, processPipes,
numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout, numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout,
analyticsProcessConfig, namedXContentRegistry); analyticsProcessConfig, namedXContentRegistry);
try { try {
startProcess(config, executorService, processPipes, analyticsProcess); startProcess(config, executorService, analyticsProcess);
return analyticsProcess; return analyticsProcess;
} catch (IOException | EsRejectedExecutionException e) { } catch (IOException | EsRejectedExecutionException e) {
String msg = "Failed to connect to data frame analytics process for job " + jobId; String msg = "Failed to connect to data frame analytics process for job " + jobId;
@ -101,7 +101,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<An
} }
} }
private void startProcess(DataFrameAnalyticsConfig config, ExecutorService executorService, ProcessPipes processPipes, private void startProcess(DataFrameAnalyticsConfig config, ExecutorService executorService,
NativeAnalyticsProcess process) throws IOException { NativeAnalyticsProcess process) throws IOException {
if (config.getAnalysis().persistsState()) { if (config.getAnalysis().persistsState()) {
IndexingStateProcessor stateProcessor = new IndexingStateProcessor(config.getId(), resultsPersisterService, auditor); IndexingStateProcessor stateProcessor = new IndexingStateProcessor(config.getId(), resultsPersisterService, auditor);
@ -112,7 +112,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<An
} }
private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, List<Path> filesToDelete, private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, List<Path> filesToDelete,
ProcessPipes processPipes, ExecutorService executorService) { ProcessPipes processPipes) {
AnalyticsBuilder analyticsBuilder = AnalyticsBuilder analyticsBuilder =
new AnalyticsBuilder(env::tmpFile, nativeController, processPipes, analyticsProcessConfig, filesToDelete); new AnalyticsBuilder(env::tmpFile, nativeController, processPipes, analyticsProcessConfig, filesToDelete);
try { try {

View File

@ -45,6 +45,7 @@ import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -190,6 +191,20 @@ public class AnalyticsProcessManagerTests extends ESTestCase {
verifyNoMoreInteractions(dataExtractor, executorServiceForProcess, process, task); verifyNoMoreInteractions(dataExtractor, executorServiceForProcess, process, task);
} }
public void testRunJob_ProcessNotAliveAfterStart() {
when(process.isProcessAlive()).thenReturn(false);
when(task.getParams()).thenReturn(
new StartDataFrameAnalyticsAction.TaskParams("data_frame_id", Version.CURRENT, Collections.emptyList(), false));
processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory);
assertThat(processManager.getProcessContextCount(), equalTo(1));
ArgumentCaptor<Exception> errorCaptor = ArgumentCaptor.forClass(Exception.class);
verify(task).setFailed(errorCaptor.capture());
assertThat(errorCaptor.getValue().getMessage(), equalTo("Failed to start data frame analytics process"));
}
public void testProcessContext_GetSetFailureReason() { public void testProcessContext_GetSetFailureReason() {
AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(dataFrameAnalyticsConfig); AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(dataFrameAnalyticsConfig);
assertThat(processContext.getFailureReason(), is(nullValue())); assertThat(processContext.getFailureReason(), is(nullValue()));