From 07ab0beea0429ba989dcce57e0d029aef1ff7a02 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 2 Sep 2020 16:32:45 +0300 Subject: [PATCH] [7.x][ML] Improve handling of exception while starting DFA process (#61838) (#61847) While starting the data frame analytics process it is possible to get an exception before the process crash handler is in place. In addition, right after starting the process, we check the process is alive to ensure we capture a failed process. However, those exceptions are unhandled. This commit catches any exception thrown while starting the process and sets the task to failed with the root cause error message. I have also taken the chance to remove some unused parameters in `NativeAnalyticsProcessFactory`. Relates #61704 Backport of #61838 --- .../process/AnalyticsProcessManager.java | 12 +++++++++++- .../process/NativeAnalyticsProcessFactory.java | 10 +++++----- .../process/AnalyticsProcessManagerTests.java | 15 +++++++++++++++ 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index 97cd053b13f..793f5c4ea88 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -141,7 +141,17 @@ public class AnalyticsProcessManager { // Fetch existing model state (if any) BytesReference state = getModelState(config); - if (processContext.startProcess(dataExtractorFactory, task, state)) { + boolean isProcessStarted; + try { + isProcessStarted = processContext.startProcess(dataExtractorFactory, task, state); + } catch (Exception e) { + processContext.stop(); + task.setFailed(processContext.getFailureReason() == null ? + e : ExceptionsHelper.serverError(processContext.getFailureReason())); + return; + } + + if (isProcessStarted) { executorServiceForProcess.execute(() -> processContext.resultProcessor.get().process(processContext.process.get())); executorServiceForProcess.execute(() -> processData(task, processContext, state)); } else { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java index 2d1e4fc01ce..115516e3030 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java @@ -80,14 +80,14 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory filesToDelete, - ProcessPipes processPipes, ExecutorService executorService) { + ProcessPipes processPipes) { AnalyticsBuilder analyticsBuilder = new AnalyticsBuilder(env::tmpFile, nativeController, processPipes, analyticsProcessConfig, filesToDelete); try { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java index 7a720dd8b5f..f5964ec0260 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java @@ -45,6 +45,7 @@ import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -190,6 +191,20 @@ public class AnalyticsProcessManagerTests extends ESTestCase { verifyNoMoreInteractions(dataExtractor, executorServiceForProcess, process, task); } + public void testRunJob_ProcessNotAliveAfterStart() { + when(process.isProcessAlive()).thenReturn(false); + when(task.getParams()).thenReturn( + new StartDataFrameAnalyticsAction.TaskParams("data_frame_id", Version.CURRENT, Collections.emptyList(), false)); + + processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory); + assertThat(processManager.getProcessContextCount(), equalTo(1)); + + ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(Exception.class); + verify(task).setFailed(errorCaptor.capture()); + + assertThat(errorCaptor.getValue().getMessage(), equalTo("Failed to start data frame analytics process")); + } + public void testProcessContext_GetSetFailureReason() { AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(dataFrameAnalyticsConfig); assertThat(processContext.getFailureReason(), is(nullValue()));