diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index bc78fd6eba7..8d49a0bfdbc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -33,7 +33,7 @@ import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.CountingInputStream; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; -import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; +import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; @@ -66,7 +66,7 @@ public class AutodetectCommunicator implements Closeable { private final AutodetectProcess autodetectProcess; private final StateStreamer stateStreamer; private final DataCountsReporter dataCountsReporter; - private final AutoDetectResultProcessor autoDetectResultProcessor; + private final AutodetectResultProcessor autodetectResultProcessor; private final BiConsumer onFinishHandler; private final ExecutorService autodetectWorkerExecutor; private final NamedXContentRegistry xContentRegistry; @@ -75,7 +75,7 @@ public class AutodetectCommunicator implements Closeable { private volatile boolean processKilled; AutodetectCommunicator(Job job, Environment environment, AutodetectProcess process, StateStreamer stateStreamer, - DataCountsReporter dataCountsReporter, AutoDetectResultProcessor autoDetectResultProcessor, + DataCountsReporter dataCountsReporter, AutodetectResultProcessor autodetectResultProcessor, BiConsumer onFinishHandler, NamedXContentRegistry xContentRegistry, ExecutorService autodetectWorkerExecutor) { this.job = job; @@ -83,7 +83,7 @@ public class AutodetectCommunicator implements Closeable { this.autodetectProcess = process; this.stateStreamer = stateStreamer; this.dataCountsReporter = dataCountsReporter; - this.autoDetectResultProcessor = autoDetectResultProcessor; + this.autodetectResultProcessor = autodetectResultProcessor; this.onFinishHandler = onFinishHandler; this.xContentRegistry = xContentRegistry; this.autodetectWorkerExecutor = autodetectWorkerExecutor; @@ -120,7 +120,7 @@ public class AutodetectCommunicator implements Closeable { } CountingInputStream countingStream = new CountingInputStream(inputStream, dataCountsReporter); - DataToProcessWriter autoDetectWriter = createProcessWriter(params.getDataDescription()); + DataToProcessWriter autodetectWriter = createProcessWriter(params.getDataDescription()); if (includeTokensField && categorizationAnalyzer == null) { createCategorizationAnalyzer(analysisRegistry); @@ -129,14 +129,14 @@ public class AutodetectCommunicator implements Closeable { CountDownLatch latch = new CountDownLatch(1); AtomicReference dataCountsAtomicReference = new AtomicReference<>(); AtomicReference exceptionAtomicReference = new AtomicReference<>(); - autoDetectWriter.write(countingStream, categorizationAnalyzer, xContentType, (dataCounts, e) -> { + autodetectWriter.write(countingStream, categorizationAnalyzer, xContentType, (dataCounts, e) -> { dataCountsAtomicReference.set(dataCounts); exceptionAtomicReference.set(e); latch.countDown(); }); latch.await(); - autoDetectWriter.flushStream(); + autodetectWriter.flushStream(); if (exceptionAtomicReference.get() != null) { throw exceptionAtomicReference.get(); @@ -168,7 +168,7 @@ public class AutodetectCommunicator implements Closeable { killProcess(false, false); stateStreamer.cancel(); } - autoDetectResultProcessor.awaitCompletion(); + autodetectResultProcessor.awaitCompletion(); } finally { onFinishHandler.accept(restart ? new ElasticsearchException(reason) : null, true); } @@ -199,13 +199,13 @@ public class AutodetectCommunicator implements Closeable { public void killProcess(boolean awaitCompletion, boolean finish, boolean finalizeJob) throws IOException { try { processKilled = true; - autoDetectResultProcessor.setProcessKilled(); + autodetectResultProcessor.setProcessKilled(); autodetectWorkerExecutor.shutdown(); autodetectProcess.kill(); if (awaitCompletion) { try { - autoDetectResultProcessor.awaitCompletion(); + autodetectResultProcessor.awaitCompletion(); } catch (TimeoutException e) { LOGGER.warn(new ParameterizedMessage("[{}] Timed out waiting for killed job", job.getId()), e); } @@ -289,20 +289,20 @@ public class AutodetectCommunicator implements Closeable { FlushAcknowledgement flushAcknowledgement; try { - flushAcknowledgement = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY); + flushAcknowledgement = autodetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY); while (flushAcknowledgement == null) { checkProcessIsAlive(); checkResultsProcessorIsAlive(); - flushAcknowledgement = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY); + flushAcknowledgement = autodetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY); } } finally { - autoDetectResultProcessor.clearAwaitingFlush(flushId); + autodetectResultProcessor.clearAwaitingFlush(flushId); } if (processKilled == false) { // We also have to wait for the normalizer to become idle so that we block // clients from querying results in the middle of normalization. - autoDetectResultProcessor.waitUntilRenormalizerIsIdle(); + autodetectResultProcessor.waitUntilRenormalizerIsIdle(); LOGGER.debug("[{}] Flush completed", job.getId()); } @@ -321,7 +321,7 @@ public class AutodetectCommunicator implements Closeable { } private void checkResultsProcessorIsAlive() { - if (autoDetectResultProcessor.isFailed()) { + if (autodetectResultProcessor.isFailed()) { // Don't log here - it just causes double logging when the exception gets logged throw new ElasticsearchException("[{}] Unexpected death of the result processor", job.getId()); } @@ -332,11 +332,11 @@ public class AutodetectCommunicator implements Closeable { } public ModelSizeStats getModelSizeStats() { - return autoDetectResultProcessor.modelSizeStats(); + return autodetectResultProcessor.modelSizeStats(); } public TimingStats getTimingStats() { - return autoDetectResultProcessor.timingStats(); + return autodetectResultProcessor.timingStats(); } public DataCounts getDataCounts() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 513661a9794..9bca7dc512e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -57,7 +57,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; -import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; +import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; @@ -500,7 +500,7 @@ public class AutodetectProcessManager implements ClusterStateListener { } // A TP with no queue, so that we fail immediately if there are no threads available - ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME); + ExecutorService autodetectExecutorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME); DataCountsReporter dataCountsReporter = new DataCountsReporter(job, autodetectParams.dataCounts(), jobDataCountsPersister); ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobResultsProvider, new JobRenormalizedResultsPersister(job.getId(), client), normalizerFactory); @@ -508,10 +508,10 @@ public class AutodetectProcessManager implements ClusterStateListener { Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater, renormalizerExecutorService); - AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autoDetectExecutorService, + AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autodetectExecutorService, onProcessCrash(jobTask)); - AutoDetectResultProcessor processor = - new AutoDetectResultProcessor( + AutodetectResultProcessor processor = + new AutodetectResultProcessor( client, auditor, jobId, @@ -521,8 +521,8 @@ public class AutodetectProcessManager implements ClusterStateListener { autodetectParams.timingStats()); ExecutorService autodetectWorkerExecutor; try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { - autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService); - autoDetectExecutorService.submit(() -> processor.process(process)); + autodetectWorkerExecutor = createAutodetectExecutorService(autodetectExecutorService); + autodetectExecutorService.submit(() -> processor.process(process)); } catch (EsRejectedExecutionException e) { // If submitting the operation to read the results from the process fails we need to close // the process too, so that other submitted operations to threadpool are stopped. @@ -734,9 +734,9 @@ public class AutodetectProcessManager implements ClusterStateListener { } ExecutorService createAutodetectExecutorService(ExecutorService executorService) { - AutodetectWorkerExecutorService autoDetectWorkerExecutor = new AutodetectWorkerExecutorService(threadPool.getThreadContext()); - executorService.submit(autoDetectWorkerExecutor::start); - return autoDetectWorkerExecutor; + AutodetectWorkerExecutorService autodetectWorkerExecutor = new AutodetectWorkerExecutorService(threadPool.getThreadContext()); + executorService.submit(autodetectWorkerExecutor::start); + return autodetectWorkerExecutor; } public ByteSizeValue getMinLocalStorageAvailable() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java similarity index 99% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java index 37e2d626b4c..c02aef907be 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java @@ -68,9 +68,9 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; * interim results and the old interim results have to be cleared out * before the new ones are written. */ -public class AutoDetectResultProcessor { +public class AutodetectResultProcessor { - private static final Logger LOGGER = LogManager.getLogger(AutoDetectResultProcessor.class); + private static final Logger LOGGER = LogManager.getLogger(AutodetectResultProcessor.class); private final Client client; private final Auditor auditor; @@ -100,14 +100,14 @@ public class AutoDetectResultProcessor { */ private TimingStats persistedTimingStats; // only used from the process() thread, so doesn't need to be volatile - public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, + public AutodetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, JobResultsPersister persister, ModelSizeStats latestModelSizeStats, TimingStats timingStats) { this(client, auditor, jobId, renormalizer, persister, latestModelSizeStats, timingStats, new FlushListener()); } - AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, + AutodetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, JobResultsPersister persister, ModelSizeStats latestModelSizeStats, TimingStats timingStats, FlushListener flushListener) { this.client = Objects.requireNonNull(client); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index c5451cd739d..7b5ab84dd18 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -39,7 +39,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; -import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; +import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.job.results.BucketTests; @@ -75,7 +75,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase { private JobResultsProvider jobResultsProvider; private List capturedUpdateModelSnapshotOnJobRequests; - private AutoDetectResultProcessor resultProcessor; + private AutodetectResultProcessor resultProcessor; private Renormalizer renormalizer; @Override @@ -91,7 +91,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase { jobResultsProvider = new JobResultsProvider(client(), builder.build()); renormalizer = mock(Renormalizer.class); capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>(); - resultProcessor = new AutoDetectResultProcessor(client(), auditor, JOB_ID, renormalizer, + resultProcessor = new AutodetectResultProcessor(client(), auditor, JOB_ID, renormalizer, new JobResultsPersister(client()), new ModelSizeStats.Builder(JOB_ID).build(), new TimingStats(JOB_ID)) { @Override protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index 6705773dc73..9f6d5295073 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -27,7 +27,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknow import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; -import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; +import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; @@ -79,7 +79,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { public void testWriteResetBucketsControlMessage() throws IOException { DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("2").build(), Optional.empty()); AutodetectProcess process = mockAutodetectProcessWithOutputStream(); - try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) { + try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutodetectResultProcessor.class))) { communicator.writeToJob(new ByteArrayInputStream(new byte[0]), analysisRegistry, randomFrom(XContentType.values()), params, (dataCounts, e) -> {}); verify(process).writeResetBucketsControlMessage(params); @@ -89,7 +89,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { public void testWriteUpdateProcessMessage() throws IOException { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); when(process.isReady()).thenReturn(true); - AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class)); + AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutodetectResultProcessor.class)); DetectionRule updatedRule = new DetectionRule.Builder(RuleScope.builder().exclude("foo", "bar")).build(); List detectorUpdates = Collections.singletonList( @@ -111,7 +111,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { public void testFlushJob() throws IOException, InterruptedException { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); when(process.isProcessAlive()).thenReturn(true); - AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class); + AutodetectResultProcessor processor = mock(AutodetectResultProcessor.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(flushAcknowledgement); try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor)) { @@ -126,7 +126,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { public void testWaitForFlushReturnsIfParserFails() throws IOException, InterruptedException { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); when(process.isProcessAlive()).thenReturn(true); - AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class); + AutodetectResultProcessor processor = mock(AutodetectResultProcessor.class); when(processor.isFailed()).thenReturn(true); when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(null); AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor); @@ -137,7 +137,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); when(process.isProcessAlive()).thenReturn(false); when(process.readError()).thenReturn("Mock process is dead"); - AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class)); + AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutodetectResultProcessor.class)); FlushJobParams params = FlushJobParams.builder().build(); Exception[] holder = new ElasticsearchException[1]; communicator.flushJob(params, (aVoid, e1) -> holder[0] = e1); @@ -147,17 +147,17 @@ public class AutodetectCommunicatorTests extends ESTestCase { public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException, InterruptedException { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); when(process.isProcessAlive()).thenReturn(true); - AutoDetectResultProcessor autoDetectResultProcessor = Mockito.mock(AutoDetectResultProcessor.class); + AutodetectResultProcessor autodetectResultProcessor = Mockito.mock(AutodetectResultProcessor.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); - when(autoDetectResultProcessor.waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1)))) + when(autodetectResultProcessor.waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1)))) .thenReturn(null).thenReturn(flushAcknowledgement); FlushJobParams params = FlushJobParams.builder().build(); - try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, autoDetectResultProcessor)) { + try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, autodetectResultProcessor)) { communicator.flushJob(params, (aVoid, e) -> {}); } - verify(autoDetectResultProcessor, times(2)).waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1))); + verify(autodetectResultProcessor, times(2)).waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1))); // First in checkAndRun, second due to check between calls to waitForFlushAcknowledgement and third due to close() verify(process, times(3)).isProcessAlive(); } @@ -165,7 +165,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { public void testCloseGivenProcessIsReady() throws IOException { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); when(process.isReady()).thenReturn(true); - AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class)); + AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutodetectResultProcessor.class)); communicator.close(); @@ -177,7 +177,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { public void testCloseGivenProcessIsNotReady() throws IOException { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); when(process.isReady()).thenReturn(false); - AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class)); + AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutodetectResultProcessor.class)); communicator.close(); @@ -188,7 +188,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { public void testKill() throws IOException, TimeoutException { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); - AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class); + AutodetectResultProcessor resultProcessor = mock(AutodetectResultProcessor.class); ExecutorService executorService = mock(ExecutorService.class); AtomicBoolean finishCalled = new AtomicBoolean(false); @@ -232,7 +232,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { @SuppressWarnings("unchecked") private AutodetectCommunicator createAutodetectCommunicator(ExecutorService executorService, AutodetectProcess autodetectProcess, - AutoDetectResultProcessor autoDetectResultProcessor, + AutodetectResultProcessor autodetectResultProcessor, BiConsumer finishHandler) throws IOException { DataCountsReporter dataCountsReporter = mock(DataCountsReporter.class); doAnswer(invocation -> { @@ -240,13 +240,13 @@ public class AutodetectCommunicatorTests extends ESTestCase { return null; }).when(dataCountsReporter).finishReporting(any()); return new AutodetectCommunicator(createJobDetails(), environment, autodetectProcess, - stateStreamer, dataCountsReporter, autoDetectResultProcessor, finishHandler, + stateStreamer, dataCountsReporter, autodetectResultProcessor, finishHandler, new NamedXContentRegistry(Collections.emptyList()), executorService); } @SuppressWarnings("unchecked") private AutodetectCommunicator createAutodetectCommunicator(AutodetectProcess autodetectProcess, - AutoDetectResultProcessor autoDetectResultProcessor) throws IOException { + AutodetectResultProcessor autodetectResultProcessor) throws IOException { ExecutorService executorService = mock(ExecutorService.class); when(executorService.submit(any(Callable.class))).thenReturn(mock(Future.class)); doAnswer(invocationOnMock -> { @@ -259,7 +259,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { return null; }).when(executorService).execute(any(Runnable.class)); - return createAutodetectCommunicator(executorService, autodetectProcess, autoDetectResultProcessor, (e, b) -> {}); + return createAutodetectCommunicator(executorService, autodetectProcess, autodetectResultProcessor, (e, b) -> {}); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java similarity index 95% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java index bb151ecefb3..c8c730be708 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java @@ -67,7 +67,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -public class AutoDetectResultProcessorTests extends ESTestCase { +public class AutodetectResultProcessorTests extends ESTestCase { private static final String JOB_ID = "valid_id"; private static final long BUCKET_SPAN_MS = 1000; @@ -78,7 +78,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { private Renormalizer renormalizer; private JobResultsPersister persister; private FlushListener flushListener; - private AutoDetectResultProcessor processorUnderTest; + private AutodetectResultProcessor processorUnderTest; private ScheduledThreadPoolExecutor executor; @Before @@ -94,7 +94,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(persister.persistModelSnapshot(any(), any())) .thenReturn(new IndexResponse(new ShardId("ml", "uid", 0), "doc", "1", 0L, 0L, 0L, true)); flushListener = mock(FlushListener.class); - processorUnderTest = new AutoDetectResultProcessor( + processorUnderTest = new AutodetectResultProcessor( client, auditor, JOB_ID, @@ -132,7 +132,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder); when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); + AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); Bucket bucket = mock(Bucket.class); @@ -152,7 +152,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder); when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); + AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = true; AutodetectResult result = mock(AutodetectResult.class); Bucket bucket = mock(Bucket.class); @@ -171,7 +171,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", bulkBuilder); + AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context("foo", bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); AnomalyRecord record1 = new AnomalyRecord("foo", new Date(123), 123); @@ -190,7 +190,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); + AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); Influencer influencer1 = new Influencer(JOB_ID, "infField", "infValue", new Date(123), 123); @@ -208,7 +208,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); + AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); CategoryDefinition categoryDefinition = mock(CategoryDefinition.class); @@ -224,7 +224,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); + AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); @@ -242,7 +242,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_flushAcknowledgementMustBeProcessedLast() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); + AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); @@ -265,7 +265,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_modelPlot() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); + AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); ModelPlot modelPlot = mock(ModelPlot.class); @@ -279,7 +279,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_modelSizeStats() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); + AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); ModelSizeStats modelSizeStats = mock(ModelSizeStats.class); @@ -296,7 +296,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { setupScheduleDelayTime(TimeValue.timeValueSeconds(5)); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); + AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); @@ -333,7 +333,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_modelSnapshot() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); + AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID) @@ -355,7 +355,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_quantiles_givenRenormalizationIsEnabled() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); + AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); Quantiles quantiles = mock(Quantiles.class); @@ -375,7 +375,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_quantiles_givenRenormalizationIsDisabled() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); + AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); Quantiles quantiles = mock(Quantiles.class);