[ML] Single doc type for results index (elastic/x-pack-elasticsearch#1528)

This commit changes all results to use the single doc type.
All searches were adjusted to work without the need to specify
type which ensures BWC with 5.4 results.

Additional work is needed to put the new doc mapping in indices
created with 5.4 but it will be done in separate PR.

Relates elastic/x-pack-elasticsearch#668

Original commit: elastic/x-pack-elasticsearch@041c88ac2d
This commit is contained in:
Dimitris Athanasiou 2017-05-24 13:24:32 +01:00 committed by GitHub
parent 3d057991e0
commit 71fe599592
31 changed files with 498 additions and 546 deletions

View File

@ -30,12 +30,8 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.AuditMessage;
import org.elasticsearch.xpack.ml.notifications.Auditor;
@ -220,8 +216,9 @@ public class MachineLearningTemplateRegistry extends AbstractComponent implemen
templateRequest.patterns(Collections.singletonList(MlMetaIndex.INDEX_NAME));
templateRequest.settings(mlNotificationIndexSettings());
templateRequest.version(Version.CURRENT.id);
try (XContentBuilder defaultMapping = ElasticsearchMappings.defaultMapping()) {
templateRequest.mapping(MapperService.DEFAULT_MAPPING, defaultMapping);
try (XContentBuilder docMapping = MlMetaIndex.docMapping()) {
templateRequest.mapping(MlMetaIndex.TYPE, docMapping);
} catch (IOException e) {
String msg = "Error creating template mappings for the " + MlMetaIndex.INDEX_NAME + " index";
logger.error(msg, e);
@ -255,20 +252,12 @@ public class MachineLearningTemplateRegistry extends AbstractComponent implemen
}
void putJobResultsIndexTemplate(BiConsumer<Boolean, Exception> listener) {
try (XContentBuilder defaultMapping = ElasticsearchMappings.defaultMapping();
XContentBuilder resultsMapping = ElasticsearchMappings.resultsMapping();
XContentBuilder categoryDefinitionMapping = ElasticsearchMappings.categoryDefinitionMapping();
XContentBuilder dataCountsMapping = ElasticsearchMappings.dataCountsMapping();
XContentBuilder modelSnapshotMapping = ElasticsearchMappings.modelSnapshotMapping()) {
try (XContentBuilder docMapping = ElasticsearchMappings.docMapping()) {
PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(AnomalyDetectorsIndex.jobResultsIndexPrefix());
templateRequest.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"));
templateRequest.settings(mlResultsIndexSettings());
templateRequest.mapping(MapperService.DEFAULT_MAPPING, defaultMapping);
templateRequest.mapping(Result.TYPE.getPreferredName(), resultsMapping);
templateRequest.mapping(CategoryDefinition.TYPE.getPreferredName(), categoryDefinitionMapping);
templateRequest.mapping(DataCounts.TYPE.getPreferredName(), dataCountsMapping);
templateRequest.mapping(ModelSnapshot.TYPE.getPreferredName(), modelSnapshotMapping);
templateRequest.mapping(ElasticsearchMappings.DOC_TYPE, docMapping);
templateRequest.version(Version.CURRENT.id);
client.admin().indices().putTemplate(templateRequest,

View File

@ -5,6 +5,13 @@
*/
package org.elasticsearch.xpack.ml;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings;
import java.io.IOException;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public final class MlMetaIndex {
/**
* Where to store the ml info in Elasticsearch - must match what's
@ -15,4 +22,14 @@ public final class MlMetaIndex {
public static final String TYPE = "doc";
private MlMetaIndex() {}
public static XContentBuilder docMapping() throws IOException {
XContentBuilder builder = jsonBuilder();
builder.startObject();
builder.startObject(TYPE);
ElasticsearchMappings.addDefaultMapping(builder);
builder.endObject();
builder.endObject();
return builder;
}
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.xpack.ml.MlMetaIndex;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
@ -178,7 +179,7 @@ public class DeleteFilterAction extends Action<DeleteFilterAction.Request, Delet
+ currentlyUsedBy);
}
DeleteRequest deleteRequest = new DeleteRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, filterId);
DeleteRequest deleteRequest = new DeleteRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, MlFilter.documentId(filterId));
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(deleteRequest);
transportAction.execute(bulkRequest, new ActionListener<BulkResponse>() {

View File

@ -245,7 +245,7 @@ public class GetFiltersAction extends Action<GetFiltersAction.Request, GetFilter
}
private void getFilter(String filterId, ActionListener<Response> listener) {
GetRequest getRequest = new GetRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, filterId);
GetRequest getRequest = new GetRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, MlFilter.documentId(filterId));
transportGetAction.execute(getRequest, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getDocResponse) {

View File

@ -174,8 +174,7 @@ public class PutFilterAction extends Action<PutFilterAction.Request, PutFilterAc
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
MlFilter filter = request.getFilter();
final String filterId = filter.getId();
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, filterId);
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, filter.documentId());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
Payload.XContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(MlFilter.INCLUDE_TYPE_KEY, "true"));
indexRequest.source(filter.toXContent(builder, params));
@ -192,7 +191,7 @@ public class PutFilterAction extends Action<PutFilterAction.Request, PutFilterAc
@Override
public void onFailure(Exception e) {
listener.onFailure(new ResourceNotFoundException("Could not create filter with ID [" + filterId + "]", e));
listener.onFailure(new ResourceNotFoundException("Could not create filter with ID [" + filter.getId() + "]", e));
}
});
}

View File

@ -12,6 +12,10 @@ import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.ElasticsearchClient;
@ -26,13 +30,14 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.results.Result;
@ -40,6 +45,7 @@ import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Objects;
import java.util.function.Consumer;
public class UpdateModelSnapshotAction extends Action<UpdateModelSnapshotAction.Request,
UpdateModelSnapshotAction.Response, UpdateModelSnapshotAction.RequestBuilder> {
@ -257,15 +263,15 @@ public class UpdateModelSnapshotAction extends Action<UpdateModelSnapshotAction.
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final JobManager jobManager;
private final JobProvider jobProvider;
private final TransportBulkAction transportBulkAction;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager, JobProvider jobProvider) {
IndexNameExpressionResolver indexNameExpressionResolver, JobProvider jobProvider, TransportBulkAction transportBulkAction) {
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
this.jobManager = jobManager;
this.jobProvider = jobProvider;
this.transportBulkAction = transportBulkAction;
}
@Override
@ -277,7 +283,7 @@ public class UpdateModelSnapshotAction extends Action<UpdateModelSnapshotAction.
Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getSnapshotId(), request.getJobId())));
} else {
Result<ModelSnapshot> updatedSnapshot = applyUpdate(request, modelSnapshot);
jobManager.updateModelSnapshot(updatedSnapshot, b -> {
indexModelSnapshot(updatedSnapshot, b -> {
// The quantiles can be large, and totally dominate the output -
// it's clearer to remove them
listener.onResponse(new Response(new ModelSnapshot.Builder(updatedSnapshot.result).setQuantiles(null).build()));
@ -296,5 +302,29 @@ public class UpdateModelSnapshotAction extends Action<UpdateModelSnapshotAction.
}
return new Result(target.index, updatedSnapshotBuilder.build());
}
private void indexModelSnapshot(Result<ModelSnapshot> modelSnapshot, Consumer<Boolean> handler, Consumer<Exception> errorHandler) {
IndexRequest indexRequest = new IndexRequest(modelSnapshot.index, ElasticsearchMappings.DOC_TYPE,
ModelSnapshot.documentId(modelSnapshot.result));
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
modelSnapshot.result.toXContent(builder, ToXContent.EMPTY_PARAMS);
indexRequest.source(builder);
} catch (IOException e) {
errorHandler.accept(e);
return;
}
BulkRequest bulkRequest = new BulkRequest().add(indexRequest);
transportBulkAction.execute(bulkRequest, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse indexResponse) {
handler.accept(true);
}
@Override
public void onFailure(Exception e) {
errorHandler.accept(e);
}
});
}
}
}

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.job;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
@ -21,9 +20,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.PutJobAction;
@ -38,12 +34,10 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import java.io.IOException;
import java.util.Collections;
import java.util.Date;
import java.util.List;
@ -404,24 +398,6 @@ public class JobManager extends AbstractComponent {
});
}
/**
* Update a persisted model snapshot metadata document to match the
* argument supplied.
*
* @param modelSnapshot the updated model snapshot object to be stored
*/
public void updateModelSnapshot(Result<ModelSnapshot> modelSnapshot, Consumer<Boolean> handler, Consumer<Exception> errorHandler) {
IndexRequest indexRequest = new IndexRequest(modelSnapshot.index, ModelSnapshot.TYPE.getPreferredName(),
ModelSnapshot.documentId(modelSnapshot.result));
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
modelSnapshot.result.toXContent(builder, ToXContent.EMPTY_PARAMS);
indexRequest.source(builder);
} catch (IOException e) {
errorHandler.accept(e);
}
client.index(indexRequest, ActionListener.wrap(r -> handler.accept(true), errorHandler));
}
private static MlMetadata.Builder createMlMetadataBuilder(ClusterState currentState) {
MlMetadata currentMlMetadata = currentState.metaData().custom(MlMetadata.TYPE);
return new MlMetadata.Builder(currentMlMetadata);

View File

@ -23,6 +23,8 @@ import java.util.Objects;
public class MlFilter extends ToXContentToBytes implements Writeable {
public static final String DOCUMENT_ID_PREFIX = "filter_";
public static final String INCLUDE_TYPE_KEY = "include_type";
public static final String FILTER_TYPE = "filter";
@ -99,6 +101,14 @@ public class MlFilter extends ToXContentToBytes implements Writeable {
return Objects.hash(id, items);
}
public String documentId() {
return documentId(id);
}
public static String documentId(String filterId) {
return DOCUMENT_ID_PREFIX + filterId;
}
public static class Builder {
private String id;

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
@ -52,6 +51,9 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
* using whitespace.
*/
public class ElasticsearchMappings {
public static final String DOC_TYPE = "doc";
/**
* String constants used in mappings
*/
@ -97,14 +99,10 @@ public class ElasticsearchMappings {
* so that the per-job term fields will not be automatically added
* as fields of type 'text' to the index mappings of newly rolled indices.
*
* @return The default mapping
* @throws IOException On write error
*/
public static XContentBuilder defaultMapping() throws IOException {
return jsonBuilder()
.startObject()
.startObject(MapperService.DEFAULT_MAPPING)
.startArray("dynamic_templates")
public static void addDefaultMapping(XContentBuilder builder) throws IOException {
builder.startArray("dynamic_templates")
.startObject()
.startObject("strings_as_keywords")
.field("match", "*")
@ -113,9 +111,44 @@ public class ElasticsearchMappings {
.endObject()
.endObject()
.endObject()
.endArray()
.endObject()
.endArray();
}
public static XContentBuilder docMapping() throws IOException {
XContentBuilder builder = jsonBuilder();
builder.startObject();
builder.startObject(DOC_TYPE);
addDefaultMapping(builder);
builder.startObject(PROPERTIES);
// Add result all field for easy searches in kibana
builder.startObject(ALL_FIELD_VALUES)
.field(TYPE, TEXT)
.field(ANALYZER, WHITESPACE)
.endObject();
builder.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD)
.field(COPY_TO, ALL_FIELD_VALUES)
.endObject();
builder.startObject(Result.TIMESTAMP.getPreferredName())
.field(TYPE, DATE)
.endObject();
addResultsMapping(builder);
addCategoryDefinitionMapping(builder);
addDataCountsMapping(builder);
addModelSnapshotMapping(builder);
// end properties
builder.endObject();
// end mapping
builder.endObject();
// end doc
builder.endObject();
return builder;
}
/**
@ -142,28 +175,12 @@ public class ElasticsearchMappings {
* <li>Influencer.influencer_field_value</li>
* </ul>
*
* @return The mapping
* @throws IOException On write error
*/
public static XContentBuilder resultsMapping() throws IOException {
XContentBuilder builder = jsonBuilder()
.startObject()
.startObject(Result.TYPE.getPreferredName())
.startObject(PROPERTIES)
.startObject(ALL_FIELD_VALUES)
.field(TYPE, TEXT)
.field(ANALYZER, WHITESPACE)
.endObject()
.startObject(Result.RESULT_TYPE.getPreferredName())
private static void addResultsMapping(XContentBuilder builder) throws IOException {
builder.startObject(Result.RESULT_TYPE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD)
.field(COPY_TO, ALL_FIELD_VALUES)
.endObject()
.startObject(Result.TIMESTAMP.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(Bucket.ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
@ -272,15 +289,6 @@ public class ElasticsearchMappings {
addAnomalyRecordFieldsToMapping(builder);
addInfluencerFieldsToMapping(builder);
addModelSizeStatsFieldsToMapping(builder);
// End result properties
builder.endObject();
// End result
builder.endObject();
// End mapping
builder.endObject();
return builder;
}
static XContentBuilder termFieldsMapping(String type, Collection<String> termFields) {
@ -312,11 +320,9 @@ public class ElasticsearchMappings {
/**
* AnomalyRecord fields to be added under the 'properties' section of the mapping
* @param builder Add properties to this builder
* @return builder
* @throws IOException On write error
*/
private static XContentBuilder addAnomalyRecordFieldsToMapping(XContentBuilder builder)
throws IOException {
private static void addAnomalyRecordFieldsToMapping(XContentBuilder builder) throws IOException {
builder.startObject(AnomalyRecord.DETECTOR_INDEX.getPreferredName())
.field(TYPE, INTEGER)
.endObject()
@ -426,11 +432,9 @@ public class ElasticsearchMappings {
.endObject()
.endObject()
.endObject();
return builder;
}
private static XContentBuilder addInfluencerFieldsToMapping(XContentBuilder builder) throws IOException {
private static void addInfluencerFieldsToMapping(XContentBuilder builder) throws IOException {
builder.startObject(Influencer.INFLUENCER_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
@ -444,8 +448,6 @@ public class ElasticsearchMappings {
.field(TYPE, KEYWORD)
.field(COPY_TO, ALL_FIELD_VALUES)
.endObject();
return builder;
}
/**
@ -453,18 +455,10 @@ public class ElasticsearchMappings {
* The type is disabled so {@link DataCounts} aren't searchable and
* the '_all' field is disabled
*
* @return The builder
* @throws IOException On builder write error
*/
public static XContentBuilder dataCountsMapping() throws IOException {
return jsonBuilder()
.startObject()
.startObject(DataCounts.TYPE.getPreferredName())
.startObject(PROPERTIES)
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(DataCounts.PROCESSED_RECORD_COUNT.getPreferredName())
private static void addDataCountsMapping(XContentBuilder builder) throws IOException {
builder.startObject(DataCounts.PROCESSED_RECORD_COUNT.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataCounts.PROCESSED_FIELD_COUNT.getPreferredName())
@ -511,9 +505,6 @@ public class ElasticsearchMappings {
.endObject()
.startObject(DataCounts.LAST_DATA_TIME.getPreferredName())
.field(TYPE, DATE)
.endObject()
.endObject()
.endObject()
.endObject();
}
@ -554,20 +545,12 @@ public class ElasticsearchMappings {
* Create the Elasticsearch mapping for {@linkplain CategoryDefinition}.
* The '_all' field is disabled as the document isn't meant to be searched.
*
* @return The builder
* @throws IOException On builder error
*/
public static XContentBuilder categoryDefinitionMapping() throws IOException {
return jsonBuilder()
.startObject()
.startObject(CategoryDefinition.TYPE.getPreferredName())
.startObject(PROPERTIES)
.startObject(CategoryDefinition.CATEGORY_ID.getPreferredName())
private static void addCategoryDefinitionMapping(XContentBuilder builder) throws IOException {
builder.startObject(CategoryDefinition.CATEGORY_ID.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(CategoryDefinition.TERMS.getPreferredName())
.field(TYPE, TEXT)
.endObject()
@ -579,9 +562,6 @@ public class ElasticsearchMappings {
.endObject()
.startObject(CategoryDefinition.EXAMPLES.getPreferredName())
.field(TYPE, TEXT)
.endObject()
.endObject()
.endObject()
.endObject();
}
@ -605,18 +585,8 @@ public class ElasticsearchMappings {
* Create the Elasticsearch mapping for {@linkplain ModelSnapshot}.
* The '_all' field is disabled but the type is searchable
*/
public static XContentBuilder modelSnapshotMapping() throws IOException {
XContentBuilder builder = jsonBuilder()
.startObject()
.startObject(ModelSnapshot.TYPE.getPreferredName())
.startObject(PROPERTIES)
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(ModelSnapshot.TIMESTAMP.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(ModelSnapshot.DESCRIPTION.getPreferredName())
private static void addModelSnapshotMapping(XContentBuilder builder) throws IOException {
builder.startObject(ModelSnapshot.DESCRIPTION.getPreferredName())
.field(TYPE, TEXT)
.endObject()
.startObject(ModelSnapshot.SNAPSHOT_ID.getPreferredName())
@ -642,9 +612,12 @@ public class ElasticsearchMappings {
addModelSizeStatsFieldsToMapping(builder);
builder.endObject()
.endObject()
.startObject(Quantiles.TYPE.getPreferredName())
// end model size stats properties
builder.endObject();
// end model size stats mapping
builder.endObject();
builder.startObject(Quantiles.TYPE.getPreferredName())
.field(ENABLED, false)
.endObject()
.startObject(ModelSnapshot.LATEST_RECORD_TIME.getPreferredName())
@ -652,21 +625,15 @@ public class ElasticsearchMappings {
.endObject()
.startObject(ModelSnapshot.LATEST_RESULT_TIME.getPreferredName())
.field(TYPE, DATE)
.endObject()
.endObject()
.endObject()
.endObject();
return builder;
}
/**
* {@link ModelSizeStats} fields to be added under the 'properties' section of the mapping
* @param builder Add properties to this builder
* @return builder
* @throws IOException On write error
*/
private static XContentBuilder addModelSizeStatsFieldsToMapping(XContentBuilder builder) throws IOException {
private static void addModelSizeStatsFieldsToMapping(XContentBuilder builder) throws IOException {
builder.startObject(ModelSizeStats.MODEL_BYTES_FIELD.getPreferredName())
.field(TYPE, LONG)
.endObject()
@ -688,8 +655,6 @@ public class ElasticsearchMappings {
.startObject(ModelSizeStats.LOG_TIME_FIELD.getPreferredName())
.field(TYPE, DATE)
.endObject();
return builder;
}
public static XContentBuilder auditMessageMapping() throws IOException {

View File

@ -47,7 +47,7 @@ public class JobDataCountsPersister extends AbstractComponent {
*/
public void persistDataCounts(String jobId, DataCounts counts, ActionListener<Boolean> listener) {
try (XContentBuilder content = serialiseCounts(counts)) {
client.prepareIndex(AnomalyDetectorsIndex.resultsWriteAlias(jobId), DataCounts.TYPE.getPreferredName(),
client.prepareIndex(AnomalyDetectorsIndex.resultsWriteAlias(jobId), ElasticsearchMappings.DOC_TYPE,
DataCounts.documentId(jobId))
.setSource(content).execute(new ActionListener<IndexResponse>() {
@Override

View File

@ -10,9 +10,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
@ -21,7 +18,9 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState;
@ -61,7 +60,7 @@ public class JobDataDeleter {
}
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()),
ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.documentId(modelSnapshot)));
ElasticsearchMappings.DOC_TYPE, ModelSnapshot.documentId(modelSnapshot)));
}
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
@ -82,11 +81,11 @@ public class JobDataDeleter {
DeleteByQueryHolder deleteByQueryHolder = new DeleteByQueryHolder(AnomalyDetectorsIndex.jobResultsAliasedName(jobId));
deleteByQueryHolder.dbqRequest.setRefresh(true);
RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName());
timeRange.gte(cutoffEpochMs);
QueryBuilder query = QueryBuilders.boolQuery()
.filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName()))
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs));
deleteByQueryHolder.searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
deleteByQueryHolder.searchRequest.types(Result.TYPE.getPreferredName());
deleteByQueryHolder.searchRequest.source(new SearchSourceBuilder().query(timeRange));
deleteByQueryHolder.searchRequest.source(new SearchSourceBuilder().query(query));
client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest, new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
@ -108,7 +107,6 @@ public class JobDataDeleter {
deleteByQueryHolder.dbqRequest.setRefresh(false);
deleteByQueryHolder.searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
deleteByQueryHolder.searchRequest.types(Result.TYPE.getPreferredName());
QueryBuilder qb = QueryBuilders.termQuery(Result.IS_INTERIM.getPreferredName(), true);
deleteByQueryHolder.searchRequest.source(new SearchSourceBuilder().query(new ConstantScoreQueryBuilder(qb)));

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.MapperService;
@ -138,8 +139,9 @@ public class JobProvider {
if (!state.getMetaData().hasIndex(indexName)) {
LOGGER.trace("ES API CALL: create index {}", indexName);
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
String type = Result.TYPE.getPreferredName();
createIndexRequest.mapping(type, ElasticsearchMappings.termFieldsMapping(type, termFields));
try (XContentBuilder termFieldsMapping = ElasticsearchMappings.termFieldsMapping(ElasticsearchMappings.DOC_TYPE, termFields)) {
createIndexRequest.mapping(ElasticsearchMappings.DOC_TYPE, termFieldsMapping);
}
client.admin().indices().create(createIndexRequest,
ActionListener.wrap(
r -> createAliasListener.onResponse(r.isAcknowledged()),
@ -209,8 +211,9 @@ public class JobProvider {
}
private void updateIndexMappingWithTermFields(String indexName, Collection<String> termFields, ActionListener<Boolean> listener) {
client.admin().indices().preparePutMapping(indexName).setType(Result.TYPE.getPreferredName())
.setSource(ElasticsearchMappings.termFieldsMapping(null, termFields))
try (XContentBuilder termFieldsMapping = ElasticsearchMappings.termFieldsMapping(null, termFields)) {
client.admin().indices().preparePutMapping(indexName).setType(ElasticsearchMappings.DOC_TYPE)
.setSource(termFieldsMapping)
.execute(new ActionListener<PutMappingResponse>() {
@Override
public void onResponse(PutMappingResponse putMappingResponse) {
@ -223,6 +226,7 @@ public class JobProvider {
}
});
}
}
/**
* Get the job's data counts
@ -251,12 +255,11 @@ public class JobProvider {
MultiSearchRequestBuilder msearch = client.prepareMultiSearch()
.add(createLatestDataCountsSearch(resultsIndex, jobId))
.add(createLatestModelSizeStatsSearch(resultsIndex))
.add(createDocIdSearch(resultsIndex, ModelSnapshot.TYPE.getPreferredName(),
ModelSnapshot.documentId(jobId, job.getModelSnapshotId())))
.add(createDocIdSearch(stateIndex, Quantiles.TYPE.getPreferredName(), Quantiles.documentId(jobId)));
.add(createDocIdSearch(resultsIndex, ModelSnapshot.documentId(jobId, job.getModelSnapshotId())))
.add(createDocIdSearch(stateIndex, Quantiles.documentId(jobId)));
for (String filterId : job.getAnalysisConfig().extractReferencedFilters()) {
msearch.add(createDocIdSearch(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, filterId));
msearch.add(createDocIdSearch(MlMetaIndex.INDEX_NAME, filterId));
}
msearch.execute(ActionListener.wrap(
@ -284,7 +287,7 @@ public class JobProvider {
SearchRequest searchRequest = msearch.request().requests().get(i);
LOGGER.debug("Found 0 hits for [{}/{}]", searchRequest.indices(), searchRequest.types());
} else if (hitsCount == 1) {
parseAutodetectParamSearchHit(paramsBuilder, hits.getAt(0), errorHandler);
parseAutodetectParamSearchHit(jobId, paramsBuilder, hits.getAt(0), errorHandler);
} else if (hitsCount > 1) {
errorHandler.accept(new IllegalStateException("Expected hits count to be 0 or 1, but got ["
+ hitsCount + "]"));
@ -298,30 +301,30 @@ public class JobProvider {
));
}
private SearchRequestBuilder createDocIdSearch(String index, String type, String id) {
private SearchRequestBuilder createDocIdSearch(String index, String id) {
return client.prepareSearch(index).setSize(1)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(QueryBuilders.idsQuery(type).addIds(id))
.setQuery(QueryBuilders.idsQuery().addIds(id))
.setRouting(id);
}
private void parseAutodetectParamSearchHit(AutodetectParams.Builder paramsBuilder,
SearchHit hit, Consumer<Exception> errorHandler) {
String type = hit.getType();
if (DataCounts.TYPE.getPreferredName().equals(type)) {
private void parseAutodetectParamSearchHit(String jobId, AutodetectParams.Builder paramsBuilder, SearchHit hit,
Consumer<Exception> errorHandler) {
String hitId = hit.getId();
if (DataCounts.documentId(jobId).equals(hitId)) {
paramsBuilder.setDataCounts(parseSearchHit(hit, DataCounts.PARSER, errorHandler));
} else if (Result.TYPE.getPreferredName().equals(type)) {
} else if (hitId.startsWith(ModelSizeStats.documentIdPrefix(jobId))) {
ModelSizeStats.Builder modelSizeStats = parseSearchHit(hit, ModelSizeStats.PARSER, errorHandler);
paramsBuilder.setModelSizeStats(modelSizeStats == null ? null : modelSizeStats.build());
} else if (ModelSnapshot.TYPE.getPreferredName().equals(type)) {
} else if (hitId.startsWith(ModelSnapshot.documentIdPrefix(jobId))) {
ModelSnapshot.Builder modelSnapshot = parseSearchHit(hit, ModelSnapshot.PARSER, errorHandler);
paramsBuilder.setModelSnapshot(modelSnapshot == null ? null : modelSnapshot.build());
} else if (Quantiles.TYPE.getPreferredName().equals(type)) {
} else if (Quantiles.documentId(jobId).equals(hit.getId())) {
paramsBuilder.setQuantiles(parseSearchHit(hit, Quantiles.PARSER, errorHandler));
} else if (MlFilter.TYPE.getPreferredName().equals(type)) {
} else if (hitId.startsWith(MlFilter.DOCUMENT_ID_PREFIX)) {
paramsBuilder.addFilter(parseSearchHit(hit, MlFilter.PARSER, errorHandler).build());
} else {
errorHandler.accept(new IllegalStateException("Unexpected type [" + type + "]"));
errorHandler.accept(new IllegalStateException("Unexpected type [" + hit.getType() + "]"));
}
}
@ -644,7 +647,6 @@ public class JobProvider {
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.indicesOptions(addIgnoreUnavailable(searchRequest.indicesOptions()));
searchRequest.types(Result.TYPE.getPreferredName());
searchRequest.source(new SearchSourceBuilder()
.from(from)
.size(size)
@ -703,7 +705,6 @@ public class JobProvider {
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.indicesOptions(addIgnoreUnavailable(searchRequest.indicesOptions()));
searchRequest.types(Result.TYPE.getPreferredName());
FieldSortBuilder sb = query.getSortField() == null ? SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)
: new FieldSortBuilder(query.getSortField()).order(query.isSortDescending() ? SortOrder.DESC : SortOrder.ASC);
searchRequest.source(new SearchSourceBuilder().query(qb).from(query.getFrom()).size(query.getSize()).sort(sb));
@ -744,8 +745,7 @@ public class JobProvider {
return;
}
String resultsIndex = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
SearchRequestBuilder search = createDocIdSearch(resultsIndex, ModelSnapshot.TYPE.getPreferredName(),
ModelSnapshot.documentId(jobId, modelSnapshotId));
SearchRequestBuilder search = createDocIdSearch(resultsIndex, ModelSnapshot.documentId(jobId, modelSnapshotId));
searchSingleResult(jobId, ModelSnapshot.TYPE.getPreferredName(), search, ModelSnapshot.PARSER,
result -> handler.accept(result.result == null ? null : new Result(result.index, result.result.build())),
errorHandler, () -> null);
@ -814,7 +814,7 @@ public class JobProvider {
.order(sortDescending ? SortOrder.DESC : SortOrder.ASC);
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} with filter after sort from {} size {}",
LOGGER.trace("ES API CALL: search all {}s from index {} sort ascending {} with filter after sort from {} size {}",
ModelSnapshot.TYPE, indexName, sortField, from, size);
SearchRequest searchRequest = new SearchRequest(indexName);
@ -909,7 +909,6 @@ public class JobProvider {
searchResponse = client.prepareSearch(indexName)
.setIndicesOptions(addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS))
.setTypes(Result.TYPE.getPreferredName())
.setQuery(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), ModelPlot.RESULT_TYPE_VALUE))
.setFrom(from).setSize(size)
.get();

View File

@ -17,12 +17,12 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.BucketNormalizable;
import org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.BucketInfluencer;
import org.elasticsearch.xpack.ml.job.results.Result;
import java.io.IOException;
import java.util.List;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings.DOC_TYPE;
/**
@ -66,7 +66,7 @@ public class JobRenormalizedResultsPersister extends AbstractComponent {
public void updateResult(String id, String index, ToXContent resultDoc) {
try (XContentBuilder content = toXContentBuilder(resultDoc)) {
bulkRequest.add(new IndexRequest(index, Result.TYPE.getPreferredName(), id).source(content));
bulkRequest.add(new IndexRequest(index, DOC_TYPE, id).source(content));
} catch (IOException e) {
logger.error("Error serialising result", e);
}

View File

@ -39,6 +39,7 @@ import java.util.List;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings.DOC_TYPE;
/**
* Persists result types, Quantiles etc to Elasticsearch<br>
@ -60,7 +61,6 @@ public class JobResultsPersister extends AbstractComponent {
private final Client client;
public JobResultsPersister(Settings settings, Client client) {
super(settings);
this.client = client;
@ -100,8 +100,7 @@ public class JobResultsPersister extends AbstractComponent {
logger.trace("[{}] ES API CALL: index result type {} to index {} at epoch {}",
jobId, Bucket.RESULT_TYPE_VALUE, indexName, bucketWithoutRecords.getEpoch());
bulkRequest.add(new IndexRequest(indexName, Result.TYPE.getPreferredName(),
bucketWithoutRecords.getId()).source(content));
bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, bucketWithoutRecords.getId()).source(content));
persistBucketInfluencersStandalone(jobId, bucketWithoutRecords.getBucketInfluencers());
} catch (IOException e) {
@ -120,7 +119,7 @@ public class JobResultsPersister extends AbstractComponent {
String id = bucketInfluencer.getId();
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
jobId, BucketInfluencer.RESULT_TYPE_VALUE, indexName, id);
bulkRequest.add(new IndexRequest(indexName, Result.TYPE.getPreferredName(), id).source(content));
bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content));
}
}
}
@ -139,7 +138,7 @@ public class JobResultsPersister extends AbstractComponent {
try (XContentBuilder content = toXContentBuilder(record)) {
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName, record.getId());
bulkRequest.add(new IndexRequest(indexName, Result.TYPE.getPreferredName(), record.getId()).source(content));
bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, record.getId()).source(content));
}
}
} catch (IOException e) {
@ -162,7 +161,7 @@ public class JobResultsPersister extends AbstractComponent {
try (XContentBuilder content = toXContentBuilder(influencer)) {
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
jobId, Influencer.RESULT_TYPE_VALUE, indexName, influencer.getId());
bulkRequest.add(new IndexRequest(indexName, Result.TYPE.getPreferredName(), influencer.getId()).source(content));
bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, influencer.getId()).source(content));
}
}
} catch (IOException e) {
@ -184,7 +183,7 @@ public class JobResultsPersister extends AbstractComponent {
jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp(),
partitionProbabilities.getId());
bulkRequest.add(
new IndexRequest(indexName, Result.TYPE.getPreferredName(), partitionProbabilities.getId()).source(builder));
new IndexRequest(indexName, DOC_TYPE, partitionProbabilities.getId()).source(builder));
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] error serialising bucket per partition max normalized scores",
@ -349,14 +348,14 @@ public class JobResultsPersister extends AbstractComponent {
private final String jobId;
private final ToXContent object;
private final String type;
private final String description;
private final String id;
private WriteRequest.RefreshPolicy refreshPolicy;
Persistable(String jobId, ToXContent object, String type, String id) {
Persistable(String jobId, ToXContent object, String description, String id) {
this.jobId = jobId;
this.object = object;
this.type = type;
this.description = description;
this.id = id;
this.refreshPolicy = WriteRequest.RefreshPolicy.NONE;
}
@ -373,12 +372,15 @@ public class JobResultsPersister extends AbstractComponent {
void persist(String indexName, ActionListener<IndexResponse> listener) {
logCall(indexName);
// TODO no_release: this is a temporary hack until we also switch state index to have doc type in which case
// we can remove this line and use DOC_TYPE directly in the index request
String type = AnomalyDetectorsIndex.jobStateIndexName().equals(indexName) ? description : DOC_TYPE;
try (XContentBuilder content = toXContentBuilder(object)) {
IndexRequest indexRequest = new IndexRequest(indexName, type, id).source(content).setRefreshPolicy(refreshPolicy);
client.index(indexRequest, listener);
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error writing {}", new Object[]{jobId, type}), e);
logger.error(new ParameterizedMessage("[{}] Error writing {}", new Object[]{jobId, description}), e);
IndexResponse.Builder notCreatedResponse = new IndexResponse.Builder();
notCreatedResponse.setCreated(false);
listener.onResponse(notCreatedResponse.build());
@ -387,9 +389,9 @@ public class JobResultsPersister extends AbstractComponent {
private void logCall(String indexName) {
if (id != null) {
logger.trace("[{}] ES API CALL: index type {} to index {} with ID {}", jobId, type, indexName, id);
logger.trace("[{}] ES API CALL: index {} to index {} with ID {}", jobId, description, indexName, id);
} else {
logger.trace("[{}] ES API CALL: index type {} to index {} with auto-generated ID", jobId, type, indexName);
logger.trace("[{}] ES API CALL: index {} to index {} with auto-generated ID", jobId, description, indexName);
}
}
}

View File

@ -143,7 +143,11 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable {
}
public String getId() {
return jobId + "_model_size_stats_" + logTime.getTime();
return documentIdPrefix(jobId) + logTime.getTime();
}
public static String documentIdPrefix(String jobId) {
return jobId + "_model_size_stats_";
}
@Override

View File

@ -274,12 +274,16 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
return stateDocumentIds;
}
public static String documentIdPrefix(String jobId) {
return jobId + "_model_snapshot_";
}
public static String documentId(ModelSnapshot snapshot) {
return documentId(snapshot.getJobId(), snapshot.getSnapshotId());
}
public static String documentId(String jobId, String snapshotId) {
return jobId + "_model_snapshot_" + snapshotId;
return documentIdPrefix(jobId) + snapshotId;
}
public static ModelSnapshot fromJson(BytesReference bytesReference) {

View File

@ -67,12 +67,12 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
searchRequest.types(ModelSnapshot.TYPE.getPreferredName());
QueryBuilder activeSnapshotFilter = QueryBuilders.termQuery(
ModelSnapshot.SNAPSHOT_ID.getPreferredName(), job.getModelSnapshotId());
QueryBuilder retainFilter = QueryBuilders.termQuery(ModelSnapshot.RETAIN.getPreferredName(), true);
QueryBuilder query = createQuery(job.getId(), cutoffEpochMs)
.filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName()))
.mustNot(activeSnapshotFilter)
.mustNot(retainFilter);

View File

@ -86,9 +86,10 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
request.setSlices(5);
searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
searchRequest.types(Result.TYPE.getPreferredName());
QueryBuilder excludeFilter = QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_VALUE);
QueryBuilder query = createQuery(job.getId(), cutoffEpochMs).mustNot(excludeFilter);
QueryBuilder query = createQuery(job.getId(), cutoffEpochMs)
.filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName()))
.mustNot(excludeFilter);
searchRequest.source(new SearchSourceBuilder().query(query));
return request;
}

View File

@ -27,12 +27,8 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.AuditMessage;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.Before;
@ -221,7 +217,7 @@ public class MachineLearningTemplateRegistryTests extends ESTestCase {
assertNotNull(request);
assertEquals(templateRegistry.mlNotificationIndexSettings().build(), request.settings());
assertEquals(1, request.mappings().size());
assertThat(request.mappings().containsKey(MapperService.DEFAULT_MAPPING), is(true));
assertThat(request.mappings().containsKey("doc"), is(true));
assertEquals(Collections.singletonList(MlMetaIndex.INDEX_NAME), request.patterns());
assertEquals(new Integer(Version.CURRENT.id), request.version());
});
@ -262,12 +258,8 @@ public class MachineLearningTemplateRegistryTests extends ESTestCase {
PutIndexTemplateRequest request = captor.getValue();
assertNotNull(request);
assertEquals(templateRegistry.mlResultsIndexSettings().build(), request.settings());
assertTrue(request.mappings().containsKey("_default_"));
assertTrue(request.mappings().containsKey(Result.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(CategoryDefinition.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(DataCounts.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(ModelSnapshot.TYPE.getPreferredName()));
assertEquals(5, request.mappings().size());
assertTrue(request.mappings().containsKey("doc"));
assertEquals(1, request.mappings().size());
assertEquals(Collections.singletonList(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"), request.patterns());
assertEquals(new Integer(Version.CURRENT.id), request.version());
});

View File

@ -150,9 +150,9 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
assertEquals(1, persistedDefinition.count());
assertEquals(categoryDefinition, persistedDefinition.results().get(0));
QueryPage<ModelPlot> persistedmodelPlot = jobProvider.modelPlot(JOB_ID, 0, 100);
assertEquals(1, persistedmodelPlot.count());
assertEquals(modelPlot, persistedmodelPlot.results().get(0));
QueryPage<ModelPlot> persistedModelPlot = jobProvider.modelPlot(JOB_ID, 0, 100);
assertEquals(1, persistedModelPlot.count());
assertEquals(modelPlot, persistedModelPlot.results().get(0));
ModelSizeStats persistedModelSizeStats = getModelSizeStats();
assertEquals(modelSizeStats, persistedModelSizeStats);

View File

@ -113,7 +113,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
// Update snapshot timestamp to force it out of snapshot retention window
String snapshotUpdate = "{ \"timestamp\": " + oneDayAgo + "}";
UpdateRequest updateSnapshotRequest = new UpdateRequest(".ml-anomalies-" + job.getId(), "model_snapshot", snapshotDocId);
UpdateRequest updateSnapshotRequest = new UpdateRequest(".ml-anomalies-" + job.getId(), "doc", snapshotDocId);
updateSnapshotRequest.doc(snapshotUpdate.getBytes(StandardCharsets.UTF_8), XContentType.JSON);
client().execute(UpdateAction.INSTANCE, updateSnapshotRequest).get();
}
@ -136,7 +136,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
retainAllSnapshots("snapshots-retention-with-retain");
long totalModelSizeStatsBeforeDelete = client().prepareSearch("*").setTypes("result")
long totalModelSizeStatsBeforeDelete = client().prepareSearch("*")
.setQuery(QueryBuilders.termQuery("result_type", "model_size_stats"))
.get().getHits().totalHits;
long totalNotificationsCountBeforeDelete = client().prepareSearch(".ml-notifications").get().getHits().totalHits;
@ -175,7 +175,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
assertThat(getRecords("results-and-snapshots-retention").size(), equalTo(0));
assertThat(getModelSnapshots("results-and-snapshots-retention").size(), equalTo(1));
long totalModelSizeStatsAfterDelete = client().prepareSearch("*").setTypes("result")
long totalModelSizeStatsAfterDelete = client().prepareSearch("*")
.setQuery(QueryBuilders.termQuery("result_type", "model_size_stats"))
.get().getHits().totalHits;
long totalNotificationsCountAfterDelete = client().prepareSearch(".ml-notifications").get().getHits().totalHits;
@ -199,8 +199,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
private void retainAllSnapshots(String jobId) throws Exception {
List<ModelSnapshot> modelSnapshots = getModelSnapshots(jobId);
for (ModelSnapshot modelSnapshot : modelSnapshots) {
UpdateModelSnapshotAction.Request request = new UpdateModelSnapshotAction.Request(
jobId, modelSnapshot.getSnapshotId());
UpdateModelSnapshotAction.Request request = new UpdateModelSnapshotAction.Request(jobId, modelSnapshot.getSnapshotId());
request.setRetain(true);
client().execute(UpdateModelSnapshotAction.INSTANCE, request).get();
}

View File

@ -171,7 +171,6 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
// are what we expect them to be:
private static DataCounts getDataCountsFromIndex(String jobId) {
SearchResponse searchResponse = client().prepareSearch()
.setTypes(DataCounts.TYPE.getPreferredName())
.setQuery(QueryBuilders.idsQuery().addIds(DataCounts.documentId(jobId)))
.get();
if (searchResponse.getHits().getTotalHits() != 1) {

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.job;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterName;
@ -15,9 +14,6 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.MlMetadata;
@ -28,15 +24,11 @@ import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshotTests;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import java.io.IOException;
import java.util.Collections;
import java.util.Date;
@ -163,28 +155,6 @@ public class JobManagerTests extends ESTestCase {
});
}
public void testUpdateModelSnapshot() {
ArgumentCaptor<IndexRequest> indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class);
doAnswer(invocationOnMock -> null).when(client).index(indexRequestCaptor.capture(), any());
ModelSnapshot modelSnapshot = ModelSnapshotTests.createRandomized();
JobManager jobManager = createJobManager();
jobManager.updateModelSnapshot(new Result("snapshot-index", modelSnapshot), response -> {}, error -> {});
IndexRequest indexRequest = indexRequestCaptor.getValue();
assertThat(indexRequest.index(), equalTo("snapshot-index"));
// Assert snapshot was correctly serialised in the request by parsing it back and comparing to original
try (XContentParser parser = XContentFactory.xContent(indexRequest.source()).createParser(NamedXContentRegistry.EMPTY,
indexRequest.source())) {
ModelSnapshot requestSnapshot = ModelSnapshot.PARSER.apply(parser, null).build();
assertThat(requestSnapshot, equalTo(modelSnapshot));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private Job.Builder createJob() {
Detector.Builder d1 = new Detector.Builder("info_content", "domain");
d1.setOverFieldName("client");

View File

@ -13,6 +13,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
public class MlFilterTests extends AbstractSerializingTestCase<MlFilter> {
@Override
@ -46,4 +48,7 @@ public class MlFilterTests extends AbstractSerializingTestCase<MlFilter> {
assertEquals(MlFilter.ITEMS.getPreferredName() + " must not be null", ex.getMessage());
}
public void testDocumentId() {
assertThat(MlFilter.documentId("foo"), equalTo("filter_foo"));
}
}

View File

@ -33,30 +33,8 @@ import java.util.Set;
public class ElasticsearchMappingsTests extends ESTestCase {
private void parseJson(JsonParser parser, Set<String> expected) throws IOException {
try {
JsonToken token = parser.nextToken();
while (token != null && token != JsonToken.END_OBJECT) {
switch (token) {
case START_OBJECT:
parseJson(parser, expected);
break;
case FIELD_NAME:
String fieldName = parser.getCurrentName();
expected.add(fieldName);
break;
default:
break;
}
token = parser.nextToken();
}
} catch (JsonParseException e) {
fail("Cannot parse JSON: " + e);
}
}
public void testReservedFields()
throws IOException, ClassNotFoundException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
public void testReservedFields() throws Exception {
Set<String> overridden = new HashSet<>();
// These are not reserved because they're Elasticsearch keywords, not
@ -78,30 +56,7 @@ public class ElasticsearchMappingsTests extends ESTestCase {
overridden.add(ModelSnapshot.TYPE.getPreferredName());
overridden.add(Quantiles.TYPE.getPreferredName());
Set<String> expected = new HashSet<>();
// Only the mappings for the results index should be added below. Do NOT add mappings for other indexes here.
XContentBuilder builder = ElasticsearchMappings.resultsMapping();
BufferedInputStream inputStream =
new BufferedInputStream(new ByteArrayInputStream(builder.string().getBytes(StandardCharsets.UTF_8)));
JsonParser parser = new JsonFactory().createParser(inputStream);
parseJson(parser, expected);
builder = ElasticsearchMappings.categoryDefinitionMapping();
inputStream = new BufferedInputStream(new ByteArrayInputStream(builder.string().getBytes(StandardCharsets.UTF_8)));
parser = new JsonFactory().createParser(inputStream);
parseJson(parser, expected);
builder = ElasticsearchMappings.dataCountsMapping();
inputStream = new BufferedInputStream(new ByteArrayInputStream(builder.string().getBytes(StandardCharsets.UTF_8)));
parser = new JsonFactory().createParser(inputStream);
parseJson(parser, expected);
builder = ElasticsearchMappings.modelSnapshotMapping();
inputStream = new BufferedInputStream(new ByteArrayInputStream(builder.string().getBytes(StandardCharsets.UTF_8)));
parser = new JsonFactory().createParser(inputStream);
parseJson(parser, expected);
Set<String> expected = collectResultsDocFieldNames();
expected.removeAll(overridden);
@ -147,4 +102,41 @@ public class ElasticsearchMappingsTests extends ESTestCase {
instanceMapping = (Map<String, Object>) properties.get(AnomalyRecord.BUCKET_SPAN.getPreferredName());
assertNull(instanceMapping);
}
private Set<String> collectResultsDocFieldNames() throws IOException {
// Only the mappings for the results index should be added below. Do NOT add mappings for other indexes here.
XContentBuilder builder = ElasticsearchMappings.docMapping();
BufferedInputStream inputStream =
new BufferedInputStream(new ByteArrayInputStream(builder.string().getBytes(StandardCharsets.UTF_8)));
JsonParser parser = new JsonFactory().createParser(inputStream);
Set<String> fieldNames = new HashSet<>();
boolean isAfterPropertiesStart = false;
try {
JsonToken token = parser.nextToken();
while (token != null) {
switch (token) {
case START_OBJECT:
break;
case FIELD_NAME:
String fieldName = parser.getCurrentName();
if (isAfterPropertiesStart) {
fieldNames.add(fieldName);
} else {
if (ElasticsearchMappings.PROPERTIES.equals(fieldName)) {
isAfterPropertiesStart = true;
}
}
break;
default:
break;
}
token = parser.nextToken();
}
} catch (JsonParseException e) {
fail("Cannot parse JSON: " + e);
}
return fieldNames;
}
}

View File

@ -28,7 +28,7 @@ setup:
- do:
index:
index: .ml-anomalies-delete-model-snapshot
type: model_snapshot
type: doc
id: "delete-model-snapshot_model_snapshot_inactive-snapshot"
body: >
{
@ -64,7 +64,7 @@ setup:
- do:
index:
index: .ml-anomalies-delete-model-snapshot
type: model_snapshot
type: doc
id: "delete-model-snapshot_model_snapshot_active-snapshot"
body: >
{

View File

@ -5,7 +5,7 @@ setup:
index:
index: .ml-meta
type: doc
id: imposter-filter
id: filter_imposter-filter
body: >
{
"filter_id": "imposter",

View File

@ -19,7 +19,7 @@
- do:
index:
index: .ml-anomalies-shared
type: result
type: doc
id: "new_doc"
body: >
{
@ -33,6 +33,6 @@
- do:
indices.get_field_mapping:
index: .ml-anomalies-shared
type: result
type: doc
fields: new_field
- match: {\.ml-anomalies-shared.mappings.result.new_field.mapping.new_field.type: keyword}
- match: {\.ml-anomalies-shared.mappings.doc.new_field.mapping.new_field.type: keyword}

View File

@ -80,7 +80,7 @@ setup:
- do:
get:
index: .ml-anomalies-post-data-job
type: data_counts
type: doc
id: post-data-job_data_counts
- match: { _source.processed_record_count: 2 }

View File

@ -28,7 +28,7 @@ setup:
- do:
index:
index: .ml-anomalies-revert-model-snapshot
type: model_snapshot
type: doc
id: "revert-model-snapshot_model_snapshot_first"
body: >
{
@ -53,7 +53,7 @@ setup:
- do:
index:
index: .ml-anomalies-revert-model-snapshot
type: model_snapshot
type: doc
id: "revert-model-snapshot_model_snapshot_second"
body: >
{
@ -78,7 +78,7 @@ setup:
- do:
index:
index: .ml-anomalies-revert-model-snapshot
type: result
type: doc
id: "revert-model-snapshot_1464825600000_1"
body: >
{
@ -91,7 +91,7 @@ setup:
- do:
index:
index: .ml-anomalies-revert-model-snapshot
type: result
type: doc
id: "revert-model-snapshot_1464782400000_1"
body: >
{
@ -104,7 +104,7 @@ setup:
- do:
index:
index: .ml-anomalies-revert-model-snapshot
type: result
type: doc
id: "revert-model-snapshot_1462060800000_1"
body: >
{
@ -117,7 +117,7 @@ setup:
- do:
index:
index: .ml-anomalies-revert-model-snapshot
type: result
type: doc
id: "revert-model-snapshot_1464825600000_1_1"
body: >
{
@ -130,7 +130,7 @@ setup:
- do:
index:
index: .ml-anomalies-revert-model-snapshot
type: result
type: doc
id: "revert-model-snapshot_1462060800000_1_2"
body: >
{
@ -143,7 +143,7 @@ setup:
- do:
index:
index: .ml-anomalies-revert-model-snapshot
type: result
type: doc
id: "revert-model-snapshot_1464825600000_1_3"
body: {
"job_id": "revert-model-snapshot",
@ -158,7 +158,7 @@ setup:
- do:
index:
index: .ml-anomalies-revert-model-snapshot
type: result
type: doc
id: "revert-model-snapshot_1462060800000_1_4"
body:
{

View File

@ -16,7 +16,7 @@ setup:
- do:
index:
index: .ml-anomalies-update-model-snapshot
type: model_snapshot
type: doc
id: "update-model-snapshot_model_snapshot_snapshot-1"
body: >
{
@ -30,7 +30,7 @@ setup:
- do:
index:
index: .ml-anomalies-update-model-snapshot
type: model_snapshot
type: doc
id: "update-model-snapshot_model_snapshot_snapshot-2"
body: >
{