From 9b116c8fefb94905112f7a314c146e5e5117807a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Witek?= Date: Wed, 11 Dec 2019 09:35:05 +0100 Subject: [PATCH] A few improvements to AnalyticsProcessManager class that make the code more readable. (#50026) (#50069) --- .../dataframe/DataFrameAnalyticsManager.java | 10 +- .../process/AnalyticsProcessManager.java | 150 +++++++++--------- .../process/AnalyticsProcessManagerTests.java | 98 +++++++----- 3 files changed, 135 insertions(+), 123 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index 76fc5880279..8e89113be7e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -233,15 +233,7 @@ public class DataFrameAnalyticsManager { DataFrameAnalyticsTaskState analyzingState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.ANALYZING, task.getAllocationId(), null); task.updatePersistentTaskState(analyzingState, ActionListener.wrap( - updatedTask -> processManager.runJob(task, config, dataExtractorFactory, - error -> { - if (error != null) { - task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()); - } else { - auditor.info(config.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS); - task.markAsCompleted(); - } - }), + updatedTask -> processManager.runJob(task, config, dataExtractorFactory), error -> { if (ExceptionsHelper.unwrapCause(error) instanceof ResourceNotFoundException) { // Task has stopped diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index 815d8478a52..ce981ad17a9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.dataframe.process; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.search.SearchResponse; @@ -90,19 +91,19 @@ public class AnalyticsProcessManager { this.trainedModelProvider = Objects.requireNonNull(trainedModelProvider); } - public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractorFactory dataExtractorFactory, - Consumer finishHandler) { + public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractorFactory dataExtractorFactory) { executorServiceForJob.execute(() -> { - ProcessContext processContext = new ProcessContext(config.getId()); + ProcessContext processContext = new ProcessContext(config); synchronized (processContextByAllocation) { if (task.isStopping()) { // The task was requested to stop before we created the process context - finishHandler.accept(null); + auditor.info(config.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS); + task.markAsCompleted(); return; } if (processContextByAllocation.putIfAbsent(task.getAllocationId(), processContext) != null) { - finishHandler.accept( - ExceptionsHelper.serverError("[" + config.getId() + "] Could not create process as one already exists")); + task.updateState( + DataFrameAnalyticsState.FAILED, "[" + config.getId() + "] Could not create process as one already exists"); return; } } @@ -113,13 +114,13 @@ public class AnalyticsProcessManager { // Fetch existing model state (if any) BytesReference state = getModelState(config); - if (processContext.startProcess(dataExtractorFactory, config, task, state)) { - executorServiceForProcess.execute(() -> processResults(processContext)); - executorServiceForProcess.execute(() -> processData(task, config, processContext.dataExtractor, - processContext.process, processContext.resultProcessor, finishHandler, state)); + if (processContext.startProcess(dataExtractorFactory, task, state)) { + executorServiceForProcess.execute(() -> processContext.resultProcessor.get().process(processContext.process.get())); + executorServiceForProcess.execute(() -> processData(task, processContext, state)); } else { processContextByAllocation.remove(task.getAllocationId()); - finishHandler.accept(null); + auditor.info(config.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS); + task.markAsCompleted(); } }); } @@ -140,26 +141,18 @@ public class AnalyticsProcessManager { } } - private void processResults(ProcessContext processContext) { + private void processData(DataFrameAnalyticsTask task, ProcessContext processContext, BytesReference state) { + DataFrameAnalyticsConfig config = processContext.config; + DataFrameDataExtractor dataExtractor = processContext.dataExtractor.get(); + AnalyticsProcess process = processContext.process.get(); + AnalyticsResultProcessor resultProcessor = processContext.resultProcessor.get(); try { - processContext.resultProcessor.process(processContext.process); - } catch (Exception e) { - processContext.setFailureReason(e.getMessage()); - } - } - - private void processData(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor, - AnalyticsProcess process, AnalyticsResultProcessor resultProcessor, - Consumer finishHandler, BytesReference state) { - - try { - ProcessContext processContext = processContextByAllocation.get(task.getAllocationId()); writeHeaderRecord(dataExtractor, process); writeDataRows(dataExtractor, process, config.getAnalysis(), task.getProgressTracker()); process.writeEndOfDataMessage(); process.flushStream(); - restoreState(config, state, process, finishHandler); + restoreState(task, config, state, process); LOGGER.info("[{}] Waiting for result processor to complete", config.getId()); resultProcessor.awaitForCompletion(); @@ -168,26 +161,34 @@ public class AnalyticsProcessManager { refreshDest(config); LOGGER.info("[{}] Result processor has completed", config.getId()); } catch (Exception e) { - if (task.isStopping() == false) { - String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage()) - .getFormattedMessage(); + if (task.isStopping()) { + // Errors during task stopping are expected but we still want to log them just in case. + String errorMsg = + new ParameterizedMessage( + "[{}] Error while processing data [{}]; task is stopping", config.getId(), e.getMessage()).getFormattedMessage(); + LOGGER.debug(errorMsg, e); + } else { + String errorMsg = + new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage()).getFormattedMessage(); LOGGER.error(errorMsg, e); - processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg); + processContext.setFailureReason(errorMsg); } } finally { closeProcess(task); - ProcessContext processContext = processContextByAllocation.remove(task.getAllocationId()); + processContextByAllocation.remove(task.getAllocationId()); LOGGER.debug("Removed process context for task [{}]; [{}] processes still running", config.getId(), processContextByAllocation.size()); if (processContext.getFailureReason() == null) { // This results in marking the persistent task as complete LOGGER.info("[{}] Marking task completed", config.getId()); - finishHandler.accept(null); + auditor.info(config.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS); + task.markAsCompleted(); } else { LOGGER.error("[{}] Marking task failed; {}", config.getId(), processContext.getFailureReason()); task.updateState(DataFrameAnalyticsState.FAILED, processContext.getFailureReason()); + // Note: We are not marking the task as failed here as we want the user to be able to inspect the failure reason. } } } @@ -239,8 +240,8 @@ public class AnalyticsProcessManager { process.writeRecord(headerRecord); } - private void restoreState(DataFrameAnalyticsConfig config, @Nullable BytesReference state, AnalyticsProcess process, - Consumer failureHandler) { + private void restoreState(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, @Nullable BytesReference state, + AnalyticsProcess process) { if (config.getAnalysis().persistsState() == false) { LOGGER.debug("[{}] Analysis does not support state", config.getId()); return; @@ -258,7 +259,7 @@ public class AnalyticsProcessManager { process.restoreState(state); } catch (Exception e) { LOGGER.error(new ParameterizedMessage("[{}] Failed to restore state", process.getConfig().jobId()), e); - failureHandler.accept(ExceptionsHelper.serverError("Failed to restore state", e)); + task.updateState(DataFrameAnalyticsState.FAILED, "Failed to restore state: " + e.getMessage()); } } @@ -293,9 +294,10 @@ public class AnalyticsProcessManager { ProcessContext processContext = processContextByAllocation.get(task.getAllocationId()); try { - processContext.process.close(); + processContext.process.get().close(); LOGGER.info("[{}] Closed process", configId); } catch (Exception e) { + LOGGER.error("[" + configId + "] Error closing data frame analyzer process", e); String errorMsg = new ParameterizedMessage( "[{}] Error closing data frame analyzer process [{}]", configId, e.getMessage()).getFormattedMessage(); processContext.setFailureReason(errorMsg); @@ -323,42 +325,41 @@ public class AnalyticsProcessManager { class ProcessContext { - private final String id; - private volatile AnalyticsProcess process; - private volatile DataFrameDataExtractor dataExtractor; - private volatile AnalyticsResultProcessor resultProcessor; - private volatile boolean processKilled; - private volatile String failureReason; + private final DataFrameAnalyticsConfig config; + private final SetOnce> process = new SetOnce<>(); + private final SetOnce dataExtractor = new SetOnce<>(); + private final SetOnce resultProcessor = new SetOnce<>(); + private final SetOnce failureReason = new SetOnce<>(); - ProcessContext(String id) { - this.id = Objects.requireNonNull(id); + ProcessContext(DataFrameAnalyticsConfig config) { + this.config = Objects.requireNonNull(config); } - synchronized String getFailureReason() { - return failureReason; + String getFailureReason() { + return failureReason.get(); } - synchronized void setFailureReason(String failureReason) { - // Only set the new reason if there isn't one already as we want to keep the first reason - if (this.failureReason == null && failureReason != null) { - this.failureReason = failureReason; + void setFailureReason(String failureReason) { + if (failureReason == null) { + return; } + // Only set the new reason if there isn't one already as we want to keep the first reason (most likely the root cause). + this.failureReason.trySet(failureReason); } synchronized void stop() { - LOGGER.debug("[{}] Stopping process", id); - processKilled = true; - if (dataExtractor != null) { - dataExtractor.cancel(); + LOGGER.debug("[{}] Stopping process", config.getId()); + if (dataExtractor.get() != null) { + dataExtractor.get().cancel(); } - if (resultProcessor != null) { - resultProcessor.cancel(); + if (resultProcessor.get() != null) { + resultProcessor.get().cancel(); } - if (process != null) { + if (process.get() != null) { try { - process.kill(); + process.get().kill(); } catch (IOException e) { - LOGGER.error(new ParameterizedMessage("[{}] Failed to kill process", id), e); + LOGGER.error(new ParameterizedMessage("[{}] Failed to kill process", config.getId()), e); } } } @@ -366,16 +367,17 @@ public class AnalyticsProcessManager { /** * @return {@code true} if the process was started or {@code false} if it was not because it was stopped in the meantime */ - synchronized boolean startProcess(DataFrameDataExtractorFactory dataExtractorFactory, DataFrameAnalyticsConfig config, - DataFrameAnalyticsTask task, @Nullable BytesReference state) { - if (processKilled) { + synchronized boolean startProcess(DataFrameDataExtractorFactory dataExtractorFactory, + DataFrameAnalyticsTask task, + @Nullable BytesReference state) { + if (task.isStopping()) { // The job was stopped before we started the process so no need to start it return false; } - dataExtractor = dataExtractorFactory.newExtractor(false); + dataExtractor.set(dataExtractorFactory.newExtractor(false)); AnalyticsProcessConfig analyticsProcessConfig = - createProcessConfig(config, dataExtractor, dataExtractorFactory.getExtractedFields()); + createProcessConfig(dataExtractor.get(), dataExtractorFactory.getExtractedFields()); LOGGER.trace("[{}] creating analytics process with config [{}]", config.getId(), Strings.toString(analyticsProcessConfig)); // If we have no rows, that means there is no data so no point in starting the native process // just finish the task @@ -383,19 +385,16 @@ public class AnalyticsProcessManager { LOGGER.info("[{}] no data found to analyze. Will not start analytics native process.", config.getId()); return false; } - process = createProcess(task, config, analyticsProcessConfig, state); - DataFrameRowsJoiner dataFrameRowsJoiner = new DataFrameRowsJoiner(config.getId(), client, - dataExtractorFactory.newExtractor(true)); - resultProcessor = new AnalyticsResultProcessor( - config, dataFrameRowsJoiner, task.getProgressTracker(), trainedModelProvider, auditor, dataExtractor.getFieldNames()); + process.set(createProcess(task, config, analyticsProcessConfig, state)); + resultProcessor.set(createResultProcessor(task, dataExtractorFactory)); return true; } - private AnalyticsProcessConfig createProcessConfig( - DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor, ExtractedFields extractedFields) { + private AnalyticsProcessConfig createProcessConfig(DataFrameDataExtractor dataExtractor, + ExtractedFields extractedFields) { DataFrameDataExtractor.DataSummary dataSummary = dataExtractor.collectDataSummary(); Set categoricalFields = dataExtractor.getCategoricalFields(config.getAnalysis()); - AnalyticsProcessConfig processConfig = new AnalyticsProcessConfig( + return new AnalyticsProcessConfig( config.getId(), dataSummary.rows, dataSummary.cols, @@ -405,7 +404,14 @@ public class AnalyticsProcessManager { categoricalFields, config.getAnalysis(), extractedFields); - return processConfig; + } + + private AnalyticsResultProcessor createResultProcessor(DataFrameAnalyticsTask task, + DataFrameDataExtractorFactory dataExtractorFactory) { + DataFrameRowsJoiner dataFrameRowsJoiner = + new DataFrameRowsJoiner(config.getId(), client, dataExtractorFactory.newExtractor(true)); + return new AnalyticsResultProcessor( + config, dataFrameRowsJoiner, task.getProgressTracker(), trainedModelProvider, auditor, dataExtractor.get().getFieldNames()); } } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java index 2668a3d1d46..21c06b96c40 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask; import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor; import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory; @@ -22,12 +23,10 @@ import org.elasticsearch.xpack.ml.extractor.ExtractedFields; import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider; import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; import org.junit.Before; -import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import java.util.Arrays; import java.util.concurrent.ExecutorService; -import java.util.function.Consumer; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -37,7 +36,6 @@ import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -66,8 +64,6 @@ public class AnalyticsProcessManagerTests extends ESTestCase { private DataFrameAnalyticsConfig dataFrameAnalyticsConfig; private DataFrameDataExtractorFactory dataExtractorFactory; private DataFrameDataExtractor dataExtractor; - private Consumer finishHandler; - private ArgumentCaptor exceptionCaptor; private AnalyticsProcessManager processManager; @SuppressWarnings("unchecked") @@ -97,9 +93,6 @@ public class AnalyticsProcessManagerTests extends ESTestCase { dataExtractorFactory = mock(DataFrameDataExtractorFactory.class); when(dataExtractorFactory.newExtractor(anyBoolean())).thenReturn(dataExtractor); when(dataExtractorFactory.getExtractedFields()).thenReturn(mock(ExtractedFields.class)); - finishHandler = mock(Consumer.class); - - exceptionCaptor = ArgumentCaptor.forClass(Exception.class); processManager = new AnalyticsProcessManager( client, executorServiceForJob, executorServiceForProcess, processFactory, auditor, trainedModelProvider); @@ -108,54 +101,68 @@ public class AnalyticsProcessManagerTests extends ESTestCase { public void testRunJob_TaskIsStopping() { when(task.isStopping()).thenReturn(true); - processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory, finishHandler); + processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory); assertThat(processManager.getProcessContextCount(), equalTo(0)); - verify(finishHandler).accept(null); - verifyNoMoreInteractions(finishHandler); + InOrder inOrder = inOrder(task); + inOrder.verify(task).isStopping(); + inOrder.verify(task).markAsCompleted(); + verifyNoMoreInteractions(task); } public void testRunJob_ProcessContextAlreadyExists() { - processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory, finishHandler); + processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory); assertThat(processManager.getProcessContextCount(), equalTo(1)); - processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory, finishHandler); + processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory); assertThat(processManager.getProcessContextCount(), equalTo(1)); - verify(finishHandler).accept(exceptionCaptor.capture()); - verifyNoMoreInteractions(finishHandler); - - Exception e = exceptionCaptor.getValue(); - assertThat(e.getMessage(), equalTo("[config-id] Could not create process as one already exists")); + InOrder inOrder = inOrder(task); + inOrder.verify(task).isStopping(); + inOrder.verify(task).getAllocationId(); + inOrder.verify(task).isStopping(); + inOrder.verify(task).getProgressTracker(); + inOrder.verify(task).isStopping(); + inOrder.verify(task).getAllocationId(); + inOrder.verify(task).updateState(DataFrameAnalyticsState.FAILED, "[config-id] Could not create process as one already exists"); + verifyNoMoreInteractions(task); } public void testRunJob_EmptyDataFrame() { when(dataExtractor.collectDataSummary()).thenReturn(new DataFrameDataExtractor.DataSummary(0, NUM_COLS)); - processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory, finishHandler); + processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory); assertThat(processManager.getProcessContextCount(), equalTo(0)); // Make sure the process context did not leak - InOrder inOrder = inOrder(dataExtractor, executorServiceForProcess, process, finishHandler); + InOrder inOrder = inOrder(dataExtractor, executorServiceForProcess, process, task); + inOrder.verify(task).isStopping(); + inOrder.verify(task).getAllocationId(); + inOrder.verify(task).isStopping(); inOrder.verify(dataExtractor).collectDataSummary(); inOrder.verify(dataExtractor).getCategoricalFields(dataFrameAnalyticsConfig.getAnalysis()); - inOrder.verify(finishHandler).accept(null); - verifyNoMoreInteractions(dataExtractor, executorServiceForProcess, process, finishHandler); + inOrder.verify(task).getAllocationId(); + inOrder.verify(task).markAsCompleted(); + verifyNoMoreInteractions(dataExtractor, executorServiceForProcess, process, task); } public void testRunJob_Ok() { - processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory, finishHandler); + processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory); assertThat(processManager.getProcessContextCount(), equalTo(1)); - InOrder inOrder = inOrder(dataExtractor, executorServiceForProcess, process, finishHandler); + InOrder inOrder = inOrder(dataExtractor, executorServiceForProcess, process, task); + inOrder.verify(task).isStopping(); + inOrder.verify(task).getAllocationId(); + inOrder.verify(task).isStopping(); inOrder.verify(dataExtractor).collectDataSummary(); inOrder.verify(dataExtractor).getCategoricalFields(dataFrameAnalyticsConfig.getAnalysis()); inOrder.verify(process).isProcessAlive(); + inOrder.verify(task).getProgressTracker(); inOrder.verify(dataExtractor).getFieldNames(); inOrder.verify(executorServiceForProcess, times(2)).execute(any()); // 'processData' and 'processResults' threads - verifyNoMoreInteractions(dataExtractor, executorServiceForProcess, process, finishHandler); + verifyNoMoreInteractions(dataExtractor, executorServiceForProcess, process, task); } public void testProcessContext_GetSetFailureReason() { - AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(CONFIG_ID); + AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(dataFrameAnalyticsConfig); assertThat(processContext.getFailureReason(), is(nullValue())); processContext.setFailureReason("reason1"); @@ -167,50 +174,57 @@ public class AnalyticsProcessManagerTests extends ESTestCase { processContext.setFailureReason("reason2"); assertThat(processContext.getFailureReason(), equalTo("reason1")); - verifyNoMoreInteractions(dataExtractor, process, finishHandler); + verifyNoMoreInteractions(dataExtractor, process, task); } - public void testProcessContext_StartProcess_ProcessAlreadyKilled() { - AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(CONFIG_ID); - processContext.stop(); - assertThat(processContext.startProcess(dataExtractorFactory, dataFrameAnalyticsConfig, task, null), is(false)); + public void testProcessContext_StartProcess_TaskAlreadyStopped() { + when(task.isStopping()).thenReturn(true); - verifyNoMoreInteractions(dataExtractor, process, finishHandler); + AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(dataFrameAnalyticsConfig); + processContext.stop(); + assertThat(processContext.startProcess(dataExtractorFactory, task, null), is(false)); + + InOrder inOrder = inOrder(dataExtractor, process, task); + inOrder.verify(task).isStopping(); + verifyNoMoreInteractions(dataExtractor, process, task); } public void testProcessContext_StartProcess_EmptyDataFrame() { when(dataExtractor.collectDataSummary()).thenReturn(new DataFrameDataExtractor.DataSummary(0, NUM_COLS)); - AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(CONFIG_ID); - assertThat(processContext.startProcess(dataExtractorFactory, dataFrameAnalyticsConfig, task, null), is(false)); + AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(dataFrameAnalyticsConfig); + assertThat(processContext.startProcess(dataExtractorFactory, task, null), is(false)); - InOrder inOrder = inOrder(dataExtractor, process, finishHandler); + InOrder inOrder = inOrder(dataExtractor, process, task); + inOrder.verify(task).isStopping(); inOrder.verify(dataExtractor).collectDataSummary(); inOrder.verify(dataExtractor).getCategoricalFields(dataFrameAnalyticsConfig.getAnalysis()); - verifyNoMoreInteractions(dataExtractor, process, finishHandler); + verifyNoMoreInteractions(dataExtractor, process, task); } public void testProcessContext_StartAndStop() throws Exception { - AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(CONFIG_ID); - assertThat(processContext.startProcess(dataExtractorFactory, dataFrameAnalyticsConfig, task, null), is(true)); + AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(dataFrameAnalyticsConfig); + assertThat(processContext.startProcess(dataExtractorFactory, task, null), is(true)); processContext.stop(); - InOrder inOrder = inOrder(dataExtractor, process, finishHandler); + InOrder inOrder = inOrder(dataExtractor, process, task); // startProcess + inOrder.verify(task).isStopping(); inOrder.verify(dataExtractor).collectDataSummary(); inOrder.verify(dataExtractor).getCategoricalFields(dataFrameAnalyticsConfig.getAnalysis()); inOrder.verify(process).isProcessAlive(); + inOrder.verify(task).getProgressTracker(); inOrder.verify(dataExtractor).getFieldNames(); // stop inOrder.verify(dataExtractor).cancel(); inOrder.verify(process).kill(); - verifyNoMoreInteractions(dataExtractor, process, finishHandler); + verifyNoMoreInteractions(dataExtractor, process, task); } public void testProcessContext_Stop() { - AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(CONFIG_ID); + AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(dataFrameAnalyticsConfig); processContext.stop(); - verifyNoMoreInteractions(dataExtractor, process, finishHandler); + verifyNoMoreInteractions(dataExtractor, process, task); } }