Removed last blocking client calls on network threads.

Closes elastic/elasticsearch#127

Original commit: elastic/x-pack-elasticsearch@3441f51764
This commit is contained in:
Martijn van Groningen 2017-01-12 14:03:12 +01:00
parent d3e4ebcc0e
commit 9ec22efcba
10 changed files with 189 additions and 247 deletions

View File

@ -369,9 +369,7 @@ public class GetRecordsAction extends Action<GetRecordsAction.Request, GetRecord
.sortField(request.sort)
.sortDescending(request.decending)
.build();
QueryPage<AnomalyRecord> page = jobProvider.records(request.jobId, query);
listener.onResponse(new Response(page));
jobProvider.records(request.jobId, query, page -> listener.onResponse(new Response(page)), listener::onFailure);
}
}

View File

@ -273,15 +273,13 @@ UpdateModelSnapshotAction.RequestBuilder> {
}
ModelSnapshot modelSnapshot = changeCandidates.get(0);
modelSnapshot.setDescription(request.getDescriptionString());
jobManager.updateModelSnapshot(request.getJobId(), modelSnapshot, false);
modelSnapshot.setDescription(request.getDescriptionString());
// The quantiles can be large, and totally dominate the output -
// it's clearer to remove them
modelSnapshot.setQuantiles(null);
listener.onResponse(new Response(modelSnapshot));
jobManager.updateModelSnapshot(modelSnapshot, b -> {
modelSnapshot.setDescription(request.getDescriptionString());
// The quantiles can be large, and totally dominate the output -
// it's clearer to remove them
modelSnapshot.setQuantiles(null);
listener.onResponse(new Response(modelSnapshot));
}, listener::onFailure);
}, listener::onFailure);
}, listener::onFailure);
}

View File

