diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedBucketsIterator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedBucketsIterator.java similarity index 88% rename from elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedBucketsIterator.java rename to elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedBucketsIterator.java index 2fec4c7511e..dad65d80ba9 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedBucketsIterator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedBucketsIterator.java @@ -16,9 +16,9 @@ import org.elasticsearch.xpack.ml.job.results.Bucket; import java.io.IOException; -class ElasticsearchBatchedBucketsIterator extends ElasticsearchBatchedResultsIterator { +class BatchedBucketsIterator extends BatchedResultsIterator { - public ElasticsearchBatchedBucketsIterator(Client client, String jobId) { + public BatchedBucketsIterator(Client client, String jobId) { super(client, jobId, Bucket.RESULT_TYPE_VALUE); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java index 3292a3f17ca..63d91b9ef18 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java @@ -5,14 +5,59 @@ */ package org.elasticsearch.xpack.ml.job.persistence; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortBuilders; +import org.elasticsearch.xpack.ml.job.results.Bucket; + +import java.util.ArrayDeque; +import java.util.Arrays; import java.util.Deque; import java.util.NoSuchElementException; +import java.util.Objects; /** * An iterator useful to fetch a big number of documents of type T * and iterate through them in batches. */ -public interface BatchedDocumentsIterator { +public abstract class BatchedDocumentsIterator { + private static final Logger LOGGER = Loggers.getLogger(BatchedDocumentsIterator.class); + + private static final String CONTEXT_ALIVE_DURATION = "5m"; + private static final int BATCH_SIZE = 10000; + + private final Client client; + private final String index; + private final ResultsFilterBuilder filterBuilder; + private volatile long count; + private volatile long totalHits; + private volatile String scrollId; + private volatile boolean isScrollInitialised; + + public BatchedDocumentsIterator(Client client, String index) { + this(client, index, new ResultsFilterBuilder()); + } + + protected BatchedDocumentsIterator(Client client, String index, QueryBuilder queryBuilder) { + this(client, index, new ResultsFilterBuilder(queryBuilder)); + } + + private BatchedDocumentsIterator(Client client, String index, ResultsFilterBuilder resultsFilterBuilder) { + this.client = Objects.requireNonNull(client); + this.index = Objects.requireNonNull(index); + totalHits = 0; + count = 0; + filterBuilder = Objects.requireNonNull(resultsFilterBuilder); + isScrollInitialised = false; + } + /** * Query documents whose timestamp is within the given time range * @@ -20,14 +65,31 @@ public interface BatchedDocumentsIterator { * @param endEpochMs the end time as epoch milliseconds (exclusive) * @return the iterator itself */ - BatchedDocumentsIterator timeRange(long startEpochMs, long endEpochMs); + public BatchedDocumentsIterator timeRange(long startEpochMs, long endEpochMs) { + filterBuilder.timeRange(Bucket.TIMESTAMP.getPreferredName(), startEpochMs, endEpochMs); + return this; + } /** * Include interim documents * * @param interimFieldName Name of the include interim field */ - BatchedDocumentsIterator includeInterim(String interimFieldName); + public BatchedDocumentsIterator includeInterim(String interimFieldName) { + filterBuilder.interim(interimFieldName, true); + return this; + } + + /** + * Returns {@code true} if the iteration has more elements. + * (In other words, returns {@code true} if {@link #next} would + * return an element rather than throwing an exception.) + * + * @return {@code true} if the iteration has more elements + */ + public boolean hasNext() { + return !isScrollInitialised || count != totalHits; + } /** * The first time next() is called, the search will be performed and the first @@ -39,14 +101,66 @@ public interface BatchedDocumentsIterator { * @return a {@code Deque} with the next batch of documents * @throws NoSuchElementException if the iteration has no more elements */ - Deque next(); + public Deque next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + SearchResponse searchResponse; + if (scrollId == null) { + searchResponse = initScroll(); + } else { + SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId).scroll(CONTEXT_ALIVE_DURATION); + searchResponse = client.searchScroll(searchScrollRequest).actionGet(); + } + scrollId = searchResponse.getScrollId(); + return mapHits(searchResponse); + } + + private SearchResponse initScroll() { + LOGGER.trace("ES API CALL: search all of type {} from index {}", getType(), index); + + isScrollInitialised = true; + + SearchRequest searchRequest = new SearchRequest(index); + searchRequest.types(getType()); + searchRequest.scroll(CONTEXT_ALIVE_DURATION); + searchRequest.source(new SearchSourceBuilder() + .size(BATCH_SIZE) + .query(filterBuilder.build()) + .sort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC))); + + SearchResponse searchResponse = client.search(searchRequest).actionGet(); + totalHits = searchResponse.getHits().getTotalHits(); + scrollId = searchResponse.getScrollId(); + return searchResponse; + } + + private Deque mapHits(SearchResponse searchResponse) { + Deque results = new ArrayDeque<>(); + + SearchHit[] hits = searchResponse.getHits().getHits(); + for (SearchHit hit : hits) { + T mapped = map(hit); + if (mapped != null) { + results.add(mapped); + } + } + count += hits.length; + + if (!hasNext() && scrollId != null) { + client.prepareClearScroll().setScrollIds(Arrays.asList(scrollId)).get(); + } + return results; + } + + protected abstract String getType(); /** - * Returns {@code true} if the iteration has more elements. - * (In other words, returns {@code true} if {@link #next} would - * return an element rather than throwing an exception.) - * - * @return {@code true} if the iteration has more elements + * Maps the search hit to the document type + * @param hit + * the search hit + * @return The mapped document or {@code null} if the mapping failed */ - boolean hasNext(); + protected abstract T map(SearchHit hit); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedInfluencersIterator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedInfluencersIterator.java similarity index 88% rename from elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedInfluencersIterator.java rename to elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedInfluencersIterator.java index 2e499fbadc0..6efa361da5d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedInfluencersIterator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedInfluencersIterator.java @@ -16,8 +16,8 @@ import org.elasticsearch.xpack.ml.job.results.Influencer; import java.io.IOException; -class ElasticsearchBatchedInfluencersIterator extends ElasticsearchBatchedResultsIterator { - public ElasticsearchBatchedInfluencersIterator(Client client, String jobId) { +class BatchedInfluencersIterator extends BatchedResultsIterator { + public BatchedInfluencersIterator(Client client, String jobId) { super(client, jobId, Influencer.RESULT_TYPE_VALUE); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedRecordsIterator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedRecordsIterator.java similarity index 88% rename from elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedRecordsIterator.java rename to elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedRecordsIterator.java index 77aef798487..11fe417b169 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedRecordsIterator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedRecordsIterator.java @@ -16,9 +16,9 @@ import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import java.io.IOException; -class ElasticsearchBatchedRecordsIterator extends ElasticsearchBatchedResultsIterator { +class BatchedRecordsIterator extends BatchedResultsIterator { - public ElasticsearchBatchedRecordsIterator(Client client, String jobId) { + public BatchedRecordsIterator(Client client, String jobId) { super(client, jobId, AnomalyRecord.RESULT_TYPE_VALUE); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedResultsIterator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedResultsIterator.java similarity index 78% rename from elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedResultsIterator.java rename to elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedResultsIterator.java index eea8efa96b5..eac6a2e1a79 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedResultsIterator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedResultsIterator.java @@ -9,10 +9,10 @@ import org.elasticsearch.client.Client; import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.xpack.ml.job.results.Result; -public abstract class ElasticsearchBatchedResultsIterator - extends ElasticsearchBatchedDocumentsIterator> { +public abstract class BatchedResultsIterator + extends BatchedDocumentsIterator> { - public ElasticsearchBatchedResultsIterator(Client client, String jobId, String resultType) { + public BatchedResultsIterator(Client client, String jobId, String resultType) { super(client, AnomalyDetectorsIndex.jobResultsIndexName(jobId), new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), resultType)); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedDocumentsIterator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedDocumentsIterator.java deleted file mode 100644 index 9c90bcc1aa0..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedDocumentsIterator.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.job.persistence; - -import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchScrollRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.search.sort.SortBuilders; -import org.elasticsearch.xpack.ml.job.results.Bucket; - -import java.util.ArrayDeque; -import java.util.Arrays; -import java.util.Deque; -import java.util.NoSuchElementException; -import java.util.Objects; - -abstract class ElasticsearchBatchedDocumentsIterator implements BatchedDocumentsIterator { - private static final Logger LOGGER = Loggers.getLogger(ElasticsearchBatchedDocumentsIterator.class); - - private static final String CONTEXT_ALIVE_DURATION = "5m"; - private static final int BATCH_SIZE = 10000; - - private final Client client; - private final String index; - private final ResultsFilterBuilder filterBuilder; - private volatile long count; - private volatile long totalHits; - private volatile String scrollId; - private volatile boolean isScrollInitialised; - - public ElasticsearchBatchedDocumentsIterator(Client client, String index) { - this(client, index, new ResultsFilterBuilder()); - } - - protected ElasticsearchBatchedDocumentsIterator(Client client, String index, QueryBuilder queryBuilder) { - this(client, index, new ResultsFilterBuilder(queryBuilder)); - } - - private ElasticsearchBatchedDocumentsIterator(Client client, String index, ResultsFilterBuilder resultsFilterBuilder) { - this.client = Objects.requireNonNull(client); - this.index = Objects.requireNonNull(index); - totalHits = 0; - count = 0; - filterBuilder = Objects.requireNonNull(resultsFilterBuilder); - isScrollInitialised = false; - } - - @Override - public BatchedDocumentsIterator timeRange(long startEpochMs, long endEpochMs) { - filterBuilder.timeRange(Bucket.TIMESTAMP.getPreferredName(), startEpochMs, endEpochMs); - return this; - } - - @Override - public BatchedDocumentsIterator includeInterim(String interimFieldName) { - filterBuilder.interim(interimFieldName, true); - return this; - } - - @Override - public boolean hasNext() { - return !isScrollInitialised || count != totalHits; - } - - @Override - public Deque next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - - SearchResponse searchResponse; - if (scrollId == null) { - searchResponse = initScroll(); - } else { - SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId).scroll(CONTEXT_ALIVE_DURATION); - searchResponse = client.searchScroll(searchScrollRequest).actionGet(); - } - scrollId = searchResponse.getScrollId(); - return mapHits(searchResponse); - } - - private SearchResponse initScroll() { - LOGGER.trace("ES API CALL: search all of type {} from index {}", getType(), index); - - isScrollInitialised = true; - - SearchRequest searchRequest = new SearchRequest(index); - searchRequest.types(getType()); - searchRequest.scroll(CONTEXT_ALIVE_DURATION); - searchRequest.source(new SearchSourceBuilder() - .size(BATCH_SIZE) - .query(filterBuilder.build()) - .sort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC))); - - SearchResponse searchResponse = client.search(searchRequest).actionGet(); - totalHits = searchResponse.getHits().getTotalHits(); - scrollId = searchResponse.getScrollId(); - return searchResponse; - } - - private Deque mapHits(SearchResponse searchResponse) { - Deque results = new ArrayDeque<>(); - - SearchHit[] hits = searchResponse.getHits().getHits(); - for (SearchHit hit : hits) { - T mapped = map(hit); - if (mapped != null) { - results.add(mapped); - } - } - count += hits.length; - - if (!hasNext() && scrollId != null) { - client.prepareClearScroll().setScrollIds(Arrays.asList(scrollId)).get(); - } - return results; - } - - protected abstract String getType(); - - /** - * Maps the search hit to the document type - * @param hit - * the search hit - * @return The mapped document or {@code null} if the mapping failed - */ - protected abstract T map(SearchHit hit); -} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index b3a69cdcba2..cba85210857 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -491,8 +491,8 @@ public class JobProvider { * @param jobId the id of the job for which buckets are requested * @return a bucket {@link BatchedDocumentsIterator} */ - public BatchedDocumentsIterator> newBatchedBucketsIterator(String jobId) { - return new ElasticsearchBatchedBucketsIterator(client, jobId); + public BatchedDocumentsIterator> newBatchedBucketsIterator(String jobId) { + return new BatchedBucketsIterator(client, jobId); } /** @@ -503,9 +503,9 @@ public class JobProvider { * @param jobId the id of the job for which buckets are requested * @return a record {@link BatchedDocumentsIterator} */ - public BatchedDocumentsIterator> + public BatchedDocumentsIterator> newBatchedRecordsIterator(String jobId) { - return new ElasticsearchBatchedRecordsIterator(client, jobId); + return new BatchedRecordsIterator(client, jobId); } // TODO (norelease): Use scroll search instead of multiple searches with increasing from @@ -761,9 +761,9 @@ public class JobProvider { * @param jobId the id of the job for which influencers are requested * @return an influencer {@link BatchedDocumentsIterator} */ - public BatchedDocumentsIterator> + public BatchedDocumentsIterator> newBatchedInfluencersIterator(String jobId) { - return new ElasticsearchBatchedInfluencersIterator(client, jobId); + return new BatchedInfluencersIterator(client, jobId); } /** diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java index 7b830a898c6..29f4d8d862e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java @@ -10,7 +10,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.xpack.ml.job.AnalysisConfig; import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator; -import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchBatchedResultsIterator; +import org.elasticsearch.xpack.ml.job.persistence.BatchedResultsIterator; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; @@ -101,7 +101,7 @@ public class ScoresUpdater { private void updateBuckets(Normalizer normalizer, String quantilesState, long endBucketEpochMs, long windowExtensionMs, int[] counts, boolean perPartitionNormalization) { - BatchedDocumentsIterator> bucketsIterator = + BatchedDocumentsIterator> bucketsIterator = jobProvider.newBatchedBucketsIterator(job.getId()) .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs); @@ -119,13 +119,13 @@ public class ScoresUpdater { while (bucketsIterator.hasNext()) { // Get a batch of buckets without their records to calculate // how many buckets can be sensibly retrieved - Deque> buckets = bucketsIterator.next(); + Deque> buckets = bucketsIterator.next(); if (buckets.isEmpty()) { break; } while (!buckets.isEmpty()) { - ElasticsearchBatchedResultsIterator.ResultWithIndex current = buckets.removeFirst(); + BatchedResultsIterator.ResultWithIndex current = buckets.removeFirst(); Bucket currentBucket = current.result; if (currentBucket.isNormalizable()) { BucketNormalizable bucketNormalizable = new BucketNormalizable(current.result, current.indexName); @@ -156,13 +156,13 @@ public class ScoresUpdater { } private List bucketRecordsAsNormalizables(long bucketTimeStamp) { - BatchedDocumentsIterator> + BatchedDocumentsIterator> recordsIterator = jobProvider.newBatchedRecordsIterator(job.getId()) .timeRange(bucketTimeStamp, bucketTimeStamp + 1); List recordNormalizables = new ArrayList<>(); while (recordsIterator.hasNext()) { - for (ElasticsearchBatchedResultsIterator.ResultWithIndex record : recordsIterator.next() ) { + for (BatchedResultsIterator.ResultWithIndex record : recordsIterator.next() ) { recordNormalizables.add(new RecordNormalizable(record.result, record.indexName)); } } @@ -211,12 +211,12 @@ public class ScoresUpdater { private void updateInfluencers(Normalizer normalizer, String quantilesState, long endBucketEpochMs, long windowExtensionMs, int[] counts) { - BatchedDocumentsIterator> influencersIterator = + BatchedDocumentsIterator> influencersIterator = jobProvider.newBatchedInfluencersIterator(job.getId()) .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs); while (influencersIterator.hasNext()) { - Deque> influencers = influencersIterator.next(); + Deque> influencers = influencersIterator.next(); if (influencers.isEmpty()) { LOGGER.debug("[{}] No influencers to renormalize for job", job.getId()); break; diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedDocumentsIteratorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIteratorTests.java similarity index 97% rename from elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedDocumentsIteratorTests.java rename to elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIteratorTests.java index 006216b3cd2..7fcc08ff99c 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedDocumentsIteratorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIteratorTests.java @@ -30,7 +30,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127") -public class ElasticsearchBatchedDocumentsIteratorTests extends ESTestCase { +public class BatchedDocumentsIteratorTests extends ESTestCase { private static final String INDEX_NAME = ".ml-anomalies-foo"; private static final String SCROLL_ID = "someScrollId"; @@ -187,7 +187,7 @@ public class ElasticsearchBatchedDocumentsIteratorTests extends ESTestCase { } } - private static class TestIterator extends ElasticsearchBatchedDocumentsIterator { + private static class TestIterator extends BatchedDocumentsIterator { public TestIterator(Client client, String jobId) { super(client, jobId); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockBatchedDocumentsIterator.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockBatchedDocumentsIterator.java index 1a57325a970..06a0ce676ac 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockBatchedDocumentsIterator.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockBatchedDocumentsIterator.java @@ -5,19 +5,23 @@ */ package org.elasticsearch.xpack.ml.job.persistence; +import org.elasticsearch.client.Client; +import org.elasticsearch.search.SearchHit; + import java.util.Deque; import java.util.List; import java.util.NoSuchElementException; -import static org.junit.Assert.assertEquals; +import static org.elasticsearch.mock.orig.Mockito.mock; -public class MockBatchedDocumentsIterator implements BatchedDocumentsIterator { +public class MockBatchedDocumentsIterator extends BatchedDocumentsIterator { private final List> batches; private int index; private boolean wasTimeRangeCalled; private String interimFieldName; public MockBatchedDocumentsIterator(List> batches) { + super(mock(Client.class), "foo"); this.batches = batches; index = 0; wasTimeRangeCalled = false; @@ -44,6 +48,16 @@ public class MockBatchedDocumentsIterator implements BatchedDocumentsIterator return batches.get(index++); } + @Override + protected String getType() { + return null; + } + + @Override + protected T map(SearchHit hit) { + return null; + } + @Override public boolean hasNext() { return index != batches.size(); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java index b256379e78e..76c4e2c0577 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java @@ -18,7 +18,7 @@ import org.elasticsearch.xpack.ml.job.AnalysisConfig; import org.elasticsearch.xpack.ml.job.Detector; import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator; -import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchBatchedResultsIterator; +import org.elasticsearch.xpack.ml.job.persistence.BatchedResultsIterator; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.MockBatchedDocumentsIterator; @@ -192,10 +192,10 @@ public class ScoresUpdaterTests extends ESTestCase { bucket1.setAnomalyScore(42.0); bucket1.addBucketInfluencer(createTimeBucketInfluencer(bucket1.getTimestamp(), 0.04, 42.0)); bucket1.setMaxNormalizedProbability(50.0); - List> records = new ArrayList<>(); + List> records = new ArrayList<>(); Date date = new Date(); for (int i=0; i<100000; i++) { - records.add(new ElasticsearchBatchedResultsIterator.ResultWithIndex<>("foo", new AnomalyRecord("foo", date, 1, i))); + records.add(new BatchedResultsIterator.ResultWithIndex<>("foo", new AnomalyRecord("foo", date, 1, i))); } Bucket bucket2 = generateBucket(new Date(10000 * 1000)); @@ -209,9 +209,9 @@ public class ScoresUpdaterTests extends ESTestCase { givenProviderReturnsBuckets(batch); - List>> recordBatches = new ArrayList<>(); + List>> recordBatches = new ArrayList<>(); recordBatches.add(new ArrayDeque<>(records)); - BatchedDocumentsIterator> recordIter = + BatchedDocumentsIterator> recordIter = new MockBatchedDocumentsIterator<>(recordBatches); when(jobProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter); @@ -341,29 +341,29 @@ public class ScoresUpdaterTests extends ESTestCase { } private void givenBuckets(List> batches) { - List>> batchesWithIndex = new ArrayList<>(); + List>> batchesWithIndex = new ArrayList<>(); for (Deque deque : batches) { - Deque> queueWithIndex = new ArrayDeque<>(); + Deque> queueWithIndex = new ArrayDeque<>(); for (Bucket bucket : deque) { - queueWithIndex.add(new ElasticsearchBatchedResultsIterator.ResultWithIndex<>("foo", bucket)); + queueWithIndex.add(new BatchedResultsIterator.ResultWithIndex<>("foo", bucket)); } batchesWithIndex.add(queueWithIndex); } - BatchedDocumentsIterator> bucketIter = + BatchedDocumentsIterator> bucketIter = new MockBatchedDocumentsIterator<>(batchesWithIndex); when(jobProvider.newBatchedBucketsIterator(JOB_ID)).thenReturn(bucketIter); } private void givenProviderReturnsRecords(Deque records) { - Deque> batch = new ArrayDeque<>(); - List>> batches = new ArrayList<>(); + Deque> batch = new ArrayDeque<>(); + List>> batches = new ArrayList<>(); for (AnomalyRecord record : records) { - batch.add(new ElasticsearchBatchedResultsIterator.ResultWithIndex<>("foo", record)); + batch.add(new BatchedResultsIterator.ResultWithIndex<>("foo", record)); } batches.add(batch); - BatchedDocumentsIterator> recordIter = + BatchedDocumentsIterator> recordIter = new MockBatchedDocumentsIterator<>(batches); when(jobProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter); } @@ -373,13 +373,13 @@ public class ScoresUpdaterTests extends ESTestCase { } private void givenProviderReturnsInfluencers(Deque influencers) { - List>> batches = new ArrayList<>(); - Deque> queue = new ArrayDeque<>(); + List>> batches = new ArrayList<>(); + Deque> queue = new ArrayDeque<>(); for (Influencer inf : influencers) { - queue.add(new ElasticsearchBatchedResultsIterator.ResultWithIndex<>("foo", inf)); + queue.add(new BatchedResultsIterator.ResultWithIndex<>("foo", inf)); } batches.add(queue); - BatchedDocumentsIterator> iterator = + BatchedDocumentsIterator> iterator = new MockBatchedDocumentsIterator<>(batches); when(jobProvider.newBatchedInfluencersIterator(JOB_ID)).thenReturn(iterator); }