Rename AutoDetectResultProcessor* to AutodetectResultProcessor* for consistency with other classes where the spelling is "Autodetect" (#43359) (#43366)

This commit is contained in:
Przemysław Witek 2019-06-19 15:31:26 +02:00 committed by GitHub
parent 8578aba654
commit 86b58d9ff3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 67 additions and 67 deletions

View File

@ -33,7 +33,7 @@ import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.CountingInputStream; import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
@ -66,7 +66,7 @@ public class AutodetectCommunicator implements Closeable {
private final AutodetectProcess autodetectProcess; private final AutodetectProcess autodetectProcess;
private final StateStreamer stateStreamer; private final StateStreamer stateStreamer;
private final DataCountsReporter dataCountsReporter; private final DataCountsReporter dataCountsReporter;
private final AutoDetectResultProcessor autoDetectResultProcessor; private final AutodetectResultProcessor autodetectResultProcessor;
private final BiConsumer<Exception, Boolean> onFinishHandler; private final BiConsumer<Exception, Boolean> onFinishHandler;
private final ExecutorService autodetectWorkerExecutor; private final ExecutorService autodetectWorkerExecutor;
private final NamedXContentRegistry xContentRegistry; private final NamedXContentRegistry xContentRegistry;
@ -75,7 +75,7 @@ public class AutodetectCommunicator implements Closeable {
private volatile boolean processKilled; private volatile boolean processKilled;
AutodetectCommunicator(Job job, Environment environment, AutodetectProcess process, StateStreamer stateStreamer, AutodetectCommunicator(Job job, Environment environment, AutodetectProcess process, StateStreamer stateStreamer,
DataCountsReporter dataCountsReporter, AutoDetectResultProcessor autoDetectResultProcessor, DataCountsReporter dataCountsReporter, AutodetectResultProcessor autodetectResultProcessor,
BiConsumer<Exception, Boolean> onFinishHandler, NamedXContentRegistry xContentRegistry, BiConsumer<Exception, Boolean> onFinishHandler, NamedXContentRegistry xContentRegistry,
ExecutorService autodetectWorkerExecutor) { ExecutorService autodetectWorkerExecutor) {
this.job = job; this.job = job;
@ -83,7 +83,7 @@ public class AutodetectCommunicator implements Closeable {
this.autodetectProcess = process; this.autodetectProcess = process;
this.stateStreamer = stateStreamer; this.stateStreamer = stateStreamer;
this.dataCountsReporter = dataCountsReporter; this.dataCountsReporter = dataCountsReporter;
this.autoDetectResultProcessor = autoDetectResultProcessor; this.autodetectResultProcessor = autodetectResultProcessor;
this.onFinishHandler = onFinishHandler; this.onFinishHandler = onFinishHandler;
this.xContentRegistry = xContentRegistry; this.xContentRegistry = xContentRegistry;
this.autodetectWorkerExecutor = autodetectWorkerExecutor; this.autodetectWorkerExecutor = autodetectWorkerExecutor;
@ -120,7 +120,7 @@ public class AutodetectCommunicator implements Closeable {
} }
CountingInputStream countingStream = new CountingInputStream(inputStream, dataCountsReporter); CountingInputStream countingStream = new CountingInputStream(inputStream, dataCountsReporter);
DataToProcessWriter autoDetectWriter = createProcessWriter(params.getDataDescription()); DataToProcessWriter autodetectWriter = createProcessWriter(params.getDataDescription());
if (includeTokensField && categorizationAnalyzer == null) { if (includeTokensField && categorizationAnalyzer == null) {
createCategorizationAnalyzer(analysisRegistry); createCategorizationAnalyzer(analysisRegistry);
@ -129,14 +129,14 @@ public class AutodetectCommunicator implements Closeable {
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
AtomicReference<DataCounts> dataCountsAtomicReference = new AtomicReference<>(); AtomicReference<DataCounts> dataCountsAtomicReference = new AtomicReference<>();
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>(); AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
autoDetectWriter.write(countingStream, categorizationAnalyzer, xContentType, (dataCounts, e) -> { autodetectWriter.write(countingStream, categorizationAnalyzer, xContentType, (dataCounts, e) -> {
dataCountsAtomicReference.set(dataCounts); dataCountsAtomicReference.set(dataCounts);
exceptionAtomicReference.set(e); exceptionAtomicReference.set(e);
latch.countDown(); latch.countDown();
}); });
latch.await(); latch.await();
autoDetectWriter.flushStream(); autodetectWriter.flushStream();
if (exceptionAtomicReference.get() != null) { if (exceptionAtomicReference.get() != null) {
throw exceptionAtomicReference.get(); throw exceptionAtomicReference.get();
@ -168,7 +168,7 @@ public class AutodetectCommunicator implements Closeable {
killProcess(false, false); killProcess(false, false);
stateStreamer.cancel(); stateStreamer.cancel();
} }
autoDetectResultProcessor.awaitCompletion(); autodetectResultProcessor.awaitCompletion();
} finally { } finally {
onFinishHandler.accept(restart ? new ElasticsearchException(reason) : null, true); onFinishHandler.accept(restart ? new ElasticsearchException(reason) : null, true);
} }
@ -199,13 +199,13 @@ public class AutodetectCommunicator implements Closeable {
public void killProcess(boolean awaitCompletion, boolean finish, boolean finalizeJob) throws IOException { public void killProcess(boolean awaitCompletion, boolean finish, boolean finalizeJob) throws IOException {
try { try {
processKilled = true; processKilled = true;
autoDetectResultProcessor.setProcessKilled(); autodetectResultProcessor.setProcessKilled();
autodetectWorkerExecutor.shutdown(); autodetectWorkerExecutor.shutdown();
autodetectProcess.kill(); autodetectProcess.kill();
if (awaitCompletion) { if (awaitCompletion) {
try { try {
autoDetectResultProcessor.awaitCompletion(); autodetectResultProcessor.awaitCompletion();
} catch (TimeoutException e) { } catch (TimeoutException e) {
LOGGER.warn(new ParameterizedMessage("[{}] Timed out waiting for killed job", job.getId()), e); LOGGER.warn(new ParameterizedMessage("[{}] Timed out waiting for killed job", job.getId()), e);
} }
@ -289,20 +289,20 @@ public class AutodetectCommunicator implements Closeable {
FlushAcknowledgement flushAcknowledgement; FlushAcknowledgement flushAcknowledgement;
try { try {
flushAcknowledgement = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY); flushAcknowledgement = autodetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
while (flushAcknowledgement == null) { while (flushAcknowledgement == null) {
checkProcessIsAlive(); checkProcessIsAlive();
checkResultsProcessorIsAlive(); checkResultsProcessorIsAlive();
flushAcknowledgement = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY); flushAcknowledgement = autodetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
} }
} finally { } finally {
autoDetectResultProcessor.clearAwaitingFlush(flushId); autodetectResultProcessor.clearAwaitingFlush(flushId);
} }
if (processKilled == false) { if (processKilled == false) {
// We also have to wait for the normalizer to become idle so that we block // We also have to wait for the normalizer to become idle so that we block
// clients from querying results in the middle of normalization. // clients from querying results in the middle of normalization.
autoDetectResultProcessor.waitUntilRenormalizerIsIdle(); autodetectResultProcessor.waitUntilRenormalizerIsIdle();
LOGGER.debug("[{}] Flush completed", job.getId()); LOGGER.debug("[{}] Flush completed", job.getId());
} }
@ -321,7 +321,7 @@ public class AutodetectCommunicator implements Closeable {
} }
private void checkResultsProcessorIsAlive() { private void checkResultsProcessorIsAlive() {
if (autoDetectResultProcessor.isFailed()) { if (autodetectResultProcessor.isFailed()) {
// Don't log here - it just causes double logging when the exception gets logged // Don't log here - it just causes double logging when the exception gets logged
throw new ElasticsearchException("[{}] Unexpected death of the result processor", job.getId()); throw new ElasticsearchException("[{}] Unexpected death of the result processor", job.getId());
} }
@ -332,11 +332,11 @@ public class AutodetectCommunicator implements Closeable {
} }
public ModelSizeStats getModelSizeStats() { public ModelSizeStats getModelSizeStats() {
return autoDetectResultProcessor.modelSizeStats(); return autodetectResultProcessor.modelSizeStats();
} }
public TimingStats getTimingStats() { public TimingStats getTimingStats() {
return autoDetectResultProcessor.timingStats(); return autodetectResultProcessor.timingStats();
} }
public DataCounts getDataCounts() { public DataCounts getDataCounts() {

View File

@ -57,7 +57,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
@ -500,7 +500,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
} }
// A TP with no queue, so that we fail immediately if there are no threads available // A TP with no queue, so that we fail immediately if there are no threads available
ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME); ExecutorService autodetectExecutorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME);
DataCountsReporter dataCountsReporter = new DataCountsReporter(job, autodetectParams.dataCounts(), jobDataCountsPersister); DataCountsReporter dataCountsReporter = new DataCountsReporter(job, autodetectParams.dataCounts(), jobDataCountsPersister);
ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobResultsProvider, ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobResultsProvider,
new JobRenormalizedResultsPersister(job.getId(), client), normalizerFactory); new JobRenormalizedResultsPersister(job.getId(), client), normalizerFactory);
@ -508,10 +508,10 @@ public class AutodetectProcessManager implements ClusterStateListener {
Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater, Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater,
renormalizerExecutorService); renormalizerExecutorService);
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autoDetectExecutorService, AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autodetectExecutorService,
onProcessCrash(jobTask)); onProcessCrash(jobTask));
AutoDetectResultProcessor processor = AutodetectResultProcessor processor =
new AutoDetectResultProcessor( new AutodetectResultProcessor(
client, client,
auditor, auditor,
jobId, jobId,
@ -521,8 +521,8 @@ public class AutodetectProcessManager implements ClusterStateListener {
autodetectParams.timingStats()); autodetectParams.timingStats());
ExecutorService autodetectWorkerExecutor; ExecutorService autodetectWorkerExecutor;
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService); autodetectWorkerExecutor = createAutodetectExecutorService(autodetectExecutorService);
autoDetectExecutorService.submit(() -> processor.process(process)); autodetectExecutorService.submit(() -> processor.process(process));
} catch (EsRejectedExecutionException e) { } catch (EsRejectedExecutionException e) {
// If submitting the operation to read the results from the process fails we need to close // If submitting the operation to read the results from the process fails we need to close
// the process too, so that other submitted operations to threadpool are stopped. // the process too, so that other submitted operations to threadpool are stopped.
@ -734,9 +734,9 @@ public class AutodetectProcessManager implements ClusterStateListener {
} }
ExecutorService createAutodetectExecutorService(ExecutorService executorService) { ExecutorService createAutodetectExecutorService(ExecutorService executorService) {
AutodetectWorkerExecutorService autoDetectWorkerExecutor = new AutodetectWorkerExecutorService(threadPool.getThreadContext()); AutodetectWorkerExecutorService autodetectWorkerExecutor = new AutodetectWorkerExecutorService(threadPool.getThreadContext());
executorService.submit(autoDetectWorkerExecutor::start); executorService.submit(autodetectWorkerExecutor::start);
return autoDetectWorkerExecutor; return autodetectWorkerExecutor;
} }
public ByteSizeValue getMinLocalStorageAvailable() { public ByteSizeValue getMinLocalStorageAvailable() {

View File

@ -68,9 +68,9 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
* interim results and the old interim results have to be cleared out * interim results and the old interim results have to be cleared out
* before the new ones are written. * before the new ones are written.
*/ */
public class AutoDetectResultProcessor { public class AutodetectResultProcessor {
private static final Logger LOGGER = LogManager.getLogger(AutoDetectResultProcessor.class); private static final Logger LOGGER = LogManager.getLogger(AutodetectResultProcessor.class);
private final Client client; private final Client client;
private final Auditor auditor; private final Auditor auditor;
@ -100,14 +100,14 @@ public class AutoDetectResultProcessor {
*/ */
private TimingStats persistedTimingStats; // only used from the process() thread, so doesn't need to be volatile private TimingStats persistedTimingStats; // only used from the process() thread, so doesn't need to be volatile
public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, public AutodetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
JobResultsPersister persister, JobResultsPersister persister,
ModelSizeStats latestModelSizeStats, ModelSizeStats latestModelSizeStats,
TimingStats timingStats) { TimingStats timingStats) {
this(client, auditor, jobId, renormalizer, persister, latestModelSizeStats, timingStats, new FlushListener()); this(client, auditor, jobId, renormalizer, persister, latestModelSizeStats, timingStats, new FlushListener());
} }
AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, AutodetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
JobResultsPersister persister, ModelSizeStats latestModelSizeStats, TimingStats timingStats, JobResultsPersister persister, ModelSizeStats latestModelSizeStats, TimingStats timingStats,
FlushListener flushListener) { FlushListener flushListener) {
this.client = Objects.requireNonNull(client); this.client = Objects.requireNonNull(client);

View File

@ -39,7 +39,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.job.results.BucketTests; import org.elasticsearch.xpack.ml.job.results.BucketTests;
@ -75,7 +75,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
private JobResultsProvider jobResultsProvider; private JobResultsProvider jobResultsProvider;
private List<ModelSnapshot> capturedUpdateModelSnapshotOnJobRequests; private List<ModelSnapshot> capturedUpdateModelSnapshotOnJobRequests;
private AutoDetectResultProcessor resultProcessor; private AutodetectResultProcessor resultProcessor;
private Renormalizer renormalizer; private Renormalizer renormalizer;
@Override @Override
@ -91,7 +91,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
jobResultsProvider = new JobResultsProvider(client(), builder.build()); jobResultsProvider = new JobResultsProvider(client(), builder.build());
renormalizer = mock(Renormalizer.class); renormalizer = mock(Renormalizer.class);
capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>(); capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>();
resultProcessor = new AutoDetectResultProcessor(client(), auditor, JOB_ID, renormalizer, resultProcessor = new AutodetectResultProcessor(client(), auditor, JOB_ID, renormalizer,
new JobResultsPersister(client()), new ModelSizeStats.Builder(JOB_ID).build(), new TimingStats(JOB_ID)) { new JobResultsPersister(client()), new ModelSizeStats.Builder(JOB_ID).build(), new TimingStats(JOB_ID)) {
@Override @Override
protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) {

View File

@ -27,7 +27,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknow
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
@ -79,7 +79,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
public void testWriteResetBucketsControlMessage() throws IOException { public void testWriteResetBucketsControlMessage() throws IOException {
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("2").build(), Optional.empty()); DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("2").build(), Optional.empty());
AutodetectProcess process = mockAutodetectProcessWithOutputStream(); AutodetectProcess process = mockAutodetectProcessWithOutputStream();
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) { try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutodetectResultProcessor.class))) {
communicator.writeToJob(new ByteArrayInputStream(new byte[0]), analysisRegistry, communicator.writeToJob(new ByteArrayInputStream(new byte[0]), analysisRegistry,
randomFrom(XContentType.values()), params, (dataCounts, e) -> {}); randomFrom(XContentType.values()), params, (dataCounts, e) -> {});
verify(process).writeResetBucketsControlMessage(params); verify(process).writeResetBucketsControlMessage(params);
@ -89,7 +89,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
public void testWriteUpdateProcessMessage() throws IOException { public void testWriteUpdateProcessMessage() throws IOException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream(); AutodetectProcess process = mockAutodetectProcessWithOutputStream();
when(process.isReady()).thenReturn(true); when(process.isReady()).thenReturn(true);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class)); AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutodetectResultProcessor.class));
DetectionRule updatedRule = new DetectionRule.Builder(RuleScope.builder().exclude("foo", "bar")).build(); DetectionRule updatedRule = new DetectionRule.Builder(RuleScope.builder().exclude("foo", "bar")).build();
List<JobUpdate.DetectorUpdate> detectorUpdates = Collections.singletonList( List<JobUpdate.DetectorUpdate> detectorUpdates = Collections.singletonList(
@ -111,7 +111,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
public void testFlushJob() throws IOException, InterruptedException { public void testFlushJob() throws IOException, InterruptedException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream(); AutodetectProcess process = mockAutodetectProcessWithOutputStream();
when(process.isProcessAlive()).thenReturn(true); when(process.isProcessAlive()).thenReturn(true);
AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class); AutodetectResultProcessor processor = mock(AutodetectResultProcessor.class);
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(flushAcknowledgement); when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(flushAcknowledgement);
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor)) { try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor)) {
@ -126,7 +126,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
public void testWaitForFlushReturnsIfParserFails() throws IOException, InterruptedException { public void testWaitForFlushReturnsIfParserFails() throws IOException, InterruptedException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream(); AutodetectProcess process = mockAutodetectProcessWithOutputStream();
when(process.isProcessAlive()).thenReturn(true); when(process.isProcessAlive()).thenReturn(true);
AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class); AutodetectResultProcessor processor = mock(AutodetectResultProcessor.class);
when(processor.isFailed()).thenReturn(true); when(processor.isFailed()).thenReturn(true);
when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(null); when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(null);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor); AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor);
@ -137,7 +137,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
AutodetectProcess process = mockAutodetectProcessWithOutputStream(); AutodetectProcess process = mockAutodetectProcessWithOutputStream();
when(process.isProcessAlive()).thenReturn(false); when(process.isProcessAlive()).thenReturn(false);
when(process.readError()).thenReturn("Mock process is dead"); when(process.readError()).thenReturn("Mock process is dead");
AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class)); AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutodetectResultProcessor.class));
FlushJobParams params = FlushJobParams.builder().build(); FlushJobParams params = FlushJobParams.builder().build();
Exception[] holder = new ElasticsearchException[1]; Exception[] holder = new ElasticsearchException[1];
communicator.flushJob(params, (aVoid, e1) -> holder[0] = e1); communicator.flushJob(params, (aVoid, e1) -> holder[0] = e1);
@ -147,17 +147,17 @@ public class AutodetectCommunicatorTests extends ESTestCase {
public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException, InterruptedException { public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException, InterruptedException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream(); AutodetectProcess process = mockAutodetectProcessWithOutputStream();
when(process.isProcessAlive()).thenReturn(true); when(process.isProcessAlive()).thenReturn(true);
AutoDetectResultProcessor autoDetectResultProcessor = Mockito.mock(AutoDetectResultProcessor.class); AutodetectResultProcessor autodetectResultProcessor = Mockito.mock(AutodetectResultProcessor.class);
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
when(autoDetectResultProcessor.waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1)))) when(autodetectResultProcessor.waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1))))
.thenReturn(null).thenReturn(flushAcknowledgement); .thenReturn(null).thenReturn(flushAcknowledgement);
FlushJobParams params = FlushJobParams.builder().build(); FlushJobParams params = FlushJobParams.builder().build();
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, autoDetectResultProcessor)) { try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, autodetectResultProcessor)) {
communicator.flushJob(params, (aVoid, e) -> {}); communicator.flushJob(params, (aVoid, e) -> {});
} }
verify(autoDetectResultProcessor, times(2)).waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1))); verify(autodetectResultProcessor, times(2)).waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1)));
// First in checkAndRun, second due to check between calls to waitForFlushAcknowledgement and third due to close() // First in checkAndRun, second due to check between calls to waitForFlushAcknowledgement and third due to close()
verify(process, times(3)).isProcessAlive(); verify(process, times(3)).isProcessAlive();
} }
@ -165,7 +165,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
public void testCloseGivenProcessIsReady() throws IOException { public void testCloseGivenProcessIsReady() throws IOException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream(); AutodetectProcess process = mockAutodetectProcessWithOutputStream();
when(process.isReady()).thenReturn(true); when(process.isReady()).thenReturn(true);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class)); AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutodetectResultProcessor.class));
communicator.close(); communicator.close();
@ -177,7 +177,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
public void testCloseGivenProcessIsNotReady() throws IOException { public void testCloseGivenProcessIsNotReady() throws IOException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream(); AutodetectProcess process = mockAutodetectProcessWithOutputStream();
when(process.isReady()).thenReturn(false); when(process.isReady()).thenReturn(false);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class)); AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutodetectResultProcessor.class));
communicator.close(); communicator.close();
@ -188,7 +188,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
public void testKill() throws IOException, TimeoutException { public void testKill() throws IOException, TimeoutException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream(); AutodetectProcess process = mockAutodetectProcessWithOutputStream();
AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class); AutodetectResultProcessor resultProcessor = mock(AutodetectResultProcessor.class);
ExecutorService executorService = mock(ExecutorService.class); ExecutorService executorService = mock(ExecutorService.class);
AtomicBoolean finishCalled = new AtomicBoolean(false); AtomicBoolean finishCalled = new AtomicBoolean(false);
@ -232,7 +232,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private AutodetectCommunicator createAutodetectCommunicator(ExecutorService executorService, AutodetectProcess autodetectProcess, private AutodetectCommunicator createAutodetectCommunicator(ExecutorService executorService, AutodetectProcess autodetectProcess,
AutoDetectResultProcessor autoDetectResultProcessor, AutodetectResultProcessor autodetectResultProcessor,
BiConsumer<Exception, Boolean> finishHandler) throws IOException { BiConsumer<Exception, Boolean> finishHandler) throws IOException {
DataCountsReporter dataCountsReporter = mock(DataCountsReporter.class); DataCountsReporter dataCountsReporter = mock(DataCountsReporter.class);
doAnswer(invocation -> { doAnswer(invocation -> {
@ -240,13 +240,13 @@ public class AutodetectCommunicatorTests extends ESTestCase {
return null; return null;
}).when(dataCountsReporter).finishReporting(any()); }).when(dataCountsReporter).finishReporting(any());
return new AutodetectCommunicator(createJobDetails(), environment, autodetectProcess, return new AutodetectCommunicator(createJobDetails(), environment, autodetectProcess,
stateStreamer, dataCountsReporter, autoDetectResultProcessor, finishHandler, stateStreamer, dataCountsReporter, autodetectResultProcessor, finishHandler,
new NamedXContentRegistry(Collections.emptyList()), executorService); new NamedXContentRegistry(Collections.emptyList()), executorService);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private AutodetectCommunicator createAutodetectCommunicator(AutodetectProcess autodetectProcess, private AutodetectCommunicator createAutodetectCommunicator(AutodetectProcess autodetectProcess,
AutoDetectResultProcessor autoDetectResultProcessor) throws IOException { AutodetectResultProcessor autodetectResultProcessor) throws IOException {
ExecutorService executorService = mock(ExecutorService.class); ExecutorService executorService = mock(ExecutorService.class);
when(executorService.submit(any(Callable.class))).thenReturn(mock(Future.class)); when(executorService.submit(any(Callable.class))).thenReturn(mock(Future.class));
doAnswer(invocationOnMock -> { doAnswer(invocationOnMock -> {
@ -259,7 +259,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
return null; return null;
}).when(executorService).execute(any(Runnable.class)); }).when(executorService).execute(any(Runnable.class));
return createAutodetectCommunicator(executorService, autodetectProcess, autoDetectResultProcessor, (e, b) -> {}); return createAutodetectCommunicator(executorService, autodetectProcess, autodetectResultProcessor, (e, b) -> {});
} }
} }

View File

@ -67,7 +67,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class AutoDetectResultProcessorTests extends ESTestCase { public class AutodetectResultProcessorTests extends ESTestCase {
private static final String JOB_ID = "valid_id"; private static final String JOB_ID = "valid_id";
private static final long BUCKET_SPAN_MS = 1000; private static final long BUCKET_SPAN_MS = 1000;
@ -78,7 +78,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
private Renormalizer renormalizer; private Renormalizer renormalizer;
private JobResultsPersister persister; private JobResultsPersister persister;
private FlushListener flushListener; private FlushListener flushListener;
private AutoDetectResultProcessor processorUnderTest; private AutodetectResultProcessor processorUnderTest;
private ScheduledThreadPoolExecutor executor; private ScheduledThreadPoolExecutor executor;
@Before @Before
@ -94,7 +94,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
when(persister.persistModelSnapshot(any(), any())) when(persister.persistModelSnapshot(any(), any()))
.thenReturn(new IndexResponse(new ShardId("ml", "uid", 0), "doc", "1", 0L, 0L, 0L, true)); .thenReturn(new IndexResponse(new ShardId("ml", "uid", 0), "doc", "1", 0L, 0L, 0L, true));
flushListener = mock(FlushListener.class); flushListener = mock(FlushListener.class);
processorUnderTest = new AutoDetectResultProcessor( processorUnderTest = new AutodetectResultProcessor(
client, client,
auditor, auditor,
JOB_ID, JOB_ID,
@ -132,7 +132,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder); when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false; context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class); AutodetectResult result = mock(AutodetectResult.class);
Bucket bucket = mock(Bucket.class); Bucket bucket = mock(Bucket.class);
@ -152,7 +152,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder); when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = true; context.deleteInterimRequired = true;
AutodetectResult result = mock(AutodetectResult.class); AutodetectResult result = mock(AutodetectResult.class);
Bucket bucket = mock(Bucket.class); Bucket bucket = mock(Bucket.class);
@ -171,7 +171,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", bulkBuilder); AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context("foo", bulkBuilder);
context.deleteInterimRequired = false; context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class); AutodetectResult result = mock(AutodetectResult.class);
AnomalyRecord record1 = new AnomalyRecord("foo", new Date(123), 123); AnomalyRecord record1 = new AnomalyRecord("foo", new Date(123), 123);
@ -190,7 +190,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false; context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class); AutodetectResult result = mock(AutodetectResult.class);
Influencer influencer1 = new Influencer(JOB_ID, "infField", "infValue", new Date(123), 123); Influencer influencer1 = new Influencer(JOB_ID, "infField", "infValue", new Date(123), 123);
@ -208,7 +208,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false; context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class); AutodetectResult result = mock(AutodetectResult.class);
CategoryDefinition categoryDefinition = mock(CategoryDefinition.class); CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
@ -224,7 +224,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false; context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class); AutodetectResult result = mock(AutodetectResult.class);
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
@ -242,7 +242,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_flushAcknowledgementMustBeProcessedLast() { public void testProcessResult_flushAcknowledgementMustBeProcessedLast() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false; context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class); AutodetectResult result = mock(AutodetectResult.class);
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
@ -265,7 +265,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_modelPlot() { public void testProcessResult_modelPlot() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false; context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class); AutodetectResult result = mock(AutodetectResult.class);
ModelPlot modelPlot = mock(ModelPlot.class); ModelPlot modelPlot = mock(ModelPlot.class);
@ -279,7 +279,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_modelSizeStats() { public void testProcessResult_modelSizeStats() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false; context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class); AutodetectResult result = mock(AutodetectResult.class);
ModelSizeStats modelSizeStats = mock(ModelSizeStats.class); ModelSizeStats modelSizeStats = mock(ModelSizeStats.class);
@ -296,7 +296,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
setupScheduleDelayTime(TimeValue.timeValueSeconds(5)); setupScheduleDelayTime(TimeValue.timeValueSeconds(5));
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false; context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class); AutodetectResult result = mock(AutodetectResult.class);
@ -333,7 +333,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_modelSnapshot() { public void testProcessResult_modelSnapshot() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false; context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class); AutodetectResult result = mock(AutodetectResult.class);
ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID) ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID)
@ -355,7 +355,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_quantiles_givenRenormalizationIsEnabled() { public void testProcessResult_quantiles_givenRenormalizationIsEnabled() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false; context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class); AutodetectResult result = mock(AutodetectResult.class);
Quantiles quantiles = mock(Quantiles.class); Quantiles quantiles = mock(Quantiles.class);
@ -375,7 +375,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_quantiles_givenRenormalizationIsDisabled() { public void testProcessResult_quantiles_givenRenormalizationIsDisabled() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false; context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class); AutodetectResult result = mock(AutodetectResult.class);
Quantiles quantiles = mock(Quantiles.class); Quantiles quantiles = mock(Quantiles.class);