first step to get things working again, added norelease work around for blocking client calls

Original commit: elastic/x-pack-elasticsearch@2c2a6d9ae8
This commit is contained in:
Martijn van Groningen 2016-12-27 17:25:45 +01:00
parent d431ebba6f
commit a2aaef90b7
9 changed files with 211 additions and 271 deletions

View File

@ -6,14 +6,20 @@
package org.elasticsearch.xpack.prelert.job.persistence; package org.elasticsearch.xpack.prelert.job.persistence;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollAction;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.xpack.prelert.job.results.Bucket; import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.utils.FixBlockingClientOperations;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Arrays; import java.util.Arrays;
@ -81,8 +87,10 @@ abstract class ElasticsearchBatchedDocumentsIterator<T> implements BatchedDocume
if (!hasNext()) { if (!hasNext()) {
throw new NoSuchElementException(); throw new NoSuchElementException();
} }
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId).scroll(CONTEXT_ALIVE_DURATION);
SearchResponse searchResponse = (scrollId == null) ? initScroll() SearchResponse searchResponse = (scrollId == null) ? initScroll()
: client.prepareSearchScroll(scrollId).setScroll(CONTEXT_ALIVE_DURATION).get(); : FixBlockingClientOperations.executeBlocking(client, SearchScrollAction.INSTANCE, searchScrollRequest);
scrollId = searchResponse.getScrollId(); scrollId = searchResponse.getScrollId();
return mapHits(searchResponse); return mapHits(searchResponse);
} }
@ -92,8 +100,15 @@ abstract class ElasticsearchBatchedDocumentsIterator<T> implements BatchedDocume
isScrollInitialised = true; isScrollInitialised = true;
SearchResponse searchResponse = client.prepareSearch(index).setScroll(CONTEXT_ALIVE_DURATION).setSize(BATCH_SIZE) SearchRequest searchRequest = new SearchRequest(index);
.setTypes(getType()).setQuery(filterBuilder.build()).addSort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)).get(); searchRequest.types(getType());
searchRequest.scroll(CONTEXT_ALIVE_DURATION);
searchRequest.source(new SearchSourceBuilder()
.size(BATCH_SIZE)
.query(filterBuilder.build())
.sort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)));
SearchResponse searchResponse = FixBlockingClientOperations.executeBlocking(client, SearchAction.INSTANCE, searchRequest);
totalHits = searchResponse.getHits().getTotalHits(); totalHits = searchResponse.getHits().getTotalHits();
scrollId = searchResponse.getScrollId(); scrollId = searchResponse.getScrollId();
return searchResponse; return searchResponse;

View File

