Rename result iterators (elastic/elasticsearch#740)

Original commit: elastic/x-pack-elasticsearch@c462e9595a
This commit is contained in:
David Kyle 2017-01-17 16:02:02 +00:00 committed by GitHub
parent 92c808fd97
commit cfb94b6627
11 changed files with 182 additions and 191 deletions

View File

@ -16,9 +16,9 @@ import org.elasticsearch.xpack.ml.job.results.Bucket;
import java.io.IOException; import java.io.IOException;
class ElasticsearchBatchedBucketsIterator extends ElasticsearchBatchedResultsIterator<Bucket> { class BatchedBucketsIterator extends BatchedResultsIterator<Bucket> {
public ElasticsearchBatchedBucketsIterator(Client client, String jobId) { public BatchedBucketsIterator(Client client, String jobId) {
super(client, jobId, Bucket.RESULT_TYPE_VALUE); super(client, jobId, Bucket.RESULT_TYPE_VALUE);
} }

View File

@ -5,14 +5,59 @@
*/ */
package org.elasticsearch.xpack.ml.job.persistence; package org.elasticsearch.xpack.ml.job.persistence;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Deque; import java.util.Deque;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Objects;
/** /**
* An iterator useful to fetch a big number of documents of type T * An iterator useful to fetch a big number of documents of type T
* and iterate through them in batches. * and iterate through them in batches.
*/ */
public interface BatchedDocumentsIterator<T> { public abstract class BatchedDocumentsIterator<T> {
private static final Logger LOGGER = Loggers.getLogger(BatchedDocumentsIterator.class);
private static final String CONTEXT_ALIVE_DURATION = "5m";
private static final int BATCH_SIZE = 10000;
private final Client client;
private final String index;
private final ResultsFilterBuilder filterBuilder;
private volatile long count;
private volatile long totalHits;
private volatile String scrollId;
private volatile boolean isScrollInitialised;
public BatchedDocumentsIterator(Client client, String index) {
this(client, index, new ResultsFilterBuilder());
}
protected BatchedDocumentsIterator(Client client, String index, QueryBuilder queryBuilder) {
this(client, index, new ResultsFilterBuilder(queryBuilder));
}
private BatchedDocumentsIterator(Client client, String index, ResultsFilterBuilder resultsFilterBuilder) {
this.client = Objects.requireNonNull(client);
this.index = Objects.requireNonNull(index);
totalHits = 0;
count = 0;
filterBuilder = Objects.requireNonNull(resultsFilterBuilder);
isScrollInitialised = false;
}
/** /**
* Query documents whose timestamp is within the given time range * Query documents whose timestamp is within the given time range
* *
@ -20,14 +65,31 @@ public interface BatchedDocumentsIterator<T> {
* @param endEpochMs the end time as epoch milliseconds (exclusive) * @param endEpochMs the end time as epoch milliseconds (exclusive)
* @return the iterator itself * @return the iterator itself
*/ */
BatchedDocumentsIterator<T> timeRange(long startEpochMs, long endEpochMs); public BatchedDocumentsIterator<T> timeRange(long startEpochMs, long endEpochMs) {
filterBuilder.timeRange(Bucket.TIMESTAMP.getPreferredName(), startEpochMs, endEpochMs);
return this;
}
/** /**
* Include interim documents * Include interim documents
* *
* @param interimFieldName Name of the include interim field * @param interimFieldName Name of the include interim field
*/ */
BatchedDocumentsIterator<T> includeInterim(String interimFieldName); public BatchedDocumentsIterator<T> includeInterim(String interimFieldName) {
filterBuilder.interim(interimFieldName, true);
return this;
}
/**
* Returns {@code true} if the iteration has more elements.
* (In other words, returns {@code true} if {@link #next} would
* return an element rather than throwing an exception.)
*
* @return {@code true} if the iteration has more elements
*/
public boolean hasNext() {
return !isScrollInitialised || count != totalHits;
}
/** /**
* The first time next() is called, the search will be performed and the first * The first time next() is called, the search will be performed and the first
@ -39,14 +101,66 @@ public interface BatchedDocumentsIterator<T> {
* @return a {@code Deque} with the next batch of documents * @return a {@code Deque} with the next batch of documents
* @throws NoSuchElementException if the iteration has no more elements * @throws NoSuchElementException if the iteration has no more elements
*/ */
Deque<T> next(); public Deque<T> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
SearchResponse searchResponse;
if (scrollId == null) {
searchResponse = initScroll();
} else {
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId).scroll(CONTEXT_ALIVE_DURATION);
searchResponse = client.searchScroll(searchScrollRequest).actionGet();
}
scrollId = searchResponse.getScrollId();
return mapHits(searchResponse);
}
private SearchResponse initScroll() {
LOGGER.trace("ES API CALL: search all of type {} from index {}", getType(), index);
isScrollInitialised = true;
SearchRequest searchRequest = new SearchRequest(index);
searchRequest.types(getType());
searchRequest.scroll(CONTEXT_ALIVE_DURATION);
searchRequest.source(new SearchSourceBuilder()
.size(BATCH_SIZE)
.query(filterBuilder.build())
.sort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)));
SearchResponse searchResponse = client.search(searchRequest).actionGet();
totalHits = searchResponse.getHits().getTotalHits();
scrollId = searchResponse.getScrollId();
return searchResponse;
}
private Deque<T> mapHits(SearchResponse searchResponse) {
Deque<T> results = new ArrayDeque<>();
SearchHit[] hits = searchResponse.getHits().getHits();
for (SearchHit hit : hits) {
T mapped = map(hit);
if (mapped != null) {
results.add(mapped);
}
}
count += hits.length;
if (!hasNext() && scrollId != null) {
client.prepareClearScroll().setScrollIds(Arrays.asList(scrollId)).get();
}
return results;
}
protected abstract String getType();
/** /**
* Returns {@code true} if the iteration has more elements. * Maps the search hit to the document type
* (In other words, returns {@code true} if {@link #next} would * @param hit
* return an element rather than throwing an exception.) * the search hit
* * @return The mapped document or {@code null} if the mapping failed
* @return {@code true} if the iteration has more elements
*/ */
boolean hasNext(); protected abstract T map(SearchHit hit);
} }

