diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index b00747f0979..ed857c252aa 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -49,18 +49,15 @@ import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.action.util.QueryPage; +import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.MlFilter; +import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder.BucketsQuery; +import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery; import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; -import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState; -import org.elasticsearch.xpack.ml.notifications.AuditActivity; -import org.elasticsearch.xpack.ml.notifications.AuditMessage; -import org.elasticsearch.xpack.ml.notifications.Auditor; -import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder.BucketsQuery; -import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.Bucket; @@ -69,6 +66,9 @@ import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelDebugOutput; import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.ml.job.results.Result; +import org.elasticsearch.xpack.ml.notifications.AuditActivity; +import org.elasticsearch.xpack.ml.notifications.AuditMessage; +import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.io.IOException; @@ -338,9 +338,9 @@ public class JobProvider { })); } - private void mget(String indexName, String type, String[] ids, Consumer> handler, Consumer errorHandler, + private void mget(String indexName, String type, Set ids, Consumer> handler, Consumer errorHandler, BiFunction objectParser) { - if (ids.length == 0) { + if (ids.isEmpty()) { handler.accept(Collections.emptySet()); return; } @@ -1079,7 +1079,7 @@ public class JobProvider { * * @param ids the id of the requested filter */ - public void getFilters(Consumer> handler, Consumer errorHandler, String... ids) { + public void getFilters(Consumer> handler, Consumer errorHandler, Set ids) { mget(ML_META_INDEX, MlFilter.TYPE.getPreferredName(), ids, handler, errorHandler, MlFilter.PARSER); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 65b7f9eec8b..9fae3b95955 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -34,11 +34,11 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsPa import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.process.normalizer.ScoresUpdater; import org.elasticsearch.xpack.ml.job.process.normalizer.ShortCircuitingRenormalizer; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.io.IOException; @@ -201,9 +201,9 @@ public class AutodetectProcessManager extends AbstractComponent { void gatherRequiredInformation(String jobId, TriConsumer handler, Consumer errorHandler) { Job job = jobManager.getJobOrThrowIfUnknown(jobId); jobProvider.modelSnapshots(jobId, 0, 1, page -> { - ModelSnapshot modelSnapshot = page.results().isEmpty() ? null : page.results().get(1); + ModelSnapshot modelSnapshot = page.results().isEmpty() ? null : page.results().get(0); jobProvider.getQuantiles(jobId, quantiles -> { - String[] ids = job.getAnalysisConfig().extractReferencedFilters().toArray(new String[0]); + Set ids = job.getAnalysisConfig().extractReferencedFilters(); jobProvider.getFilters(filterDocument -> handler.accept(modelSnapshot, quantiles, filterDocument), errorHandler, ids); }, errorHandler); }, errorHandler); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index e223be6f123..d89736fee45 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MlPlugin; import org.elasticsearch.xpack.ml.action.UpdateJobStatusAction; +import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.DataDescription; @@ -31,8 +32,8 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsPa import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; -import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; +import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.junit.Before; import org.mockito.Mockito; @@ -60,6 +61,7 @@ import static org.elasticsearch.mock.orig.Mockito.when; import static org.hamcrest.core.IsEqual.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -78,6 +80,10 @@ public class AutodetectProcessManagerTests extends ESTestCase { private JobDataCountsPersister jobDataCountsPersister; private NormalizerFactory normalizerFactory; + private ModelSnapshot modelSnapshot = new ModelSnapshot("foo"); + private Quantiles quantiles = new Quantiles("foo", new Date(), "state"); + private Set filters = new HashSet<>(); + @Before public void initMocks() { jobManager = mock(JobManager.class); @@ -86,6 +92,26 @@ public class AutodetectProcessManagerTests extends ESTestCase { jobDataCountsPersister = mock(JobDataCountsPersister.class); normalizerFactory = mock(NormalizerFactory.class); givenAllocationWithStatus(JobStatus.OPENED); + + when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo")); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + Consumer> handler = (Consumer>) invocationOnMock.getArguments()[3]; + handler.accept(new QueryPage<>(Collections.singletonList(modelSnapshot), 1, ModelSnapshot.RESULTS_FIELD)); + return null; + }).when(jobProvider).modelSnapshots(any(), anyInt(), anyInt(), any(), any()); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + Consumer handler = (Consumer) invocationOnMock.getArguments()[1]; + handler.accept(quantiles); + return null; + }).when(jobProvider).getQuantiles(any(), any(), any()); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + Consumer> handler = (Consumer>) invocationOnMock.getArguments()[0]; + handler.accept(filters); + return null; + }).when(jobProvider).getFilters(any(), any(), any()); } public void testOpenJob() { @@ -191,7 +217,6 @@ public class AutodetectProcessManagerTests extends ESTestCase { public void testCloseJob() { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); - when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo")); AutodetectProcessManager manager = createManager(communicator); assertEquals(0, manager.numberOfOpenJobs()); @@ -218,7 +243,6 @@ public class AutodetectProcessManagerTests extends ESTestCase { public void testFlush() throws IOException { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); AutodetectProcessManager manager = createManager(communicator); - when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo")); InputStream inputStream = createInputStream(""); manager.openJob("foo", false, e -> {}); @@ -267,7 +291,6 @@ public class AutodetectProcessManagerTests extends ESTestCase { Job job = createJobDetails("foo"); - when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(job); givenAllocationWithStatus(JobStatus.OPENED); InputStream inputStream = createInputStream(""); @@ -295,16 +318,8 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectResultsParser parser = mock(AutodetectResultsParser.class); AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); AutodetectProcessFactory autodetectProcessFactory = (j, modelSnapshot, quantiles, filters, i, e) -> autodetectProcess; - AutodetectProcessManager manager = spy(new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, - jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory)); - ModelSnapshot modelSnapshot = new ModelSnapshot("foo"); - Quantiles quantiles = new Quantiles("foo", new Date(), "state"); - Set filters = new HashSet<>(); - doAnswer(invocationOnMock -> { - AutodetectProcessManager.TriConsumer consumer = (AutodetectProcessManager.TriConsumer) invocationOnMock.getArguments()[1]; - consumer.accept(modelSnapshot, quantiles, filters); - return null; - }).when(manager).gatherRequiredInformation(any(), any(), any()); + AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, + jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory); expectThrows(EsRejectedExecutionException.class, () -> manager.create("my_id", modelSnapshot, quantiles, filters, false, e -> {})); verify(autodetectProcess, times(1)).close(); @@ -328,14 +343,6 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory); manager = spy(manager); - ModelSnapshot modelSnapshot = new ModelSnapshot("foo"); - Quantiles quantiles = new Quantiles("foo", new Date(), "state"); - Set filters = new HashSet<>(); - doAnswer(invocationOnMock -> { - AutodetectProcessManager.TriConsumer consumer = (AutodetectProcessManager.TriConsumer) invocationOnMock.getArguments()[1]; - consumer.accept(modelSnapshot, quantiles, filters); - return null; - }).when(manager).gatherRequiredInformation(any(), any(), any()); doReturn(communicator).when(manager).create(any(), eq(modelSnapshot), eq(quantiles), eq(filters), anyBoolean(), any()); return manager; }