@ -10,7 +10,6 @@ import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
@ -31,11 +30,11 @@ import org.elasticsearch.xpack.ml.job.IgnoreDowntime;
import org.elasticsearch.xpack.ml.job.Job;
import org.elasticsearch.xpack.ml.job.JobStatus;
import org.elasticsearch.xpack.ml.job.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.audit.Auditor;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.metadata.Allocation;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.QueryPage;
@ -419,26 +418,10 @@ public class JobManager extends AbstractComponent {
* Update a persisted model snapshot metadata document to match the
* argument supplied.
*
* @param jobId the job id
* @param modelSnapshot the updated model snapshot object to be stored
* @param restoreModelSizeStats should the model size stats in this
* snapshot be made the current ones for this job?
*/
public void updateModelSnapshot(String jobId, ModelSnapshot modelSnapshot, boolean restoreModelSizeStats) {
// For Elasticsearch the update can be done in exactly the same way as
// the original persist
jobResultsPersister.persistModelSnapshot(modelSnapshot);
if (restoreModelSizeStats) {
if (modelSnapshot.getModelSizeStats() != null) {
jobResultsPersister.persistModelSizeStats(modelSnapshot.getModelSizeStats());
}
if (modelSnapshot.getQuantiles() != null) {
jobResultsPersister.persistQuantiles(modelSnapshot.getQuantiles());
}
}
// Commit so that when the REST API call that triggered the update
// returns the updated document is searchable
jobResultsPersister.commitStateWrites(jobId);
public void updateModelSnapshot(ModelSnapshot modelSnapshot, Consumer<Boolean> handler, Consumer<Exception> errorHandler) {
jobResultsPersister.updateModelSnapshot(modelSnapshot, handler, errorHandler);
}
private static MlMetadata.Builder createMlMetadataBuilder(ClusterState currentState) {

View File

@ -17,7 +17,6 @@ import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.PlainActionFuture;
@ -72,7 +71,6 @@ import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.job.usage.Usage;
import org.elasticsearch.xpack.ml.lists.ListDocument;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.FixBlockingClientOperations;
import java.io.IOException;
import java.io.OutputStream;
@ -85,6 +83,8 @@ import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ -404,23 +404,36 @@ public class JobProvider {
List<PerPartitionMaxProbabilities> partitionProbs =
handlePartitionMaxNormailizedProbabilitiesResponse(item2.getResponse());
mergePartitionScoresIntoBucket(partitionProbs, buckets.results(), query.getPartitionValue());
for (Bucket b : buckets.results()) {
if (query.isExpand() && b.getRecordCount() > 0) {
this.expandBucketForPartitionValue(jobId, query.isIncludeInterim(), b, query.getPartitionValue());
}
b.setAnomalyScore(b.partitionAnomalyScore(query.getPartitionValue()));
if (query.isExpand()) {
Iterator<Bucket> bucketsToExpand = buckets.results().stream()
.filter(bucket -> bucket.getRecordCount() > 0).iterator();
expandBuckets(jobId, query, buckets, bucketsToExpand, 0, handler, errorHandler);
return;
}
} else {
for (Bucket b : buckets.results()) {
if (query.isExpand() && b.getRecordCount() > 0) {
expandBucket(jobId, query.isIncludeInterim(), b);
}
if (query.isExpand()) {
Iterator<Bucket> bucketsToExpand = buckets.results().stream()
.filter(bucket -> bucket.getRecordCount() > 0).iterator();
expandBuckets(jobId, query, buckets, bucketsToExpand, 0, handler, errorHandler);
return;
}
}
handler.accept(buckets);
}, errorHandler));
}
private void expandBuckets(String jobId, BucketsQuery query, QueryPage<Bucket> buckets, Iterator<Bucket> bucketsToExpand,
int from, Consumer<QueryPage<Bucket>> handler, Consumer<Exception> errorHandler) {
if (bucketsToExpand.hasNext()) {
Consumer<Integer> c = i -> {
expandBuckets(jobId, query, buckets, bucketsToExpand, from + RECORDS_SIZE_PARAM, handler, errorHandler);
};
expandBucket(jobId, query.isIncludeInterim(), bucketsToExpand.next(), query.getPartitionValue(), from, c, errorHandler);
} else {
handler.accept(buckets);
}
}
void mergePartitionScoresIntoBucket(List<PerPartitionMaxProbabilities> partitionProbs, List<Bucket> buckets, String partitionValue) {
Iterator<PerPartitionMaxProbabilities> itr = partitionProbs.iterator();
PerPartitionMaxProbabilities partitionProb = itr.hasNext() ? itr.next() : null;
@ -472,28 +485,6 @@ public class JobProvider {
return results;
}
// TODO (norelease): Use scroll search instead of multiple searches with increasing from
private int expandBucketForPartitionValue(String jobId, boolean includeInterim, Bucket bucket,
String partitionFieldValue) throws ResourceNotFoundException {
int from = 0;
QueryPage<AnomalyRecord> page = bucketRecords(
jobId, bucket, from, RECORDS_SIZE_PARAM, includeInterim,
AnomalyRecord.PROBABILITY.getPreferredName(), false, partitionFieldValue);
bucket.setRecords(page.results());
while (page.count() > from + RECORDS_SIZE_PARAM) {
from += RECORDS_SIZE_PARAM;
page = bucketRecords(
jobId, bucket, from, RECORDS_SIZE_PARAM, includeInterim,
AnomalyRecord.PROBABILITY.getPreferredName(), false, partitionFieldValue);
bucket.getRecords().addAll(page.results());
}
return bucket.getRecords().size();
}
/**
* Returns a {@link BatchedDocumentsIterator} that allows querying
* and iterating over a large number of buckets of the given job
@ -511,32 +502,53 @@ public class JobProvider {
* @param jobId the job id
* @param includeInterim Include interim results
* @param bucket The bucket to be expanded
* @return The number of records added to the bucket
*/
// TODO (norelease): Use scroll search instead of multiple searches with increasing from
public int expandBucket(String jobId, boolean includeInterim, Bucket bucket) throws ResourceNotFoundException {
int from = 0;
QueryPage<AnomalyRecord> page = bucketRecords(
jobId, bucket, from, RECORDS_SIZE_PARAM, includeInterim,
AnomalyRecord.PROBABILITY.getPreferredName(), false, null);
bucket.setRecords(page.results());
while (page.count() > from + RECORDS_SIZE_PARAM) {
from += RECORDS_SIZE_PARAM;
page = bucketRecords(
jobId, bucket, from, RECORDS_SIZE_PARAM, includeInterim,
AnomalyRecord.PROBABILITY.getPreferredName(), false, null);
public void expandBucket(String jobId, boolean includeInterim, Bucket bucket, String partitionFieldValue, int from,
Consumer<Integer> consumer, Consumer<Exception> errorHandler) {
Consumer<QueryPage<AnomalyRecord>> h = page -> {
bucket.getRecords().addAll(page.results());
}
return bucket.getRecords().size();
if (partitionFieldValue != null) {
bucket.setAnomalyScore(bucket.partitionAnomalyScore(partitionFieldValue));
}
if (page.count() > from + RECORDS_SIZE_PARAM) {
expandBucket(jobId, includeInterim, bucket, partitionFieldValue, from + RECORDS_SIZE_PARAM, consumer, errorHandler);
} else {
consumer.accept(bucket.getRecords().size());
}
};
bucketRecords(jobId, bucket, from, RECORDS_SIZE_PARAM, includeInterim, AnomalyRecord.PROBABILITY.getPreferredName(),
false, partitionFieldValue, h, errorHandler);
}
QueryPage<AnomalyRecord> bucketRecords(String jobId,
Bucket bucket, int from, int size, boolean includeInterim,
String sortField, boolean descending, String partitionFieldValue)
throws ResourceNotFoundException {
// keep blocking variant around for ScoresUpdater as that can remain a blocking as this is ran from dedicated ml threadpool.
// also refactoring that to be non blocking is a lot of work.
public int expandBucket(String jobId, boolean includeInterim, Bucket bucket) {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Integer> holder = new AtomicReference<>();
AtomicReference<Exception> errorHolder = new AtomicReference<>();
expandBucket(jobId, includeInterim, bucket, null, 0, records -> {
holder.set(records);
latch.countDown();
}, e -> {
errorHolder.set(e);
latch.countDown();
});
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (errorHolder.get() != null) {
throw new RuntimeException(errorHolder.get());
} else {
return holder.get();
}
}
void bucketRecords(String jobId, Bucket bucket, int from, int size, boolean includeInterim, String sortField,
boolean descending, String partitionFieldValue, Consumer<QueryPage<AnomalyRecord>> handler,
Consumer<Exception> errorHandler) {
// Find the records using the time stamp rather than a parent-child
// relationship. The parent-child filter involves two queries behind
// the scenes, and Elasticsearch documentation claims it's significantly
@ -544,10 +556,12 @@ public class JobProvider {
// bucket timestamp.
QueryBuilder recordFilter = QueryBuilders.termQuery(Bucket.TIMESTAMP.getPreferredName(), bucket.getTimestamp().getTime());
recordFilter = new ResultsFilterBuilder(recordFilter)
.interim(AnomalyRecord.IS_INTERIM.getPreferredName(), includeInterim)
.term(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue)
.build();
ResultsFilterBuilder builder = new ResultsFilterBuilder(recordFilter)
.interim(AnomalyRecord.IS_INTERIM.getPreferredName(), includeInterim);
if (partitionFieldValue != null) {
builder.term(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue);
}
recordFilter = builder.build();
FieldSortBuilder sb = null;
if (sortField != null) {
@ -556,8 +570,7 @@ public class JobProvider {
.order(descending ? SortOrder.DESC : SortOrder.ASC);
}
return records(jobId, from, size, recordFilter, sb, SECONDARY_SORT,
descending);
records(jobId, from, size, recordFilter, sb, SECONDARY_SORT, descending, handler, errorHandler);
}
/**
@ -619,11 +632,9 @@ public class JobProvider {
/**
* Search for anomaly records with the parameters in the
* {@link org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder.RecordsQuery}
*
* @return QueryPage of AnomalyRecords
*/
public QueryPage<AnomalyRecord> records(String jobId, RecordsQueryBuilder.RecordsQuery query)
throws ResourceNotFoundException {
public void records(String jobId, RecordsQueryBuilder.RecordsQuery query, Consumer<QueryPage<AnomalyRecord>> handler,
Consumer<Exception> errorHandler) {
QueryBuilder fb = new ResultsFilterBuilder()
.timeRange(Bucket.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd())
.score(AnomalyRecord.ANOMALY_SCORE.getPreferredName(), query.getAnomalyScoreThreshold())
@ -636,15 +647,16 @@ public class JobProvider {
.missing("_last")
.order(query.isSortDescending() ? SortOrder.DESC : SortOrder.ASC);
}
return records(jobId, query.getFrom(), query.getSize(), fb, sb, SECONDARY_SORT, query.isSortDescending());
records(jobId, query.getFrom(), query.getSize(), fb, sb, SECONDARY_SORT, query.isSortDescending(), handler, errorHandler);
}
/**
* The returned records have their id set.
*/
private QueryPage<AnomalyRecord> records(String jobId, int from, int size,
private void records(String jobId, int from, int size,
QueryBuilder recordFilter, FieldSortBuilder sb, List<String> secondarySort,
boolean descending) throws ResourceNotFoundException {
boolean descending, Consumer<QueryPage<AnomalyRecord>> handler,
Consumer<Exception> errorHandler) {
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
recordFilter = new BoolQueryBuilder()
@ -665,28 +677,29 @@ public class JobProvider {
searchRequest.source().sort(sortField, descending ? SortOrder.DESC : SortOrder.ASC);
}
SearchResponse searchResponse;
try {
LOGGER.trace("ES API CALL: search all of result type {} from index {}{}{} with filter after sort from {} size {}",
AnomalyRecord.RESULT_TYPE_VALUE, indexName, (sb != null) ? " with sort" : "",
secondarySort.isEmpty() ? "" : " with secondary sort", from, size);
searchResponse = FixBlockingClientOperations.executeBlocking(client, SearchAction.INSTANCE, searchRequest);
} catch (IndexNotFoundException e) {
throw ExceptionsHelper.missingJobException(jobId);
}
List<AnomalyRecord> results = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
BytesReference source = hit.getSourceRef();
try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) {
results.add(AnomalyRecord.PARSER.apply(parser, () -> parseFieldMatcher));
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse records", e);
LOGGER.trace("ES API CALL: search all of result type {} from index {}{}{} with filter after sort from {} size {}",
AnomalyRecord.RESULT_TYPE_VALUE, indexName, (sb != null) ? " with sort" : "",
secondarySort.isEmpty() ? "" : " with secondary sort", from, size);
client.search(searchRequest, ActionListener.wrap(searchResponse -> {
List<AnomalyRecord> results = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
BytesReference source = hit.getSourceRef();
try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) {
results.add(AnomalyRecord.PARSER.apply(parser, () -> parseFieldMatcher));
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse records", e);
}
}
}
return new QueryPage<>(results, searchResponse.getHits().getTotalHits(), AnomalyRecord.RESULTS_FIELD);
QueryPage<AnomalyRecord> queryPage =
new QueryPage<>(results, searchResponse.getHits().getTotalHits(), AnomalyRecord.RESULTS_FIELD);
handler.accept(queryPage);
}, e -> {
if (e instanceof IndexNotFoundException) {
errorHandler.accept(ExceptionsHelper.missingJobException(jobId));
} else {
errorHandler.accept(e);
}
}));
}
/**

View File

@ -6,12 +6,10 @@
package org.elasticsearch.xpack.ml.job.persistence;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
@ -30,12 +28,12 @@ import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.utils.FixBlockingClientOperations;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -200,7 +198,7 @@ public class JobResultsPersister extends AbstractComponent {
}
logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, bulkRequest.numberOfActions());
BulkResponse addRecordsResponse = FixBlockingClientOperations.executeBlocking(client, BulkAction.INSTANCE, bulkRequest);
BulkResponse addRecordsResponse = client.bulk(bulkRequest).actionGet();
if (addRecordsResponse.hasFailures()) {
logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage());
}
@ -245,6 +243,17 @@ public class JobResultsPersister extends AbstractComponent {
persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(modelSnapshot.getJobId()));
}
public void updateModelSnapshot(ModelSnapshot modelSnapshot, Consumer<Boolean> handler, Consumer<Exception> errorHandler) {
String index = AnomalyDetectorsIndex.jobResultsIndexName(modelSnapshot.getJobId());
IndexRequest indexRequest = new IndexRequest(index, ModelSnapshot.TYPE.getPreferredName(), modelSnapshot.getSnapshotId());
try {
indexRequest.source(toXContentBuilder(modelSnapshot));
} catch (IOException e) {
errorHandler.accept(e);
}
client.index(indexRequest, ActionListener.wrap(r -> handler.accept(true), errorHandler));
}
/**
* Persist the memory usage data
*/
@ -322,7 +331,7 @@ public class JobResultsPersister extends AbstractComponent {
// Refresh should wait for Lucene to make the data searchable
logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName);
RefreshRequest refreshRequest = new RefreshRequest(indexName);
FixBlockingClientOperations.executeBlocking(client, RefreshAction.INSTANCE, refreshRequest);
client.admin().indices().refresh(refreshRequest).actionGet();
return true;
}
@ -363,7 +372,7 @@ public class JobResultsPersister extends AbstractComponent {
try {
IndexRequest indexRequest = new IndexRequest(indexName, type, id)
.source(toXContentBuilder(object));
FixBlockingClientOperations.executeBlocking(client, IndexAction.INSTANCE, indexRequest);
client.index(indexRequest).actionGet();
return true;
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error writing {}", new Object[]{jobId, type}), e);

View File

@ -93,7 +93,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
private double initialAnomalyScore;
private double maxNormalizedProbability;
private int recordCount;
private List<AnomalyRecord> records = Collections.emptyList();
private List<AnomalyRecord> records = new ArrayList<>();
private long eventCount;
private boolean isInterim;
private boolean hadBigNormalizedUpdate;

View File

@ -1,51 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.utils;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
// TODO (#127): norelease Placeholder fix until: https://github.com/elastic/prelert-legacy/issues/127 gets in.
public class FixBlockingClientOperations {
@SuppressWarnings({"unchecked", "rawtypes"})
public static <Res extends ActionResponse> Res executeBlocking(Client client, Action action, ActionRequest request) {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Res> response = new AtomicReference<>();
AtomicReference<Exception> exception = new AtomicReference<>();
ActionListener<Res> listener = new ActionListener<Res>() {
@Override
public void onResponse(Res r) {
response.set(r);
latch.countDown();
}
@Override
public void onFailure(Exception e) {
exception.set(e);
latch.countDown();
}
};
client.execute(action, request, listener);
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (exception.get() != null) {
throw org.elasticsearch.ExceptionsHelper.convertToElastic(exception.get());
} else {
return response.get();
}
}
}

View File

@ -123,7 +123,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
bucket.setRecords(Collections.emptyList());
assertEquals(bucket, persistedBucket.results().get(0));
QueryPage<AnomalyRecord> persistedRecords = jobProvider.records(JOB_ID, new RecordsQueryBuilder().includeInterim(true).build());
QueryPage<AnomalyRecord> persistedRecords = getRecords(new RecordsQueryBuilder().includeInterim(true).build());
assertResultsAreSame(records, persistedRecords);
QueryPage<Influencer> persistedInfluencers = getInfluencers();
@ -190,7 +190,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
QueryPage<Influencer> persistedInfluencers = getInfluencers();
assertEquals(0, persistedInfluencers.count());
QueryPage<AnomalyRecord> persistedRecords = jobProvider.records(JOB_ID, new RecordsQueryBuilder().includeInterim(true).build());
QueryPage<AnomalyRecord> persistedRecords = getRecords(new RecordsQueryBuilder().includeInterim(true).build());
assertEquals(0, persistedRecords.count());
}
@ -236,7 +236,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
finalBucket.setRecords(Collections.emptyList());
assertEquals(finalBucket, persistedBucket.results().get(0));
QueryPage<AnomalyRecord> persistedRecords = jobProvider.records(JOB_ID, new RecordsQueryBuilder().includeInterim(true).build());
QueryPage<AnomalyRecord> persistedRecords = getRecords(new RecordsQueryBuilder().includeInterim(true).build());
assertResultsAreSame(finalAnomalyRecords, persistedRecords);
}
@ -273,9 +273,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build());
assertEquals(1, persistedBucket.count());
QueryPage<AnomalyRecord> persistedRecords = jobProvider.records(JOB_ID,
new RecordsQueryBuilder().size(200).includeInterim(true).build());
QueryPage<AnomalyRecord> persistedRecords = getRecords(new RecordsQueryBuilder().size(200).includeInterim(true).build());
List<AnomalyRecord> allRecords = new ArrayList<>(firstSetOfRecords);
allRecords.addAll(secondSetOfRecords);
assertResultsAreSame(allRecords, persistedRecords);
@ -519,4 +517,21 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
}
return resultHolder.get();
}
private QueryPage<AnomalyRecord> getRecords(RecordsQueryBuilder.RecordsQuery recordsQuery) throws Exception {
AtomicReference<Exception> errorHolder = new AtomicReference<>();
AtomicReference<QueryPage<AnomalyRecord>> resultHolder = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
jobProvider.records(JOB_ID, recordsQuery, page -> {
resultHolder.set(page);
latch.countDown();
}, e -> {
errorHolder.set(e);
latch.countDown();
});
latch.await();
if (errorHolder.get() != null) {
throw errorHolder.get();
}
return resultHolder.get();
}
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.ml.job.persistence;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
@ -495,7 +494,6 @@ public class JobProviderTests extends ESTestCase {
assertEquals(ResourceNotFoundException.class, holder[0].getClass());
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testRecords() throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentification";
Date now = new Date();
@ -523,20 +521,18 @@ public class JobProviderTests extends ESTestCase {
int from = 14;
int size = 2;
String sortfield = "minefield";
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
Client client = getMockedClient(qb -> {}, response);
JobProvider provider = createProvider(client);
RecordsQueryBuilder rqb = new RecordsQueryBuilder().from(from).size(size).epochStart(String.valueOf(now.getTime()))
.epochEnd(String.valueOf(now.getTime())).includeInterim(true).sortField(sortfield).anomalyScoreThreshold(11.1)
.normalizedProbability(2.2);
QueryPage<AnomalyRecord> recordPage = provider.records(jobId, rqb.build());
@SuppressWarnings({"unchecked", "rawtypes"})
QueryPage<AnomalyRecord>[] holder = new QueryPage[1];
provider.records(jobId, rqb.build(), page -> holder[0] = page, RuntimeException::new);
QueryPage<AnomalyRecord> recordPage = holder[0];
assertEquals(2L, recordPage.count());
List<AnomalyRecord> records = recordPage.results();
assertEquals(22.4, records.get(0).getTypical().get(0), 0.000001);
@ -547,7 +543,6 @@ public class JobProviderTests extends ESTestCase {
assertEquals("irrascible", records.get(1).getFunction());
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testRecords_UsingBuilder()
throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentification";
@ -576,13 +571,9 @@ public class JobProviderTests extends ESTestCase {
int from = 14;
int size = 2;
String sortfield = "minefield";
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
Client client = getMockedClient(qb -> {}, response);
JobProvider provider = createProvider(client);
RecordsQueryBuilder rqb = new RecordsQueryBuilder();
@ -595,7 +586,10 @@ public class JobProviderTests extends ESTestCase {
rqb.anomalyScoreThreshold(11.1);
rqb.normalizedProbability(2.2);
QueryPage<AnomalyRecord> recordPage = provider.records(jobId, rqb.build());
@SuppressWarnings({"unchecked", "rawtypes"})
QueryPage<AnomalyRecord>[] holder = new QueryPage[1];
provider.records(jobId, rqb.build(), page -> holder[0] = page, RuntimeException::new);
QueryPage<AnomalyRecord> recordPage = holder[0];
assertEquals(2L, recordPage.count());
List<AnomalyRecord> records = recordPage.results();
assertEquals(22.4, records.get(0).getTypical().get(0), 0.000001);
@ -606,7 +600,6 @@ public class JobProviderTests extends ESTestCase {
assertEquals("irrascible", records.get(1).getFunction());
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testBucketRecords() throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentification";
Date now = new Date();
@ -636,17 +629,14 @@ public class JobProviderTests extends ESTestCase {
int from = 14;
int size = 2;
String sortfield = "minefield";
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
Client client = getMockedClient(qb -> {}, response);
JobProvider provider = createProvider(client);
QueryPage<AnomalyRecord> recordPage = provider.bucketRecords(jobId, bucket, from, size, true, sortfield, true, "");
@SuppressWarnings({"unchecked", "rawtypes"})
QueryPage<AnomalyRecord>[] holder = new QueryPage[1];
provider.bucketRecords(jobId, bucket, from, size, true, sortfield, true, "", page -> holder[0] = page, RuntimeException::new);
QueryPage<AnomalyRecord> recordPage = holder[0];
assertEquals(2L, recordPage.count());
List<AnomalyRecord> records = recordPage.results();
assertEquals(22.4, records.get(0).getTypical().get(0), 0.000001);
@ -657,7 +647,6 @@ public class JobProviderTests extends ESTestCase {
assertEquals("irrascible", records.get(1).getFunction());
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testexpandBucket() throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentification";
Date now = new Date();
@ -676,20 +665,16 @@ public class JobProviderTests extends ESTestCase {
source.add(recordMap);
}
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.prepareSearchAnySize(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), response, queryBuilder);
Client client = clientBuilder.build();
Client client = getMockedClient(qb -> {}, response);
JobProvider provider = createProvider(client);
int records = provider.expandBucket(jobId, false, bucket);
Integer[] holder = new Integer[1];
provider.expandBucket(jobId, false, bucket, null, 0, records -> holder[0] = records, RuntimeException::new);
int records = holder[0];
assertEquals(400L, records);
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testexpandBucket_WithManyRecords()
throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentification";
@ -709,16 +694,13 @@ public class JobProviderTests extends ESTestCase {
source.add(recordMap);
}
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.prepareSearchAnySize(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), response, queryBuilder);
Client client = clientBuilder.build();
Client client = getMockedClient(qb -> {}, response);
JobProvider provider = createProvider(client);
int records = provider.expandBucket(jobId, false, bucket);
Integer[] holder = new Integer[1];
provider.expandBucket(jobId, false, bucket, null, 0, records -> holder[0] = records, RuntimeException::new);
int records = holder[0];
// This is not realistic, but is an artifact of the fact that the mock
// query
// returns all the records, not a subset
@ -900,7 +882,6 @@ public class JobProviderTests extends ESTestCase {
assertEquals(5.0, records.get(1).getInitialAnomalyScore(), 0.00001);
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testModelSnapshots() throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentificationForInfluencers";
Date now = new Date();

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
@ -17,17 +17,16 @@ import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.BucketInfluencer;
import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class JobResultsPersisterTests extends ESTestCase {
@ -35,8 +34,8 @@ public class JobResultsPersisterTests extends ESTestCase {
private static final String JOB_ID = "foo";
public void testPersistBucket_OneRecord() throws IOException {
AtomicReference<BulkRequest> reference = new AtomicReference<>();
Client client = mockClient(reference);
ArgumentCaptor<BulkRequest> captor = ArgumentCaptor.forClass(BulkRequest.class);
Client client = mockClient(captor);
Bucket bucket = new Bucket("foo", new Date(), 123456);
bucket.setAnomalyScore(99.9);
bucket.setEventCount(57);
@ -60,7 +59,7 @@ public class JobResultsPersisterTests extends ESTestCase {
JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client);
persister.bulkPersisterBuilder(JOB_ID).persistBucket(bucket).executeRequest();
BulkRequest bulkRequest = reference.get();
BulkRequest bulkRequest = captor.getValue();
assertEquals(2, bulkRequest.numberOfActions());
String s = ((IndexRequest)bulkRequest.requests().get(0)).source().utf8ToString();
@ -83,8 +82,8 @@ public class JobResultsPersisterTests extends ESTestCase {
}
public void testPersistRecords() throws IOException {
AtomicReference<BulkRequest> reference = new AtomicReference<>();
Client client = mockClient(reference);
ArgumentCaptor<BulkRequest> captor = ArgumentCaptor.forClass(BulkRequest.class);
Client client = mockClient(captor);
List<AnomalyRecord> records = new ArrayList<>();
AnomalyRecord r1 = new AnomalyRecord(JOB_ID, new Date(), 42, 1);
@ -115,7 +114,7 @@ public class JobResultsPersisterTests extends ESTestCase {
JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client);
persister.bulkPersisterBuilder(JOB_ID).persistRecords(records).executeRequest();
BulkRequest bulkRequest = reference.get();
BulkRequest bulkRequest = captor.getValue();
assertEquals(1, bulkRequest.numberOfActions());
String s = ((IndexRequest) bulkRequest.requests().get(0)).source().utf8ToString();
@ -140,8 +139,8 @@ public class JobResultsPersisterTests extends ESTestCase {
}
public void testPersistInfluencers() throws IOException {
AtomicReference<BulkRequest> reference = new AtomicReference<>();
Client client = mockClient(reference);
ArgumentCaptor<BulkRequest> captor = ArgumentCaptor.forClass(BulkRequest.class);
Client client = mockClient(captor);
List<Influencer> influencers = new ArrayList<>();
Influencer inf = new Influencer(JOB_ID, "infName1", "infValue1", new Date(), 600, 1);
@ -152,7 +151,7 @@ public class JobResultsPersisterTests extends ESTestCase {
JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client);
persister.bulkPersisterBuilder(JOB_ID).persistInfluencers(influencers).executeRequest();
BulkRequest bulkRequest = reference.get();
BulkRequest bulkRequest = captor.getValue();
assertEquals(1, bulkRequest.numberOfActions());
String s = ((IndexRequest) bulkRequest.requests().get(0)).source().utf8ToString();
@ -164,14 +163,11 @@ public class JobResultsPersisterTests extends ESTestCase {
}
@SuppressWarnings({"unchecked", "rawtypes"})
private Client mockClient(AtomicReference reference) {
private Client mockClient(ArgumentCaptor<BulkRequest> captor) {
Client client = mock(Client.class);
doAnswer(invocationOnMock -> {
reference.set(invocationOnMock.getArguments()[1]);
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
listener.onResponse(new BulkResponse(new BulkItemResponse[0], 0L));
return null;
}).when(client).execute(any(), any(), any());
ActionFuture<BulkResponse> future = mock(ActionFuture.class);
when(future.actionGet()).thenReturn(new BulkResponse(new BulkItemResponse[0], 0L));
when(client.bulk(captor.capture())).thenReturn(future);
return client;
}
}