View File

@ -16,8 +16,8 @@ import org.elasticsearch.xpack.ml.job.results.Influencer;
import java.io.IOException; import java.io.IOException;
class ElasticsearchBatchedInfluencersIterator extends ElasticsearchBatchedResultsIterator<Influencer> { class BatchedInfluencersIterator extends BatchedResultsIterator<Influencer> {
public ElasticsearchBatchedInfluencersIterator(Client client, String jobId) { public BatchedInfluencersIterator(Client client, String jobId) {
super(client, jobId, Influencer.RESULT_TYPE_VALUE); super(client, jobId, Influencer.RESULT_TYPE_VALUE);
} }

View File

@ -16,9 +16,9 @@ import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import java.io.IOException; import java.io.IOException;
class ElasticsearchBatchedRecordsIterator extends ElasticsearchBatchedResultsIterator<AnomalyRecord> { class BatchedRecordsIterator extends BatchedResultsIterator<AnomalyRecord> {
public ElasticsearchBatchedRecordsIterator(Client client, String jobId) { public BatchedRecordsIterator(Client client, String jobId) {
super(client, jobId, AnomalyRecord.RESULT_TYPE_VALUE); super(client, jobId, AnomalyRecord.RESULT_TYPE_VALUE);
} }

View File

@ -9,10 +9,10 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.xpack.ml.job.results.Result; import org.elasticsearch.xpack.ml.job.results.Result;
public abstract class ElasticsearchBatchedResultsIterator<T> public abstract class BatchedResultsIterator<T>
extends ElasticsearchBatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<T>> { extends BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<T>> {
public ElasticsearchBatchedResultsIterator(Client client, String jobId, String resultType) { public BatchedResultsIterator(Client client, String jobId, String resultType) {
super(client, AnomalyDetectorsIndex.jobResultsIndexName(jobId), super(client, AnomalyDetectorsIndex.jobResultsIndexName(jobId),
new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), resultType)); new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), resultType));
} }

View File

@ -1,137 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.persistence;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Deque;
import java.util.NoSuchElementException;
import java.util.Objects;
abstract class ElasticsearchBatchedDocumentsIterator<T> implements BatchedDocumentsIterator<T> {
private static final Logger LOGGER = Loggers.getLogger(ElasticsearchBatchedDocumentsIterator.class);
private static final String CONTEXT_ALIVE_DURATION = "5m";
private static final int BATCH_SIZE = 10000;
private final Client client;
private final String index;
private final ResultsFilterBuilder filterBuilder;
private volatile long count;
private volatile long totalHits;
private volatile String scrollId;
private volatile boolean isScrollInitialised;
public ElasticsearchBatchedDocumentsIterator(Client client, String index) {
this(client, index, new ResultsFilterBuilder());
}
protected ElasticsearchBatchedDocumentsIterator(Client client, String index, QueryBuilder queryBuilder) {
this(client, index, new ResultsFilterBuilder(queryBuilder));
}
private ElasticsearchBatchedDocumentsIterator(Client client, String index, ResultsFilterBuilder resultsFilterBuilder) {
this.client = Objects.requireNonNull(client);
this.index = Objects.requireNonNull(index);
totalHits = 0;
count = 0;
filterBuilder = Objects.requireNonNull(resultsFilterBuilder);
isScrollInitialised = false;
}
@Override
public BatchedDocumentsIterator<T> timeRange(long startEpochMs, long endEpochMs) {
filterBuilder.timeRange(Bucket.TIMESTAMP.getPreferredName(), startEpochMs, endEpochMs);
return this;
}
@Override
public BatchedDocumentsIterator<T> includeInterim(String interimFieldName) {
filterBuilder.interim(interimFieldName, true);
return this;
}
@Override
public boolean hasNext() {
return !isScrollInitialised || count != totalHits;
}
@Override
public Deque<T> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
SearchResponse searchResponse;
if (scrollId == null) {
searchResponse = initScroll();
} else {
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId).scroll(CONTEXT_ALIVE_DURATION);
searchResponse = client.searchScroll(searchScrollRequest).actionGet();
}
scrollId = searchResponse.getScrollId();
return mapHits(searchResponse);
}
private SearchResponse initScroll() {
LOGGER.trace("ES API CALL: search all of type {} from index {}", getType(), index);
isScrollInitialised = true;
SearchRequest searchRequest = new SearchRequest(index);
searchRequest.types(getType());
searchRequest.scroll(CONTEXT_ALIVE_DURATION);
searchRequest.source(new SearchSourceBuilder()
.size(BATCH_SIZE)
.query(filterBuilder.build())
.sort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)));
SearchResponse searchResponse = client.search(searchRequest).actionGet();
totalHits = searchResponse.getHits().getTotalHits();
scrollId = searchResponse.getScrollId();
return searchResponse;
}
private Deque<T> mapHits(SearchResponse searchResponse) {
Deque<T> results = new ArrayDeque<>();
SearchHit[] hits = searchResponse.getHits().getHits();
for (SearchHit hit : hits) {
T mapped = map(hit);
if (mapped != null) {
results.add(mapped);
}
}
count += hits.length;
if (!hasNext() && scrollId != null) {
client.prepareClearScroll().setScrollIds(Arrays.asList(scrollId)).get();
}
return results;
}
protected abstract String getType();
/**
* Maps the search hit to the document type
* @param hit
* the search hit
* @return The mapped document or {@code null} if the mapping failed
*/
protected abstract T map(SearchHit hit);
}

View File

@ -491,8 +491,8 @@ public class JobProvider {
* @param jobId the id of the job for which buckets are requested * @param jobId the id of the job for which buckets are requested
* @return a bucket {@link BatchedDocumentsIterator} * @return a bucket {@link BatchedDocumentsIterator}
*/ */
public BatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<Bucket>> newBatchedBucketsIterator(String jobId) { public BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<Bucket>> newBatchedBucketsIterator(String jobId) {
return new ElasticsearchBatchedBucketsIterator(client, jobId); return new BatchedBucketsIterator(client, jobId);
} }
/** /**
@ -503,9 +503,9 @@ public class JobProvider {
* @param jobId the id of the job for which buckets are requested * @param jobId the id of the job for which buckets are requested
* @return a record {@link BatchedDocumentsIterator} * @return a record {@link BatchedDocumentsIterator}
*/ */
public BatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<AnomalyRecord>> public BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<AnomalyRecord>>
newBatchedRecordsIterator(String jobId) { newBatchedRecordsIterator(String jobId) {
return new ElasticsearchBatchedRecordsIterator(client, jobId); return new BatchedRecordsIterator(client, jobId);
} }
// TODO (norelease): Use scroll search instead of multiple searches with increasing from // TODO (norelease): Use scroll search instead of multiple searches with increasing from
@ -761,9 +761,9 @@ public class JobProvider {
* @param jobId the id of the job for which influencers are requested * @param jobId the id of the job for which influencers are requested
* @return an influencer {@link BatchedDocumentsIterator} * @return an influencer {@link BatchedDocumentsIterator}
*/ */
public BatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<Influencer>> public BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<Influencer>>
newBatchedInfluencersIterator(String jobId) { newBatchedInfluencersIterator(String jobId) {
return new ElasticsearchBatchedInfluencersIterator(client, jobId); return new BatchedInfluencersIterator(client, jobId);
} }
/** /**

View File

@ -10,7 +10,7 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.ml.job.AnalysisConfig; import org.elasticsearch.xpack.ml.job.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.Job;
import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator; import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator;
import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchBatchedResultsIterator; import org.elasticsearch.xpack.ml.job.persistence.BatchedResultsIterator;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
@ -101,7 +101,7 @@ public class ScoresUpdater {
private void updateBuckets(Normalizer normalizer, String quantilesState, long endBucketEpochMs, private void updateBuckets(Normalizer normalizer, String quantilesState, long endBucketEpochMs,
long windowExtensionMs, int[] counts, boolean perPartitionNormalization) { long windowExtensionMs, int[] counts, boolean perPartitionNormalization) {
BatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<Bucket>> bucketsIterator = BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<Bucket>> bucketsIterator =
jobProvider.newBatchedBucketsIterator(job.getId()) jobProvider.newBatchedBucketsIterator(job.getId())
.timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs); .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs);
@ -119,13 +119,13 @@ public class ScoresUpdater {
while (bucketsIterator.hasNext()) { while (bucketsIterator.hasNext()) {
// Get a batch of buckets without their records to calculate // Get a batch of buckets without their records to calculate
// how many buckets can be sensibly retrieved // how many buckets can be sensibly retrieved
Deque<ElasticsearchBatchedResultsIterator.ResultWithIndex<Bucket>> buckets = bucketsIterator.next(); Deque<BatchedResultsIterator.ResultWithIndex<Bucket>> buckets = bucketsIterator.next();
if (buckets.isEmpty()) { if (buckets.isEmpty()) {
break; break;
} }
while (!buckets.isEmpty()) { while (!buckets.isEmpty()) {
ElasticsearchBatchedResultsIterator.ResultWithIndex<Bucket> current = buckets.removeFirst(); BatchedResultsIterator.ResultWithIndex<Bucket> current = buckets.removeFirst();
Bucket currentBucket = current.result; Bucket currentBucket = current.result;
if (currentBucket.isNormalizable()) { if (currentBucket.isNormalizable()) {
BucketNormalizable bucketNormalizable = new BucketNormalizable(current.result, current.indexName); BucketNormalizable bucketNormalizable = new BucketNormalizable(current.result, current.indexName);
@ -156,13 +156,13 @@ public class ScoresUpdater {
} }
private List<RecordNormalizable> bucketRecordsAsNormalizables(long bucketTimeStamp) { private List<RecordNormalizable> bucketRecordsAsNormalizables(long bucketTimeStamp) {
BatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<AnomalyRecord>> BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<AnomalyRecord>>
recordsIterator = jobProvider.newBatchedRecordsIterator(job.getId()) recordsIterator = jobProvider.newBatchedRecordsIterator(job.getId())
.timeRange(bucketTimeStamp, bucketTimeStamp + 1); .timeRange(bucketTimeStamp, bucketTimeStamp + 1);
List<RecordNormalizable> recordNormalizables = new ArrayList<>(); List<RecordNormalizable> recordNormalizables = new ArrayList<>();
while (recordsIterator.hasNext()) { while (recordsIterator.hasNext()) {
for (ElasticsearchBatchedResultsIterator.ResultWithIndex<AnomalyRecord> record : recordsIterator.next() ) { for (BatchedResultsIterator.ResultWithIndex<AnomalyRecord> record : recordsIterator.next() ) {
recordNormalizables.add(new RecordNormalizable(record.result, record.indexName)); recordNormalizables.add(new RecordNormalizable(record.result, record.indexName));
} }
} }
@ -211,12 +211,12 @@ public class ScoresUpdater {
private void updateInfluencers(Normalizer normalizer, String quantilesState, long endBucketEpochMs, private void updateInfluencers(Normalizer normalizer, String quantilesState, long endBucketEpochMs,
long windowExtensionMs, int[] counts) { long windowExtensionMs, int[] counts) {
BatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<Influencer>> influencersIterator = BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<Influencer>> influencersIterator =
jobProvider.newBatchedInfluencersIterator(job.getId()) jobProvider.newBatchedInfluencersIterator(job.getId())
.timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs); .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs);
while (influencersIterator.hasNext()) { while (influencersIterator.hasNext()) {
Deque<ElasticsearchBatchedResultsIterator.ResultWithIndex<Influencer>> influencers = influencersIterator.next(); Deque<BatchedResultsIterator.ResultWithIndex<Influencer>> influencers = influencersIterator.next();
if (influencers.isEmpty()) { if (influencers.isEmpty()) {
LOGGER.debug("[{}] No influencers to renormalize for job", job.getId()); LOGGER.debug("[{}] No influencers to renormalize for job", job.getId());
break; break;

View File

@ -30,7 +30,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127") @LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public class ElasticsearchBatchedDocumentsIteratorTests extends ESTestCase { public class BatchedDocumentsIteratorTests extends ESTestCase {
private static final String INDEX_NAME = ".ml-anomalies-foo"; private static final String INDEX_NAME = ".ml-anomalies-foo";
private static final String SCROLL_ID = "someScrollId"; private static final String SCROLL_ID = "someScrollId";
@ -187,7 +187,7 @@ public class ElasticsearchBatchedDocumentsIteratorTests extends ESTestCase {
} }
} }
private static class TestIterator extends ElasticsearchBatchedDocumentsIterator<String> { private static class TestIterator extends BatchedDocumentsIterator<String> {
public TestIterator(Client client, String jobId) { public TestIterator(Client client, String jobId) {
super(client, jobId); super(client, jobId);
} }

View File

@ -5,19 +5,23 @@
*/ */
package org.elasticsearch.xpack.ml.job.persistence; package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.SearchHit;
import java.util.Deque; import java.util.Deque;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import static org.junit.Assert.assertEquals; import static org.elasticsearch.mock.orig.Mockito.mock;
public class MockBatchedDocumentsIterator<T> implements BatchedDocumentsIterator<T> { public class MockBatchedDocumentsIterator<T> extends BatchedDocumentsIterator<T> {
private final List<Deque<T>> batches; private final List<Deque<T>> batches;
private int index; private int index;
private boolean wasTimeRangeCalled; private boolean wasTimeRangeCalled;
private String interimFieldName; private String interimFieldName;
public MockBatchedDocumentsIterator(List<Deque<T>> batches) { public MockBatchedDocumentsIterator(List<Deque<T>> batches) {
super(mock(Client.class), "foo");
this.batches = batches; this.batches = batches;
index = 0; index = 0;
wasTimeRangeCalled = false; wasTimeRangeCalled = false;
@ -44,6 +48,16 @@ public class MockBatchedDocumentsIterator<T> implements BatchedDocumentsIterator
return batches.get(index++); return batches.get(index++);
} }
@Override
protected String getType() {
return null;
}
@Override
protected T map(SearchHit hit) {
return null;
}
@Override @Override
public boolean hasNext() { public boolean hasNext() {
return index != batches.size(); return index != batches.size();

View File

@ -18,7 +18,7 @@ import org.elasticsearch.xpack.ml.job.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.Detector; import org.elasticsearch.xpack.ml.job.Detector;
import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.Job;
import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator; import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator;
import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchBatchedResultsIterator; import org.elasticsearch.xpack.ml.job.persistence.BatchedResultsIterator;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.MockBatchedDocumentsIterator; import org.elasticsearch.xpack.ml.job.persistence.MockBatchedDocumentsIterator;
@ -192,10 +192,10 @@ public class ScoresUpdaterTests extends ESTestCase {
bucket1.setAnomalyScore(42.0); bucket1.setAnomalyScore(42.0);
bucket1.addBucketInfluencer(createTimeBucketInfluencer(bucket1.getTimestamp(), 0.04, 42.0)); bucket1.addBucketInfluencer(createTimeBucketInfluencer(bucket1.getTimestamp(), 0.04, 42.0));
bucket1.setMaxNormalizedProbability(50.0); bucket1.setMaxNormalizedProbability(50.0);
List<ElasticsearchBatchedResultsIterator.ResultWithIndex<AnomalyRecord>> records = new ArrayList<>(); List<BatchedResultsIterator.ResultWithIndex<AnomalyRecord>> records = new ArrayList<>();
Date date = new Date(); Date date = new Date();
for (int i=0; i<100000; i++) { for (int i=0; i<100000; i++) {
records.add(new ElasticsearchBatchedResultsIterator.ResultWithIndex<>("foo", new AnomalyRecord("foo", date, 1, i))); records.add(new BatchedResultsIterator.ResultWithIndex<>("foo", new AnomalyRecord("foo", date, 1, i)));
} }
Bucket bucket2 = generateBucket(new Date(10000 * 1000)); Bucket bucket2 = generateBucket(new Date(10000 * 1000));
@ -209,9 +209,9 @@ public class ScoresUpdaterTests extends ESTestCase {
givenProviderReturnsBuckets(batch); givenProviderReturnsBuckets(batch);
List<Deque<ElasticsearchBatchedResultsIterator.ResultWithIndex<AnomalyRecord>>> recordBatches = new ArrayList<>(); List<Deque<BatchedResultsIterator.ResultWithIndex<AnomalyRecord>>> recordBatches = new ArrayList<>();
recordBatches.add(new ArrayDeque<>(records)); recordBatches.add(new ArrayDeque<>(records));
BatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<AnomalyRecord>> recordIter = BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<AnomalyRecord>> recordIter =
new MockBatchedDocumentsIterator<>(recordBatches); new MockBatchedDocumentsIterator<>(recordBatches);
when(jobProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter); when(jobProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter);
@ -341,29 +341,29 @@ public class ScoresUpdaterTests extends ESTestCase {
} }
private void givenBuckets(List<Deque<Bucket>> batches) { private void givenBuckets(List<Deque<Bucket>> batches) {
List<Deque<ElasticsearchBatchedResultsIterator.ResultWithIndex<Bucket>>> batchesWithIndex = new ArrayList<>(); List<Deque<BatchedResultsIterator.ResultWithIndex<Bucket>>> batchesWithIndex = new ArrayList<>();
for (Deque<Bucket> deque : batches) { for (Deque<Bucket> deque : batches) {
Deque<ElasticsearchBatchedResultsIterator.ResultWithIndex<Bucket>> queueWithIndex = new ArrayDeque<>(); Deque<BatchedResultsIterator.ResultWithIndex<Bucket>> queueWithIndex = new ArrayDeque<>();
for (Bucket bucket : deque) { for (Bucket bucket : deque) {
queueWithIndex.add(new ElasticsearchBatchedResultsIterator.ResultWithIndex<>("foo", bucket)); queueWithIndex.add(new BatchedResultsIterator.ResultWithIndex<>("foo", bucket));
} }
batchesWithIndex.add(queueWithIndex); batchesWithIndex.add(queueWithIndex);
} }
BatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<Bucket>> bucketIter = BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<Bucket>> bucketIter =
new MockBatchedDocumentsIterator<>(batchesWithIndex); new MockBatchedDocumentsIterator<>(batchesWithIndex);
when(jobProvider.newBatchedBucketsIterator(JOB_ID)).thenReturn(bucketIter); when(jobProvider.newBatchedBucketsIterator(JOB_ID)).thenReturn(bucketIter);
} }
private void givenProviderReturnsRecords(Deque<AnomalyRecord> records) { private void givenProviderReturnsRecords(Deque<AnomalyRecord> records) {
Deque<ElasticsearchBatchedResultsIterator.ResultWithIndex<AnomalyRecord>> batch = new ArrayDeque<>(); Deque<BatchedResultsIterator.ResultWithIndex<AnomalyRecord>> batch = new ArrayDeque<>();
List<Deque<ElasticsearchBatchedResultsIterator.ResultWithIndex<AnomalyRecord>>> batches = new ArrayList<>(); List<Deque<BatchedResultsIterator.ResultWithIndex<AnomalyRecord>>> batches = new ArrayList<>();
for (AnomalyRecord record : records) { for (AnomalyRecord record : records) {
batch.add(new ElasticsearchBatchedResultsIterator.ResultWithIndex<>("foo", record)); batch.add(new BatchedResultsIterator.ResultWithIndex<>("foo", record));
} }
batches.add(batch); batches.add(batch);
BatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<AnomalyRecord>> recordIter = BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<AnomalyRecord>> recordIter =
new MockBatchedDocumentsIterator<>(batches); new MockBatchedDocumentsIterator<>(batches);
when(jobProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter); when(jobProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter);
} }
@ -373,13 +373,13 @@ public class ScoresUpdaterTests extends ESTestCase {
} }
private void givenProviderReturnsInfluencers(Deque<Influencer> influencers) { private void givenProviderReturnsInfluencers(Deque<Influencer> influencers) {
List<Deque<ElasticsearchBatchedResultsIterator.ResultWithIndex<Influencer>>> batches = new ArrayList<>(); List<Deque<BatchedResultsIterator.ResultWithIndex<Influencer>>> batches = new ArrayList<>();
Deque<ElasticsearchBatchedResultsIterator.ResultWithIndex<Influencer>> queue = new ArrayDeque<>(); Deque<BatchedResultsIterator.ResultWithIndex<Influencer>> queue = new ArrayDeque<>();
for (Influencer inf : influencers) { for (Influencer inf : influencers) {
queue.add(new ElasticsearchBatchedResultsIterator.ResultWithIndex<>("foo", inf)); queue.add(new BatchedResultsIterator.ResultWithIndex<>("foo", inf));
} }
batches.add(queue); batches.add(queue);
BatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<Influencer>> iterator = BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<Influencer>> iterator =
new MockBatchedDocumentsIterator<>(batches); new MockBatchedDocumentsIterator<>(batches);
when(jobProvider.newBatchedInfluencersIterator(JOB_ID)).thenReturn(iterator); when(jobProvider.newBatchedInfluencersIterator(JOB_ID)).thenReturn(iterator);
} }