From fbe9e8a53062d250b1f66d29c7d18ebfafe9a153 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Witek?= Date: Thu, 29 Aug 2019 10:47:17 +0200 Subject: [PATCH] Do not throw an exception if the process finished quickly but without any error. (#46073) (#46113) --- .../MemoryUsageEstimationProcessManager.java | 40 +++-------- ...oryUsageEstimationProcessManagerTests.java | 67 ++++++++++++------- 2 files changed, 53 insertions(+), 54 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java index 00d8c15e418..6512dc075d7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java @@ -17,12 +17,10 @@ import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor; import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory; import org.elasticsearch.xpack.ml.dataframe.process.results.MemoryUsageEstimationResult; -import java.io.IOException; import java.util.Iterator; import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.function.Consumer; public class MemoryUsageEstimationProcessManager { @@ -74,24 +72,21 @@ public class MemoryUsageEstimationProcessManager { "", categoricalFields, config.getAnalysis()); - ProcessHolder processHolder = new ProcessHolder(); AnalyticsProcess process = processFactory.createAnalyticsProcess( jobId, processConfig, executorServiceForProcess, - onProcessCrash(jobId, processHolder)); - processHolder.process = process; - if (process.isProcessAlive() == false) { - String errorMsg = - new ParameterizedMessage("[{}] Error while starting process: {}", jobId, process.readError()).getFormattedMessage(); - throw ExceptionsHelper.serverError(errorMsg); - } + // The handler passed here will never be called as AbstractNativeProcess.detectCrash method returns early when + // (processInStream == null) which is the case for MemoryUsageEstimationProcess. + reason -> {}); try { return readResult(jobId, process); } catch (Exception e) { String errorMsg = - new ParameterizedMessage("[{}] Error while processing result [{}]", jobId, e.getMessage()).getFormattedMessage(); + new ParameterizedMessage( + "[{}] Error while processing process output [{}], process errors: [{}]", + jobId, e.getMessage(), process.readError()).getFormattedMessage(); throw ExceptionsHelper.serverError(errorMsg, e); } finally { process.consumeAndCloseOutputStream(); @@ -101,31 +96,14 @@ public class MemoryUsageEstimationProcessManager { LOGGER.info("[{}] Closed process", jobId); } catch (Exception e) { String errorMsg = - new ParameterizedMessage("[{}] Error while closing process [{}]", jobId, e.getMessage()).getFormattedMessage(); + new ParameterizedMessage( + "[{}] Error while closing process [{}], process errors: [{}]", + jobId, e.getMessage(), process.readError()).getFormattedMessage(); throw ExceptionsHelper.serverError(errorMsg, e); } } } - private static class ProcessHolder { - volatile AnalyticsProcess process; - } - - private static Consumer onProcessCrash(String jobId, ProcessHolder processHolder) { - return reason -> { - AnalyticsProcess process = processHolder.process; - if (process == null) { - LOGGER.error(new ParameterizedMessage("[{}] Process does not exist", jobId)); - return; - } - try { - process.kill(); - } catch (IOException e) { - LOGGER.error(new ParameterizedMessage("[{}] Failed to kill process", jobId), e); - } - }; - } - /** * Extracts {@link MemoryUsageEstimationResult} from process' output. */ diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java index 5a647c8178b..9790e0618da 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.dataframe.process; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -65,7 +66,6 @@ public class MemoryUsageEstimationProcessManagerTests extends ESTestCase { executorServiceForJob = EsExecutors.newDirectExecutorService(); executorServiceForProcess = mock(ExecutorService.class); process = mock(AnalyticsProcess.class); - when(process.isProcessAlive()).thenReturn(true); when(process.readAnalyticsResults()).thenReturn(Arrays.asList(PROCESS_RESULT).iterator()); processFactory = mock(AnalyticsProcessFactory.class); when(processFactory.createAnalyticsProcess(anyString(), any(), any(), any())).thenReturn(process); @@ -93,24 +93,6 @@ public class MemoryUsageEstimationProcessManagerTests extends ESTestCase { verifyNoMoreInteractions(process, listener); } - public void testRunJob_ProcessNotAlive() { - when(process.isProcessAlive()).thenReturn(false); - when(process.readError()).thenReturn("Error from inside the process"); - - processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener); - - verify(listener).onFailure(exceptionCaptor.capture()); - ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue(); - assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR)); - assertThat(exception.getMessage(), containsString(TASK_ID)); - assertThat(exception.getMessage(), containsString("Error while starting process")); - assertThat(exception.getMessage(), containsString("Error from inside the process")); - - verify(process).isProcessAlive(); - verify(process).readError(); - verifyNoMoreInteractions(process, listener); - } - public void testRunJob_NoResults() throws Exception { when(process.readAnalyticsResults()).thenReturn(Arrays.asList().iterator()); @@ -123,8 +105,8 @@ public class MemoryUsageEstimationProcessManagerTests extends ESTestCase { assertThat(exception.getMessage(), containsString("no results")); InOrder inOrder = inOrder(process); - inOrder.verify(process).isProcessAlive(); inOrder.verify(process).readAnalyticsResults(); + inOrder.verify(process).readError(); inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).close(); verifyNoMoreInteractions(process, listener); @@ -142,12 +124,30 @@ public class MemoryUsageEstimationProcessManagerTests extends ESTestCase { assertThat(exception.getMessage(), containsString("more than one result")); InOrder inOrder = inOrder(process); - inOrder.verify(process).isProcessAlive(); inOrder.verify(process).readAnalyticsResults(); + inOrder.verify(process).readError(); inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).close(); verifyNoMoreInteractions(process, listener); + } + public void testRunJob_OneResult_ParseException() throws Exception { + when(process.readAnalyticsResults()).thenThrow(new ElasticsearchParseException("cannot parse result")); + + processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener); + + verify(listener).onFailure(exceptionCaptor.capture()); + ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue(); + assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + assertThat(exception.getMessage(), containsString(TASK_ID)); + assertThat(exception.getMessage(), containsString("cannot parse result")); + + InOrder inOrder = inOrder(process); + inOrder.verify(process).readAnalyticsResults(); + inOrder.verify(process).readError(); + inOrder.verify(process).consumeAndCloseOutputStream(); + inOrder.verify(process).close(); + verifyNoMoreInteractions(process, listener); } public void testRunJob_FailsOnClose() throws Exception { @@ -162,10 +162,32 @@ public class MemoryUsageEstimationProcessManagerTests extends ESTestCase { assertThat(exception.getMessage(), containsString("Error while closing process")); InOrder inOrder = inOrder(process); - inOrder.verify(process).isProcessAlive(); inOrder.verify(process).readAnalyticsResults(); inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).close(); + inOrder.verify(process).readError(); + verifyNoMoreInteractions(process, listener); + } + + public void testRunJob_FailsOnClose_ProcessReportsError() throws Exception { + doThrow(ExceptionsHelper.serverError("some LOG(ERROR) lines coming from cpp process")).when(process).close(); + when(process.readError()).thenReturn("Error from inside the process"); + + processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener); + + verify(listener).onFailure(exceptionCaptor.capture()); + ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue(); + assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + assertThat(exception.getMessage(), containsString(TASK_ID)); + assertThat(exception.getMessage(), containsString("Error while closing process")); + assertThat(exception.getMessage(), containsString("some LOG(ERROR) lines coming from cpp process")); + assertThat(exception.getMessage(), containsString("Error from inside the process")); + + InOrder inOrder = inOrder(process); + inOrder.verify(process).readAnalyticsResults(); + inOrder.verify(process).consumeAndCloseOutputStream(); + inOrder.verify(process).close(); + inOrder.verify(process).readError(); verifyNoMoreInteractions(process, listener); } @@ -177,7 +199,6 @@ public class MemoryUsageEstimationProcessManagerTests extends ESTestCase { assertThat(result, equalTo(PROCESS_RESULT)); InOrder inOrder = inOrder(process); - inOrder.verify(process).isProcessAlive(); inOrder.verify(process).readAnalyticsResults(); inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).close();