remove FixBlockingClientOperations usage in ElasticsearchBatchedDocumentsIterator as it is ok to make blocking from there. It is only used during remormalization which happens from a prelert thread and not a network thread.

Also removed some used code.

Original commit: elastic/x-pack-elasticsearch@2fe506099a
This commit is contained in:
Martijn van Groningen 2017-01-09 15:05:57 +01:00
parent 1a0151d020
commit 1d81509616
3 changed files with 12 additions and 75 deletions

View File

@ -6,10 +6,8 @@
package org.elasticsearch.xpack.prelert.job.persistence;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollAction;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.ParseFieldMatcher;
@ -19,7 +17,6 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.utils.FixBlockingClientOperations;
import java.util.ArrayDeque;
import java.util.Arrays;
@ -88,9 +85,13 @@ abstract class ElasticsearchBatchedDocumentsIterator<T> implements BatchedDocume
throw new NoSuchElementException();
}
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId).scroll(CONTEXT_ALIVE_DURATION);
SearchResponse searchResponse = (scrollId == null) ? initScroll()
: FixBlockingClientOperations.executeBlocking(client, SearchScrollAction.INSTANCE, searchScrollRequest);
SearchResponse searchResponse;
if (scrollId == null) {
searchResponse = initScroll();
} else {
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId).scroll(CONTEXT_ALIVE_DURATION);
searchResponse = client.searchScroll(searchScrollRequest).actionGet();
}
scrollId = searchResponse.getScrollId();
return mapHits(searchResponse);
}
@ -108,7 +109,7 @@ abstract class ElasticsearchBatchedDocumentsIterator<T> implements BatchedDocume
.query(filterBuilder.build())
.sort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)));
SearchResponse searchResponse = FixBlockingClientOperations.executeBlocking(client, SearchAction.INSTANCE, searchRequest);
SearchResponse searchResponse = client.search(searchRequest).actionGet();
totalHits = searchResponse.getHits().getTotalHits();
scrollId = searchResponse.getScrollId();
return searchResponse;

View File

@ -1,42 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.persistence;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import java.io.IOException;
class ElasticsearchBatchedModelSnapshotIterator extends ElasticsearchBatchedDocumentsIterator<ModelSnapshot> {
public ElasticsearchBatchedModelSnapshotIterator(Client client, String jobId, ParseFieldMatcher parserFieldMatcher) {
super(client, AnomalyDetectorsIndex.jobStateIndexName(), parserFieldMatcher);
}
@Override
protected String getType() {
return ModelSnapshot.TYPE.getPreferredName();
}
@Override
protected ModelSnapshot map(SearchHit hit) {
BytesReference source = hit.getSourceRef();
XContentParser parser;
try {
parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parser model snapshot", e);
}
return ModelSnapshot.PARSER.apply(parser, () -> parseFieldMatcher);
}
}

View File

@ -629,26 +629,15 @@ public class JobProvider {
.score(AnomalyRecord.NORMALIZED_PROBABILITY.getPreferredName(), query.getNormalizedProbabilityThreshold())
.interim(AnomalyRecord.IS_INTERIM.getPreferredName(), query.isIncludeInterim())
.term(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), query.getPartitionFieldValue()).build();
return records(jobId, query.getFrom(), query.getSize(), fb, query.getSortField(), query.isSortDescending());
}
private QueryPage<AnomalyRecord> records(String jobId,
int from, int size, QueryBuilder recordFilter,
String sortField, boolean descending)
throws ResourceNotFoundException {
FieldSortBuilder sb = null;
if (sortField != null) {
sb = new FieldSortBuilder(sortField)
if (query.getSortField() != null) {
sb = new FieldSortBuilder(query.getSortField())
.missing("_last")
.order(descending ? SortOrder.DESC : SortOrder.ASC);
.order(query.isSortDescending() ? SortOrder.DESC : SortOrder.ASC);
}
return records(jobId, from, size, recordFilter, sb, SECONDARY_SORT, descending);
return records(jobId, query.getFrom(), query.getSize(), fb, sb, SECONDARY_SORT, query.isSortDescending());
}
/**
* The returned records have their id set.
*/
@ -757,17 +746,6 @@ public class JobProvider {
return new ElasticsearchBatchedInfluencersIterator(client, jobId, parseFieldMatcher);
}
/**
* Returns a {@link BatchedDocumentsIterator} that allows querying
* and iterating over a number of model snapshots of the given job
*
* @param jobId the id of the job for which model snapshots are requested
* @return a model snapshot {@link BatchedDocumentsIterator}
*/
public BatchedDocumentsIterator<ModelSnapshot> newBatchedModelSnapshotIterator(String jobId) {
return new ElasticsearchBatchedModelSnapshotIterator(client, jobId, parseFieldMatcher);
}
/**
* Get the persisted quantiles state for the job
*/