* Add result_type field to bucket

* Query and delete buckets/records by result_type

* Add a filter to the ElasticsearchBatchedDocumentsIterator subclasses for result_type:bucket

* De-type Influencers, BucketInfluencers and Category Definitions

* Revert de-typing CategoryDefinition

* Resolve merge errors after rebase

Original commit: elastic/x-pack-elasticsearch@65605432e8
This commit is contained in:
David Kyle 2016-11-28 10:47:17 +00:00 committed by GitHub
parent 37cd03ad4d
commit 02a94ce729
25 changed files with 591 additions and 573 deletions

View File

@ -364,8 +364,7 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
ModelSnapshot modelSnapshot = revertCandidates.get(0);
// The quantiles can be large, and totally dominate the output -
// it's
// clearer to remove them
// it's clearer to remove them
modelSnapshot.setQuantiles(null);
return modelSnapshot;
}

View File

@ -17,16 +17,10 @@ import java.io.IOException;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
class ElasticsearchBatchedBucketsIterator extends ElasticsearchBatchedDocumentsIterator<Bucket> {
class ElasticsearchBatchedBucketsIterator extends ElasticsearchBatchedResultsIterator<Bucket> {
public ElasticsearchBatchedBucketsIterator(Client client, String jobId, ParseFieldMatcher parserFieldMatcher) {
super(client, JobResultsPersister.getJobIndexName(jobId), parserFieldMatcher);
}
@Override
protected String getType()
{
return Bucket.TYPE.getPreferredName();
public ElasticsearchBatchedBucketsIterator(Client client, String jobId, ParseFieldMatcher parseFieldMatcher) {
super(client, jobId, Bucket.RESULT_TYPE_VALUE, parseFieldMatcher);
}
@Override

View File

@ -10,6 +10,7 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortBuilders;
@ -45,6 +46,18 @@ abstract class ElasticsearchBatchedDocumentsIterator<T> implements BatchedDocume
isScrollInitialised = false;
}
protected ElasticsearchBatchedDocumentsIterator(Client client, String index, ParseFieldMatcher parseFieldMatcher,
QueryBuilder queryBuilder) {
this.parseFieldMatcher = parseFieldMatcher;
this.client = Objects.requireNonNull(client);
this.index = Objects.requireNonNull(index);
this.parseFieldMatcher = Objects.requireNonNull(parseFieldMatcher);
totalHits = 0;
count = 0;
filterBuilder = new ResultsFilterBuilder(queryBuilder);
isScrollInitialised = false;
}
@Override
public BatchedDocumentsIterator<T> timeRange(long startEpochMs, long endEpochMs) {
filterBuilder.timeRange(ElasticsearchMappings.ES_TIMESTAMP, startEpochMs, endEpochMs);
@ -74,7 +87,7 @@ abstract class ElasticsearchBatchedDocumentsIterator<T> implements BatchedDocume
}
private SearchResponse initScroll() {
LOGGER.trace("ES API CALL: search all of type " + getType() + " from index " + index);
LOGGER.trace("ES API CALL: search all of type {} from index {}", getType(), index);
isScrollInitialised = true;

View File

@ -17,18 +17,12 @@ import java.io.IOException;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
class ElasticsearchBatchedInfluencersIterator extends ElasticsearchBatchedDocumentsIterator<Influencer>
class ElasticsearchBatchedInfluencersIterator extends ElasticsearchBatchedResultsIterator<Influencer>
{
public ElasticsearchBatchedInfluencersIterator(Client client, String jobId,
ParseFieldMatcher parserFieldMatcher)
{
super(client, JobResultsPersister.getJobIndexName(jobId), parserFieldMatcher);
}
@Override
protected String getType()
{
return Influencer.TYPE.getPreferredName();
super(client, jobId, Influencer.RESULT_TYPE_VALUE, parserFieldMatcher);
}
@Override

View File

@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.persistence;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.xpack.prelert.job.results.Result;
abstract class ElasticsearchBatchedResultsIterator<T> extends ElasticsearchBatchedDocumentsIterator<T> {
public ElasticsearchBatchedResultsIterator(Client client, String jobId, String resultType, ParseFieldMatcher parseFieldMatcher) {
super(client, JobResultsPersister.getJobIndexName(jobId), parseFieldMatcher,
new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), resultType));
}
@Override
protected String getType() {
return Result.TYPE.getPreferredName();
}
}

View File

@ -16,10 +16,11 @@ import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
@ -29,6 +30,7 @@ import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.prelert.job.results.Result;
import java.util.Objects;
import java.util.function.LongSupplier;
@ -72,7 +74,7 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter {
deleteRecords(bucket);
deleteBucketInfluencers(bucket);
bulkRequestBuilder.add(
client.prepareDelete(JobResultsPersister.getJobIndexName(jobId), Bucket.TYPE.getPreferredName(), bucket.getId()));
client.prepareDelete(JobResultsPersister.getJobIndexName(jobId), Result.TYPE.getPreferredName(), bucket.getId()));
++deletedBucketCount;
}
@ -83,20 +85,22 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter {
// the scenes, and Elasticsearch documentation claims it's significantly
// slower. Here we rely on the record timestamps being identical to the
// bucket timestamp.
deleteTypeByBucket(bucket, AnomalyRecord.TYPE.getPreferredName(), () -> ++deletedRecordCount);
deleteResultTypeByBucket(bucket, AnomalyRecord.RESULT_TYPE_VALUE, () -> ++deletedRecordCount);
}
private void deleteTypeByBucket(Bucket bucket, String type, LongSupplier deleteCounter) {
QueryBuilder query = QueryBuilders.termQuery(ElasticsearchMappings.ES_TIMESTAMP,
bucket.getTimestamp().getTime());
private void deleteResultTypeByBucket(Bucket bucket, String resultType, LongSupplier deleteCounter) {
QueryBuilder timeQuery = QueryBuilders.termQuery(ElasticsearchMappings.ES_TIMESTAMP, bucket.getTimestamp().getTime());
QueryBuilder boolQuery = new BoolQueryBuilder()
.filter(timeQuery)
.filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), resultType));
int done = 0;
boolean finished = false;
while (finished == false) {
SearchResponse searchResponse = SearchAction.INSTANCE.newRequestBuilder(client)
.setIndices(JobResultsPersister.getJobIndexName(jobId))
.setTypes(type)
.setQuery(query)
.setTypes(Result.TYPE.getPreferredName())
.setQuery(boolQuery)
.addSort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC))
.setSize(SCROLL_SIZE)
.setFrom(done)
@ -124,17 +128,17 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter {
public void deleteBucketInfluencers(Bucket bucket) {
// Find the bucket influencers using the time stamp, relying on the
// bucket influencer timestamps being identical to the bucket timestamp.
deleteTypeByBucket(bucket, BucketInfluencer.TYPE.getPreferredName(), () -> ++deletedBucketInfluencerCount);
deleteResultTypeByBucket(bucket, BucketInfluencer.RESULT_TYPE_VALUE, () -> ++deletedBucketInfluencerCount);
}
public void deleteInfluencers(Bucket bucket) {
// Find the influencers using the time stamp, relying on the influencer
// timestamps being identical to the bucket timestamp.
deleteTypeByBucket(bucket, Influencer.TYPE.getPreferredName(), () -> ++deletedInfluencerCount);
deleteResultTypeByBucket(bucket, Influencer.RESULT_TYPE_VALUE, () -> ++deletedInfluencerCount);
}
public void deleteBucketByTime(Bucket bucket) {
deleteTypeByBucket(bucket, Bucket.TYPE.getPreferredName(), () -> ++deletedBucketCount);
deleteResultTypeByBucket(bucket, Bucket.RESULT_TYPE_VALUE, () -> ++deletedBucketCount);
}
@Override
@ -147,7 +151,7 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter {
return;
}
bulkRequestBuilder.add(
client.prepareDelete(JobResultsPersister.getJobIndexName(jobId), Influencer.TYPE.getPreferredName(), id));
client.prepareDelete(JobResultsPersister.getJobIndexName(jobId), Result.TYPE.getPreferredName(), id));
++deletedInfluencerCount;
}
@ -188,8 +192,7 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter {
QueryBuilder qb = QueryBuilders.termQuery(Bucket.IS_INTERIM.getPreferredName(), true);
SearchResponse searchResponse = client.prepareSearch(JobResultsPersister.getJobIndexName(jobId))
.setTypes(Bucket.TYPE.getPreferredName(), AnomalyRecord.TYPE.getPreferredName(), Influencer.TYPE.getPreferredName(),
BucketInfluencer.TYPE.getPreferredName())
.setTypes(Result.RESULT_TYPE.getPreferredName())
.setQuery(qb)
.addSort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC))
.setScroll(SCROLL_CONTEXT_DURATION)
@ -201,15 +204,15 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter {
long totalDeletedCount = 0;
while (totalDeletedCount < totalHits) {
for (SearchHit hit : searchResponse.getHits()) {
LOGGER.trace("Search hit for bucket: " + hit.toString() + ", " + hit.getId());
String type = hit.getType();
if (type.equals(Bucket.TYPE)) {
LOGGER.trace("Search hit for bucket: {}, {}", hit.toString(), hit.getId());
String type = (String) hit.getSource().get(Result.RESULT_TYPE.getPreferredName());
if (Bucket.RESULT_TYPE_VALUE.equals(type)) {
++deletedBucketCount;
} else if (type.equals(AnomalyRecord.TYPE)) {
} else if (AnomalyRecord.RESULT_TYPE_VALUE.equals(type)) {
++deletedRecordCount;
} else if (type.equals(BucketInfluencer.TYPE)) {
} else if (BucketInfluencer.RESULT_TYPE_VALUE.equals(type)) {
++deletedBucketInfluencerCount;
} else if (type.equals(Influencer.TYPE)) {
} else if (Influencer.RESULT_TYPE_VALUE.equals(type)) {
++deletedInfluencerCount;
}
++totalDeletedCount;

View File

@ -57,11 +57,11 @@ import org.elasticsearch.xpack.prelert.job.persistence.InfluencersQueryBuilder.I
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.prelert.job.results.ReservedFieldNames;
import org.elasticsearch.xpack.prelert.job.results.Result;
import org.elasticsearch.xpack.prelert.job.usage.Usage;
import org.elasticsearch.xpack.prelert.lists.ListDocument;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
@ -160,7 +160,7 @@ public class ElasticsearchJobProvider implements JobProvider
*/
private void createUsageMeteringIndex() {
try {
LOGGER.trace("ES API CALL: index exists? " + PRELERT_USAGE_INDEX);
LOGGER.trace("ES API CALL: index exists? {}", PRELERT_USAGE_INDEX);
boolean indexExists = client.admin().indices()
.exists(new IndicesExistsRequest(PRELERT_USAGE_INDEX))
.get().isExists();
@ -170,12 +170,12 @@ public class ElasticsearchJobProvider implements JobProvider
XContentBuilder usageMapping = ElasticsearchMappings.usageMapping();
LOGGER.trace("ES API CALL: create index " + PRELERT_USAGE_INDEX);
LOGGER.trace("ES API CALL: create index {}", PRELERT_USAGE_INDEX);
client.admin().indices().prepareCreate(PRELERT_USAGE_INDEX)
.setSettings(prelertIndexSettings())
.addMapping(Usage.TYPE, usageMapping)
.get();
LOGGER.trace("ES API CALL: wait for yellow status " + PRELERT_USAGE_INDEX);
LOGGER.trace("ES API CALL: wait for yellow status {}", PRELERT_USAGE_INDEX);
client.admin().cluster().prepareHealth(PRELERT_USAGE_INDEX).setWaitForYellowStatus().execute().actionGet();
}
} catch (InterruptedException | ExecutionException | IOException e) {
@ -217,38 +217,31 @@ public class ElasticsearchJobProvider implements JobProvider
*/
@Override
public void createJobRelatedIndices(Job job, ActionListener<Boolean> listener) {
Collection<String> termFields = (job.getAnalysisConfig() != null) ? job.getAnalysisConfig().termFields() : null;
Collection<String> influencers = (job.getAnalysisConfig() != null) ? job.getAnalysisConfig().getInfluencers() : null;
Collection<String> termFields = (job.getAnalysisConfig() != null) ? job.getAnalysisConfig().termFields() : Collections.emptyList();
try {
XContentBuilder bucketMapping = ElasticsearchMappings.bucketMapping();
XContentBuilder bucketInfluencerMapping = ElasticsearchMappings.bucketInfluencerMapping();
XContentBuilder resultsMapping = ElasticsearchMappings.resultsMapping(termFields);
XContentBuilder categorizerStateMapping = ElasticsearchMappings.categorizerStateMapping();
XContentBuilder categoryDefinitionMapping = ElasticsearchMappings.categoryDefinitionMapping();
XContentBuilder recordMapping = ElasticsearchMappings.recordMapping(termFields);
XContentBuilder quantilesMapping = ElasticsearchMappings.quantilesMapping();
XContentBuilder modelStateMapping = ElasticsearchMappings.modelStateMapping();
XContentBuilder modelSnapshotMapping = ElasticsearchMappings.modelSnapshotMapping();
XContentBuilder modelSizeStatsMapping = ElasticsearchMappings.modelSizeStatsMapping();
XContentBuilder influencerMapping = ElasticsearchMappings.influencerMapping(influencers);
XContentBuilder modelDebugMapping = ElasticsearchMappings.modelDebugOutputMapping(termFields);
XContentBuilder processingTimeMapping = ElasticsearchMappings.processingTimeMapping();
XContentBuilder partitionScoreMapping = ElasticsearchMappings.bucketPartitionMaxNormalizedScores();
XContentBuilder dataCountsMapping = ElasticsearchMappings.dataCountsMapping();
String jobId = job.getId();
LOGGER.trace("ES API CALL: create index " + job.getId());
LOGGER.trace("ES API CALL: create index {}", job.getId());
CreateIndexRequest createIndexRequest = new CreateIndexRequest(JobResultsPersister.getJobIndexName(jobId));
createIndexRequest.settings(prelertIndexSettings());
createIndexRequest.mapping(Bucket.TYPE.getPreferredName(), bucketMapping);
createIndexRequest.mapping(BucketInfluencer.TYPE.getPreferredName(), bucketInfluencerMapping);
createIndexRequest.mapping(Result.TYPE.getPreferredName(), resultsMapping);
createIndexRequest.mapping(CategorizerState.TYPE, categorizerStateMapping);
createIndexRequest.mapping(CategoryDefinition.TYPE.getPreferredName(), categoryDefinitionMapping);
createIndexRequest.mapping(AnomalyRecord.TYPE.getPreferredName(), recordMapping);
createIndexRequest.mapping(Quantiles.TYPE.getPreferredName(), quantilesMapping);
createIndexRequest.mapping(ModelState.TYPE, modelStateMapping);
createIndexRequest.mapping(ModelSnapshot.TYPE.getPreferredName(), modelSnapshotMapping);
createIndexRequest.mapping(ModelSizeStats.TYPE.getPreferredName(), modelSizeStatsMapping);
createIndexRequest.mapping(Influencer.TYPE.getPreferredName(), influencerMapping);
createIndexRequest.mapping(ModelDebugOutput.TYPE.getPreferredName(), modelDebugMapping);
createIndexRequest.mapping(ReservedFieldNames.BUCKET_PROCESSING_TIME_TYPE, processingTimeMapping);
createIndexRequest.mapping(ReservedFieldNames.PARTITION_NORMALIZED_PROB_TYPE, partitionScoreMapping);
@ -273,7 +266,7 @@ public class ElasticsearchJobProvider implements JobProvider
@Override
public void deleteJobRelatedIndices(String jobId, ActionListener<DeleteJobAction.Response> listener) {
String indexName = JobResultsPersister.getJobIndexName(jobId);
LOGGER.trace("ES API CALL: delete index " + indexName);
LOGGER.trace("ES API CALL: delete index {}", indexName);
try {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
@ -330,6 +323,7 @@ public class ElasticsearchJobProvider implements JobProvider
SortBuilder<?> sortBuilder = new FieldSortBuilder(esSortField(query.getSortField()))
.order(query.isSortDescending() ? SortOrder.DESC : SortOrder.ASC);
QueryPage<Bucket> buckets = buckets(jobId, query.isIncludeInterim(), query.getFrom(), query.getSize(), fb, sortBuilder);
if (Strings.isNullOrEmpty(query.getPartitionValue())) {
@ -351,7 +345,6 @@ public class ElasticsearchJobProvider implements JobProvider
b.setAnomalyScore(b.partitionAnomalyScore(query.getPartitionValue()));
}
}
return buckets;
@ -376,16 +369,21 @@ public class ElasticsearchJobProvider implements JobProvider
private QueryPage<Bucket> buckets(String jobId, boolean includeInterim, int from, int size,
QueryBuilder fb, SortBuilder<?> sb) throws ResourceNotFoundException {
QueryBuilder boolQuery = new BoolQueryBuilder()
.filter(fb)
.filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), Bucket.RESULT_TYPE_VALUE));
SearchResponse searchResponse;
try {
String indexName = JobResultsPersister.getJobIndexName(jobId);
LOGGER.trace("ES API CALL: search all of type " + Bucket.TYPE +
" from index " + indexName + " sort ascending " + ElasticsearchMappings.ES_TIMESTAMP +
" with filter after sort from " + from + " size " + size);
LOGGER.trace("ES API CALL: search all of result type {} from index {} with filter from {} size {}",
Bucket.RESULT_TYPE_VALUE, indexName, from, size);
searchResponse = client.prepareSearch(indexName)
.setTypes(Bucket.TYPE.getPreferredName())
.setTypes(Result.TYPE.getPreferredName())
.addSort(sb)
.setQuery(new ConstantScoreQueryBuilder(fb))
.setQuery(new ConstantScoreQueryBuilder(boolQuery))
.setFrom(from).setSize(size)
.get();
} catch (IndexNotFoundException e) {
@ -419,14 +417,16 @@ public class ElasticsearchJobProvider implements JobProvider
String indexName = JobResultsPersister.getJobIndexName(jobId);
SearchHits hits;
try {
LOGGER.trace("ES API CALL: get Bucket with timestamp " + query.getTimestamp() +
" from index " + indexName);
QueryBuilder qb = QueryBuilders.matchQuery(ElasticsearchMappings.ES_TIMESTAMP,
query.getTimestamp());
LOGGER.trace("ES API CALL: get Bucket with timestamp {} from index {}", query.getTimestamp(), indexName);
QueryBuilder matchQuery = QueryBuilders.matchQuery(ElasticsearchMappings.ES_TIMESTAMP, query.getTimestamp());
QueryBuilder boolQuery = new BoolQueryBuilder()
.filter(matchQuery)
.filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), Bucket.RESULT_TYPE_VALUE));
SearchResponse searchResponse = client.prepareSearch(indexName)
.setTypes(Bucket.TYPE.getPreferredName())
.setQuery(qb)
.setTypes(Result.TYPE.getPreferredName())
.setQuery(boolQuery)
.addSort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC))
.get();
hits = searchResponse.getHits();
@ -505,7 +505,7 @@ public class ElasticsearchJobProvider implements JobProvider
String indexName = JobResultsPersister.getJobIndexName(jobId);
SearchRequestBuilder searchBuilder = client
.prepareSearch(indexName)
.setPostFilter(qb)
.setQuery(qb)
.addSort(sb)
.setTypes(ReservedFieldNames.PARTITION_NORMALIZED_PROB_TYPE);
@ -601,8 +601,7 @@ public class ElasticsearchJobProvider implements JobProvider
// the scenes, and Elasticsearch documentation claims it's significantly
// slower. Here we rely on the record timestamps being identical to the
// bucket timestamp.
QueryBuilder recordFilter = QueryBuilders.termQuery(ElasticsearchMappings.ES_TIMESTAMP,
bucket.getTimestamp().getTime());
QueryBuilder recordFilter = QueryBuilders.termQuery(ElasticsearchMappings.ES_TIMESTAMP, bucket.getTimestamp().getTime());
recordFilter = new ResultsFilterBuilder(recordFilter)
.interim(AnomalyRecord.IS_INTERIM.getPreferredName(), includeInterim)
@ -624,9 +623,9 @@ public class ElasticsearchJobProvider implements JobProvider
@Override
public QueryPage<CategoryDefinition> categoryDefinitions(String jobId, int from, int size) {
String indexName = JobResultsPersister.getJobIndexName(jobId);
LOGGER.trace("ES API CALL: search all of type " + CategoryDefinition.TYPE +
" from index " + indexName + " sort ascending " + CategoryDefinition.CATEGORY_ID +
" from " + from + " size " + size);
LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} from {} size {}",
CategoryDefinition.TYPE.getPreferredName(), indexName, CategoryDefinition.CATEGORY_ID.getPreferredName(), from, size);
SearchRequestBuilder searchBuilder = client.prepareSearch(indexName)
.setTypes(CategoryDefinition.TYPE.getPreferredName())
.setFrom(from).setSize(size)
@ -662,13 +661,15 @@ public class ElasticsearchJobProvider implements JobProvider
GetResponse response;
try {
LOGGER.trace("ES API CALL: get ID " + categoryId + " type " + CategoryDefinition.TYPE +
" from index " + indexName);
LOGGER.trace("ES API CALL: get ID {} type {} from index {}",
categoryId, CategoryDefinition.TYPE, indexName);
response = client.prepareGet(indexName, CategoryDefinition.TYPE.getPreferredName(), categoryId).get();
} catch (IndexNotFoundException e) {
throw ExceptionsHelper.missingJobException(jobId);
}
if (response.isExists()) {
BytesReference source = response.getSourceAsBytesRef();
XContentParser parser;
@ -680,6 +681,7 @@ public class ElasticsearchJobProvider implements JobProvider
CategoryDefinition definition = CategoryDefinition.PARSER.apply(parser, () -> parseFieldMatcher);
return new QueryPage<>(Collections.singletonList(definition), 1, CategoryDefinition.RESULTS_FIELD);
}
throw QueryPage.emptyQueryPage(Bucket.RESULTS_FIELD);
}
@ -724,10 +726,10 @@ public class ElasticsearchJobProvider implements JobProvider
recordFilter = new BoolQueryBuilder()
.filter(recordFilter)
.filter(new TermsQueryBuilder(AnomalyRecord.RESULT_TYPE.getPreferredName(), AnomalyRecord.RESULT_TYPE_VALUE));
.filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), AnomalyRecord.RESULT_TYPE_VALUE));
SearchRequestBuilder searchBuilder = client.prepareSearch(indexName)
.setTypes(AnomalyRecord.TYPE.getPreferredName())
.setTypes(Result.TYPE.getPreferredName())
.setQuery(recordFilter)
.setFrom(from).setSize(size)
.addSort(sb == null ? SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC) : sb)
@ -740,10 +742,10 @@ public class ElasticsearchJobProvider implements JobProvider
SearchResponse searchResponse;
try {
LOGGER.trace("ES API CALL: search all of type " + AnomalyRecord.TYPE +
" from index " + indexName + ((sb != null) ? " with sort" : "") +
(secondarySort.isEmpty() ? "" : " with secondary sort") +
" with filter after sort from " + from + " size " + size);
LOGGER.trace("ES API CALL: search all of result type {} from index {}{}{} with filter after sort from {} size {}",
AnomalyRecord.RESULT_TYPE_VALUE, indexName, (sb != null) ? " with sort" : "",
secondarySort.isEmpty() ? "" : " with secondary sort", from, size);
searchResponse = searchBuilder.get();
} catch (IndexNotFoundException e) {
throw ExceptionsHelper.missingJobException(jobId);
@ -786,14 +788,15 @@ public class ElasticsearchJobProvider implements JobProvider
private QueryPage<Influencer> influencers(String jobId, int from, int size, QueryBuilder filterBuilder, String sortField,
boolean sortDescending) throws ResourceNotFoundException {
String indexName = JobResultsPersister.getJobIndexName(jobId);
LOGGER.trace("ES API CALL: search all of type " + Influencer.TYPE + " from index " + indexName
+ ((sortField != null)
? " with sort " + (sortDescending ? "descending" : "ascending") + " on field " + esSortField(sortField) : "")
+ " with filter after sort from " + from + " size " + size);
LOGGER.trace("ES API CALL: search all of result type {} from index {}{} with filter from {} size {}",
() -> Influencer.RESULT_TYPE_VALUE, () -> indexName,
() -> (sortField != null) ?
" with sort " + (sortDescending ? "descending" : "ascending") + " on field " + esSortField(sortField) : "",
() -> from, () -> size);
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indexName)
.setTypes(Influencer.TYPE.getPreferredName())
.setPostFilter(filterBuilder)
.setTypes(Result.TYPE.getPreferredName())
.setQuery(filterBuilder)
.setFrom(from).setSize(size);
FieldSortBuilder sb = sortField == null ? SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)

