A few improvements to AnalyticsProcessManager class that make the code more readable. (#50026) (#50069)
This commit is contained in:
parent
3b613c36f4
commit
9b116c8fef
|
@ -233,15 +233,7 @@ public class DataFrameAnalyticsManager {
|
|||
DataFrameAnalyticsTaskState analyzingState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.ANALYZING,
|
||||
task.getAllocationId(), null);
|
||||
task.updatePersistentTaskState(analyzingState, ActionListener.wrap(
|
||||
updatedTask -> processManager.runJob(task, config, dataExtractorFactory,
|
||||
error -> {
|
||||
if (error != null) {
|
||||
task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage());
|
||||
} else {
|
||||
auditor.info(config.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS);
|
||||
task.markAsCompleted();
|
||||
}
|
||||
}),
|
||||
updatedTask -> processManager.runJob(task, config, dataExtractorFactory),
|
||||
error -> {
|
||||
if (ExceptionsHelper.unwrapCause(error) instanceof ResourceNotFoundException) {
|
||||
// Task has stopped
|
||||
|
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.dataframe.process;
|
|||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
|
@ -90,19 +91,19 @@ public class AnalyticsProcessManager {
|
|||
this.trainedModelProvider = Objects.requireNonNull(trainedModelProvider);
|
||||
}
|
||||
|
||||
public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractorFactory dataExtractorFactory,
|
||||
Consumer<Exception> finishHandler) {
|
||||
public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractorFactory dataExtractorFactory) {
|
||||
executorServiceForJob.execute(() -> {
|
||||
ProcessContext processContext = new ProcessContext(config.getId());
|
||||
ProcessContext processContext = new ProcessContext(config);
|
||||
synchronized (processContextByAllocation) {
|
||||
if (task.isStopping()) {
|
||||
// The task was requested to stop before we created the process context
|
||||
finishHandler.accept(null);
|
||||
auditor.info(config.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS);
|
||||
task.markAsCompleted();
|
||||
return;
|
||||
}
|
||||
if (processContextByAllocation.putIfAbsent(task.getAllocationId(), processContext) != null) {
|
||||
finishHandler.accept(
|
||||
ExceptionsHelper.serverError("[" + config.getId() + "] Could not create process as one already exists"));
|
||||
task.updateState(
|
||||
DataFrameAnalyticsState.FAILED, "[" + config.getId() + "] Could not create process as one already exists");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -113,13 +114,13 @@ public class AnalyticsProcessManager {
|
|||
// Fetch existing model state (if any)
|
||||
BytesReference state = getModelState(config);
|
||||
|
||||
if (processContext.startProcess(dataExtractorFactory, config, task, state)) {
|
||||
executorServiceForProcess.execute(() -> processResults(processContext));
|
||||
executorServiceForProcess.execute(() -> processData(task, config, processContext.dataExtractor,
|
||||
processContext.process, processContext.resultProcessor, finishHandler, state));
|
||||
if (processContext.startProcess(dataExtractorFactory, task, state)) {
|
||||
executorServiceForProcess.execute(() -> processContext.resultProcessor.get().process(processContext.process.get()));
|
||||
executorServiceForProcess.execute(() -> processData(task, processContext, state));
|
||||
} else {
|
||||
processContextByAllocation.remove(task.getAllocationId());
|
||||
finishHandler.accept(null);
|
||||
auditor.info(config.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS);
|
||||
task.markAsCompleted();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -140,26 +141,18 @@ public class AnalyticsProcessManager {
|
|||
}
|
||||
}
|
||||
|
||||
private void processResults(ProcessContext processContext) {
|
||||
private void processData(DataFrameAnalyticsTask task, ProcessContext processContext, BytesReference state) {
|
||||
DataFrameAnalyticsConfig config = processContext.config;
|
||||
DataFrameDataExtractor dataExtractor = processContext.dataExtractor.get();
|
||||
AnalyticsProcess<AnalyticsResult> process = processContext.process.get();
|
||||
AnalyticsResultProcessor resultProcessor = processContext.resultProcessor.get();
|
||||
try {
|
||||
processContext.resultProcessor.process(processContext.process);
|
||||
} catch (Exception e) {
|
||||
processContext.setFailureReason(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private void processData(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor,
|
||||
AnalyticsProcess<AnalyticsResult> process, AnalyticsResultProcessor resultProcessor,
|
||||
Consumer<Exception> finishHandler, BytesReference state) {
|
||||
|
||||
try {
|
||||
ProcessContext processContext = processContextByAllocation.get(task.getAllocationId());
|
||||
writeHeaderRecord(dataExtractor, process);
|
||||
writeDataRows(dataExtractor, process, config.getAnalysis(), task.getProgressTracker());
|
||||
process.writeEndOfDataMessage();
|
||||
process.flushStream();
|
||||
|
||||
restoreState(config, state, process, finishHandler);
|
||||
restoreState(task, config, state, process);
|
||||
|
||||
LOGGER.info("[{}] Waiting for result processor to complete", config.getId());
|
||||
resultProcessor.awaitForCompletion();
|
||||
|
@ -168,26 +161,34 @@ public class AnalyticsProcessManager {
|
|||
refreshDest(config);
|
||||
LOGGER.info("[{}] Result processor has completed", config.getId());
|
||||
} catch (Exception e) {
|
||||
if (task.isStopping() == false) {
|
||||
String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage())
|
||||
.getFormattedMessage();
|
||||
if (task.isStopping()) {
|
||||
// Errors during task stopping are expected but we still want to log them just in case.
|
||||
String errorMsg =
|
||||
new ParameterizedMessage(
|
||||
"[{}] Error while processing data [{}]; task is stopping", config.getId(), e.getMessage()).getFormattedMessage();
|
||||
LOGGER.debug(errorMsg, e);
|
||||
} else {
|
||||
String errorMsg =
|
||||
new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage()).getFormattedMessage();
|
||||
LOGGER.error(errorMsg, e);
|
||||
processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg);
|
||||
processContext.setFailureReason(errorMsg);
|
||||
}
|
||||
} finally {
|
||||
closeProcess(task);
|
||||
|
||||
ProcessContext processContext = processContextByAllocation.remove(task.getAllocationId());
|
||||
processContextByAllocation.remove(task.getAllocationId());
|
||||
LOGGER.debug("Removed process context for task [{}]; [{}] processes still running", config.getId(),
|
||||
processContextByAllocation.size());
|
||||
|
||||
if (processContext.getFailureReason() == null) {
|
||||
// This results in marking the persistent task as complete
|
||||
LOGGER.info("[{}] Marking task completed", config.getId());
|
||||
finishHandler.accept(null);
|
||||
auditor.info(config.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS);
|
||||
task.markAsCompleted();
|
||||
} else {
|
||||
LOGGER.error("[{}] Marking task failed; {}", config.getId(), processContext.getFailureReason());
|
||||
task.updateState(DataFrameAnalyticsState.FAILED, processContext.getFailureReason());
|
||||
// Note: We are not marking the task as failed here as we want the user to be able to inspect the failure reason.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -239,8 +240,8 @@ public class AnalyticsProcessManager {
|
|||
process.writeRecord(headerRecord);
|
||||
}
|
||||
|
||||
private void restoreState(DataFrameAnalyticsConfig config, @Nullable BytesReference state, AnalyticsProcess<AnalyticsResult> process,
|
||||
Consumer<Exception> failureHandler) {
|
||||
private void restoreState(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, @Nullable BytesReference state,
|
||||
AnalyticsProcess<AnalyticsResult> process) {
|
||||
if (config.getAnalysis().persistsState() == false) {
|
||||
LOGGER.debug("[{}] Analysis does not support state", config.getId());
|
||||
return;
|
||||
|
@ -258,7 +259,7 @@ public class AnalyticsProcessManager {
|
|||
process.restoreState(state);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(new ParameterizedMessage("[{}] Failed to restore state", process.getConfig().jobId()), e);
|
||||
failureHandler.accept(ExceptionsHelper.serverError("Failed to restore state", e));
|
||||
task.updateState(DataFrameAnalyticsState.FAILED, "Failed to restore state: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -293,9 +294,10 @@ public class AnalyticsProcessManager {
|
|||
|
||||
ProcessContext processContext = processContextByAllocation.get(task.getAllocationId());
|
||||
try {
|
||||
processContext.process.close();
|
||||
processContext.process.get().close();
|
||||
LOGGER.info("[{}] Closed process", configId);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("[" + configId + "] Error closing data frame analyzer process", e);
|
||||
String errorMsg = new ParameterizedMessage(
|
||||
"[{}] Error closing data frame analyzer process [{}]", configId, e.getMessage()).getFormattedMessage();
|
||||
processContext.setFailureReason(errorMsg);
|
||||
|
@ -323,42 +325,41 @@ public class AnalyticsProcessManager {
|
|||
|
||||
class ProcessContext {
|
||||
|
||||
private final String id;
|
||||
private volatile AnalyticsProcess<AnalyticsResult> process;
|
||||
private volatile DataFrameDataExtractor dataExtractor;
|
||||
private volatile AnalyticsResultProcessor resultProcessor;
|
||||
private volatile boolean processKilled;
|
||||
private volatile String failureReason;
|
||||
private final DataFrameAnalyticsConfig config;
|
||||
private final SetOnce<AnalyticsProcess<AnalyticsResult>> process = new SetOnce<>();
|
||||
private final SetOnce<DataFrameDataExtractor> dataExtractor = new SetOnce<>();
|
||||
private final SetOnce<AnalyticsResultProcessor> resultProcessor = new SetOnce<>();
|
||||
private final SetOnce<String> failureReason = new SetOnce<>();
|
||||
|
||||
ProcessContext(String id) {
|
||||
this.id = Objects.requireNonNull(id);
|
||||
ProcessContext(DataFrameAnalyticsConfig config) {
|
||||
this.config = Objects.requireNonNull(config);
|
||||
}
|
||||
|
||||
synchronized String getFailureReason() {
|
||||
return failureReason;
|
||||
String getFailureReason() {
|
||||
return failureReason.get();
|
||||
}
|
||||
|
||||
synchronized void setFailureReason(String failureReason) {
|
||||
// Only set the new reason if there isn't one already as we want to keep the first reason
|
||||
if (this.failureReason == null && failureReason != null) {
|
||||
this.failureReason = failureReason;
|
||||
void setFailureReason(String failureReason) {
|
||||
if (failureReason == null) {
|
||||
return;
|
||||
}
|
||||
// Only set the new reason if there isn't one already as we want to keep the first reason (most likely the root cause).
|
||||
this.failureReason.trySet(failureReason);
|
||||
}
|
||||
|
||||
synchronized void stop() {
|
||||
LOGGER.debug("[{}] Stopping process", id);
|
||||
processKilled = true;
|
||||
if (dataExtractor != null) {
|
||||
dataExtractor.cancel();
|
||||
LOGGER.debug("[{}] Stopping process", config.getId());
|
||||
if (dataExtractor.get() != null) {
|
||||
dataExtractor.get().cancel();
|
||||
}
|
||||
if (resultProcessor != null) {
|
||||
resultProcessor.cancel();
|
||||
if (resultProcessor.get() != null) {
|
||||
resultProcessor.get().cancel();
|
||||
}
|
||||
if (process != null) {
|
||||
if (process.get() != null) {
|
||||
try {
|
||||
process.kill();
|
||||
process.get().kill();
|
||||
} catch (IOException e) {
|
||||
LOGGER.error(new ParameterizedMessage("[{}] Failed to kill process", id), e);
|
||||
LOGGER.error(new ParameterizedMessage("[{}] Failed to kill process", config.getId()), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -366,16 +367,17 @@ public class AnalyticsProcessManager {
|
|||
/**
|
||||
* @return {@code true} if the process was started or {@code false} if it was not because it was stopped in the meantime
|
||||
*/
|
||||
synchronized boolean startProcess(DataFrameDataExtractorFactory dataExtractorFactory, DataFrameAnalyticsConfig config,
|
||||
DataFrameAnalyticsTask task, @Nullable BytesReference state) {
|
||||
if (processKilled) {
|
||||
synchronized boolean startProcess(DataFrameDataExtractorFactory dataExtractorFactory,
|
||||
DataFrameAnalyticsTask task,
|
||||
@Nullable BytesReference state) {
|
||||
if (task.isStopping()) {
|
||||
// The job was stopped before we started the process so no need to start it
|
||||
return false;
|
||||
}
|
||||
|
||||
dataExtractor = dataExtractorFactory.newExtractor(false);
|
||||
dataExtractor.set(dataExtractorFactory.newExtractor(false));
|
||||
AnalyticsProcessConfig analyticsProcessConfig =
|
||||
createProcessConfig(config, dataExtractor, dataExtractorFactory.getExtractedFields());
|
||||
createProcessConfig(dataExtractor.get(), dataExtractorFactory.getExtractedFields());
|
||||
LOGGER.trace("[{}] creating analytics process with config [{}]", config.getId(), Strings.toString(analyticsProcessConfig));
|
||||
// If we have no rows, that means there is no data so no point in starting the native process
|
||||
// just finish the task
|
||||
|
@ -383,19 +385,16 @@ public class AnalyticsProcessManager {
|
|||
LOGGER.info("[{}] no data found to analyze. Will not start analytics native process.", config.getId());
|
||||
return false;
|
||||
}
|
||||
process = createProcess(task, config, analyticsProcessConfig, state);
|
||||
DataFrameRowsJoiner dataFrameRowsJoiner = new DataFrameRowsJoiner(config.getId(), client,
|
||||
dataExtractorFactory.newExtractor(true));
|
||||
resultProcessor = new AnalyticsResultProcessor(
|
||||
config, dataFrameRowsJoiner, task.getProgressTracker(), trainedModelProvider, auditor, dataExtractor.getFieldNames());
|
||||
process.set(createProcess(task, config, analyticsProcessConfig, state));
|
||||
resultProcessor.set(createResultProcessor(task, dataExtractorFactory));
|
||||
return true;
|
||||
}
|
||||
|
||||
private AnalyticsProcessConfig createProcessConfig(
|
||||
DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor, ExtractedFields extractedFields) {
|
||||
private AnalyticsProcessConfig createProcessConfig(DataFrameDataExtractor dataExtractor,
|
||||
ExtractedFields extractedFields) {
|
||||
DataFrameDataExtractor.DataSummary dataSummary = dataExtractor.collectDataSummary();
|
||||
Set<String> categoricalFields = dataExtractor.getCategoricalFields(config.getAnalysis());
|
||||
AnalyticsProcessConfig processConfig = new AnalyticsProcessConfig(
|
||||
return new AnalyticsProcessConfig(
|
||||
config.getId(),
|
||||
dataSummary.rows,
|
||||
dataSummary.cols,
|
||||
|
@ -405,7 +404,14 @@ public class AnalyticsProcessManager {
|
|||
categoricalFields,
|
||||
config.getAnalysis(),
|
||||
extractedFields);
|
||||
return processConfig;
|
||||
}
|
||||
|
||||
private AnalyticsResultProcessor createResultProcessor(DataFrameAnalyticsTask task,
|
||||
DataFrameDataExtractorFactory dataExtractorFactory) {
|
||||
DataFrameRowsJoiner dataFrameRowsJoiner =
|
||||
new DataFrameRowsJoiner(config.getId(), client, dataExtractorFactory.newExtractor(true));
|
||||
return new AnalyticsResultProcessor(
|
||||
config, dataFrameRowsJoiner, task.getProgressTracker(), trainedModelProvider, auditor, dataExtractor.get().getFieldNames());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
|
||||
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests;
|
||||
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
|
||||
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
|
||||
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor;
|
||||
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
|
||||
|
@ -22,12 +23,10 @@ import org.elasticsearch.xpack.ml.extractor.ExtractedFields;
|
|||
import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider;
|
||||
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
|
||||
import org.junit.Before;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.InOrder;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
@ -37,7 +36,6 @@ import static org.mockito.Matchers.anyBoolean;
|
|||
import static org.mockito.Mockito.inOrder;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -66,8 +64,6 @@ public class AnalyticsProcessManagerTests extends ESTestCase {
|
|||
private DataFrameAnalyticsConfig dataFrameAnalyticsConfig;
|
||||
private DataFrameDataExtractorFactory dataExtractorFactory;
|
||||
private DataFrameDataExtractor dataExtractor;
|
||||
private Consumer<Exception> finishHandler;
|
||||
private ArgumentCaptor<Exception> exceptionCaptor;
|
||||
private AnalyticsProcessManager processManager;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -97,9 +93,6 @@ public class AnalyticsProcessManagerTests extends ESTestCase {
|
|||
dataExtractorFactory = mock(DataFrameDataExtractorFactory.class);
|
||||
when(dataExtractorFactory.newExtractor(anyBoolean())).thenReturn(dataExtractor);
|
||||
when(dataExtractorFactory.getExtractedFields()).thenReturn(mock(ExtractedFields.class));
|
||||
finishHandler = mock(Consumer.class);
|
||||
|
||||
exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
|
||||
|
||||
processManager = new AnalyticsProcessManager(
|
||||
client, executorServiceForJob, executorServiceForProcess, processFactory, auditor, trainedModelProvider);
|
||||
|
@ -108,54 +101,68 @@ public class AnalyticsProcessManagerTests extends ESTestCase {
|
|||
public void testRunJob_TaskIsStopping() {
|
||||
when(task.isStopping()).thenReturn(true);
|
||||
|
||||
processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory, finishHandler);
|
||||
processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory);
|
||||
assertThat(processManager.getProcessContextCount(), equalTo(0));
|
||||
|
||||
verify(finishHandler).accept(null);
|
||||
verifyNoMoreInteractions(finishHandler);
|
||||
InOrder inOrder = inOrder(task);
|
||||
inOrder.verify(task).isStopping();
|
||||
inOrder.verify(task).markAsCompleted();
|
||||
verifyNoMoreInteractions(task);
|
||||
}
|
||||
|
||||
public void testRunJob_ProcessContextAlreadyExists() {
|
||||
processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory, finishHandler);
|
||||
processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory);
|
||||
assertThat(processManager.getProcessContextCount(), equalTo(1));
|
||||
processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory, finishHandler);
|
||||
processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory);
|
||||
assertThat(processManager.getProcessContextCount(), equalTo(1));
|
||||
|
||||
verify(finishHandler).accept(exceptionCaptor.capture());
|
||||
verifyNoMoreInteractions(finishHandler);
|
||||
|
||||
Exception e = exceptionCaptor.getValue();
|
||||
assertThat(e.getMessage(), equalTo("[config-id] Could not create process as one already exists"));
|
||||
InOrder inOrder = inOrder(task);
|
||||
inOrder.verify(task).isStopping();
|
||||
inOrder.verify(task).getAllocationId();
|
||||
inOrder.verify(task).isStopping();
|
||||
inOrder.verify(task).getProgressTracker();
|
||||
inOrder.verify(task).isStopping();
|
||||
inOrder.verify(task).getAllocationId();
|
||||
inOrder.verify(task).updateState(DataFrameAnalyticsState.FAILED, "[config-id] Could not create process as one already exists");
|
||||
verifyNoMoreInteractions(task);
|
||||
}
|
||||
|
||||
public void testRunJob_EmptyDataFrame() {
|
||||
when(dataExtractor.collectDataSummary()).thenReturn(new DataFrameDataExtractor.DataSummary(0, NUM_COLS));
|
||||
|
||||
processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory, finishHandler);
|
||||
processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory);
|
||||
assertThat(processManager.getProcessContextCount(), equalTo(0)); // Make sure the process context did not leak
|
||||
|
||||
InOrder inOrder = inOrder(dataExtractor, executorServiceForProcess, process, finishHandler);
|
||||
InOrder inOrder = inOrder(dataExtractor, executorServiceForProcess, process, task);
|
||||
inOrder.verify(task).isStopping();
|
||||
inOrder.verify(task).getAllocationId();
|
||||
inOrder.verify(task).isStopping();
|
||||
inOrder.verify(dataExtractor).collectDataSummary();
|
||||
inOrder.verify(dataExtractor).getCategoricalFields(dataFrameAnalyticsConfig.getAnalysis());
|
||||
inOrder.verify(finishHandler).accept(null);
|
||||
verifyNoMoreInteractions(dataExtractor, executorServiceForProcess, process, finishHandler);
|
||||
inOrder.verify(task).getAllocationId();
|
||||
inOrder.verify(task).markAsCompleted();
|
||||
verifyNoMoreInteractions(dataExtractor, executorServiceForProcess, process, task);
|
||||
}
|
||||
|
||||
public void testRunJob_Ok() {
|
||||
processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory, finishHandler);
|
||||
processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory);
|
||||
assertThat(processManager.getProcessContextCount(), equalTo(1));
|
||||
|
||||
InOrder inOrder = inOrder(dataExtractor, executorServiceForProcess, process, finishHandler);
|
||||
InOrder inOrder = inOrder(dataExtractor, executorServiceForProcess, process, task);
|
||||
inOrder.verify(task).isStopping();
|
||||
inOrder.verify(task).getAllocationId();
|
||||
inOrder.verify(task).isStopping();
|
||||
inOrder.verify(dataExtractor).collectDataSummary();
|
||||
inOrder.verify(dataExtractor).getCategoricalFields(dataFrameAnalyticsConfig.getAnalysis());
|
||||
inOrder.verify(process).isProcessAlive();
|
||||
inOrder.verify(task).getProgressTracker();
|
||||
inOrder.verify(dataExtractor).getFieldNames();
|
||||
inOrder.verify(executorServiceForProcess, times(2)).execute(any()); // 'processData' and 'processResults' threads
|
||||
verifyNoMoreInteractions(dataExtractor, executorServiceForProcess, process, finishHandler);
|
||||
verifyNoMoreInteractions(dataExtractor, executorServiceForProcess, process, task);
|
||||
}
|
||||
|
||||
public void testProcessContext_GetSetFailureReason() {
|
||||
AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(CONFIG_ID);
|
||||
AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(dataFrameAnalyticsConfig);
|
||||
assertThat(processContext.getFailureReason(), is(nullValue()));
|
||||
|
||||
processContext.setFailureReason("reason1");
|
||||
|
@ -167,50 +174,57 @@ public class AnalyticsProcessManagerTests extends ESTestCase {
|
|||
processContext.setFailureReason("reason2");
|
||||
assertThat(processContext.getFailureReason(), equalTo("reason1"));
|
||||
|
||||
verifyNoMoreInteractions(dataExtractor, process, finishHandler);
|
||||
verifyNoMoreInteractions(dataExtractor, process, task);
|
||||
}
|
||||
|
||||
public void testProcessContext_StartProcess_ProcessAlreadyKilled() {
|
||||
AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(CONFIG_ID);
|
||||
processContext.stop();
|
||||
assertThat(processContext.startProcess(dataExtractorFactory, dataFrameAnalyticsConfig, task, null), is(false));
|
||||
public void testProcessContext_StartProcess_TaskAlreadyStopped() {
|
||||
when(task.isStopping()).thenReturn(true);
|
||||
|
||||
verifyNoMoreInteractions(dataExtractor, process, finishHandler);
|
||||
AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(dataFrameAnalyticsConfig);
|
||||
processContext.stop();
|
||||
assertThat(processContext.startProcess(dataExtractorFactory, task, null), is(false));
|
||||
|
||||
InOrder inOrder = inOrder(dataExtractor, process, task);
|
||||
inOrder.verify(task).isStopping();
|
||||
verifyNoMoreInteractions(dataExtractor, process, task);
|
||||
}
|
||||
|
||||
public void testProcessContext_StartProcess_EmptyDataFrame() {
|
||||
when(dataExtractor.collectDataSummary()).thenReturn(new DataFrameDataExtractor.DataSummary(0, NUM_COLS));
|
||||
|
||||
AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(CONFIG_ID);
|
||||
assertThat(processContext.startProcess(dataExtractorFactory, dataFrameAnalyticsConfig, task, null), is(false));
|
||||
AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(dataFrameAnalyticsConfig);
|
||||
assertThat(processContext.startProcess(dataExtractorFactory, task, null), is(false));
|
||||
|
||||
InOrder inOrder = inOrder(dataExtractor, process, finishHandler);
|
||||
InOrder inOrder = inOrder(dataExtractor, process, task);
|
||||
inOrder.verify(task).isStopping();
|
||||
inOrder.verify(dataExtractor).collectDataSummary();
|
||||
inOrder.verify(dataExtractor).getCategoricalFields(dataFrameAnalyticsConfig.getAnalysis());
|
||||
verifyNoMoreInteractions(dataExtractor, process, finishHandler);
|
||||
verifyNoMoreInteractions(dataExtractor, process, task);
|
||||
}
|
||||
|
||||
public void testProcessContext_StartAndStop() throws Exception {
|
||||
AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(CONFIG_ID);
|
||||
assertThat(processContext.startProcess(dataExtractorFactory, dataFrameAnalyticsConfig, task, null), is(true));
|
||||
AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(dataFrameAnalyticsConfig);
|
||||
assertThat(processContext.startProcess(dataExtractorFactory, task, null), is(true));
|
||||
processContext.stop();
|
||||
|
||||
InOrder inOrder = inOrder(dataExtractor, process, finishHandler);
|
||||
InOrder inOrder = inOrder(dataExtractor, process, task);
|
||||
// startProcess
|
||||
inOrder.verify(task).isStopping();
|
||||
inOrder.verify(dataExtractor).collectDataSummary();
|
||||
inOrder.verify(dataExtractor).getCategoricalFields(dataFrameAnalyticsConfig.getAnalysis());
|
||||
inOrder.verify(process).isProcessAlive();
|
||||
inOrder.verify(task).getProgressTracker();
|
||||
inOrder.verify(dataExtractor).getFieldNames();
|
||||
// stop
|
||||
inOrder.verify(dataExtractor).cancel();
|
||||
inOrder.verify(process).kill();
|
||||
verifyNoMoreInteractions(dataExtractor, process, finishHandler);
|
||||
verifyNoMoreInteractions(dataExtractor, process, task);
|
||||
}
|
||||
|
||||
public void testProcessContext_Stop() {
|
||||
AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(CONFIG_ID);
|
||||
AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(dataFrameAnalyticsConfig);
|
||||
processContext.stop();
|
||||
|
||||
verifyNoMoreInteractions(dataExtractor, process, finishHandler);
|
||||
verifyNoMoreInteractions(dataExtractor, process, task);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue