Do not throw an exception if the process finished quickly but without any error. (#46073) (#46113)

This commit is contained in:
Przemysław Witek 2019-08-29 10:47:17 +02:00 committed by GitHub
parent 3666bcfbd8
commit fbe9e8a530
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 54 deletions

View File

@ -17,12 +17,10 @@ import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory; import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
import org.elasticsearch.xpack.ml.dataframe.process.results.MemoryUsageEstimationResult; import org.elasticsearch.xpack.ml.dataframe.process.results.MemoryUsageEstimationResult;
import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
public class MemoryUsageEstimationProcessManager { public class MemoryUsageEstimationProcessManager {
@ -74,24 +72,21 @@ public class MemoryUsageEstimationProcessManager {
"", "",
categoricalFields, categoricalFields,
config.getAnalysis()); config.getAnalysis());
ProcessHolder processHolder = new ProcessHolder();
AnalyticsProcess<MemoryUsageEstimationResult> process = AnalyticsProcess<MemoryUsageEstimationResult> process =
processFactory.createAnalyticsProcess( processFactory.createAnalyticsProcess(
jobId, jobId,
processConfig, processConfig,
executorServiceForProcess, executorServiceForProcess,
onProcessCrash(jobId, processHolder)); // The handler passed here will never be called as AbstractNativeProcess.detectCrash method returns early when
processHolder.process = process; // (processInStream == null) which is the case for MemoryUsageEstimationProcess.
if (process.isProcessAlive() == false) { reason -> {});
String errorMsg =
new ParameterizedMessage("[{}] Error while starting process: {}", jobId, process.readError()).getFormattedMessage();
throw ExceptionsHelper.serverError(errorMsg);
}
try { try {
return readResult(jobId, process); return readResult(jobId, process);
} catch (Exception e) { } catch (Exception e) {
String errorMsg = String errorMsg =
new ParameterizedMessage("[{}] Error while processing result [{}]", jobId, e.getMessage()).getFormattedMessage(); new ParameterizedMessage(
"[{}] Error while processing process output [{}], process errors: [{}]",
jobId, e.getMessage(), process.readError()).getFormattedMessage();
throw ExceptionsHelper.serverError(errorMsg, e); throw ExceptionsHelper.serverError(errorMsg, e);
} finally { } finally {
process.consumeAndCloseOutputStream(); process.consumeAndCloseOutputStream();
@ -101,31 +96,14 @@ public class MemoryUsageEstimationProcessManager {
LOGGER.info("[{}] Closed process", jobId); LOGGER.info("[{}] Closed process", jobId);
} catch (Exception e) { } catch (Exception e) {
String errorMsg = String errorMsg =
new ParameterizedMessage("[{}] Error while closing process [{}]", jobId, e.getMessage()).getFormattedMessage(); new ParameterizedMessage(
"[{}] Error while closing process [{}], process errors: [{}]",
jobId, e.getMessage(), process.readError()).getFormattedMessage();
throw ExceptionsHelper.serverError(errorMsg, e); throw ExceptionsHelper.serverError(errorMsg, e);
} }
} }
} }
private static class ProcessHolder {
volatile AnalyticsProcess<MemoryUsageEstimationResult> process;
}
private static Consumer<String> onProcessCrash(String jobId, ProcessHolder processHolder) {
return reason -> {
AnalyticsProcess<MemoryUsageEstimationResult> process = processHolder.process;
if (process == null) {
LOGGER.error(new ParameterizedMessage("[{}] Process does not exist", jobId));
return;
}
try {
process.kill();
} catch (IOException e) {
LOGGER.error(new ParameterizedMessage("[{}] Failed to kill process", jobId), e);
}
};
}
/** /**
* Extracts {@link MemoryUsageEstimationResult} from process' output. * Extracts {@link MemoryUsageEstimationResult} from process' output.
*/ */

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.dataframe.process; package org.elasticsearch.xpack.ml.dataframe.process;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
@ -65,7 +66,6 @@ public class MemoryUsageEstimationProcessManagerTests extends ESTestCase {
executorServiceForJob = EsExecutors.newDirectExecutorService(); executorServiceForJob = EsExecutors.newDirectExecutorService();
executorServiceForProcess = mock(ExecutorService.class); executorServiceForProcess = mock(ExecutorService.class);
process = mock(AnalyticsProcess.class); process = mock(AnalyticsProcess.class);
when(process.isProcessAlive()).thenReturn(true);
when(process.readAnalyticsResults()).thenReturn(Arrays.asList(PROCESS_RESULT).iterator()); when(process.readAnalyticsResults()).thenReturn(Arrays.asList(PROCESS_RESULT).iterator());
processFactory = mock(AnalyticsProcessFactory.class); processFactory = mock(AnalyticsProcessFactory.class);
when(processFactory.createAnalyticsProcess(anyString(), any(), any(), any())).thenReturn(process); when(processFactory.createAnalyticsProcess(anyString(), any(), any(), any())).thenReturn(process);
@ -93,24 +93,6 @@ public class MemoryUsageEstimationProcessManagerTests extends ESTestCase {
verifyNoMoreInteractions(process, listener); verifyNoMoreInteractions(process, listener);
} }
public void testRunJob_ProcessNotAlive() {
when(process.isProcessAlive()).thenReturn(false);
when(process.readError()).thenReturn("Error from inside the process");
processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener);
verify(listener).onFailure(exceptionCaptor.capture());
ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue();
assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
assertThat(exception.getMessage(), containsString(TASK_ID));
assertThat(exception.getMessage(), containsString("Error while starting process"));
assertThat(exception.getMessage(), containsString("Error from inside the process"));
verify(process).isProcessAlive();
verify(process).readError();
verifyNoMoreInteractions(process, listener);
}
public void testRunJob_NoResults() throws Exception { public void testRunJob_NoResults() throws Exception {
when(process.readAnalyticsResults()).thenReturn(Arrays.<MemoryUsageEstimationResult>asList().iterator()); when(process.readAnalyticsResults()).thenReturn(Arrays.<MemoryUsageEstimationResult>asList().iterator());
@ -123,8 +105,8 @@ public class MemoryUsageEstimationProcessManagerTests extends ESTestCase {
assertThat(exception.getMessage(), containsString("no results")); assertThat(exception.getMessage(), containsString("no results"));
InOrder inOrder = inOrder(process); InOrder inOrder = inOrder(process);
inOrder.verify(process).isProcessAlive();
inOrder.verify(process).readAnalyticsResults(); inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).readError();
inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close(); inOrder.verify(process).close();
verifyNoMoreInteractions(process, listener); verifyNoMoreInteractions(process, listener);
@ -142,12 +124,30 @@ public class MemoryUsageEstimationProcessManagerTests extends ESTestCase {
assertThat(exception.getMessage(), containsString("more than one result")); assertThat(exception.getMessage(), containsString("more than one result"));
InOrder inOrder = inOrder(process); InOrder inOrder = inOrder(process);
inOrder.verify(process).isProcessAlive();
inOrder.verify(process).readAnalyticsResults(); inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).readError();
inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close(); inOrder.verify(process).close();
verifyNoMoreInteractions(process, listener); verifyNoMoreInteractions(process, listener);
}
public void testRunJob_OneResult_ParseException() throws Exception {
when(process.readAnalyticsResults()).thenThrow(new ElasticsearchParseException("cannot parse result"));
processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener);
verify(listener).onFailure(exceptionCaptor.capture());
ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue();
assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
assertThat(exception.getMessage(), containsString(TASK_ID));
assertThat(exception.getMessage(), containsString("cannot parse result"));
InOrder inOrder = inOrder(process);
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).readError();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
verifyNoMoreInteractions(process, listener);
} }
public void testRunJob_FailsOnClose() throws Exception { public void testRunJob_FailsOnClose() throws Exception {
@ -162,10 +162,32 @@ public class MemoryUsageEstimationProcessManagerTests extends ESTestCase {
assertThat(exception.getMessage(), containsString("Error while closing process")); assertThat(exception.getMessage(), containsString("Error while closing process"));
InOrder inOrder = inOrder(process); InOrder inOrder = inOrder(process);
inOrder.verify(process).isProcessAlive();
inOrder.verify(process).readAnalyticsResults(); inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close(); inOrder.verify(process).close();
inOrder.verify(process).readError();
verifyNoMoreInteractions(process, listener);
}
public void testRunJob_FailsOnClose_ProcessReportsError() throws Exception {
doThrow(ExceptionsHelper.serverError("some LOG(ERROR) lines coming from cpp process")).when(process).close();
when(process.readError()).thenReturn("Error from inside the process");
processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener);
verify(listener).onFailure(exceptionCaptor.capture());
ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue();
assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
assertThat(exception.getMessage(), containsString(TASK_ID));
assertThat(exception.getMessage(), containsString("Error while closing process"));
assertThat(exception.getMessage(), containsString("some LOG(ERROR) lines coming from cpp process"));
assertThat(exception.getMessage(), containsString("Error from inside the process"));
InOrder inOrder = inOrder(process);
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
inOrder.verify(process).readError();
verifyNoMoreInteractions(process, listener); verifyNoMoreInteractions(process, listener);
} }
@ -177,7 +199,6 @@ public class MemoryUsageEstimationProcessManagerTests extends ESTestCase {
assertThat(result, equalTo(PROCESS_RESULT)); assertThat(result, equalTo(PROCESS_RESULT));
InOrder inOrder = inOrder(process); InOrder inOrder = inOrder(process);
inOrder.verify(process).isProcessAlive();
inOrder.verify(process).readAnalyticsResults(); inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close(); inOrder.verify(process).close();