View File

@ -24,6 +24,7 @@ import org.elasticsearch.xpack.prelert.job.results.Influence;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.prelert.job.results.ReservedFieldNames;
import org.elasticsearch.xpack.prelert.job.results.Result;
import org.elasticsearch.xpack.prelert.job.usage.Usage;
import java.io.IOException;
@ -93,6 +94,243 @@ public class ElasticsearchMappings {
private ElasticsearchMappings() {
}
/**
* Create the Elasticsearch mapping for results objects
* {@link Bucket}s, {@link AnomalyRecord}s, {@link Influencer},
* {@link BucketInfluencer} and {@link CategoryDefinition}
*
* The '_all' field is disabled as the document isn't meant to be searched.
*
* @param termFieldNames All the term fields (by, over, partition) and influencers
* included in the mapping
*
* @return The mapping
* @throws IOException On write error
*/
public static XContentBuilder resultsMapping(Collection<String> termFieldNames) throws IOException {
XContentBuilder builder = jsonBuilder()
.startObject()
.startObject(Result.TYPE.getPreferredName())
.startObject(ALL)
.field(ENABLED, false)
// analyzer must be specified even though _all is disabled
// because all types in the same index must have the same
// analyzer for a given field
.field(ANALYZER, WHITESPACE)
.endObject()
.startObject(PROPERTIES)
.startObject(Result.RESULT_TYPE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(ES_TIMESTAMP)
.field(TYPE, DATE)
.endObject()
.startObject(Bucket.ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(Bucket.INITIAL_ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(Bucket.MAX_NORMALIZED_PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(Bucket.IS_INTERIM.getPreferredName())
.field(TYPE, BOOLEAN)
.endObject()
.startObject(Bucket.RECORD_COUNT.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(Bucket.EVENT_COUNT.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(Bucket.BUCKET_SPAN.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(Bucket.PROCESSING_TIME_MS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(Bucket.PARTITION_SCORES.getPreferredName())
.field(TYPE, NESTED)
.startObject(PROPERTIES)
.startObject(AnomalyRecord.PARTITION_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyRecord.ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(AnomalyRecord.PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(Bucket.PARTITION_SCORES.getPreferredName())
.field(TYPE, NESTED)
.startObject(PROPERTIES)
.startObject(AnomalyRecord.PARTITION_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyRecord.ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(AnomalyRecord.PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.endObject()
.endObject()
// bucket influencer mapping
.startObject(BucketInfluencer.INFLUENCER_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(BucketInfluencer.RAW_ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
// influencer mapping
.startObject(Influencer.INFLUENCER_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject();
addAnomalyRecordFieldsToMapping(builder);
if (termFieldNames != null) {
ElasticsearchDotNotationReverser reverser = new ElasticsearchDotNotationReverser();
for (String fieldName : termFieldNames) {
reverser.add(fieldName, "");
}
for (Map.Entry<String, Object> entry : reverser.getMappingsMap().entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
}
builder.endObject()
.endObject()
.endObject()
.endObject()
.endObject();
return builder;
}
/**
* AnomalyRecord fields to be added under the 'properties' section of the mapping
* @param builder Add properties to this builder
* @return builder
* @throws IOException On write error
*/
private static XContentBuilder addAnomalyRecordFieldsToMapping(XContentBuilder builder)
throws IOException {
builder.startObject(AnomalyRecord.DETECTOR_INDEX.getPreferredName())
.field(TYPE, INTEGER).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.ACTUAL.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.TYPICAL.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.FUNCTION.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.FUNCTION_DESCRIPTION.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.BY_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.BY_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyRecord.FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.PARTITION_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyRecord.OVER_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.OVER_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyRecord.NORMALIZED_PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.INITIAL_NORMALIZED_PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.CAUSES.getPreferredName())
.field(TYPE, NESTED)
.startObject(PROPERTIES)
.startObject(AnomalyCause.ACTUAL.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyCause.TYPICAL.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyCause.PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyCause.FUNCTION.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyCause.FUNCTION_DESCRIPTION.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyCause.BY_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyCause.BY_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyCause.CORRELATED_BY_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyCause.FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyCause.PARTITION_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyCause.PARTITION_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyCause.OVER_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyCause.OVER_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.endObject()
.endObject()
.startObject(AnomalyRecord.INFLUENCERS.getPreferredName())
/* Array of influences */
.field(TYPE, NESTED)
.startObject(PROPERTIES)
.startObject(Influence.INFLUENCER_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(Influence.INFLUENCER_FIELD_VALUES.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.endObject()
.endObject();
return builder;
}
public static XContentBuilder dataCountsMapping() throws IOException {
return jsonBuilder()
@ -144,138 +382,6 @@ public class ElasticsearchMappings {
.endObject();
}
/**
* Create the Elasticsearch mapping for {@linkplain org.elasticsearch.xpack.prelert.job.results.Bucket}.
* The '_all' field is disabled as the document isn't meant to be searched.
*/
public static XContentBuilder bucketMapping() throws IOException {
return jsonBuilder()
.startObject()
.startObject(Bucket.TYPE.getPreferredName())
.startObject(ALL)
.field(ENABLED, false)
// analyzer must be specified even though _all is disabled
// because all types in the same index must have the same
// analyzer for a given field
.field(ANALYZER, WHITESPACE)
.endObject()
.startObject(PROPERTIES)
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(ES_TIMESTAMP)
.field(TYPE, DATE)
.endObject()
.startObject(Bucket.ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(Bucket.INITIAL_ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(Bucket.MAX_NORMALIZED_PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(Bucket.IS_INTERIM.getPreferredName())
.field(TYPE, BOOLEAN)
.endObject()
.startObject(Bucket.RECORD_COUNT.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(Bucket.EVENT_COUNT.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(Bucket.BUCKET_SPAN.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(Bucket.PROCESSING_TIME_MS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(Bucket.BUCKET_INFLUENCERS.getPreferredName())
.field(TYPE, NESTED)
.startObject(PROPERTIES)
.startObject(BucketInfluencer.INFLUENCER_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(BucketInfluencer.INITIAL_ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(BucketInfluencer.ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(BucketInfluencer.RAW_ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(BucketInfluencer.PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.endObject()
.endObject()
.startObject(Bucket.PARTITION_SCORES.getPreferredName())
.field(TYPE, NESTED)
.startObject(PROPERTIES)
.startObject(AnomalyRecord.PARTITION_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyRecord.ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(AnomalyRecord.PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.endObject()
.endObject()
.endObject()
.endObject()
.endObject();
}
/**
* Create the Elasticsearch mapping for {@linkplain org.elasticsearch.xpack.prelert.job.results.BucketInfluencer}.
*/
public static XContentBuilder bucketInfluencerMapping() throws IOException {
return jsonBuilder()
.startObject()
.startObject(BucketInfluencer.TYPE.getPreferredName())
.startObject(ALL)
.field(ENABLED, false)
// analyzer must be specified even though _all is disabled
// because all types in the same index must have the same
// analyzer for a given field
.field(ANALYZER, WHITESPACE)
.endObject()
.startObject(PROPERTIES)
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(ES_TIMESTAMP)
.field(TYPE, DATE)
.endObject()
.startObject(Bucket.IS_INTERIM.getPreferredName())
.field(TYPE, BOOLEAN)
.endObject()
.startObject(BucketInfluencer.INFLUENCER_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(BucketInfluencer.INITIAL_ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(BucketInfluencer.ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(BucketInfluencer.RAW_ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(BucketInfluencer.PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.endObject()
.endObject()
.endObject();
}
/**
* Partition normalized scores. There is one per bucket
* so the timestamp is sufficient to uniquely identify
@ -333,6 +439,36 @@ public class ElasticsearchMappings {
.endObject();
}
/**
* Create the Elasticsearch mapping for {@linkplain Quantiles}.
* The '_all' field is disabled as the document isn't meant to be searched.
* <p>
* The quantile state string is not searchable (index = 'no') as it could be
* very large.
*/
public static XContentBuilder quantilesMapping() throws IOException {
return jsonBuilder()
.startObject()
.startObject(Quantiles.TYPE.getPreferredName())
.startObject(ALL)
.field(ENABLED, false)
// analyzer must be specified even though _all is disabled
// because all types in the same index must have the same
// analyzer for a given field
.field(ANALYZER, WHITESPACE)
.endObject()
.startObject(PROPERTIES)
.startObject(ES_TIMESTAMP)
.field(TYPE, DATE)
.endObject()
.startObject(Quantiles.QUANTILE_STATE.getPreferredName())
.field(TYPE, TEXT).field(INDEX, NO)
.endObject()
.endObject()
.endObject()
.endObject();
}
public static XContentBuilder categoryDefinitionMapping() throws IOException {
return jsonBuilder()
.startObject()
@ -368,178 +504,6 @@ public class ElasticsearchMappings {
.endObject();
}
/**
* @param termFieldNames Optionally, other field names to include in the
* mappings. Pass <code>null</code> if not required.
*/
public static XContentBuilder recordMapping(Collection<String> termFieldNames) throws IOException {
XContentBuilder builder = jsonBuilder()
.startObject()
.startObject(AnomalyRecord.TYPE.getPreferredName())
.startObject(ALL)
.field(ANALYZER, WHITESPACE)
.endObject()
.startObject(PROPERTIES)
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(ES_TIMESTAMP)
.field(TYPE, DATE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.DETECTOR_INDEX.getPreferredName())
.field(TYPE, INTEGER).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.ACTUAL.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.TYPICAL.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.FUNCTION.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.FUNCTION_DESCRIPTION.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.BY_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.BY_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyRecord.FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.PARTITION_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyRecord.OVER_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.OVER_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyRecord.CAUSES.getPreferredName())
.field(TYPE, NESTED)
.startObject(PROPERTIES)
.startObject(AnomalyCause.ACTUAL.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyCause.TYPICAL.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyCause.PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyCause.FUNCTION.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyCause.FUNCTION_DESCRIPTION.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyCause.BY_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyCause.BY_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyCause.CORRELATED_BY_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyCause.FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyCause.PARTITION_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyCause.PARTITION_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyCause.OVER_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyCause.OVER_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.endObject()
.endObject()
.startObject(AnomalyRecord.ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.NORMALIZED_PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.INITIAL_NORMALIZED_PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.IS_INTERIM.getPreferredName())
.field(TYPE, BOOLEAN).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.INFLUENCERS.getPreferredName())
/* Array of influences */
.field(TYPE, NESTED)
.startObject(PROPERTIES)
.startObject(Influence.INFLUENCER_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(Influence.INFLUENCER_FIELD_VALUES.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.endObject()
.endObject();
if (termFieldNames != null) {
ElasticsearchDotNotationReverser reverser = new ElasticsearchDotNotationReverser();
for (String fieldName : termFieldNames) {
reverser.add(fieldName, "");
}
for (Map.Entry<String, Object> entry : reverser.getMappingsMap().entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
}
return builder
.endObject()
.endObject()
.endObject();
}
/**
* Create the Elasticsearch mapping for {@linkplain Quantiles}.
* The '_all' field is disabled as the document isn't meant to be searched.
* <p>
* The quantile state string is not searchable (index = 'no') as it could be
* very large.
*/
public static XContentBuilder quantilesMapping() throws IOException {
return jsonBuilder()
.startObject()
.startObject(Quantiles.TYPE.getPreferredName())
.startObject(ALL)
.field(ENABLED, false)
// analyzer must be specified even though _all is disabled
// because all types in the same index must have the same
// analyzer for a given field
.field(ANALYZER, WHITESPACE)
.endObject()
.startObject(PROPERTIES)
.startObject(ES_TIMESTAMP)
.field(TYPE, DATE)
.endObject()
.startObject(Quantiles.QUANTILE_STATE.getPreferredName())
.field(TYPE, TEXT).field(INDEX, NO)
.endObject()
.endObject()
.endObject()
.endObject();
}
/**
* Create the Elasticsearch mapping for {@linkplain ModelState}.
* The model state could potentially be huge (over a gigabyte in size)
@ -763,61 +727,6 @@ public class ElasticsearchMappings {
.endObject();
}
/**
* Influence results mapping
*
* @param influencerFieldNames Optionally, other field names to include in the
* mappings. Pass <code>null</code> if not required.
*/
public static XContentBuilder influencerMapping(Collection<String> influencerFieldNames) throws IOException {
XContentBuilder builder = jsonBuilder()
.startObject()
.startObject(Influencer.TYPE.getPreferredName())
.startObject(ALL)
.field(ANALYZER, WHITESPACE)
.endObject()
.startObject(PROPERTIES)
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(ES_TIMESTAMP)
.field(TYPE, DATE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(Influencer.PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(Influencer.INITIAL_ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(Influencer.ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(Influencer.INFLUENCER_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(Influencer.INFLUENCER_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(Bucket.IS_INTERIM.getPreferredName())
.field(TYPE, BOOLEAN).field(INCLUDE_IN_ALL, false)
.endObject();
if (influencerFieldNames != null) {
ElasticsearchDotNotationReverser reverser = new ElasticsearchDotNotationReverser();
for (String fieldName : influencerFieldNames) {
reverser.add(fieldName, "");
}
for (Map.Entry<String, Object> entry : reverser.getMappingsMap().entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
}
return builder
.endObject()
.endObject()
.endObject();
}
/**
* The Elasticsearch mappings for the usage documents
*/

View File

@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.Result;
import java.io.IOException;
import java.util.List;
@ -46,8 +47,9 @@ public class JobRenormaliser extends AbstractComponent {
String jobId = bucket.getJobId();
try {
String indexName = JobResultsPersister.getJobIndexName(jobId);
logger.trace("[{}] ES API CALL: index type {} to index {} with ID {}", jobId, Bucket.TYPE, indexName, bucket.getId());
client.prepareIndex(indexName, Bucket.TYPE.getPreferredName(), bucket.getId())
logger.trace("[{}] ES API CALL: update result type {} to index {} with ID {}", jobId, Bucket.RESULT_TYPE_VALUE, indexName,
bucket.getId());
client.prepareIndex(indexName, Result.TYPE.getPreferredName(), bucket.getId())
.setSource(jobResultsPersister.toXContentBuilder(bucket)).execute().actionGet();
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error updating bucket state", new Object[]{jobId}, e));
@ -83,11 +85,11 @@ public class JobRenormaliser extends AbstractComponent {
for (AnomalyRecord record : records) {
String recordId = record.getId();
String indexName = JobResultsPersister.getJobIndexName(jobId);
logger.trace("[{}] ES BULK ACTION: update ID {} type {} in index {} using map of new values, for bucket {}",
jobId, recordId, AnomalyRecord.TYPE, indexName, bucketId);
logger.trace("[{}] ES BULK ACTION: update ID {} result type {} in index {} using map of new values, for bucket {}",
jobId, recordId, AnomalyRecord.RESULT_TYPE_VALUE, indexName, bucketId);
bulkRequest.add(
client.prepareIndex(indexName, AnomalyRecord.TYPE.getPreferredName(), recordId)
client.prepareIndex(indexName, Result.TYPE.getPreferredName(), recordId)
.setSource(jobResultsPersister.toXContentBuilder(record)));
addedAny = true;

View File

@ -28,6 +28,7 @@ import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.prelert.job.results.ReservedFieldNames;
import org.elasticsearch.xpack.prelert.job.results.Result;
import java.io.IOException;
import java.util.Date;
@ -73,13 +74,15 @@ public class JobResultsPersister extends AbstractComponent {
try {
XContentBuilder content = toXContentBuilder(bucket);
String indexName = getJobIndexName(jobId);
logger.trace("[{}] ES API CALL: index type {} to index {} at epoch {}", jobId, Bucket.TYPE, indexName, bucket.getEpoch());
IndexResponse response = client.prepareIndex(indexName, Bucket.TYPE.getPreferredName())
logger.trace("[{}] ES API CALL: index result type {} to index {} at epoch {}", jobId, Bucket.RESULT_TYPE_VALUE, indexName,
bucket.getEpoch());
IndexResponse response = client.prepareIndex(indexName, Result.TYPE.getPreferredName())
.setSource(content)
.execute().actionGet();
bucket.setId(response.getId());
persistBucketInfluencersStandalone(jobId, bucket.getId(), bucket.getBucketInfluencers(), bucket.getTimestamp(),
bucket.isInterim());
persistPerPartitionMaxProbabilities(bucket);
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error persisting bucket", new Object[] {jobId}, e));
@ -102,11 +105,12 @@ public class JobResultsPersister extends AbstractComponent {
for (AnomalyRecord record : records) {
content = toXContentBuilder(record);
logger.trace("[{}] ES BULK ACTION: index type {} to index {} with auto-generated ID", jobId, AnomalyRecord.TYPE, indexName);
addRecordsRequest.add(client.prepareIndex(indexName, AnomalyRecord.TYPE.getPreferredName()).setSource(content));
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with auto-generated ID",
jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName);
addRecordsRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(content));
}
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error persisting records", new Object[] {jobId}, e));
logger.error(new ParameterizedMessage("[{}] Error persisting records", new Object [] {jobId}, e));
return;
}
@ -132,9 +136,9 @@ public class JobResultsPersister extends AbstractComponent {
try {
for (Influencer influencer : influencers) {
content = toXContentBuilder(influencer);
logger.trace("[{}] ES BULK ACTION: index type {} to index {} with auto-generated ID",
jobId, Influencer.TYPE, indexName);
addInfluencersRequest.add(client.prepareIndex(indexName, Influencer.TYPE.getPreferredName()).setSource(content));
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with auto-generated ID",
jobId, Influencer.RESULT_TYPE_VALUE, indexName);
addInfluencersRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(content));
}
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error persisting influencers", new Object[] {jobId}, e));
@ -216,7 +220,7 @@ public class JobResultsPersister extends AbstractComponent {
* Persist the influencer
*/
public void persistInfluencer(Influencer influencer) {
Persistable persistable = new Persistable(influencer.getJobId(), influencer, Influencer.TYPE::getPreferredName,
Persistable persistable = new Persistable(influencer.getJobId(), influencer, Result.TYPE::getPreferredName,
influencer::getId, () -> toXContentBuilder(influencer));
persistable.persist();
// Don't commit as we expect masses of these updates and they're not
@ -296,10 +300,10 @@ public class JobResultsPersister extends AbstractComponent {
// Need consistent IDs to ensure overwriting on renormalisation
String id = bucketId + bucketInfluencer.getInfluencerFieldName();
String indexName = getJobIndexName(jobId);
logger.trace("[{}] ES BULK ACTION: index type {} to index {} with ID {}", jobId, BucketInfluencer.TYPE, indexName, id);
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", jobId, BucketInfluencer.RESULT_TYPE_VALUE,
indexName, id);
addBucketInfluencersRequest.add(
client.prepareIndex(indexName, BucketInfluencer.TYPE.getPreferredName(), id)
.setSource(content));
client.prepareIndex(indexName, Result.TYPE.getPreferredName(), id).setSource(content));
}
logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, addBucketInfluencersRequest.numberOfActions());
BulkResponse addBucketInfluencersResponse = addBucketInfluencersRequest.execute().actionGet();

View File

@ -29,16 +29,15 @@ import java.util.Objects;
* can be returned if the members have not been set.
*/
public class AnomalyRecord extends ToXContentToBytes implements Writeable {
/**
* Serialisation fields
*/
public static final ParseField TYPE = new ParseField("record");
/**
* Result type
*/
public static final String RESULT_TYPE_VALUE = "record";
/**
* Result fields (all detector types)
*/
public static final ParseField JOB_ID = new ParseField("jobId");
public static final ParseField RESULT_TYPE = new ParseField("result_type");
public static final ParseField DETECTOR_INDEX = new ParseField("detectorIndex");
public static final ParseField PROBABILITY = new ParseField("probability");
public static final ParseField BY_FIELD_NAME = new ParseField("byFieldName");
@ -78,11 +77,11 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
public static final ParseField INITIAL_NORMALIZED_PROBABILITY = new ParseField("initialNormalizedProbability");
public static final ConstructingObjectParser<AnomalyRecord, ParseFieldMatcherSupplier> PARSER =
new ConstructingObjectParser<>(TYPE.getPreferredName(), a -> new AnomalyRecord((String) a[0]));
new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> new AnomalyRecord((String) a[0]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), JOB_ID);
PARSER.declareString((anomalyRecord, s) -> {}, RESULT_TYPE);
PARSER.declareString((anomalyRecord, s) -> {}, Result.RESULT_TYPE);
PARSER.declareDouble(AnomalyRecord::setProbability, PROBABILITY);
PARSER.declareDouble(AnomalyRecord::setAnomalyScore, ANOMALY_SCORE);
PARSER.declareDouble(AnomalyRecord::setNormalizedProbability, NORMALIZED_PROBABILITY);
@ -114,8 +113,6 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
PARSER.declareObjectArray(AnomalyRecord::setInfluencers, Influence.PARSER, INFLUENCERS);
}
public static final String RESULT_TYPE_VALUE = "record";
private final String jobId;
private String id;
private int detectorIndex;
@ -246,7 +243,7 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(JOB_ID.getPreferredName(), jobId);
builder.field(RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE);
builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE);
builder.field(PROBABILITY.getPreferredName(), probability);
builder.field(ANOMALY_SCORE.getPreferredName(), anomalyScore);
builder.field(NORMALIZED_PROBABILITY.getPreferredName(), normalizedProbability);

View File

@ -26,8 +26,6 @@ import java.util.Objects;
public class AutodetectResult extends ToXContentToBytes implements Writeable {
public static final ParseField TYPE = new ParseField("autodetect_result");
public static final ParseField RECORDS = new ParseField("records");
public static final ParseField INFLUENCERS = new ParseField("influencers");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<AutodetectResult, ParseFieldMatcherSupplier> PARSER = new ConstructingObjectParser<>(
@ -36,9 +34,9 @@ public class AutodetectResult extends ToXContentToBytes implements Writeable {
(ModelDebugOutput) a[6], (CategoryDefinition) a[7], (FlushAcknowledgement) a[8]));
static {
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), Bucket.PARSER, Bucket.TYPE);
PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), AnomalyRecord.PARSER, RECORDS);
PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), Influencer.PARSER, INFLUENCERS);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), Bucket.PARSER, Bucket.RESULT_TYPE_FIELD);
PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), AnomalyRecord.PARSER, AnomalyRecord.RESULTS_FIELD);
PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), Influencer.PARSER, Influencer.RESULTS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), Quantiles.PARSER, Quantiles.TYPE);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelSnapshot.PARSER, ModelSnapshot.TYPE);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelSizeStats.PARSER, ModelSizeStats.TYPE);
@ -151,9 +149,9 @@ public class AutodetectResult extends ToXContentToBytes implements Writeable {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
addNullableField(Bucket.TYPE, bucket, builder);
addNullableField(RECORDS, records, builder);
addNullableField(INFLUENCERS, influencers, builder);
addNullableField(Bucket.RESULT_TYPE_FIELD, bucket, builder);
addNullableField(AnomalyRecord.RESULTS_FIELD, records, builder);
addNullableField(Influencer.RESULTS_FIELD, influencers, builder);
addNullableField(Quantiles.TYPE, quantiles, builder);
addNullableField(ModelSnapshot.TYPE, modelSnapshot, builder);
addNullableField(ModelSizeStats.TYPE, modelSizeStats, builder);

View File

@ -54,12 +54,13 @@ public class Bucket extends ToXContentToBytes implements Writeable {
public static final ParseField RESULTS_FIELD = new ParseField("buckets");
/**
* Elasticsearch type
* Result type
*/
public static final ParseField TYPE = new ParseField("bucket");
public static final String RESULT_TYPE_VALUE = "bucket";
public static final ParseField RESULT_TYPE_FIELD = new ParseField(RESULT_TYPE_VALUE);
public static final ConstructingObjectParser<Bucket, ParseFieldMatcherSupplier> PARSER =
new ConstructingObjectParser<>(TYPE.getPreferredName(), a -> new Bucket((String) a[0]));
new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> new Bucket((String) a[0]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), JOB_ID);
@ -82,6 +83,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
PARSER.declareLong(Bucket::setBucketSpan, BUCKET_SPAN);
PARSER.declareLong(Bucket::setProcessingTimeMs, PROCESSING_TIME_MS);
PARSER.declareObjectArray(Bucket::setPartitionScores, PartitionScore.PARSER, PARTITION_SCORES);
PARSER.declareString((bucket, s) -> {}, Result.RESULT_TYPE);
}
private final String jobId;
@ -173,6 +175,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
builder.field(BUCKET_INFLUENCERS.getPreferredName(), bucketInfluencers);
builder.field(PROCESSING_TIME_MS.getPreferredName(), processingTimeMs);
builder.field(PARTITION_SCORES.getPreferredName(), partitionScores);
builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE);
builder.endObject();
return builder;
}

View File

@ -24,14 +24,10 @@ import java.util.Objects;
public class BucketInfluencer extends ToXContentToBytes implements Writeable {
/**
* Elasticsearch type
* Result type
*/
public static final ParseField TYPE = new ParseField("bucketInfluencer");
/**
* This is the field name of the time bucket influencer.
*/
public static final ParseField BUCKET_TIME = new ParseField("bucketTime");
public static final String RESULT_TYPE_VALUE = "bucketInfluencer";
public static final ParseField RESULT_TYPE_FIELD = new ParseField(RESULT_TYPE_VALUE);
/*
* Field names
@ -46,10 +42,11 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable {
public static final ParseField TIMESTAMP = new ParseField("timestamp");
public static final ConstructingObjectParser<BucketInfluencer, ParseFieldMatcherSupplier> PARSER =
new ConstructingObjectParser<>(TYPE.getPreferredName(), a -> new BucketInfluencer((String) a[0]));
new ConstructingObjectParser<>(RESULT_TYPE_FIELD.getPreferredName(), a -> new BucketInfluencer((String) a[0]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
PARSER.declareString((bucketInfluencer, s) -> {}, Result.RESULT_TYPE);
PARSER.declareString(BucketInfluencer::setInfluencerFieldName, INFLUENCER_FIELD_NAME);
PARSER.declareDouble(BucketInfluencer::setInitialAnomalyScore, INITIAL_ANOMALY_SCORE);
PARSER.declareDouble(BucketInfluencer::setAnomalyScore, ANOMALY_SCORE);
@ -123,6 +120,7 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE);
if (influenceField != null) {
builder.field(INFLUENCER_FIELD_NAME.getPreferredName(), influenceField);
}

View File

@ -23,9 +23,10 @@ import java.util.Objects;
public class Influencer extends ToXContentToBytes implements Writeable {
/**
* Elasticsearch type
* Result type
*/
public static final ParseField TYPE = new ParseField("influencer");
public static final String RESULT_TYPE_VALUE = "influencer";
public static final ParseField RESULT_TYPE_FIELD = new ParseField(RESULT_TYPE_VALUE);
/*
* Field names
@ -42,12 +43,13 @@ public class Influencer extends ToXContentToBytes implements Writeable {
public static final ParseField RESULTS_FIELD = new ParseField("influencers");
public static final ConstructingObjectParser<Influencer, ParseFieldMatcherSupplier> PARSER = new ConstructingObjectParser<>(
TYPE.getPreferredName(), a -> new Influencer((String) a[0], (String) a[1], (String) a[2]));
RESULT_TYPE_FIELD.getPreferredName(), a -> new Influencer((String) a[0], (String) a[1], (String) a[2]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), JOB_ID);
PARSER.declareString(ConstructingObjectParser.constructorArg(), INFLUENCER_FIELD_NAME);
PARSER.declareString(ConstructingObjectParser.constructorArg(), INFLUENCER_FIELD_VALUE);
PARSER.declareString((influencer, s) -> {}, Result.RESULT_TYPE);
PARSER.declareDouble(Influencer::setProbability, PROBABILITY);
PARSER.declareDouble(Influencer::setAnomalyScore, ANOMALY_SCORE);
PARSER.declareDouble(Influencer::setInitialAnomalyScore, INITIAL_ANOMALY_SCORE);
@ -116,6 +118,7 @@ public class Influencer extends ToXContentToBytes implements Writeable {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(JOB_ID.getPreferredName(), jobId);
builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE);
builder.field(INFLUENCER_FIELD_NAME.getPreferredName(), influenceField);
builder.field(INFLUENCER_FIELD_VALUE.getPreferredName(), influenceValue);
builder.field(ANOMALY_SCORE.getPreferredName(), anomalyScore);

View File

@ -91,13 +91,10 @@ public final class ReservedFieldNames {
Bucket.IS_INTERIM.getPreferredName(),
Bucket.RECORD_COUNT.getPreferredName(),
Bucket.EVENT_COUNT.getPreferredName(),
Bucket.RECORDS.getPreferredName(),
Bucket.BUCKET_INFLUENCERS.getPreferredName(),
Bucket.INITIAL_ANOMALY_SCORE.getPreferredName(),
Bucket.PROCESSING_TIME_MS.getPreferredName(),
Bucket.PARTITION_SCORES.getPreferredName(),
BucketInfluencer.BUCKET_TIME.getPreferredName(), BucketInfluencer.INFLUENCER_FIELD_NAME.getPreferredName(),
BucketInfluencer.INITIAL_ANOMALY_SCORE.getPreferredName(), BucketInfluencer.ANOMALY_SCORE.getPreferredName(),
BucketInfluencer.RAW_ANOMALY_SCORE.getPreferredName(), BucketInfluencer.PROBABILITY.getPreferredName(),
@ -151,18 +148,17 @@ public final class ReservedFieldNames {
ModelSnapshot.RESTORE_PRIORITY.getPreferredName(),
ModelSnapshot.SNAPSHOT_ID.getPreferredName(),
ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName(),
ModelSizeStats.TYPE.getPreferredName(),
ModelSnapshot.LATEST_RECORD_TIME.getPreferredName(),
ModelSnapshot.LATEST_RESULT_TIME.getPreferredName(),
Quantiles.QUANTILE_STATE.getPreferredName(),
Result.RESULT_TYPE.getPreferredName(),
Usage.INPUT_BYTES,
Usage.INPUT_FIELD_COUNT,
Usage.INPUT_RECORD_COUNT,
Usage.TIMESTAMP,
Usage.TYPE,
JOB_ID_NAME,
ES_TIMESTAMP

View File

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.results;
import org.elasticsearch.common.ParseField;
/**
* Common attributes of the result types
*/
public class Result {
/**
* Serialisation fields
*/
public static final ParseField TYPE = new ParseField("result");
public static final ParseField RESULT_TYPE = new ParseField("result_type");
}

View File

@ -31,6 +31,18 @@ import static org.hamcrest.Matchers.not;
public class PrelertJobIT extends ESRestTestCase {
private static final String RESULT_MAPPING = "{ \"mappings\": {\"result\": { \"properties\": { " +
"\"result_type\": { \"type\" : \"keyword\" }," +
"\"timestamp\": { \"type\" : \"date\" }, " +
"\"anomalyScore\": { \"type\" : \"double\" }, " +
"\"normalizedProbability\": { \"type\" : \"double\" }, " +
"\"overFieldValue\": { \"type\" : \"keyword\" }, " +
"\"partitionFieldValue\": { \"type\" : \"keyword\" }, " +
"\"byFieldValue\": { \"type\" : \"keyword\" }, " +
"\"fieldName\": { \"type\" : \"keyword\" }, " +
"\"function\": { \"type\" : \"keyword\" } " +
"} } } }";
public void testPutJob_GivenFarequoteConfig() throws Exception {
Response response = createFarequoteJob();
@ -204,6 +216,31 @@ public class PrelertJobIT extends ESRestTestCase {
assertThat(responseAsString, not(isEmptyString()));
}
public void testGetRecordResults() throws Exception {
Map<String, String> params = new HashMap<>();
params.put("start", "1200"); // inclusive
params.put("end", "1400"); // exclusive
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest("get", PrelertPlugin.BASE_PATH + "results/1/records", params));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404));
assertThat(e.getMessage(), containsString("No known job with id '1'"));
addRecordResult("1", "1234");
addRecordResult("1", "1235");
addRecordResult("1", "1236");
Response response = client().performRequest("get", PrelertPlugin.BASE_PATH + "results/1/records", params);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":3"));
params.put("end", "1235");
response = client().performRequest("get", PrelertPlugin.BASE_PATH + "results/1/records", params);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":1"));
}
public void testPauseAndResumeJob() throws Exception {
createFarequoteJob();
@ -251,17 +288,32 @@ public class PrelertJobIT extends ESRestTestCase {
}
private Response addBucketResult(String jobId, String timestamp) throws Exception {
String createIndexBody = "{ \"mappings\": {\"bucket\": { \"properties\": { \"timestamp\": { \"type\" : \"date\" } } } } }";
try {
client().performRequest("put", "prelertresults-" + jobId, Collections.emptyMap(), new StringEntity(createIndexBody));
client().performRequest("put", "prelertresults-" + jobId, Collections.emptyMap(), new StringEntity(RESULT_MAPPING));
} catch (ResponseException e) {
// it is ok: the index already exists
assertThat(e.getMessage(), containsString("index_already_exists_exception"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
}
String bucketResult = String.format(Locale.ROOT, "{\"jobId\":\"%s\", \"timestamp\": \"%s\"}", jobId, timestamp);
return client().performRequest("put", "prelertresults-" + jobId + "/bucket/" + timestamp,
String bucketResult =
String.format(Locale.ROOT, "{\"jobId\":\"%s\", \"timestamp\": \"%s\", \"result_type\":\"bucket\"}", jobId, timestamp);
return client().performRequest("put", "prelertresults-" + jobId + "/result/" + timestamp,
Collections.singletonMap("refresh", "true"), new StringEntity(bucketResult));
}
private Response addRecordResult(String jobId, String timestamp) throws Exception {
try {
client().performRequest("put", "prelertresults-" + jobId, Collections.emptyMap(), new StringEntity(RESULT_MAPPING));
} catch (ResponseException e) {
// it is ok: the index already exists
assertThat(e.getMessage(), containsString("index_already_exists_exception"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
}
String bucketResult =
String.format(Locale.ROOT, "{\"jobId\":\"%s\", \"timestamp\": \"%s\", \"result_type\":\"record\"}", jobId, timestamp);
return client().performRequest("put", "prelertresults-" + jobId + "/result/" + timestamp,
Collections.singletonMap("refresh", "true"), new StringEntity(bucketResult));
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.Result;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
@ -46,7 +47,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any;
@ -220,7 +220,7 @@ public class ElasticsearchJobProviderTests extends ESTestCase {
int size = 10;
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(ElasticsearchJobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, Bucket.TYPE.getPreferredName(), from, size, response, queryBuilder);
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
ElasticsearchJobProvider provider = createProvider(client);
@ -253,7 +253,7 @@ public class ElasticsearchJobProviderTests extends ESTestCase {
int size = 17;
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(ElasticsearchJobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, Bucket.TYPE.getPreferredName(), from, size, response, queryBuilder);
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
ElasticsearchJobProvider provider = createProvider(client);
@ -287,7 +287,7 @@ public class ElasticsearchJobProviderTests extends ESTestCase {
int size = 17;
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(ElasticsearchJobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, Bucket.TYPE.getPreferredName(), from, size, response, queryBuilder);
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
ElasticsearchJobProvider provider = createProvider(client);
@ -323,7 +323,7 @@ public class ElasticsearchJobProviderTests extends ESTestCase {
SearchResponse response = createSearchResponse(false, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(ElasticsearchJobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, Bucket.TYPE.getPreferredName(), 0, 0, response, queryBuilder);
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), 0, 0, response, queryBuilder);
Client client = clientBuilder.build();
ElasticsearchJobProvider provider = createProvider(client);
@ -349,7 +349,7 @@ public class ElasticsearchJobProviderTests extends ESTestCase {
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(ElasticsearchJobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, Bucket.TYPE.getPreferredName(), 0, 0, response, queryBuilder);
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), 0, 0, response, queryBuilder);
Client client = clientBuilder.build();
ElasticsearchJobProvider provider = createProvider(client);
@ -378,7 +378,7 @@ public class ElasticsearchJobProviderTests extends ESTestCase {
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(ElasticsearchJobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, Bucket.TYPE.getPreferredName(), 0, 0, response, queryBuilder);
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), 0, 0, response, queryBuilder);
Client client = clientBuilder.build();
ElasticsearchJobProvider provider = createProvider(client);
@ -418,7 +418,7 @@ public class ElasticsearchJobProviderTests extends ESTestCase {
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(ElasticsearchJobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, AnomalyRecord.TYPE.getPreferredName(), from, size, response, queryBuilder);
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
ElasticsearchJobProvider provider = createProvider(client);
@ -468,7 +468,7 @@ public class ElasticsearchJobProviderTests extends ESTestCase {
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(ElasticsearchJobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, AnomalyRecord.TYPE.getPreferredName(), from, size, response, queryBuilder);
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
ElasticsearchJobProvider provider = createProvider(client);
@ -525,7 +525,7 @@ public class ElasticsearchJobProviderTests extends ESTestCase {
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(ElasticsearchJobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, AnomalyRecord.TYPE.getPreferredName(), from, size, response, queryBuilder);
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
ElasticsearchJobProvider provider = createProvider(client);
@ -564,7 +564,7 @@ public class ElasticsearchJobProviderTests extends ESTestCase {
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(ElasticsearchJobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearchAnySize("prelertresults-" + jobId, AnomalyRecord.TYPE.getPreferredName(), response, queryBuilder);
.prepareSearchAnySize("prelertresults-" + jobId, Result.TYPE.getPreferredName(), response, queryBuilder);
Client client = clientBuilder.build();
ElasticsearchJobProvider provider = createProvider(client);
@ -596,7 +596,7 @@ public class ElasticsearchJobProviderTests extends ESTestCase {
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(ElasticsearchJobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearchAnySize("prelertresults-" + jobId, AnomalyRecord.TYPE.getPreferredName(), response, queryBuilder);
.prepareSearchAnySize("prelertresults-" + jobId, Result.TYPE.getPreferredName(), response, queryBuilder);
Client client = clientBuilder.build();
ElasticsearchJobProvider provider = createProvider(client);
@ -691,7 +691,8 @@ public class ElasticsearchJobProviderTests extends ESTestCase {
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(ElasticsearchJobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, Influencer.TYPE.getPreferredName(), from, size, response, queryBuilder);
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(),
from, size, response, queryBuilder);
Client client = clientBuilder.build();
ElasticsearchJobProvider provider = createProvider(client);
@ -751,7 +752,8 @@ public class ElasticsearchJobProviderTests extends ESTestCase {
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(ElasticsearchJobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, Influencer.TYPE.getPreferredName(), from, size, response, queryBuilder);
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response,
queryBuilder);
Client client = clientBuilder.build();
ElasticsearchJobProvider provider = createProvider(client);

View File

@ -18,13 +18,10 @@ import org.elasticsearch.xpack.prelert.job.audit.AuditActivity;
import org.elasticsearch.xpack.prelert.job.audit.AuditMessage;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.prelert.job.results.ReservedFieldNames;
import org.elasticsearch.xpack.prelert.job.results.Result;
import org.elasticsearch.xpack.prelert.job.usage.Usage;
import org.elasticsearch.xpack.prelert.lists.ListDocument;
@ -38,6 +35,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@ -48,15 +46,15 @@ public class ElasticsearchMappingsTests extends ESTestCase {
JsonToken token = parser.nextToken();
while (token != null && token != JsonToken.END_OBJECT) {
switch (token) {
case START_OBJECT:
parseJson(parser, expected);
break;
case FIELD_NAME:
String fieldName = parser.getCurrentName();
expected.add(fieldName);
break;
default:
break;
case START_OBJECT:
parseJson(parser, expected);
break;
case FIELD_NAME:
String fieldName = parser.getCurrentName();
expected.add(fieldName);
break;
default:
break;
}
token = parser.nextToken();
}
@ -85,16 +83,13 @@ public class ElasticsearchMappingsTests extends ESTestCase {
overridden.add(ElasticsearchMappings.WHITESPACE);
// These are not reserved because they're data types, not field names
overridden.add(AnomalyRecord.TYPE.getPreferredName());
overridden.add(Result.TYPE.getPreferredName());
overridden.add(AuditActivity.TYPE.getPreferredName());
overridden.add(AuditMessage.TYPE.getPreferredName());
overridden.add(Bucket.TYPE.getPreferredName());
overridden.add(DataCounts.TYPE.getPreferredName());
overridden.add(ReservedFieldNames.BUCKET_PROCESSING_TIME_TYPE);
overridden.add(BucketInfluencer.TYPE.getPreferredName());
overridden.add(CategorizerState.TYPE);
overridden.add(CategoryDefinition.TYPE.getPreferredName());
overridden.add(Influencer.TYPE.getPreferredName());
overridden.add(Job.TYPE);
overridden.add(ListDocument.TYPE.getPreferredName());
overridden.add(ModelDebugOutput.TYPE.getPreferredName());
@ -129,12 +124,7 @@ public class ElasticsearchMappingsTests extends ESTestCase {
parser = new JsonFactory().createParser(inputStream);
parseJson(parser, expected);
builder = ElasticsearchMappings.bucketInfluencerMapping();
inputStream = new BufferedInputStream(new ByteArrayInputStream(builder.string().getBytes(StandardCharsets.UTF_8)));
parser = new JsonFactory().createParser(inputStream);
parseJson(parser, expected);
builder = ElasticsearchMappings.bucketMapping();
builder = ElasticsearchMappings.resultsMapping(Collections.emptyList());
inputStream = new BufferedInputStream(new ByteArrayInputStream(builder.string().getBytes(StandardCharsets.UTF_8)));
parser = new JsonFactory().createParser(inputStream);
parseJson(parser, expected);
@ -159,11 +149,6 @@ public class ElasticsearchMappingsTests extends ESTestCase {
parser = new JsonFactory().createParser(inputStream);
parseJson(parser, expected);
builder = ElasticsearchMappings.influencerMapping(null);
inputStream = new BufferedInputStream(new ByteArrayInputStream(builder.string().getBytes(StandardCharsets.UTF_8)));
parser = new JsonFactory().createParser(inputStream);
parseJson(parser, expected);
builder = ElasticsearchMappings.modelDebugOutputMapping(null);
inputStream = new BufferedInputStream(new ByteArrayInputStream(builder.string().getBytes(StandardCharsets.UTF_8)));
parser = new JsonFactory().createParser(inputStream);
@ -194,17 +179,24 @@ public class ElasticsearchMappingsTests extends ESTestCase {
parser = new JsonFactory().createParser(inputStream);
parseJson(parser, expected);
builder = ElasticsearchMappings.recordMapping(null);
inputStream = new BufferedInputStream(new ByteArrayInputStream(builder.string().getBytes(StandardCharsets.UTF_8)));
parser = new JsonFactory().createParser(inputStream);
parseJson(parser, expected);
builder = ElasticsearchMappings.usageMapping();
inputStream = new BufferedInputStream(new ByteArrayInputStream(builder.string().getBytes(StandardCharsets.UTF_8)));
parser = new JsonFactory().createParser(inputStream);
parseJson(parser, expected);
expected.removeAll(overridden);
if (ReservedFieldNames.RESERVED_FIELD_NAMES.size() != expected.size()) {
Set<String> diff = new HashSet<>(ReservedFieldNames.RESERVED_FIELD_NAMES);
diff.removeAll(expected);
System.out.println("Fields in ReservedFieldNames but not in expected: " + diff);
diff = new HashSet<>(expected);
diff.removeAll(ReservedFieldNames.RESERVED_FIELD_NAMES);
System.out.println("Fields in expected but not in ReservedFieldNames: " + diff);
}
assertEquals(ReservedFieldNames.RESERVED_FIELD_NAMES.size(), expected.size());
for (String s : expected) {
// By comparing like this the failure messages say which string is
// missing

View File

@ -10,11 +10,13 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.results.Result;
import org.mockito.ArgumentCaptor;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.util.ArrayList;
@ -35,8 +37,8 @@ public class JobResultsPersisterTests extends ESTestCase {
BulkResponse response = mock(BulkResponse.class);
String responseId = "abcXZY54321";
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME)
.prepareIndex("prelertresults-" + JOB_ID, Bucket.TYPE.getPreferredName(), responseId, captor)
.prepareIndex("prelertresults-" + JOB_ID, BucketInfluencer.TYPE.getPreferredName(), "", captor)
.prepareIndex("prelertresults-" + JOB_ID, Result.TYPE.getPreferredName(), responseId, captor)
.prepareIndex("prelertresults-" + JOB_ID, Result.TYPE.getPreferredName(), "", captor)
.prepareBulk(response);
Client client = clientBuilder.build();
@ -91,7 +93,7 @@ public class JobResultsPersisterTests extends ESTestCase {
ArgumentCaptor<XContentBuilder> captor = ArgumentCaptor.forClass(XContentBuilder.class);
BulkResponse response = mock(BulkResponse.class);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME)
.prepareIndex("prelertresults-" + JOB_ID, AnomalyRecord.TYPE.getPreferredName(), "", captor)
.prepareIndex("prelertresults-" + JOB_ID, Result.TYPE.getPreferredName(), "", captor)
.prepareBulk(response);
Client client = clientBuilder.build();
@ -153,7 +155,7 @@ public class JobResultsPersisterTests extends ESTestCase {
ArgumentCaptor<XContentBuilder> captor = ArgumentCaptor.forClass(XContentBuilder.class);
BulkResponse response = mock(BulkResponse.class);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME)
.prepareIndex("prelertresults-" + JOB_ID, Influencer.TYPE.getPreferredName(), "", captor)
.prepareIndex("prelertresults-" + JOB_ID, Result.TYPE.getPreferredName(), "", captor)
.prepareBulk(response);
Client client = clientBuilder.build();

View File

@ -4,16 +4,21 @@ setup:
index: prelertresults-farequote
body:
mappings:
bucket:
results:
properties:
"jobId":
type: keyword
"result_type":
type: keyword
"timestamp":
type: date
- do:
index:
index: prelertresults-farequote
type: bucket
type: result
id: 1
body: { "jobId": "farequote", "timestamp": "2016-06-01T00:00:00Z" }
body: { "jobId": "farequote", "result_type": "bucket", "timestamp": "2016-06-01T00:00:00Z" }
- do:
indices.refresh:
@ -30,9 +35,10 @@ setup:
- match: { count: 1 }
- match: { buckets.0.timestamp: 1464739200000 }
- match: { buckets.0.jobId: farequote}
- match: { buckets.0.result_type: bucket}
---
"Test result bucket api":
"Test result single bucket api":
- do:
xpack.prelert.get_buckets:
jobId: "farequote"
@ -40,3 +46,4 @@ setup:
- match: { buckets.0.timestamp: 1464739200000}
- match: { buckets.0.jobId: farequote }
- match: { buckets.0.result_type: bucket}

View File

@ -4,7 +4,7 @@ setup:
index: prelertresults-farequote
body:
mappings:
influencer:
result:
properties:
"jobId":
type: keyword
@ -12,17 +12,20 @@ setup:
type: date
"anomalyScore":
type: float
"result_type":
type: keyword
- do:
index:
index: prelertresults-farequote
type: influencer
type: result
id: 1
body:
{
"jobId": "farequote",
"timestamp": "2016-06-01T00:00:00Z",
"influencerFieldName": "foo",
"influencerFieldValue": "bar"
"influencerFieldValue": "bar",
"result_type" : "influencer"
}
- do:

View File

@ -30,9 +30,9 @@ setup:
- do:
index:
index: prelertresults-farequote
type: record
type: result
id: 2
body: { "jobId": "farequote", "timestamp": "2016-06-01T00:00:00Z", "result_type": "record" }
body: { "jobId": "farequote", "result_type": "record", "timestamp": "2016-06-01T00:00:00Z", "result_type": "record" }
- do:
indices.refresh:

View File

@ -50,23 +50,23 @@ setup:
- do:
index:
index: prelertresults-foo
type: bucket
type: result
id: 1
body: { "jobId": "foo", "timestamp": "2016-06-02T00:00:00Z" }
body: { "jobId": "foo", "result_type": "bucket", "timestamp": "2016-06-02T00:00:00Z" }
- do:
index:
index: prelertresults-foo
type: bucket
type: result
id: 2
body: { "jobId": "foo", "timestamp": "2016-06-01T12:00:00Z" }
body: { "jobId": "foo", "result_type": "bucket", "timestamp": "2016-06-01T12:00:00Z" }
- do:
index:
index: prelertresults-foo
type: bucket
type: result
id: 3
body: { "jobId": "foo", "timestamp": "2016-05-01T00:00:00Z" }
body: { "jobId": "foo", "result_type": "bucket", "timestamp": "2016-05-01T00:00:00Z" }
- do:
indices.refresh: