Made client calls non blocking in JobProvider#influencers(...)

and re-enabled some quantiles persistence unit tests (which can remain to be blocking as they aren't used on a network thread)

Original commit: elastic/x-pack-elasticsearch@cf8e78f42d
This commit is contained in:
Martijn van Groningen 2017-01-09 10:07:44 +01:00
parent 9e5245fd64
commit 1a132e2c8b
4 changed files with 75 additions and 104 deletions

View File

@ -324,9 +324,7 @@ extends Action<GetInfluencersAction.Request, GetInfluencersAction.Response, GetI
InfluencersQueryBuilder.InfluencersQuery query = new InfluencersQueryBuilder().includeInterim(request.includeInterim)
.start(request.start).end(request.end).from(request.pageParams.getFrom()).size(request.pageParams.getSize())
.anomalyScoreThreshold(request.anomalyScoreFilter).sortField(request.sort).sortDescending(request.decending).build();
QueryPage<Influencer> page = jobProvider.influencers(request.jobId, query);
listener.onResponse(new Response(page));
jobProvider.influencers(request.jobId, query, page -> listener.onResponse(new Response(page)), listener::onFailure);
}
}

View File

@ -705,67 +705,45 @@ public class JobProvider {
*
* @param jobId The job ID for which influencers are requested
* @param query the query
* @return QueryPage of Influencer
*/
public QueryPage<Influencer> influencers(String jobId, InfluencersQuery query) throws ResourceNotFoundException {
public void influencers(String jobId, InfluencersQuery query, Consumer<QueryPage<Influencer>> handler,
Consumer<Exception> errorHandler) {
QueryBuilder fb = new ResultsFilterBuilder()
.timeRange(Bucket.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd())
.score(Bucket.ANOMALY_SCORE.getPreferredName(), query.getAnomalyScoreFilter())
.interim(Bucket.IS_INTERIM.getPreferredName(), query.isIncludeInterim())
.build();
return influencers(jobId, query.getFrom(), query.getSize(), fb, query.getSortField(),
query.isSortDescending());
}
private QueryPage<Influencer> influencers(String jobId, int from, int size, QueryBuilder queryBuilder, String sortField,
boolean sortDescending) throws ResourceNotFoundException {
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
LOGGER.trace("ES API CALL: search all of result type {} from index {}{} with filter from {} size {}",
() -> Influencer.RESULT_TYPE_VALUE, () -> indexName,
() -> (sortField != null) ?
" with sort " + (sortDescending ? "descending" : "ascending") + " on field " + sortField : "",
() -> from, () -> size);
() -> (query.getSortField() != null) ?
" with sort " + (query.isSortDescending() ? "descending" : "ascending") + " on field " + query.getSortField() : "",
query::getFrom, query::getSize);
queryBuilder = new BoolQueryBuilder()
.filter(queryBuilder)
QueryBuilder qb = new BoolQueryBuilder()
.filter(fb)
.filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), Influencer.RESULT_TYPE_VALUE));
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(Result.TYPE.getPreferredName());
FieldSortBuilder sb = sortField == null ? SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)
: new FieldSortBuilder(sortField).order(sortDescending ? SortOrder.DESC : SortOrder.ASC);
searchRequest.source(new SearchSourceBuilder().query(queryBuilder).from(from).size(size).sort(sb));
FieldSortBuilder sb = query.getSortField() == null ? SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)
: new FieldSortBuilder(query.getSortField()).order(query.isSortDescending() ? SortOrder.DESC : SortOrder.ASC);
searchRequest.source(new SearchSourceBuilder().query(qb).from(query.getFrom()).size(query.getSize()).sort(sb));
SearchResponse response;
try {
response = FixBlockingClientOperations.executeBlocking(client, SearchAction.INSTANCE, searchRequest);
} catch (IndexNotFoundException e) {
throw ExceptionsHelper.missingJobException(jobId);
}
List<Influencer> influencers = new ArrayList<>();
for (SearchHit hit : response.getHits().getHits()) {
BytesReference source = hit.getSourceRef();
try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) {
influencers.add(Influencer.PARSER.apply(parser, () -> parseFieldMatcher));
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse influencer", e);
client.search(searchRequest, ActionListener.wrap(response -> {
List<Influencer> influencers = new ArrayList<>();
for (SearchHit hit : response.getHits().getHits()) {
BytesReference source = hit.getSourceRef();
try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) {
influencers.add(Influencer.PARSER.apply(parser, () -> parseFieldMatcher));
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse influencer", e);
}
}
}
return new QueryPage<>(influencers, response.getHits().getTotalHits(), Influencer.RESULTS_FIELD);
}
/**
* Get the influencer for the given job for id
*
* @param jobId the job id
* @param influencerId The unique influencer Id
* @return Optional Influencer
*/
public Optional<Influencer> influencer(String jobId, String influencerId) {
throw new IllegalStateException();
QueryPage<Influencer> result = new QueryPage<>(influencers, response.getHits().getTotalHits(), Influencer.RESULTS_FIELD);
handler.accept(result);
}, errorHandler));
}
/**

View File

@ -126,8 +126,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
QueryPage<AnomalyRecord> persistedRecords = jobProvider.records(JOB_ID, new RecordsQueryBuilder().includeInterim(true).build());
assertResultsAreSame(records, persistedRecords);
QueryPage<Influencer> persistedInfluencers =
jobProvider.influencers(JOB_ID, new InfluencersQueryBuilder().includeInterim(true).build());
QueryPage<Influencer> persistedInfluencers = getInfluencers();
assertResultsAreSame(influencers, persistedInfluencers);
QueryPage<CategoryDefinition> persistedDefinition = getCategoryDefinition(Long.toString(categoryDefinition.getCategoryId()));
@ -188,7 +187,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
nonInterimBucket.setRecords(Collections.emptyList());
assertEquals(nonInterimBucket, persistedBucket.results().get(0));
QueryPage<Influencer> persistedInfluencers = jobProvider.influencers(JOB_ID, new InfluencersQueryBuilder().build());
QueryPage<Influencer> persistedInfluencers = getInfluencers();
assertEquals(0, persistedInfluencers.count());
QueryPage<AnomalyRecord> persistedRecords = jobProvider.records(JOB_ID, new RecordsQueryBuilder().includeInterim(true).build());
@ -502,4 +501,22 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
}
return resultHolder.get();
}
private QueryPage<Influencer> getInfluencers() throws Exception {
AtomicReference<Exception> errorHolder = new AtomicReference<>();
AtomicReference<QueryPage<Influencer>> resultHolder = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
jobProvider.influencers(JOB_ID, new InfluencersQueryBuilder().build(), page -> {
resultHolder.set(page);
latch.countDown();
}, e -> {
errorHolder.set(e);
latch.countDown();
});
latch.await();
if (errorHolder.get() != null) {
throw errorHolder.get();
}
return resultHolder.get();
}
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.prelert.job.persistence;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.get.GetResponse;
@ -73,21 +74,17 @@ public class JobProviderTests extends ESTestCase {
private static final String JOB_ID = "foo";
private static final String STATE_INDEX_NAME = ".ml-state";
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testGetQuantiles_GivenNoQuantilesForJob() throws Exception {
GetResponse getResponse = createGetResponse(false, null);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.prepareGet(STATE_INDEX_NAME, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId(JOB_ID), getResponse);
JobProvider provider = createProvider(clientBuilder.build());
Client client = getMockedClient(getResponse);
JobProvider provider = createProvider(client);
Optional<Quantiles> quantiles = provider.getQuantiles(JOB_ID);
assertFalse(quantiles.isPresent());
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testGetQuantiles_GivenQuantilesHaveNonEmptyState() throws Exception {
Map<String, Object> source = new HashMap<>();
source.put(Job.ID.getPreferredName(), "foo");
@ -95,10 +92,8 @@ public class JobProviderTests extends ESTestCase {
source.put(Quantiles.QUANTILE_STATE.getPreferredName(), "state");
GetResponse getResponse = createGetResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.prepareGet(STATE_INDEX_NAME, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId(JOB_ID), getResponse);
JobProvider provider = createProvider(clientBuilder.build());
Client client = getMockedClient(getResponse);
JobProvider provider = createProvider(client);
Optional<Quantiles> quantiles = provider.getQuantiles(JOB_ID);
@ -106,7 +101,6 @@ public class JobProviderTests extends ESTestCase {
assertEquals("state", quantiles.get().getQuantileState());
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testGetQuantiles_GivenQuantilesHaveEmptyState() throws Exception {
Map<String, Object> source = new HashMap<>();
source.put(Job.ID.getPreferredName(), "foo");
@ -114,10 +108,8 @@ public class JobProviderTests extends ESTestCase {
source.put(Quantiles.QUANTILE_STATE.getPreferredName(), "");
GetResponse getResponse = createGetResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.prepareGet(STATE_INDEX_NAME, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId("foo"), getResponse);
JobProvider provider = createProvider(clientBuilder.build());
Client client = getMockedClient(getResponse);
JobProvider provider = createProvider(client);
Optional<Quantiles> quantiles = provider.getQuantiles(JOB_ID);
@ -784,9 +776,7 @@ public class JobProviderTests extends ESTestCase {
assertEquals(terms, categoryDefinitions.results().get(0).getTerms());
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testInfluencers_NoInterim()
throws InterruptedException, ExecutionException, IOException {
public void testInfluencers_NoInterim() throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentificationForInfluencers";
Date now = new Date();
List<Map<String, Object>> source = new ArrayList<>();
@ -816,20 +806,19 @@ public class JobProviderTests extends ESTestCase {
int from = 4;
int size = 3;
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
QueryBuilder[] qbHolder = new QueryBuilder[1];
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId), Result.TYPE.getPreferredName(),
from, size, response, queryBuilder);
Client client = clientBuilder.build();
Client client = getMockedClient(q -> qbHolder[0] = q, response);
JobProvider provider = createProvider(client);
@SuppressWarnings({"unchecked", "rawtypes"})
QueryPage<Influencer>[] holder = new QueryPage[1];
InfluencersQuery query = new InfluencersQueryBuilder().from(from).size(size).includeInterim(false).build();
QueryPage<Influencer> page = provider.influencers(jobId, query);
provider.influencers(jobId, query, page -> holder[0] = page, RuntimeException::new);
QueryPage<Influencer> page = holder[0];
assertEquals(2L, page.count());
String queryString = queryBuilder.getValue().toString();
String queryString = qbHolder[0].toString();
assertTrue(queryString.matches("(?s).*must_not[^}]*term[^}]*is_interim.*value. : .true.*"));
List<Influencer> records = page.results();
@ -849,9 +838,7 @@ public class JobProviderTests extends ESTestCase {
assertEquals(5.0, records.get(1).getInitialAnomalyScore(), 0.00001);
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testInfluencers_WithInterim()
throws InterruptedException, ExecutionException, IOException {
public void testInfluencers_WithInterim() throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentificationForInfluencers";
Date now = new Date();
List<Map<String, Object>> source = new ArrayList<>();
@ -881,21 +868,20 @@ public class JobProviderTests extends ESTestCase {
int from = 4;
int size = 3;
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
QueryBuilder[] qbHolder = new QueryBuilder[1];
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId), Result.TYPE.getPreferredName(), from, size, response,
queryBuilder);
Client client = clientBuilder.build();
Client client = getMockedClient(q -> qbHolder[0] = q, response);
JobProvider provider = createProvider(client);
@SuppressWarnings({"unchecked", "rawtypes"})
QueryPage<Influencer>[] holder = new QueryPage[1];
InfluencersQuery query = new InfluencersQueryBuilder().from(from).size(size).start("0").end("0").sortField("sort")
.sortDescending(true).anomalyScoreThreshold(0.0).includeInterim(true).build();
QueryPage<Influencer> page = provider.influencers(jobId, query);
provider.influencers(jobId, query, page -> holder[0] = page, RuntimeException::new);
QueryPage<Influencer> page = holder[0];
assertEquals(2L, page.count());
String queryString = queryBuilder.getValue().toString();
String queryString = qbHolder[0].toString();
assertFalse(queryString.matches("(?s).*isInterim.*"));
List<Influencer> records = page.results();
@ -914,23 +900,6 @@ public class JobProviderTests extends ESTestCase {
assertEquals(5.0, records.get(1).getInitialAnomalyScore(), 0.00001);
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testInfluencer() throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentificationForInfluencers";
String influencerId = "ThisIsAnInfluencerId";
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse();
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
try {
provider.influencer(jobId, influencerId);
assertTrue(false);
} catch (IllegalStateException e) {
}
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testModelSnapshots() throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentificationForInfluencers";
@ -1225,4 +1194,13 @@ public class JobProviderTests extends ESTestCase {
}).when(client).search(any(), any());
return client;
}
private Client getMockedClient(GetResponse response) {
Client client = mock(Client.class);
@SuppressWarnings("unchecked")
ActionFuture<GetResponse> actionFuture = mock(ActionFuture.class);
when(client.get(any())).thenReturn(actionFuture);
when(actionFuture.actionGet()).thenReturn(response);
return client;
}
}