diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/GetInfluencersAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/GetInfluencersAction.java index 8ac18ed107e..07fb0f59162 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/GetInfluencersAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/GetInfluencersAction.java @@ -324,9 +324,7 @@ extends Action page = jobProvider.influencers(request.jobId, query); - listener.onResponse(new Response(page)); + jobProvider.influencers(request.jobId, query, page -> listener.onResponse(new Response(page)), listener::onFailure); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java index 401202357b7..aa54982d8a6 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java @@ -705,67 +705,45 @@ public class JobProvider { * * @param jobId The job ID for which influencers are requested * @param query the query - * @return QueryPage of Influencer */ - public QueryPage influencers(String jobId, InfluencersQuery query) throws ResourceNotFoundException { + public void influencers(String jobId, InfluencersQuery query, Consumer> handler, + Consumer errorHandler) { QueryBuilder fb = new ResultsFilterBuilder() .timeRange(Bucket.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd()) .score(Bucket.ANOMALY_SCORE.getPreferredName(), query.getAnomalyScoreFilter()) .interim(Bucket.IS_INTERIM.getPreferredName(), query.isIncludeInterim()) .build(); - return influencers(jobId, query.getFrom(), query.getSize(), fb, query.getSortField(), - query.isSortDescending()); - } - - private QueryPage influencers(String jobId, int from, int size, QueryBuilder queryBuilder, String sortField, - boolean sortDescending) throws ResourceNotFoundException { String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); LOGGER.trace("ES API CALL: search all of result type {} from index {}{} with filter from {} size {}", () -> Influencer.RESULT_TYPE_VALUE, () -> indexName, - () -> (sortField != null) ? - " with sort " + (sortDescending ? "descending" : "ascending") + " on field " + sortField : "", - () -> from, () -> size); + () -> (query.getSortField() != null) ? + " with sort " + (query.isSortDescending() ? "descending" : "ascending") + " on field " + query.getSortField() : "", + query::getFrom, query::getSize); - queryBuilder = new BoolQueryBuilder() - .filter(queryBuilder) + QueryBuilder qb = new BoolQueryBuilder() + .filter(fb) .filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), Influencer.RESULT_TYPE_VALUE)); SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.types(Result.TYPE.getPreferredName()); - FieldSortBuilder sb = sortField == null ? SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC) - : new FieldSortBuilder(sortField).order(sortDescending ? SortOrder.DESC : SortOrder.ASC); - searchRequest.source(new SearchSourceBuilder().query(queryBuilder).from(from).size(size).sort(sb)); + FieldSortBuilder sb = query.getSortField() == null ? SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC) + : new FieldSortBuilder(query.getSortField()).order(query.isSortDescending() ? SortOrder.DESC : SortOrder.ASC); + searchRequest.source(new SearchSourceBuilder().query(qb).from(query.getFrom()).size(query.getSize()).sort(sb)); - SearchResponse response; - try { - response = FixBlockingClientOperations.executeBlocking(client, SearchAction.INSTANCE, searchRequest); - } catch (IndexNotFoundException e) { - throw ExceptionsHelper.missingJobException(jobId); - } - - List influencers = new ArrayList<>(); - for (SearchHit hit : response.getHits().getHits()) { - BytesReference source = hit.getSourceRef(); - try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) { - influencers.add(Influencer.PARSER.apply(parser, () -> parseFieldMatcher)); - } catch (IOException e) { - throw new ElasticsearchParseException("failed to parse influencer", e); + client.search(searchRequest, ActionListener.wrap(response -> { + List influencers = new ArrayList<>(); + for (SearchHit hit : response.getHits().getHits()) { + BytesReference source = hit.getSourceRef(); + try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) { + influencers.add(Influencer.PARSER.apply(parser, () -> parseFieldMatcher)); + } catch (IOException e) { + throw new ElasticsearchParseException("failed to parse influencer", e); + } } - } - - return new QueryPage<>(influencers, response.getHits().getTotalHits(), Influencer.RESULTS_FIELD); - } - - /** - * Get the influencer for the given job for id - * - * @param jobId the job id - * @param influencerId The unique influencer Id - * @return Optional Influencer - */ - public Optional influencer(String jobId, String influencerId) { - throw new IllegalStateException(); + QueryPage result = new QueryPage<>(influencers, response.getHits().getTotalHits(), Influencer.RESULTS_FIELD); + handler.accept(result); + }, errorHandler)); } /** diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java index ea57dc401e0..9071cc6333d 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java @@ -126,8 +126,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { QueryPage persistedRecords = jobProvider.records(JOB_ID, new RecordsQueryBuilder().includeInterim(true).build()); assertResultsAreSame(records, persistedRecords); - QueryPage persistedInfluencers = - jobProvider.influencers(JOB_ID, new InfluencersQueryBuilder().includeInterim(true).build()); + QueryPage persistedInfluencers = getInfluencers(); assertResultsAreSame(influencers, persistedInfluencers); QueryPage persistedDefinition = getCategoryDefinition(Long.toString(categoryDefinition.getCategoryId())); @@ -188,7 +187,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { nonInterimBucket.setRecords(Collections.emptyList()); assertEquals(nonInterimBucket, persistedBucket.results().get(0)); - QueryPage persistedInfluencers = jobProvider.influencers(JOB_ID, new InfluencersQueryBuilder().build()); + QueryPage persistedInfluencers = getInfluencers(); assertEquals(0, persistedInfluencers.count()); QueryPage persistedRecords = jobProvider.records(JOB_ID, new RecordsQueryBuilder().includeInterim(true).build()); @@ -502,4 +501,22 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { } return resultHolder.get(); } + + private QueryPage getInfluencers() throws Exception { + AtomicReference errorHolder = new AtomicReference<>(); + AtomicReference> resultHolder = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + jobProvider.influencers(JOB_ID, new InfluencersQueryBuilder().build(), page -> { + resultHolder.set(page); + latch.countDown(); + }, e -> { + errorHolder.set(e); + latch.countDown(); + }); + latch.await(); + if (errorHolder.get() != null) { + throw errorHolder.get(); + } + return resultHolder.get(); + } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobProviderTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobProviderTests.java index 3d149d0dffd..e05a2680d09 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobProviderTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobProviderTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.prelert.job.persistence; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.get.GetResponse; @@ -73,21 +74,17 @@ public class JobProviderTests extends ESTestCase { private static final String JOB_ID = "foo"; private static final String STATE_INDEX_NAME = ".ml-state"; - @LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127") public void testGetQuantiles_GivenNoQuantilesForJob() throws Exception { GetResponse getResponse = createGetResponse(false, null); - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .prepareGet(STATE_INDEX_NAME, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId(JOB_ID), getResponse); - - JobProvider provider = createProvider(clientBuilder.build()); + Client client = getMockedClient(getResponse); + JobProvider provider = createProvider(client); Optional quantiles = provider.getQuantiles(JOB_ID); assertFalse(quantiles.isPresent()); } - @LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127") public void testGetQuantiles_GivenQuantilesHaveNonEmptyState() throws Exception { Map source = new HashMap<>(); source.put(Job.ID.getPreferredName(), "foo"); @@ -95,10 +92,8 @@ public class JobProviderTests extends ESTestCase { source.put(Quantiles.QUANTILE_STATE.getPreferredName(), "state"); GetResponse getResponse = createGetResponse(true, source); - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .prepareGet(STATE_INDEX_NAME, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId(JOB_ID), getResponse); - - JobProvider provider = createProvider(clientBuilder.build()); + Client client = getMockedClient(getResponse); + JobProvider provider = createProvider(client); Optional quantiles = provider.getQuantiles(JOB_ID); @@ -106,7 +101,6 @@ public class JobProviderTests extends ESTestCase { assertEquals("state", quantiles.get().getQuantileState()); } - @LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127") public void testGetQuantiles_GivenQuantilesHaveEmptyState() throws Exception { Map source = new HashMap<>(); source.put(Job.ID.getPreferredName(), "foo"); @@ -114,10 +108,8 @@ public class JobProviderTests extends ESTestCase { source.put(Quantiles.QUANTILE_STATE.getPreferredName(), ""); GetResponse getResponse = createGetResponse(true, source); - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .prepareGet(STATE_INDEX_NAME, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId("foo"), getResponse); - - JobProvider provider = createProvider(clientBuilder.build()); + Client client = getMockedClient(getResponse); + JobProvider provider = createProvider(client); Optional quantiles = provider.getQuantiles(JOB_ID); @@ -784,9 +776,7 @@ public class JobProviderTests extends ESTestCase { assertEquals(terms, categoryDefinitions.results().get(0).getTerms()); } - @LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127") - public void testInfluencers_NoInterim() - throws InterruptedException, ExecutionException, IOException { + public void testInfluencers_NoInterim() throws InterruptedException, ExecutionException, IOException { String jobId = "TestJobIdentificationForInfluencers"; Date now = new Date(); List> source = new ArrayList<>(); @@ -816,20 +806,19 @@ public class JobProviderTests extends ESTestCase { int from = 4; int size = 3; - ArgumentCaptor queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class); + QueryBuilder[] qbHolder = new QueryBuilder[1]; SearchResponse response = createSearchResponse(true, source); - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId), Result.TYPE.getPreferredName(), - from, size, response, queryBuilder); - - Client client = clientBuilder.build(); + Client client = getMockedClient(q -> qbHolder[0] = q, response); JobProvider provider = createProvider(client); + @SuppressWarnings({"unchecked", "rawtypes"}) + QueryPage[] holder = new QueryPage[1]; InfluencersQuery query = new InfluencersQueryBuilder().from(from).size(size).includeInterim(false).build(); - QueryPage page = provider.influencers(jobId, query); + provider.influencers(jobId, query, page -> holder[0] = page, RuntimeException::new); + QueryPage page = holder[0]; assertEquals(2L, page.count()); - String queryString = queryBuilder.getValue().toString(); + String queryString = qbHolder[0].toString(); assertTrue(queryString.matches("(?s).*must_not[^}]*term[^}]*is_interim.*value. : .true.*")); List records = page.results(); @@ -849,9 +838,7 @@ public class JobProviderTests extends ESTestCase { assertEquals(5.0, records.get(1).getInitialAnomalyScore(), 0.00001); } - @LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127") - public void testInfluencers_WithInterim() - throws InterruptedException, ExecutionException, IOException { + public void testInfluencers_WithInterim() throws InterruptedException, ExecutionException, IOException { String jobId = "TestJobIdentificationForInfluencers"; Date now = new Date(); List> source = new ArrayList<>(); @@ -881,21 +868,20 @@ public class JobProviderTests extends ESTestCase { int from = 4; int size = 3; - ArgumentCaptor queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class); + QueryBuilder[] qbHolder = new QueryBuilder[1]; SearchResponse response = createSearchResponse(true, source); - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId), Result.TYPE.getPreferredName(), from, size, response, - queryBuilder); - - Client client = clientBuilder.build(); + Client client = getMockedClient(q -> qbHolder[0] = q, response); JobProvider provider = createProvider(client); + @SuppressWarnings({"unchecked", "rawtypes"}) + QueryPage[] holder = new QueryPage[1]; InfluencersQuery query = new InfluencersQueryBuilder().from(from).size(size).start("0").end("0").sortField("sort") .sortDescending(true).anomalyScoreThreshold(0.0).includeInterim(true).build(); - QueryPage page = provider.influencers(jobId, query); + provider.influencers(jobId, query, page -> holder[0] = page, RuntimeException::new); + QueryPage page = holder[0]; assertEquals(2L, page.count()); - String queryString = queryBuilder.getValue().toString(); + String queryString = qbHolder[0].toString(); assertFalse(queryString.matches("(?s).*isInterim.*")); List records = page.results(); @@ -914,23 +900,6 @@ public class JobProviderTests extends ESTestCase { assertEquals(5.0, records.get(1).getInitialAnomalyScore(), 0.00001); } - @LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127") - public void testInfluencer() throws InterruptedException, ExecutionException, IOException { - String jobId = "TestJobIdentificationForInfluencers"; - String influencerId = "ThisIsAnInfluencerId"; - - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse(); - - Client client = clientBuilder.build(); - JobProvider provider = createProvider(client); - - try { - provider.influencer(jobId, influencerId); - assertTrue(false); - } catch (IllegalStateException e) { - } - } - @LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127") public void testModelSnapshots() throws InterruptedException, ExecutionException, IOException { String jobId = "TestJobIdentificationForInfluencers"; @@ -1225,4 +1194,13 @@ public class JobProviderTests extends ESTestCase { }).when(client).search(any(), any()); return client; } + + private Client getMockedClient(GetResponse response) { + Client client = mock(Client.class); + @SuppressWarnings("unchecked") + ActionFuture actionFuture = mock(ActionFuture.class); + when(client.get(any())).thenReturn(actionFuture); + when(actionFuture.actionGet()).thenReturn(response); + return client; + } }