From a7d95951a6c813c2dbf5c27194def59d33155765 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 3 Feb 2017 16:23:39 +0100 Subject: [PATCH] Removed forgotten blocking call when opening a job. Original commit: elastic/x-pack-elasticsearch@e1dfa542401044dfe879228c135844cc38d0e7df --- .../autodetect/AutodetectProcessManager.java | 56 +++++++------------ .../AutodetectProcessManagerTests.java | 18 ++++-- 2 files changed, 33 insertions(+), 41 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 84595d02253..632aa0daaa3 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -51,10 +51,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; public class AutodetectProcessManager extends AbstractComponent { @@ -183,9 +181,10 @@ public class AutodetectProcessManager extends AbstractComponent { } public void openJob(String jobId, boolean ignoreDowntime, Consumer handler) { - gatherRequiredInformation(jobId, (modelSnapshot, quantiles, filters) -> { + gatherRequiredInformation(jobId, (dataCounts, modelSnapshot, quantiles, filters) -> { autoDetectCommunicatorByJob.computeIfAbsent(jobId, id -> { - AutodetectCommunicator communicator = create(id, modelSnapshot, quantiles, filters, ignoreDowntime, handler); + AutodetectCommunicator communicator = + create(id, dataCounts, modelSnapshot, quantiles, filters, ignoreDowntime, handler); try { communicator.writeJobInputHeader(); } catch (IOException ioe) { @@ -199,25 +198,30 @@ public class AutodetectProcessManager extends AbstractComponent { }, handler); } - void gatherRequiredInformation(String jobId, TriConsumer handler, Consumer errorHandler) { + // TODO: add a method on JobProvider that fetches all required info via 1 msearch call, so that we have a single lambda + // instead of 4 nested lambdas. + void gatherRequiredInformation(String jobId, MultiConsumer handler, Consumer errorHandler) { Job job = jobManager.getJobOrThrowIfUnknown(jobId); - jobProvider.modelSnapshots(jobId, 0, 1, page -> { - ModelSnapshot modelSnapshot = page.results().isEmpty() ? null : page.results().get(0); - jobProvider.getQuantiles(jobId, quantiles -> { - Set ids = job.getAnalysisConfig().extractReferencedFilters(); - jobProvider.getFilters(filterDocument -> handler.accept(modelSnapshot, quantiles, filterDocument), errorHandler, ids); + jobProvider.dataCounts(jobId, dataCounts -> { + jobProvider.modelSnapshots(jobId, 0, 1, page -> { + ModelSnapshot modelSnapshot = page.results().isEmpty() ? null : page.results().get(0); + jobProvider.getQuantiles(jobId, quantiles -> { + Set ids = job.getAnalysisConfig().extractReferencedFilters(); + jobProvider.getFilters(filterDocument -> handler.accept(dataCounts, modelSnapshot, quantiles, filterDocument), + errorHandler, ids); + }, errorHandler); }, errorHandler); }, errorHandler); } - interface TriConsumer { + interface MultiConsumer { - void accept(ModelSnapshot modelSnapshot, Quantiles quantiles, Set filters); + void accept(DataCounts dataCounts, ModelSnapshot modelSnapshot, Quantiles quantiles, Set filters); } - AutodetectCommunicator create(String jobId, ModelSnapshot modelSnapshot, Quantiles quantiles, Set filters, - boolean ignoreDowntime, Consumer handler) { + AutodetectCommunicator create(String jobId, DataCounts dataCounts, ModelSnapshot modelSnapshot, Quantiles quantiles, + Set filters, boolean ignoreDowntime, Consumer handler) { if (autoDetectCommunicatorByJob.size() == maxAllowedRunningJobs) { throw new ElasticsearchStatusException("max running job capacity [" + maxAllowedRunningJobs + "] reached", RestStatus.CONFLICT); @@ -229,7 +233,7 @@ public class AutodetectProcessManager extends AbstractComponent { // A TP with no queue, so that we fail immediately if there are no threads available ExecutorService executorService = threadPool.executor(MlPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME); - try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job.getId(), fetchDataCounts(jobId), + try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job.getId(), dataCounts, jobDataCountsPersister)) { ScoresUpdater scoresUpdator = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client), normalizerFactory); @@ -253,28 +257,6 @@ public class AutodetectProcessManager extends AbstractComponent { } } - private DataCounts fetchDataCounts(String jobId) { - CountDownLatch latch = new CountDownLatch(1); - AtomicReference holder = new AtomicReference<>(); - AtomicReference errorHolder = new AtomicReference<>(); - jobProvider.dataCounts(jobId, dataCounts -> { - holder.set(dataCounts); - latch.countDown(); - }, e -> { - errorHolder.set(e); - latch.countDown(); - }); - try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - if (errorHolder.get() != null) { - throw org.elasticsearch.ExceptionsHelper.convertToElastic(errorHolder.get()); - } - return holder.get(); - } - /** * Stop the running job and mark it as finished.
* @param jobId The job to stop diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index f337d797b2e..076817b6c88 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -80,6 +80,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { private JobDataCountsPersister jobDataCountsPersister; private NormalizerFactory normalizerFactory; + private DataCounts dataCounts = new DataCounts("foo"); private ModelSnapshot modelSnapshot = new ModelSnapshot("foo"); private Quantiles quantiles = new Quantiles("foo", new Date(), "state"); private Set filters = new HashSet<>(); @@ -94,6 +95,12 @@ public class AutodetectProcessManagerTests extends ESTestCase { givenAllocationWithState(JobState.OPENED); when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo")); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + Consumer handler = (Consumer) invocationOnMock.getArguments()[1]; + handler.accept(dataCounts); + return null; + }).when(jobProvider).dataCounts(any(), any(), any()); doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") Consumer> handler = (Consumer>) invocationOnMock.getArguments()[3]; @@ -168,12 +175,13 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory)); + DataCounts dataCounts = new DataCounts("foo"); ModelSnapshot modelSnapshot = new ModelSnapshot("foo"); Quantiles quantiles = new Quantiles("foo", new Date(), "state"); Set filters = new HashSet<>(); doAnswer(invocationOnMock -> { - AutodetectProcessManager.TriConsumer consumer = (AutodetectProcessManager.TriConsumer) invocationOnMock.getArguments()[1]; - consumer.accept(modelSnapshot, quantiles, filters); + AutodetectProcessManager.MultiConsumer consumer = (AutodetectProcessManager.MultiConsumer) invocationOnMock.getArguments()[1]; + consumer.accept(dataCounts, modelSnapshot, quantiles, filters); return null; }).when(manager).gatherRequiredInformation(any(), any(), any()); @@ -319,7 +327,8 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory); - expectThrows(EsRejectedExecutionException.class, () -> manager.create("my_id", modelSnapshot, quantiles, filters, false, e -> {})); + expectThrows(EsRejectedExecutionException.class, + () -> manager.create("my_id", dataCounts, modelSnapshot, quantiles, filters, false, e -> {})); verify(autodetectProcess, times(1)).close(); } @@ -341,7 +350,8 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory); manager = spy(manager); - doReturn(communicator).when(manager).create(any(), eq(modelSnapshot), eq(quantiles), eq(filters), anyBoolean(), any()); + doReturn(communicator).when(manager) + .create(any(), eq(dataCounts), eq(modelSnapshot), eq(quantiles), eq(filters), anyBoolean(), any()); return manager; }