@ -16,7 +16,11 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
@ -40,6 +44,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortBuilders;
@ -67,6 +72,7 @@ import org.elasticsearch.xpack.prelert.job.results.Result;
import org.elasticsearch.xpack.prelert.job.usage.Usage; import org.elasticsearch.xpack.prelert.job.usage.Usage;
import org.elasticsearch.xpack.prelert.lists.ListDocument; import org.elasticsearch.xpack.prelert.lists.ListDocument;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import org.elasticsearch.xpack.prelert.utils.FixBlockingClientOperations;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
@ -333,8 +339,8 @@ public class JobProvider {
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
try { try {
GetResponse response = client.prepareGet(indexName, DataCounts.TYPE.getPreferredName(), GetRequest getRequest = new GetRequest(indexName, DataCounts.TYPE.getPreferredName(), jobId + DataCounts.DOCUMENT_SUFFIX);
jobId + DataCounts.DOCUMENT_SUFFIX).get(); GetResponse response = FixBlockingClientOperations.executeBlocking(client, GetAction.INSTANCE, getRequest);
if (response.isExists() == false) { if (response.isExists() == false) {
return new DataCounts(jobId); return new DataCounts(jobId);
} else { } else {
@ -426,12 +432,15 @@ public class JobProvider {
LOGGER.trace("ES API CALL: search all of result type {} from index {} with filter from {} size {}", LOGGER.trace("ES API CALL: search all of result type {} from index {} with filter from {} size {}",
Bucket.RESULT_TYPE_VALUE, indexName, from, size); Bucket.RESULT_TYPE_VALUE, indexName, from, size);
searchResponse = client.prepareSearch(indexName) SearchRequest searchRequest = new SearchRequest(indexName);
.setTypes(Result.TYPE.getPreferredName()) searchRequest.types(Result.TYPE.getPreferredName());
.addSort(sb) SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
.setQuery(new ConstantScoreQueryBuilder(boolQuery)) searchSourceBuilder.sort(sb);
.setFrom(from).setSize(size) searchSourceBuilder.query(new ConstantScoreQueryBuilder(boolQuery));
.get(); searchSourceBuilder.from(from);
searchSourceBuilder.size(size);
searchRequest.source(searchSourceBuilder);
searchResponse = FixBlockingClientOperations.executeBlocking(client, SearchAction.INSTANCE, searchRequest);
} catch (IndexNotFoundException e) { } catch (IndexNotFoundException e) {
throw ExceptionsHelper.missingJobException(jobId); throw ExceptionsHelper.missingJobException(jobId);
} }
@ -475,11 +484,13 @@ public class JobProvider {
.filter(matchQuery) .filter(matchQuery)
.filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), Bucket.RESULT_TYPE_VALUE)); .filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), Bucket.RESULT_TYPE_VALUE));
SearchResponse searchResponse = client.prepareSearch(indexName) SearchRequest searchRequest = new SearchRequest(indexName);
.setTypes(Result.TYPE.getPreferredName()) searchRequest.types(Result.TYPE.getPreferredName());
.setQuery(boolQuery) SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
.addSort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)) sourceBuilder.query(boolQuery);
.get(); sourceBuilder.sort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC));
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = FixBlockingClientOperations.executeBlocking(client, SearchAction.INSTANCE, searchRequest);
hits = searchResponse.getHits(); hits = searchResponse.getHits();
} catch (IndexNotFoundException e) { } catch (IndexNotFoundException e) {
throw ExceptionsHelper.missingJobException(jobId); throw ExceptionsHelper.missingJobException(jobId);
@ -678,14 +689,13 @@ public class JobProvider {
LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} from {} size {}", LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} from {} size {}",
CategoryDefinition.TYPE.getPreferredName(), indexName, CategoryDefinition.CATEGORY_ID.getPreferredName(), from, size); CategoryDefinition.TYPE.getPreferredName(), indexName, CategoryDefinition.CATEGORY_ID.getPreferredName(), from, size);
SearchRequestBuilder searchBuilder = client.prepareSearch(indexName) SearchRequest searchRequest = new SearchRequest(indexName);
.setTypes(CategoryDefinition.TYPE.getPreferredName()) searchRequest.types(CategoryDefinition.TYPE.getPreferredName());
.setFrom(from).setSize(size) searchRequest.source(new SearchSourceBuilder().from(from).size(size)
.addSort(new FieldSortBuilder(CategoryDefinition.CATEGORY_ID.getPreferredName()).order(SortOrder.ASC)); .sort(new FieldSortBuilder(CategoryDefinition.CATEGORY_ID.getPreferredName()).order(SortOrder.ASC)));
SearchResponse searchResponse; SearchResponse searchResponse;
try { try {
searchResponse = searchBuilder.get(); searchResponse = FixBlockingClientOperations.executeBlocking(client, SearchAction.INSTANCE, searchRequest);
} catch (IndexNotFoundException e) { } catch (IndexNotFoundException e) {
throw ExceptionsHelper.missingJobException(jobId); throw ExceptionsHelper.missingJobException(jobId);
} }
@ -721,7 +731,8 @@ public class JobProvider {
LOGGER.trace("ES API CALL: get ID {} type {} from index {}", LOGGER.trace("ES API CALL: get ID {} type {} from index {}",
categoryId, CategoryDefinition.TYPE, indexName); categoryId, CategoryDefinition.TYPE, indexName);
response = client.prepareGet(indexName, CategoryDefinition.TYPE.getPreferredName(), categoryId).get(); GetRequest getRequest = new GetRequest(indexName, CategoryDefinition.TYPE.getPreferredName(), categoryId);
response = FixBlockingClientOperations.executeBlocking(client, GetAction.INSTANCE, getRequest);
} catch (IndexNotFoundException e) { } catch (IndexNotFoundException e) {
throw ExceptionsHelper.missingJobException(jobId); throw ExceptionsHelper.missingJobException(jobId);
} }
@ -787,15 +798,18 @@ public class JobProvider {
.filter(recordFilter) .filter(recordFilter)
.filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), AnomalyRecord.RESULT_TYPE_VALUE)); .filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), AnomalyRecord.RESULT_TYPE_VALUE));
SearchRequestBuilder searchBuilder = client.prepareSearch(indexName) SearchRequest searchRequest = new SearchRequest(indexName);
.setTypes(Result.TYPE.getPreferredName()) searchRequest.types(Result.TYPE.getPreferredName());
.setQuery(recordFilter) searchRequest.source(new SearchSourceBuilder()
.setFrom(from).setSize(size) .from(from)
.addSort(sb == null ? SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC) : sb) .size(size)
.setFetchSource(true); // the field option turns off source so request it explicitly .query(recordFilter)
.sort(sb == null ? SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC) : sb)
.fetchSource(true)
);
for (String sortField : secondarySort) { for (String sortField : secondarySort) {
searchBuilder.addSort(sortField, descending ? SortOrder.DESC : SortOrder.ASC); searchRequest.source().sort(sortField, descending ? SortOrder.DESC : SortOrder.ASC);
} }
SearchResponse searchResponse; SearchResponse searchResponse;
@ -804,7 +818,7 @@ public class JobProvider {
AnomalyRecord.RESULT_TYPE_VALUE, indexName, (sb != null) ? " with sort" : "", AnomalyRecord.RESULT_TYPE_VALUE, indexName, (sb != null) ? " with sort" : "",
secondarySort.isEmpty() ? "" : " with secondary sort", from, size); secondarySort.isEmpty() ? "" : " with secondary sort", from, size);
searchResponse = searchBuilder.get(); searchResponse = FixBlockingClientOperations.executeBlocking(client, SearchAction.INSTANCE, searchRequest);
} catch (IndexNotFoundException e) { } catch (IndexNotFoundException e) {
throw ExceptionsHelper.missingJobException(jobId); throw ExceptionsHelper.missingJobException(jobId);
} }
@ -846,7 +860,7 @@ public class JobProvider {
query.isSortDescending()); query.isSortDescending());
} }
private QueryPage<Influencer> influencers(String jobId, int from, int size, QueryBuilder filterBuilder, String sortField, private QueryPage<Influencer> influencers(String jobId, int from, int size, QueryBuilder queryBuilder, String sortField,
boolean sortDescending) throws ResourceNotFoundException { boolean sortDescending) throws ResourceNotFoundException {
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
LOGGER.trace("ES API CALL: search all of result type {} from index {}{} with filter from {} size {}", LOGGER.trace("ES API CALL: search all of result type {} from index {}{} with filter from {} size {}",
@ -855,22 +869,19 @@ public class JobProvider {
" with sort " + (sortDescending ? "descending" : "ascending") + " on field " + sortField : "", " with sort " + (sortDescending ? "descending" : "ascending") + " on field " + sortField : "",
() -> from, () -> size); () -> from, () -> size);
filterBuilder = new BoolQueryBuilder() queryBuilder = new BoolQueryBuilder()
.filter(filterBuilder) .filter(queryBuilder)
.filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), Influencer.RESULT_TYPE_VALUE)); .filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), Influencer.RESULT_TYPE_VALUE));
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indexName) SearchRequest searchRequest = new SearchRequest(indexName);
.setTypes(Result.TYPE.getPreferredName()) searchRequest.types(Result.TYPE.getPreferredName());
.setQuery(filterBuilder)
.setFrom(from).setSize(size);
FieldSortBuilder sb = sortField == null ? SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC) FieldSortBuilder sb = sortField == null ? SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)
: new FieldSortBuilder(sortField).order(sortDescending ? SortOrder.DESC : SortOrder.ASC); : new FieldSortBuilder(sortField).order(sortDescending ? SortOrder.DESC : SortOrder.ASC);
searchRequestBuilder.addSort(sb); searchRequest.source(new SearchSourceBuilder().query(queryBuilder).from(from).size(size).sort(sb));
SearchResponse response; SearchResponse response;
try { try {
response = searchRequestBuilder.get(); response = FixBlockingClientOperations.executeBlocking(client, SearchAction.INSTANCE, searchRequest);
} catch (IndexNotFoundException e) { } catch (IndexNotFoundException e) {
throw ExceptionsHelper.missingJobException(jobId); throw ExceptionsHelper.missingJobException(jobId);
} }
@ -933,7 +944,8 @@ public class JobProvider {
String quantilesId = Quantiles.quantilesId(jobId); String quantilesId = Quantiles.quantilesId(jobId);
LOGGER.trace("ES API CALL: get ID {} type {} from index {}", quantilesId, Quantiles.TYPE.getPreferredName(), indexName); LOGGER.trace("ES API CALL: get ID {} type {} from index {}", quantilesId, Quantiles.TYPE.getPreferredName(), indexName);
GetResponse response = client.prepareGet(indexName, Quantiles.TYPE.getPreferredName(), quantilesId).get(); GetRequest getRequest = new GetRequest(indexName, Quantiles.TYPE.getPreferredName(), quantilesId);
GetResponse response = FixBlockingClientOperations.executeBlocking(client, GetAction.INSTANCE, getRequest);
if (!response.isExists()) { if (!response.isExists()) {
LOGGER.info("There are currently no quantiles for job " + jobId); LOGGER.info("There are currently no quantiles for job " + jobId);
return Optional.empty(); return Optional.empty();
@ -998,13 +1010,13 @@ public class JobProvider {
} }
private QueryPage<ModelSnapshot> modelSnapshots(String jobId, int from, int size, private QueryPage<ModelSnapshot> modelSnapshots(String jobId, int from, int size,
String sortField, boolean sortDescending, QueryBuilder fb) { String sortField, boolean sortDescending, QueryBuilder qb) {
FieldSortBuilder sb = new FieldSortBuilder(sortField) FieldSortBuilder sb = new FieldSortBuilder(sortField)
.order(sortDescending ? SortOrder.DESC : SortOrder.ASC); .order(sortDescending ? SortOrder.DESC : SortOrder.ASC);
// Wrap in a constant_score because we always want to // Wrap in a constant_score because we always want to
// run it as a filter // run it as a filter
fb = new ConstantScoreQueryBuilder(fb); qb = new ConstantScoreQueryBuilder(qb);
SearchResponse searchResponse; SearchResponse searchResponse;
try { try {
@ -1012,12 +1024,15 @@ public class JobProvider {
LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} with filter after sort from {} size {}", LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} with filter after sort from {} size {}",
ModelSnapshot.TYPE, indexName, sortField, from, size); ModelSnapshot.TYPE, indexName, sortField, from, size);
searchResponse = client.prepareSearch(indexName) SearchRequest searchRequest = new SearchRequest(indexName);
.setTypes(ModelSnapshot.TYPE.getPreferredName()) searchRequest.types(ModelSnapshot.TYPE.getPreferredName());
.addSort(sb) SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
.setQuery(fb) sourceBuilder.sort(sb);
.setFrom(from).setSize(size) sourceBuilder.query(qb);
.get(); sourceBuilder.from(from);
sourceBuilder.size(size);
searchRequest.source(sourceBuilder);
searchResponse = FixBlockingClientOperations.executeBlocking(client, SearchAction.INSTANCE, searchRequest);
} catch (IndexNotFoundException e) { } catch (IndexNotFoundException e) {
LOGGER.error("Failed to read modelSnapshots", e); LOGGER.error("Failed to read modelSnapshots", e);
throw e; throw e;
@ -1163,8 +1178,9 @@ public class JobProvider {
LOGGER.trace("ES API CALL: get result type {} ID {} from index {}", LOGGER.trace("ES API CALL: get result type {} ID {} from index {}",
ModelSizeStats.RESULT_TYPE_VALUE, ModelSizeStats.RESULT_TYPE_FIELD, indexName); ModelSizeStats.RESULT_TYPE_VALUE, ModelSizeStats.RESULT_TYPE_FIELD, indexName);
GetResponse modelSizeStatsResponse = client.prepareGet( GetRequest getRequest =
indexName, Result.TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName()).get(); new GetRequest(indexName, Result.TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName());
GetResponse modelSizeStatsResponse = FixBlockingClientOperations.executeBlocking(client, GetAction.INSTANCE, getRequest);
if (!modelSizeStatsResponse.isExists()) { if (!modelSizeStatsResponse.isExists()) {
String msg = "No memory usage details for job with id " + jobId; String msg = "No memory usage details for job with id " + jobId;
@ -1194,7 +1210,8 @@ public class JobProvider {
* @return the matching list if it exists * @return the matching list if it exists
*/ */
public Optional<ListDocument> getList(String listId) { public Optional<ListDocument> getList(String listId) {
GetResponse response = client.prepareGet(PRELERT_INFO_INDEX, ListDocument.TYPE.getPreferredName(), listId).get(); GetRequest getRequest = new GetRequest(PRELERT_INFO_INDEX, ListDocument.TYPE.getPreferredName(), listId);
GetResponse response = FixBlockingClientOperations.executeBlocking(client, GetAction.INSTANCE, getRequest);
if (!response.isExists()) { if (!response.isExists()) {
return Optional.empty(); return Optional.empty();
} }

View File

@ -6,9 +6,13 @@
package org.elasticsearch.xpack.prelert.job.persistence; package org.elasticsearch.xpack.prelert.job.persistence;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
@ -26,6 +30,7 @@ import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput; import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.prelert.job.results.Result; import org.elasticsearch.xpack.prelert.job.results.Result;
import org.elasticsearch.xpack.prelert.utils.FixBlockingClientOperations;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
@ -65,14 +70,14 @@ public class JobResultsPersister extends AbstractComponent {
} }
public class Builder { public class Builder {
private BulkRequestBuilder bulkRequest; private BulkRequest bulkRequest;
private final String jobId; private final String jobId;
private final String indexName; private final String indexName;
private Builder(String jobId) { private Builder(String jobId) {
this.jobId = Objects.requireNonNull(jobId); this.jobId = Objects.requireNonNull(jobId);
indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
bulkRequest = client.prepareBulk(); bulkRequest = new BulkRequest();
} }
/** /**
@ -95,8 +100,8 @@ public class JobResultsPersister extends AbstractComponent {
logger.trace("[{}] ES API CALL: index result type {} to index {} at epoch {}", logger.trace("[{}] ES API CALL: index result type {} to index {} at epoch {}",
jobId, Bucket.RESULT_TYPE_VALUE, indexName, bucketWithoutRecords.getEpoch()); jobId, Bucket.RESULT_TYPE_VALUE, indexName, bucketWithoutRecords.getEpoch());
bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), bulkRequest.add(new IndexRequest(indexName, Result.TYPE.getPreferredName(),
bucketWithoutRecords.getId()).setSource(content)); bucketWithoutRecords.getId()).source(content));
persistBucketInfluencersStandalone(jobId, bucketWithoutRecords.getBucketInfluencers()); persistBucketInfluencersStandalone(jobId, bucketWithoutRecords.getBucketInfluencers());
} catch (IOException e) { } catch (IOException e) {
@ -115,7 +120,7 @@ public class JobResultsPersister extends AbstractComponent {
String id = bucketInfluencer.getId(); String id = bucketInfluencer.getId();
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
jobId, BucketInfluencer.RESULT_TYPE_VALUE, indexName, id); jobId, BucketInfluencer.RESULT_TYPE_VALUE, indexName, id);
bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), id).setSource(content)); bulkRequest.add(new IndexRequest(indexName, Result.TYPE.getPreferredName(), id).source(content));
} }
} }
} }
@ -133,8 +138,7 @@ public class JobResultsPersister extends AbstractComponent {
XContentBuilder content = toXContentBuilder(record); XContentBuilder content = toXContentBuilder(record);
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName, record.getId()); jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName, record.getId());
bulkRequest.add( bulkRequest.add(new IndexRequest(indexName, Result.TYPE.getPreferredName(), record.getId()).source(content));
client.prepareIndex(indexName, Result.TYPE.getPreferredName(), record.getId()).setSource(content));
} }
} catch (IOException e) { } catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error serialising records", new Object [] {jobId}), e); logger.error(new ParameterizedMessage("[{}] Error serialising records", new Object [] {jobId}), e);
@ -156,8 +160,7 @@ public class JobResultsPersister extends AbstractComponent {
XContentBuilder content = toXContentBuilder(influencer); XContentBuilder content = toXContentBuilder(influencer);
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
jobId, Influencer.RESULT_TYPE_VALUE, indexName, influencer.getId()); jobId, Influencer.RESULT_TYPE_VALUE, indexName, influencer.getId());
bulkRequest.add( bulkRequest.add(new IndexRequest(indexName, Result.TYPE.getPreferredName(), influencer.getId()).source(content));
client.prepareIndex(indexName, Result.TYPE.getPreferredName(), influencer.getId()).setSource(content));
} }
} catch (IOException e) { } catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error serialising influencers", new Object[] {jobId}), e); logger.error(new ParameterizedMessage("[{}] Error serialising influencers", new Object[] {jobId}), e);
@ -178,8 +181,8 @@ public class JobResultsPersister extends AbstractComponent {
logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {} with ID {}", logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {} with ID {}",
jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp(), jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp(),
partitionProbabilities.getId()); partitionProbabilities.getId());
bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), partitionProbabilities.getId()) bulkRequest.add(
.setSource(builder)); new IndexRequest(indexName, Result.TYPE.getPreferredName(), partitionProbabilities.getId()).source(builder));
} catch (IOException e) { } catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] error serialising bucket per partition max normalized scores", logger.error(new ParameterizedMessage("[{}] error serialising bucket per partition max normalized scores",
new Object[]{jobId}), e); new Object[]{jobId}), e);
@ -197,7 +200,7 @@ public class JobResultsPersister extends AbstractComponent {
} }
logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, bulkRequest.numberOfActions()); logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, bulkRequest.numberOfActions());
BulkResponse addRecordsResponse = bulkRequest.execute().actionGet(); BulkResponse addRecordsResponse = FixBlockingClientOperations.executeBlocking(client, BulkAction.INSTANCE, bulkRequest);
if (addRecordsResponse.hasFailures()) { if (addRecordsResponse.hasFailures()) {
logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage()); logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage());
} }
@ -319,7 +322,8 @@ public class JobResultsPersister extends AbstractComponent {
String indexName = AnomalyDetectorsIndex.jobStateIndexName(); String indexName = AnomalyDetectorsIndex.jobStateIndexName();
// Refresh should wait for Lucene to make the data searchable // Refresh should wait for Lucene to make the data searchable
logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName); logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName);
client.admin().indices().refresh(new RefreshRequest(indexName)).actionGet(); RefreshRequest refreshRequest = new RefreshRequest(indexName);
FixBlockingClientOperations.executeBlocking(client, RefreshAction.INSTANCE, refreshRequest);
return true; return true;
} }
@ -358,9 +362,9 @@ public class JobResultsPersister extends AbstractComponent {
logCall(indexName); logCall(indexName);
try { try {
client.prepareIndex(indexName, type, id) IndexRequest indexRequest = new IndexRequest(indexName, type, id)
.setSource(toXContentBuilder(object)) .source(toXContentBuilder(object));
.execute().actionGet(); FixBlockingClientOperations.executeBlocking(client, IndexAction.INSTANCE, indexRequest);
return true; return true;
} catch (IOException e) { } catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error writing {}", new Object[]{jobId, type}), e); logger.error(new ParameterizedMessage("[{}] Error writing {}", new Object[]{jobId, type}), e);

View File

@ -7,6 +7,8 @@ package org.elasticsearch.xpack.prelert.scheduler.http;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
@ -22,6 +24,7 @@ import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractorFactory; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractorFactory;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig; import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig;
import org.elasticsearch.xpack.prelert.utils.FixBlockingClientOperations;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
@ -55,7 +58,8 @@ public class HttpDataExtractorFactory implements DataExtractorFactory {
} }
private String getBaseUrl() { private String getBaseUrl() {
NodesInfoResponse nodesInfoResponse = client.admin().cluster().prepareNodesInfo().get(); NodesInfoRequest request = new NodesInfoRequest();
NodesInfoResponse nodesInfoResponse = FixBlockingClientOperations.executeBlocking(client, NodesInfoAction.INSTANCE, request);
TransportAddress address = nodesInfoResponse.getNodes().get(0).getHttp().getAddress().publishAddress(); TransportAddress address = nodesInfoResponse.getNodes().get(0).getHttp().getAddress().publishAddress();
String baseUrl = "http://" + address.getAddress() + ":" + address.getPort() + "/"; String baseUrl = "http://" + address.getAddress() + ":" + address.getPort() + "/";
LOGGER.info("Base URL: " + baseUrl); LOGGER.info("Base URL: " + baseUrl);

View File

@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.utils;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
// TODO (#127): norelease Placeholder fix until: https://github.com/elastic/prelert-legacy/issues/127 gets in.
public class FixBlockingClientOperations {
@SuppressWarnings({"unchecked", "rawtypes"})
public static <Res extends ActionResponse> Res executeBlocking(Client client, Action action, ActionRequest request) {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Res> response = new AtomicReference<>();
AtomicReference<Exception> exception = new AtomicReference<>();
ActionListener<Res> listener = new ActionListener<Res>() {
@Override
public void onResponse(Res r) {
response.set(r);
latch.countDown();
}
@Override
public void onFailure(Exception e) {
exception.set(e);
latch.countDown();
}
};
client.execute(action, request, listener);
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (exception.get() != null) {
throw new RuntimeException(exception.get());
} else {
return response.get();
}
}
}

View File

@ -5,16 +5,7 @@
*/ */
package org.elasticsearch.xpack.prelert.job.persistence; package org.elasticsearch.xpack.prelert.job.persistence;
import static org.mockito.Matchers.any; import org.apache.lucene.util.LuceneTestCase;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.List;
import java.util.NoSuchElementException;
import org.elasticsearch.action.search.ClearScrollRequestBuilder; import org.elasticsearch.action.search.ClearScrollRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
@ -29,6 +20,17 @@ import org.elasticsearch.test.ESTestCase;
import org.junit.Before; import org.junit.Before;
import org.mockito.Mockito; import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.List;
import java.util.NoSuchElementException;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public class ElasticsearchBatchedDocumentsIteratorTests extends ESTestCase { public class ElasticsearchBatchedDocumentsIteratorTests extends ESTestCase {
private static final String INDEX_NAME = ".ml-anomalies-foo"; private static final String INDEX_NAME = ".ml-anomalies-foo";
private static final String SCROLL_ID = "someScrollId"; private static final String SCROLL_ID = "someScrollId";

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.prelert.job.persistence; package org.elasticsearch.xpack.prelert.job.persistence;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@ -38,7 +39,6 @@ import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.prelert.job.results.Result; import org.elasticsearch.xpack.prelert.job.results.Result;
import org.elasticsearch.xpack.prelert.job.usage.Usage; import org.elasticsearch.xpack.prelert.job.usage.Usage;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
@ -63,14 +63,12 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public class JobProviderTests extends ESTestCase { public class JobProviderTests extends ESTestCase {
private static final String CLUSTER_NAME = "myCluster"; private static final String CLUSTER_NAME = "myCluster";
private static final String JOB_ID = "foo"; private static final String JOB_ID = "foo";
private static final String STATE_INDEX_NAME = ".ml-state"; private static final String STATE_INDEX_NAME = ".ml-state";
@Captor
private ArgumentCaptor<Map<String, Object>> mapCaptor;
public void testGetQuantiles_GivenNoQuantilesForJob() throws Exception { public void testGetQuantiles_GivenNoQuantilesForJob() throws Exception {
GetResponse getResponse = createGetResponse(false, null); GetResponse getResponse = createGetResponse(false, null);

View File

@ -5,14 +5,14 @@
*/ */
package org.elasticsearch.xpack.prelert.job.persistence; package org.elasticsearch.xpack.prelert.job.persistence;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.results.Result;
import org.mockito.ArgumentCaptor;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord; import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.Bucket; import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer; import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
@ -23,7 +23,10 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -33,15 +36,8 @@ public class JobResultsPersisterTests extends ESTestCase {
private static final String JOB_ID = "foo"; private static final String JOB_ID = "foo";
public void testPersistBucket_OneRecord() throws IOException { public void testPersistBucket_OneRecord() throws IOException {
ArgumentCaptor<XContentBuilder> captor = ArgumentCaptor.forClass(XContentBuilder.class); AtomicReference<BulkRequest> reference = new AtomicReference<>();
BulkResponse response = mock(BulkResponse.class); Client client = mockClient(reference);
String responseId = "abcXZY54321";
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME)
.prepareIndex(AnomalyDetectorsIndex.jobResultsIndexName(JOB_ID), Result.TYPE.getPreferredName(), responseId, captor)
.prepareIndex(AnomalyDetectorsIndex.jobResultsIndexName(JOB_ID), Result.TYPE.getPreferredName(), "", captor)
.prepareBulk(response);
Client client = clientBuilder.build();
Bucket bucket = new Bucket("foo", new Date(), 123456); Bucket bucket = new Bucket("foo", new Date(), 123456);
bucket.setAnomalyScore(99.9); bucket.setAnomalyScore(99.9);
bucket.setEventCount(57); bucket.setEventCount(57);
@ -65,10 +61,10 @@ public class JobResultsPersisterTests extends ESTestCase {
JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client); JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client);
persister.bulkPersisterBuilder(JOB_ID).persistBucket(bucket).executeRequest(); persister.bulkPersisterBuilder(JOB_ID).persistBucket(bucket).executeRequest();
List<XContentBuilder> list = captor.getAllValues(); BulkRequest bulkRequest = reference.get();
assertEquals(2, list.size()); assertEquals(2, bulkRequest.numberOfActions());
String s = list.get(0).string(); String s = ((IndexRequest)bulkRequest.requests().get(0)).source().utf8ToString();
assertTrue(s.matches(".*anomaly_score.:99\\.9.*")); assertTrue(s.matches(".*anomaly_score.:99\\.9.*"));
assertTrue(s.matches(".*initial_anomaly_score.:88\\.8.*")); assertTrue(s.matches(".*initial_anomaly_score.:88\\.8.*"));
assertTrue(s.matches(".*max_normalized_probability.:42\\.0.*")); assertTrue(s.matches(".*max_normalized_probability.:42\\.0.*"));
@ -79,7 +75,7 @@ public class JobResultsPersisterTests extends ESTestCase {
// There should NOT be any nested records // There should NOT be any nested records
assertFalse(s.matches(".*records*")); assertFalse(s.matches(".*records*"));
s = list.get(1).string(); s = ((IndexRequest)bulkRequest.requests().get(1)).source().utf8ToString();
assertTrue(s.matches(".*probability.:0\\.0054.*")); assertTrue(s.matches(".*probability.:0\\.0054.*"));
assertTrue(s.matches(".*influencer_field_name.:.biOne.*")); assertTrue(s.matches(".*influencer_field_name.:.biOne.*"));
assertTrue(s.matches(".*initial_anomaly_score.:18\\.12.*")); assertTrue(s.matches(".*initial_anomaly_score.:18\\.12.*"));
@ -88,12 +84,8 @@ public class JobResultsPersisterTests extends ESTestCase {
} }
public void testPersistRecords() throws IOException { public void testPersistRecords() throws IOException {
ArgumentCaptor<XContentBuilder> captor = ArgumentCaptor.forClass(XContentBuilder.class); AtomicReference<BulkRequest> reference = new AtomicReference<>();
BulkResponse response = mock(BulkResponse.class); Client client = mockClient(reference);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME)
.prepareIndex(AnomalyDetectorsIndex.jobResultsIndexName(JOB_ID), Result.TYPE.getPreferredName(), "", captor)
.prepareBulk(response);
Client client = clientBuilder.build();
List<AnomalyRecord> records = new ArrayList<>(); List<AnomalyRecord> records = new ArrayList<>();
AnomalyRecord r1 = new AnomalyRecord(JOB_ID, new Date(), 42, 1); AnomalyRecord r1 = new AnomalyRecord(JOB_ID, new Date(), 42, 1);
@ -124,10 +116,10 @@ public class JobResultsPersisterTests extends ESTestCase {
JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client); JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client);
persister.bulkPersisterBuilder(JOB_ID).persistRecords(records).executeRequest(); persister.bulkPersisterBuilder(JOB_ID).persistRecords(records).executeRequest();
List<XContentBuilder> captured = captor.getAllValues(); BulkRequest bulkRequest = reference.get();
assertEquals(1, captured.size()); assertEquals(1, bulkRequest.numberOfActions());
String s = captured.get(0).string(); String s = ((IndexRequest) bulkRequest.requests().get(0)).source().utf8ToString();
assertTrue(s.matches(".*detector_index.:3.*")); assertTrue(s.matches(".*detector_index.:3.*"));
assertTrue(s.matches(".*\"probability\":0\\.1.*")); assertTrue(s.matches(".*\"probability\":0\\.1.*"));
assertTrue(s.matches(".*\"anomaly_score\":99\\.8.*")); assertTrue(s.matches(".*\"anomaly_score\":99\\.8.*"));
@ -149,12 +141,8 @@ public class JobResultsPersisterTests extends ESTestCase {
} }
public void testPersistInfluencers() throws IOException { public void testPersistInfluencers() throws IOException {
ArgumentCaptor<XContentBuilder> captor = ArgumentCaptor.forClass(XContentBuilder.class); AtomicReference<BulkRequest> reference = new AtomicReference<>();
BulkResponse response = mock(BulkResponse.class); Client client = mockClient(reference);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME)
.prepareIndex(AnomalyDetectorsIndex.jobResultsIndexName(JOB_ID), Result.TYPE.getPreferredName(), "", captor)
.prepareBulk(response);
Client client = clientBuilder.build();
List<Influencer> influencers = new ArrayList<>(); List<Influencer> influencers = new ArrayList<>();
Influencer inf = new Influencer(JOB_ID, "infName1", "infValue1", new Date(), 600, 1); Influencer inf = new Influencer(JOB_ID, "infName1", "infValue1", new Date(), 600, 1);
@ -165,14 +153,26 @@ public class JobResultsPersisterTests extends ESTestCase {
JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client); JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client);
persister.bulkPersisterBuilder(JOB_ID).persistInfluencers(influencers).executeRequest(); persister.bulkPersisterBuilder(JOB_ID).persistInfluencers(influencers).executeRequest();
List<XContentBuilder> captured = captor.getAllValues(); BulkRequest bulkRequest = reference.get();
assertEquals(1, captured.size()); assertEquals(1, bulkRequest.numberOfActions());
String s = captured.get(0).string(); String s = ((IndexRequest) bulkRequest.requests().get(0)).source().utf8ToString();
assertTrue(s.matches(".*probability.:0\\.4.*")); assertTrue(s.matches(".*probability.:0\\.4.*"));
assertTrue(s.matches(".*influencer_field_name.:.infName1.*")); assertTrue(s.matches(".*influencer_field_name.:.infName1.*"));
assertTrue(s.matches(".*influencer_field_value.:.infValue1.*")); assertTrue(s.matches(".*influencer_field_value.:.infValue1.*"));
assertTrue(s.matches(".*initial_anomaly_score.:55\\.5.*")); assertTrue(s.matches(".*initial_anomaly_score.:55\\.5.*"));
assertTrue(s.matches(".*anomaly_score.:16\\.0.*")); assertTrue(s.matches(".*anomaly_score.:16\\.0.*"));
} }
@SuppressWarnings({"unchecked", "rawtypes"})
private Client mockClient(AtomicReference reference) {
Client client = mock(Client.class);
doAnswer(invocationOnMock -> {
reference.set((BulkRequest) invocationOnMock.getArguments()[1]);
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
listener.onResponse(new BulkResponse(new BulkItemResponse[0], 0L));
return null;
}).when(client).execute(any(), any(), any());
return client;
}
} }

View File

@ -36,12 +36,9 @@ import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.ClusterAdminClient; import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.script.Script; import org.elasticsearch.script.Script;
import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortBuilder;
@ -49,7 +46,6 @@ import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.prelert.action.DeleteJobAction; import org.elasticsearch.xpack.prelert.action.DeleteJobAction;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -64,7 +60,6 @@ import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset; import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -80,8 +75,6 @@ public class MockClientBuilder {
private ClusterAdminClient clusterAdminClient; private ClusterAdminClient clusterAdminClient;
@Mock @Mock
private IndicesAdminClient indicesAdminClient; private IndicesAdminClient indicesAdminClient;
@Mock
private ActionFuture<IndicesExistsResponse> indexNotExistsResponseFuture;
public MockClientBuilder(String clusterName) { public MockClientBuilder(String clusterName) {
client = mock(Client.class); client = mock(Client.class);
@ -96,24 +89,6 @@ public class MockClientBuilder {
when(client.settings()).thenReturn(settings); when(client.settings()).thenReturn(settings);
} }
public MockClientBuilder addClusterStatusYellowResponse(String index, TimeValue timeout)
throws InterruptedException, ExecutionException {
ClusterHealthRequestBuilder clusterHealthRequestBuilder = mock(ClusterHealthRequestBuilder.class);
when(clusterAdminClient.prepareHealth(index)).thenReturn(clusterHealthRequestBuilder);
when(clusterHealthRequestBuilder.get(timeout)).thenReturn(mock(ClusterHealthResponse.class));
return this;
}
public MockClientBuilder addClusterStatusYellowResponse(String index, TimeValue timeout, Exception e)
throws InterruptedException, ExecutionException {
ClusterHealthRequestBuilder clusterHealthRequestBuilder = mock(ClusterHealthRequestBuilder.class);
when(clusterAdminClient.prepareHealth(index)).thenReturn(clusterHealthRequestBuilder);
doAnswer(invocation -> {
throw e;
}).when(clusterHealthRequestBuilder).get(eq(timeout));
return this;
}
@SuppressWarnings({ "unchecked" }) @SuppressWarnings({ "unchecked" })
public MockClientBuilder addClusterStatusYellowResponse() throws InterruptedException, ExecutionException { public MockClientBuilder addClusterStatusYellowResponse() throws InterruptedException, ExecutionException {
ListenableActionFuture<ClusterHealthResponse> actionFuture = mock(ListenableActionFuture.class); ListenableActionFuture<ClusterHealthResponse> actionFuture = mock(ListenableActionFuture.class);
@ -138,20 +113,6 @@ public class MockClientBuilder {
return this; return this;
} }
@SuppressWarnings({ "unchecked" })
public MockClientBuilder addClusterStatusRedResponse() throws InterruptedException, ExecutionException {
ListenableActionFuture<ClusterHealthResponse> actionFuture = mock(ListenableActionFuture.class);
ClusterHealthRequestBuilder clusterHealthRequestBuilder = mock(ClusterHealthRequestBuilder.class);
when(clusterAdminClient.prepareHealth()).thenReturn(clusterHealthRequestBuilder);
when(clusterHealthRequestBuilder.setWaitForYellowStatus()).thenReturn(clusterHealthRequestBuilder);
when(clusterHealthRequestBuilder.execute()).thenReturn(actionFuture);
ClusterHealthResponse response = mock(ClusterHealthResponse.class);
when(response.getStatus()).thenReturn(ClusterHealthStatus.RED);
when(actionFuture.actionGet()).thenReturn(response);
return this;
}
@SuppressWarnings({ "rawtypes", "unchecked" }) @SuppressWarnings({ "rawtypes", "unchecked" })
public MockClientBuilder addIndicesExistsResponse(String index, boolean exists) throws InterruptedException, ExecutionException { public MockClientBuilder addIndicesExistsResponse(String index, boolean exists) throws InterruptedException, ExecutionException {
ActionFuture actionFuture = mock(ActionFuture.class); ActionFuture actionFuture = mock(ActionFuture.class);
@ -199,16 +160,6 @@ public class MockClientBuilder {
return this; return this;
} }
public MockClientBuilder prepareGet(String index, String type, String id, Exception exception) {
GetRequestBuilder getRequestBuilder = mock(GetRequestBuilder.class);
doAnswer(invocation -> {
throw exception;
}).when(getRequestBuilder).get();
when(getRequestBuilder.setFetchSource(false)).thenReturn(getRequestBuilder);
when(client.prepareGet(index, type, id)).thenReturn(getRequestBuilder);
return this;
}
public MockClientBuilder prepareCreate(String index) { public MockClientBuilder prepareCreate(String index) {
CreateIndexRequestBuilder createIndexRequestBuilder = mock(CreateIndexRequestBuilder.class); CreateIndexRequestBuilder createIndexRequestBuilder = mock(CreateIndexRequestBuilder.class);
CreateIndexResponse response = mock(CreateIndexResponse.class); CreateIndexResponse response = mock(CreateIndexResponse.class);
@ -229,40 +180,6 @@ public class MockClientBuilder {
return this; return this;
} }
public MockClientBuilder prepareCreate(String index, RuntimeException e) {
CreateIndexRequestBuilder createIndexRequestBuilder = mock(CreateIndexRequestBuilder.class);
when(createIndexRequestBuilder.setSettings(any(Settings.Builder.class))).thenReturn(createIndexRequestBuilder);
when(createIndexRequestBuilder.addMapping(any(String.class), any(XContentBuilder.class))).thenReturn(createIndexRequestBuilder);
doThrow(e).when(createIndexRequestBuilder).get();
when(indicesAdminClient.prepareCreate(eq(index))).thenReturn(createIndexRequestBuilder);
return this;
}
public MockClientBuilder prepareCreate(String index, Exception e) {
CreateIndexRequestBuilder createIndexRequestBuilder = mock(CreateIndexRequestBuilder.class);
when(createIndexRequestBuilder.setSettings(any(Settings.Builder.class))).thenReturn(createIndexRequestBuilder);
when(createIndexRequestBuilder.addMapping(any(String.class), any(XContentBuilder.class))).thenReturn(createIndexRequestBuilder);
doAnswer(invocation -> {
throw e;
}).when(createIndexRequestBuilder).get();
when(indicesAdminClient.prepareCreate(eq(index))).thenReturn(createIndexRequestBuilder);
return this;
}
public MockClientBuilder prepareSearch(String index, String type, SearchResponse response) {
SearchRequestBuilder searchRequestBuilder = mock(SearchRequestBuilder.class);
when(searchRequestBuilder.get()).thenReturn(response);
when(searchRequestBuilder.setTypes(eq(type))).thenReturn(searchRequestBuilder);
when(searchRequestBuilder.setFrom(anyInt())).thenReturn(searchRequestBuilder);
when(searchRequestBuilder.setSize(anyInt())).thenReturn(searchRequestBuilder);
when(searchRequestBuilder.addSort(any(SortBuilder.class))).thenReturn(searchRequestBuilder);
when(searchRequestBuilder.setQuery(any())).thenReturn(searchRequestBuilder);
when(searchRequestBuilder.setFetchSource(anyBoolean())).thenReturn(searchRequestBuilder);
when(searchRequestBuilder.setScroll(anyString())).thenReturn(searchRequestBuilder);
when(client.prepareSearch(eq(index))).thenReturn(searchRequestBuilder);
return this;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public MockClientBuilder prepareSearchExecuteListener(String index, SearchResponse response) { public MockClientBuilder prepareSearchExecuteListener(String index, SearchResponse response) {
SearchRequestBuilder builder = mock(SearchRequestBuilder.class); SearchRequestBuilder builder = mock(SearchRequestBuilder.class);
@ -342,32 +259,6 @@ public class MockClientBuilder {
return this; return this;
} }
@SuppressWarnings("unchecked")
public MockClientBuilder prepareIndex(String index, String source) {
IndexRequestBuilder builder = mock(IndexRequestBuilder.class);
ListenableActionFuture<IndexResponse> actionFuture = mock(ListenableActionFuture.class);
when(client.prepareIndex(eq(index), any(), any())).thenReturn(builder);
when(builder.setSource(eq(source))).thenReturn(builder);
when(builder.setRefreshPolicy(eq(RefreshPolicy.IMMEDIATE))).thenReturn(builder);
when(builder.execute()).thenReturn(actionFuture);
when(actionFuture.actionGet()).thenReturn(mock(IndexResponse.class));
return this;
}
@SuppressWarnings("unchecked")
public MockClientBuilder prepareIndex(String index, ArgumentCaptor<String> getSource) {
IndexRequestBuilder builder = mock(IndexRequestBuilder.class);
ListenableActionFuture<IndexResponse> actionFuture = mock(ListenableActionFuture.class);
when(client.prepareIndex(eq(index), any(), any())).thenReturn(builder);
when(builder.setSource(getSource.capture())).thenReturn(builder);
when(builder.setRefreshPolicy(eq(RefreshPolicy.IMMEDIATE))).thenReturn(builder);
when(builder.execute()).thenReturn(actionFuture);
when(actionFuture.actionGet()).thenReturn(mock(IndexResponse.class));
return this;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public MockClientBuilder prepareIndex(String index, String type, String responseId, ArgumentCaptor<XContentBuilder> getSource) { public MockClientBuilder prepareIndex(String index, String type, String responseId, ArgumentCaptor<XContentBuilder> getSource) {
IndexRequestBuilder builder = mock(IndexRequestBuilder.class); IndexRequestBuilder builder = mock(IndexRequestBuilder.class);
@ -411,43 +302,6 @@ public class MockClientBuilder {
return this; return this;
} }
@SuppressWarnings("unchecked")
public MockClientBuilder prepareBulkExecuteListener(BulkResponse response) {
BulkRequestBuilder builder = mock(BulkRequestBuilder.class);
when(client.prepareBulk()).thenReturn(builder);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
ActionListener<BulkResponse> listener = (ActionListener<BulkResponse>) invocationOnMock.getArguments()[0];
listener.onResponse(response);
return null;
}
}).when(builder).execute(any());
return this;
}
public MockClientBuilder prepareUpdate(String index, String type, String id, ArgumentCaptor<Map<String, Object>> getSource) {
UpdateRequestBuilder builder = mock(UpdateRequestBuilder.class);
when(client.prepareUpdate(index, type, id)).thenReturn(builder);
when(builder.setDoc(getSource.capture())).thenReturn(builder);
when(builder.setRetryOnConflict(any(int.class))).thenReturn(builder);
when(builder.get()).thenReturn(mock(UpdateResponse.class));
return this;
}
public MockClientBuilder prepareUpdate(String index, String type, String id, ArgumentCaptor<Map<String, Object>> getSource,
Exception e) {
UpdateRequestBuilder builder = mock(UpdateRequestBuilder.class);
when(client.prepareUpdate(index, type, id)).thenReturn(builder);
when(builder.setDoc(getSource.capture())).thenReturn(builder);
when(builder.setRetryOnConflict(any(int.class))).thenReturn(builder);
doAnswer(invocation -> {
throw e;
}).when(builder).get();
return this;
}
public MockClientBuilder prepareUpdateScript(String index, String type, String id, ArgumentCaptor<Script> getSource, public MockClientBuilder prepareUpdateScript(String index, String type, String id, ArgumentCaptor<Script> getSource,
ArgumentCaptor<Map<String, Object>> getParams) { ArgumentCaptor<Map<String, Object>> getParams) {
UpdateRequestBuilder builder = mock(UpdateRequestBuilder.class); UpdateRequestBuilder builder = mock(UpdateRequestBuilder.class);
@ -472,11 +326,6 @@ public class MockClientBuilder {
return this; return this;
} }
public MockClientBuilder throwMissingIndexOnPrepareGet(String index, String type, String id) {
doThrow(new IndexNotFoundException(index)).when(client).prepareGet(index, type, id);
return this;
}
public Client build() { public Client build() {
return client; return client;
} }