[ML] Switch state to use _type "doc" (elastic/x-pack-elasticsearch#1552)

This commit means that newly created ML state indices will have a single
type named "doc", and newly persisted state documents will have type
"doc" too.

Retrieving state is only supported for type "doc".

When deleting state, documents with the old types are deleted in addition
to those with type "doc".  This means jobs created by the beta can be fully
deleted.

Relates elastic/x-pack-elasticsearch#668

Original commit: elastic/x-pack-elasticsearch@29c07d40f1
This commit is contained in:
David Roberts 2017-05-26 10:51:29 +01:00 committed by GitHub
parent bb71839b85
commit fffe424625
16 changed files with 328 additions and 189 deletions

View File

@ -230,16 +230,11 @@ public class MachineLearningTemplateRegistry extends AbstractComponent implemen
}
void putJobStateIndexTemplate(BiConsumer<Boolean, Exception> listener) {
try (XContentBuilder categorizerStateMapping = ElasticsearchMappings.categorizerStateMapping();
XContentBuilder quantilesMapping = ElasticsearchMappings.quantilesMapping();
XContentBuilder modelStateMapping = ElasticsearchMappings.modelStateMapping()) {
try (XContentBuilder stateMapping = ElasticsearchMappings.stateMapping()) {
PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(AnomalyDetectorsIndex.jobStateIndexName());
templateRequest.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName()));
templateRequest.settings(mlStateIndexSettings());
templateRequest.mapping(CategorizerState.TYPE, categorizerStateMapping);
templateRequest.mapping(Quantiles.TYPE.getPreferredName(), quantilesMapping);
templateRequest.mapping(ModelState.TYPE.getPreferredName(), modelStateMapping);
templateRequest.mapping(ElasticsearchMappings.DOC_TYPE, stateMapping);
templateRequest.version(Version.CURRENT.id);
client.admin().indices().putTemplate(templateRequest,

View File

@ -7,12 +7,9 @@ package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.results.AnomalyCause;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.Bucket;
@ -494,39 +491,6 @@ public class ElasticsearchMappings {
.endObject();
}
/**
* {@link CategorizerState} mapping.
* The type is disabled so {@link CategorizerState} is not searchable and
* the '_all' field is disabled
*
* @return The builder
* @throws IOException On builder write error
*/
public static XContentBuilder categorizerStateMapping() throws IOException {
return jsonBuilder()
.startObject()
.startObject(CategorizerState.TYPE)
.field(ENABLED, false)
.endObject()
.endObject();
}
/**
* Create the Elasticsearch mapping for {@linkplain Quantiles}.
* The type is disabled as is the '_all' field as the document isn't meant to be searched.
* <p>
* The quantile state string is not searchable (enabled = false) as it could be
* very large.
*/
public static XContentBuilder quantilesMapping() throws IOException {
return jsonBuilder()
.startObject()
.startObject(Quantiles.TYPE.getPreferredName())
.field(ENABLED, false)
.endObject()
.endObject();
}
/**
* Create the Elasticsearch mapping for {@linkplain CategoryDefinition}.
* The '_all' field is disabled as the document isn't meant to be searched.
@ -552,16 +516,15 @@ public class ElasticsearchMappings {
}
/**
* Create the Elasticsearch mapping for {@linkplain ModelState}.
* The model state could potentially be huge (over a gigabyte in size)
* so all analysis by Elasticsearch is disabled. The only way to
* retrieve the model state is by knowing the ID of a particular
* document or by searching for all documents of this type.
* Create the Elasticsearch mapping for state. State could potentially be
* huge (target document size is 16MB and there can be many documents) so all
* analysis by Elasticsearch is disabled. The only way to retrieve state is
* by knowing the ID of a particular document.
*/
public static XContentBuilder modelStateMapping() throws IOException {
public static XContentBuilder stateMapping() throws IOException {
return jsonBuilder()
.startObject()
.startObject(ModelState.TYPE.getPreferredName())
.startObject(DOC_TYPE)
.field(ENABLED, false)
.endObject()
.endObject();
@ -603,7 +566,7 @@ public class ElasticsearchMappings {
// end model size stats mapping
builder.endObject();
builder.startObject(Quantiles.TYPE.getPreferredName())
builder.startObject(ModelSnapshot.QUANTILES.getPreferredName())
.field(ENABLED, false)
.endObject()
.startObject(ModelSnapshot.LATEST_RECORD_TIME.getPreferredName())

View File

@ -56,11 +56,18 @@ public class JobDataDeleter {
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
for (ModelSnapshot modelSnapshot : modelSnapshots) {
for (String stateDocId : modelSnapshot.stateDocumentIds()) {
bulkRequestBuilder.add(client.prepareDelete(stateIndexName, ModelState.TYPE.getPreferredName(), stateDocId));
bulkRequestBuilder.add(client.prepareDelete(stateIndexName, ElasticsearchMappings.DOC_TYPE, stateDocId));
}
// TODO: remove in 7.0
for (String stateDocId : modelSnapshot.legacyStateDocumentIds()) {
bulkRequestBuilder.add(client.prepareDelete(stateIndexName, ModelState.TYPE, stateDocId));
}
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()),
ElasticsearchMappings.DOC_TYPE, ModelSnapshot.documentId(modelSnapshot)));
// TODO: remove in 7.0
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()),
ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.legacyDocumentId(modelSnapshot)));
}
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

View File

@ -65,7 +65,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.Bucket;
@ -253,6 +252,8 @@ public class JobProvider {
MultiSearchRequestBuilder msearch = client.prepareMultiSearch()
.add(createLatestDataCountsSearch(resultsIndex, jobId))
.add(createLatestModelSizeStatsSearch(resultsIndex))
// These next two document IDs never need to be the legacy ones due to the rule
// that you cannot open a 5.4 job in a subsequent version of the product
.add(createDocIdSearch(resultsIndex, ModelSnapshot.documentId(jobId, job.getModelSnapshotId())))
.add(createDocIdSearch(stateIndex, Quantiles.documentId(jobId)));
@ -523,8 +524,8 @@ public class JobProvider {
}
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} from {} size {}",
CategoryDefinition.TYPE.getPreferredName(), indexName, CategoryDefinition.CATEGORY_ID.getPreferredName(), from, size);
LOGGER.trace("ES API CALL: search all of category definitions from index {} sort ascending {} from {} size {}",
indexName, CategoryDefinition.CATEGORY_ID.getPreferredName(), from, size);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.indicesOptions(addIgnoreUnavailable(searchRequest.indicesOptions()));
@ -605,8 +606,8 @@ public class JobProvider {
searchRequest.source().sort(sortField, descending ? SortOrder.DESC : SortOrder.ASC);
}
LOGGER.trace("ES API CALL: search all of result type {} from index {}{}{} with filter after sort from {} size {}",
AnomalyRecord.RESULT_TYPE_VALUE, indexName, (sb != null) ? " with sort" : "",
LOGGER.trace("ES API CALL: search all of records from index {}{}{} with filter after sort from {} size {}",
indexName, (sb != null) ? " with sort" : "",
secondarySort.isEmpty() ? "" : " with secondary sort", from, size);
client.search(searchRequest, ActionListener.wrap(searchResponse -> {
List<AnomalyRecord> results = new ArrayList<>();
@ -639,8 +640,7 @@ public class JobProvider {
.build();
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
LOGGER.trace("ES API CALL: search all of result type {} from index {}{} with filter from {} size {}",
() -> Influencer.RESULT_TYPE_VALUE, () -> indexName,
LOGGER.trace("ES API CALL: search all of influencers from index {}{} with filter from {} size {}", () -> indexName,
() -> (query.getSortField() != null) ?
" with sort " + (query.isSortDescending() ? "descending" : "ascending") + " on field " + query.getSortField() : "",
query::getFrom, query::getSize);
@ -760,8 +760,8 @@ public class JobProvider {
.order(sortDescending ? SortOrder.DESC : SortOrder.ASC);
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
LOGGER.trace("ES API CALL: search all {}s from index {} sort ascending {} with filter after sort from {} size {}",
ModelSnapshot.TYPE, indexName, sortField, from, size);
LOGGER.trace("ES API CALL: search all model snapshots from index {} sort ascending {} with filter after sort from {} size {}",
indexName, sortField, from, size);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.indicesOptions(addIgnoreUnavailable(searchRequest.indicesOptions()));
@ -788,6 +788,9 @@ public class JobProvider {
* stream. If there are multiple state documents they are separated using <code>'\0'</code>
* when written to the stream.
*
* Because we have a rule that we will not open a legacy job in the current product version
* we don't have to worry about legacy document IDs here.
*
* @param jobId the job id
* @param modelSnapshot the model snapshot to be restored
* @param restoreStream the stream to write the state to
@ -797,9 +800,9 @@ public class JobProvider {
// First try to restore model state.
for (String stateDocId : modelSnapshot.stateDocumentIds()) {
LOGGER.trace("ES API CALL: get ID {} type {} from index {}", stateDocId, ModelState.TYPE, indexName);
LOGGER.trace("ES API CALL: get ID {} from index {}", stateDocId, indexName);
GetResponse stateResponse = client.prepareGet(indexName, ModelState.TYPE.getPreferredName(), stateDocId).get();
GetResponse stateResponse = client.prepareGet(indexName, ElasticsearchMappings.DOC_TYPE, stateDocId).get();
if (!stateResponse.isExists()) {
LOGGER.error("Expected {} documents for model state for {} snapshot {} but failed to find {}",
modelSnapshot.getSnapshotDocCount(), jobId, modelSnapshot.getSnapshotId(), stateDocId);
@ -809,16 +812,15 @@ public class JobProvider {
}
// Secondly try to restore categorizer state. This must come after model state because that's
// the order the C++ process expects.
// There are no snapshots for this, so the IDs simply
// the order the C++ process expects. There are no snapshots for this, so the IDs simply
// count up until a document is not found. It's NOT an error to have no categorizer state.
int docNum = 0;
while (true) {
String docId = CategorizerState.categorizerStateDocId(jobId, ++docNum);
String docId = CategorizerState.documentId(jobId, ++docNum);
LOGGER.trace("ES API CALL: get ID {} type {} from index {}", docId, CategorizerState.TYPE, indexName);
LOGGER.trace("ES API CALL: get ID {} from index {}", docId, indexName);
GetResponse stateResponse = client.prepareGet(indexName, CategorizerState.TYPE, docId).get();
GetResponse stateResponse = client.prepareGet(indexName, ElasticsearchMappings.DOC_TYPE, docId).get();
if (!stateResponse.isExists()) {
break;
}
@ -850,8 +852,7 @@ public class JobProvider {
public QueryPage<ModelPlot> modelPlot(String jobId, int from, int size) {
SearchResponse searchResponse;
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
LOGGER.trace("ES API CALL: search result type {} from index {} from {}, size {}",
ModelPlot.RESULT_TYPE_VALUE, indexName, from, size);
LOGGER.trace("ES API CALL: search model plots from index {} from {} size {}", indexName, from, size);
searchResponse = client.prepareSearch(indexName)
.setIndicesOptions(addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS))

View File

@ -30,7 +30,6 @@ import org.elasticsearch.xpack.ml.job.results.BucketInfluencer;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.ml.job.results.Result;
import java.io.IOException;
import java.util.Collections;
@ -96,28 +95,26 @@ public class JobResultsPersister extends AbstractComponent {
bucketWithoutRecords.setRecords(Collections.emptyList());
}
try (XContentBuilder content = toXContentBuilder(bucketWithoutRecords)) {
logger.trace("[{}] ES API CALL: index result type {} to index {} at epoch {}",
jobId, Bucket.RESULT_TYPE_VALUE, indexName, bucketWithoutRecords.getEpoch());
String id = bucketWithoutRecords.getId();
logger.trace("[{}] ES API CALL: index bucket to index [{}] with ID [{}]", jobId, indexName, id);
bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, bucketWithoutRecords.getId()).source(content));
bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content));
persistBucketInfluencersStandalone(jobId, bucketWithoutRecords.getBucketInfluencers());
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error serialising bucket", new Object[] {jobId}), e);
logger.error(new ParameterizedMessage("[{}] Error serialising bucket", jobId), e);
}
return this;
}
private void persistBucketInfluencersStandalone(String jobId, List<BucketInfluencer> bucketInfluencers)
throws IOException {
private void persistBucketInfluencersStandalone(String jobId, List<BucketInfluencer> bucketInfluencers) throws IOException {
if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) {
for (BucketInfluencer bucketInfluencer : bucketInfluencers) {
try (XContentBuilder content = toXContentBuilder(bucketInfluencer)) {
// Need consistent IDs to ensure overwriting on renormalization
String id = bucketInfluencer.getId();
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
jobId, BucketInfluencer.RESULT_TYPE_VALUE, indexName, id);
logger.trace("[{}] ES BULK ACTION: index bucket influencer to index [{}] with ID [{}]", jobId, indexName, id);
bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content));
}
}
@ -135,9 +132,9 @@ public class JobResultsPersister extends AbstractComponent {
try {
for (AnomalyRecord record : records) {
try (XContentBuilder content = toXContentBuilder(record)) {
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName, record.getId());
bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, record.getId()).source(content));
String id = record.getId();
logger.trace("[{}] ES BULK ACTION: index record to index [{}] with ID [{}]", jobId, indexName, id);
bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content));
}
}
} catch (IOException e) {
@ -158,9 +155,9 @@ public class JobResultsPersister extends AbstractComponent {
try {
for (Influencer influencer : influencers) {
try (XContentBuilder content = toXContentBuilder(influencer)) {
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
jobId, Influencer.RESULT_TYPE_VALUE, indexName, influencer.getId());
bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, influencer.getId()).source(content));
String id = influencer.getId();
logger.trace("[{}] ES BULK ACTION: index influencer to index [{}] with ID [{}]", jobId, indexName, id);
bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content));
}
}
} catch (IOException e) {
@ -199,8 +196,7 @@ public class JobResultsPersister extends AbstractComponent {
* @param category The category to be persisted
*/
public void persistCategoryDefinition(CategoryDefinition category) {
Persistable persistable = new Persistable(category.getJobId(), category, CategoryDefinition.TYPE.getPreferredName(),
category.getId());
Persistable persistable = new Persistable(category.getJobId(), category, category.getId());
persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(category.getJobId())).actionGet();
// Don't commit as we expect masses of these updates and they're not
@ -211,8 +207,7 @@ public class JobResultsPersister extends AbstractComponent {
* Persist the quantiles (blocking)
*/
public void persistQuantiles(Quantiles quantiles) {
Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE.getPreferredName(),
Quantiles.documentId(quantiles.getJobId()));
Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.documentId(quantiles.getJobId()));
persistable.persist(AnomalyDetectorsIndex.jobStateIndexName()).actionGet();
}
@ -220,8 +215,7 @@ public class JobResultsPersister extends AbstractComponent {
* Persist the quantiles (async)
*/
public void persistQuantiles(Quantiles quantiles, WriteRequest.RefreshPolicy refreshPolicy, ActionListener<IndexResponse> listener) {
Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE.getPreferredName(),
Quantiles.documentId(quantiles.getJobId()));
Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.documentId(quantiles.getJobId()));
persistable.setRefreshPolicy(refreshPolicy);
persistable.persist(AnomalyDetectorsIndex.jobStateIndexName(), listener);
}
@ -230,8 +224,7 @@ public class JobResultsPersister extends AbstractComponent {
* Persist a model snapshot description
*/
public void persistModelSnapshot(ModelSnapshot modelSnapshot) {
Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.TYPE.getPreferredName(),
ModelSnapshot.documentId(modelSnapshot));
Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.documentId(modelSnapshot));
persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelSnapshot.getJobId())).actionGet();
}
@ -241,7 +234,7 @@ public class JobResultsPersister extends AbstractComponent {
public void persistModelSizeStats(ModelSizeStats modelSizeStats) {
String jobId = modelSizeStats.getJobId();
logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes());
Persistable persistable = new Persistable(jobId, modelSizeStats, Result.TYPE.getPreferredName(), modelSizeStats.getId());
Persistable persistable = new Persistable(jobId, modelSizeStats, modelSizeStats.getId());
persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId)).actionGet();
// Don't commit as we expect masses of these updates and they're only
// for information at the API level
@ -254,7 +247,7 @@ public class JobResultsPersister extends AbstractComponent {
ActionListener<IndexResponse> listener) {
String jobId = modelSizeStats.getJobId();
logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes());
Persistable persistable = new Persistable(jobId, modelSizeStats, Result.TYPE.getPreferredName(), modelSizeStats.getId());
Persistable persistable = new Persistable(jobId, modelSizeStats, modelSizeStats.getId());
persistable.setRefreshPolicy(refreshPolicy);
persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId), listener);
// Don't commit as we expect masses of these updates and they're only
@ -265,7 +258,7 @@ public class JobResultsPersister extends AbstractComponent {
* Persist model plot output
*/
public void persistModelPlot(ModelPlot modelPlot) {
Persistable persistable = new Persistable(modelPlot.getJobId(), modelPlot, Result.TYPE.getPreferredName(), modelPlot.getId());
Persistable persistable = new Persistable(modelPlot.getJobId(), modelPlot, modelPlot.getId());
persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelPlot.getJobId())).actionGet();
// Don't commit as we expect masses of these updates and they're not
// read again by this process
@ -283,9 +276,8 @@ public class JobResultsPersister extends AbstractComponent {
* called to commit the writes to the datastore.
*
* @param jobId The job Id
* @return True if successful
*/
public boolean commitResultWrites(String jobId) {
public void commitResultWrites(String jobId) {
// We refresh using the read alias in order to ensure all indices will
// be refreshed even if a rollover occurs in between.
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
@ -295,7 +287,6 @@ public class JobResultsPersister extends AbstractComponent {
RefreshRequest refreshRequest = new RefreshRequest(indexName);
refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
client.admin().indices().refresh(refreshRequest).actionGet();
return true;
}
/**
@ -303,19 +294,17 @@ public class JobResultsPersister extends AbstractComponent {
* immediately searchable.
*
* @param jobId The job Id
* @return True if successful
* */
public boolean commitStateWrites(String jobId) {
public void commitStateWrites(String jobId) {
String indexName = AnomalyDetectorsIndex.jobStateIndexName();
// Refresh should wait for Lucene to make the data searchable
logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName);
RefreshRequest refreshRequest = new RefreshRequest(indexName);
refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
client.admin().indices().refresh(refreshRequest).actionGet();
return true;
}
XContentBuilder toXContentBuilder(ToXContent obj) throws IOException {
private XContentBuilder toXContentBuilder(ToXContent obj) throws IOException {
XContentBuilder builder = jsonBuilder();
obj.toXContent(builder, ToXContent.EMPTY_PARAMS);
return builder;
@ -325,14 +314,12 @@ public class JobResultsPersister extends AbstractComponent {
private final String jobId;
private final ToXContent object;
private final String description;
private final String id;
private WriteRequest.RefreshPolicy refreshPolicy;
Persistable(String jobId, ToXContent object, String description, String id) {
Persistable(String jobId, ToXContent object, String id) {
this.jobId = jobId;
this.object = object;
this.description = description;
this.id = id;
this.refreshPolicy = WriteRequest.RefreshPolicy.NONE;
}
@ -349,15 +336,12 @@ public class JobResultsPersister extends AbstractComponent {
void persist(String indexName, ActionListener<IndexResponse> listener) {
logCall(indexName);
// TODO no_release: this is a temporary hack until we also switch state index to have doc type in which case
// we can remove this line and use DOC_TYPE directly in the index request
String type = AnomalyDetectorsIndex.jobStateIndexName().equals(indexName) ? description : DOC_TYPE;
try (XContentBuilder content = toXContentBuilder(object)) {
IndexRequest indexRequest = new IndexRequest(indexName, type, id).source(content).setRefreshPolicy(refreshPolicy);
IndexRequest indexRequest = new IndexRequest(indexName, DOC_TYPE, id).source(content).setRefreshPolicy(refreshPolicy);
client.index(indexRequest, listener);
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error writing {}", new Object[]{jobId, description}), e);
logger.error(new ParameterizedMessage("[{}] Error writing [{}]", jobId, (id == null) ? "auto-generated ID" : id), e);
IndexResponse.Builder notCreatedResponse = new IndexResponse.Builder();
notCreatedResponse.setCreated(false);
listener.onResponse(notCreatedResponse.build());
@ -366,9 +350,9 @@ public class JobResultsPersister extends AbstractComponent {
private void logCall(String indexName) {
if (id != null) {
logger.trace("[{}] ES API CALL: index {} to index {} with ID {}", jobId, description, indexName, id);
logger.trace("[{}] ES API CALL: to index {} with ID [{}]", jobId, indexName, id);
} else {
logger.trace("[{}] ES API CALL: index {} to index {} with auto-generated ID", jobId, description, indexName);
logger.trace("[{}] ES API CALL: to index {} with auto-generated ID", jobId, indexName);
}
}
}

View File

@ -14,7 +14,6 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
@ -23,11 +22,9 @@ import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.WildcardQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
@ -96,7 +93,7 @@ public class JobStorageDeletionTask extends Task {
// Step 3. Delete quantiles done, delete the categorizer state
ActionListener<Boolean> deleteQuantilesHandler = ActionListener.wrap(
response -> deleteCategorizerState(jobId, client, deleteCategorizerStateHandler),
response -> deleteCategorizerState(jobId, client, 1, deleteCategorizerStateHandler),
failureHandler);
// Step 2. Delete state done, delete the quantiles
@ -109,12 +106,12 @@ public class JobStorageDeletionTask extends Task {
}
private void deleteQuantiles(String jobId, Client client, ActionListener<Boolean> finishedHandler) {
// The quantiles doc Id changed in v5.5 so delete both the old and new format
// The quantiles type and doc Id changed in v5.5 so delete both the old and new format
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), Quantiles.TYPE.getPreferredName(),
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE,
Quantiles.documentId(jobId)));
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), Quantiles.TYPE.getPreferredName(),
jobId + "-" + Quantiles.TYPE.getPreferredName()));
Quantiles.legacyDocumentId(jobId)));
bulkRequestBuilder.execute(ActionListener.wrap(
response -> finishedHandler.onResponse(true),
e -> {
@ -138,26 +135,35 @@ public class JobStorageDeletionTask extends Task {
listener::onFailure);
}
private void deleteCategorizerState(String jobId, Client client, ActionListener<Boolean> finishedHandler) {
SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexName());
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
request.setSlices(5);
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
WildcardQueryBuilder query = new WildcardQueryBuilder(UidFieldMapper.NAME, Uid.createUid(CategorizerState.TYPE, jobId + "#*"));
searchRequest.source(new SearchSourceBuilder().query(query));
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
finishedHandler.onResponse(true);
private void deleteCategorizerState(String jobId, Client client, int docNum, ActionListener<Boolean> finishedHandler) {
// The categorizer state type and doc Id changed in v5.5 so delete both the old and new format
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE,
CategorizerState.documentId(jobId, docNum)));
// TODO: remove in 7.0
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), CategorizerState.TYPE,
CategorizerState.legacyDocumentId(jobId, docNum)));
bulkRequestBuilder.execute(ActionListener.wrap(
response -> {
// If we successfully deleted either document try the next one; if not we're done
for (BulkItemResponse item : response.getItems()) {
if (item.status() == RestStatus.OK) {
// There's an assumption here that there won't be very many categorizer
// state documents, so the recursion won't go more than, say, 5 levels deep
deleteCategorizerState(jobId, client, docNum + 1, finishedHandler);
return;
}
@Override
public void onFailure(Exception e) {
logger.error("[" + jobId + "] Failed to delete categorizer state for job.", e);
}
finishedHandler.onResponse(true);
},
e -> {
// It's not a problem for us if the index wasn't found - it's equivalent to document not found
if (e instanceof IndexNotFoundException) {
finishedHandler.onResponse(true);
} else {
finishedHandler.onFailure(e);
}
});
}));
}
private void deleteAlias(String jobId, String aliasName, String indexName, Client client, ActionListener<Boolean> finishedHandler ) {

View File

@ -14,6 +14,8 @@ import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings;
import java.io.IOException;
import java.io.InputStream;
@ -85,7 +87,7 @@ public class StateProcessor extends AbstractComponent {
void persist(String jobId, BytesReference bytes) throws IOException {
logger.trace("[{}] ES API CALL: bulk index", jobId);
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(bytes, null, null, XContentType.JSON);
bulkRequest.add(bytes, AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, XContentType.JSON);
if (bulkRequest.numberOfActions() > 0) {
client.bulk(bulkRequest).actionGet();
}

View File

@ -7,18 +7,25 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.state;
/**
* The categorizer state does not need to be loaded on the Java side.
* However, the Java process DOES set up a mapping on the Elasticsearch
* index to tell Elasticsearch not to analyse the categorizer state documents
* in any way.
* The categorizer state does not need to be understood on the Java side.
* The Java code only needs to know how to form the document IDs so that
* it can retrieve and delete the correct documents.
*/
public class CategorizerState {
/**
* The type of this class used when persisting the data
* Legacy type, now used only as a discriminant in the document ID
*/
public static final String TYPE = "categorizer_state";
public static final String categorizerStateDocId(String jobId, int docNum) {
public static final String documentId(String jobId, int docNum) {
return jobId + "_" + TYPE + "#" + docNum;
}
/**
* This is how the IDs were formed in v5.4
*/
public static final String legacyDocumentId(String jobId, int docNum) {
return jobId + "#" + docNum;
}

View File

@ -42,13 +42,14 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
public static final ParseField SNAPSHOT_DOC_COUNT = new ParseField("snapshot_doc_count");
public static final ParseField LATEST_RECORD_TIME = new ParseField("latest_record_time_stamp");
public static final ParseField LATEST_RESULT_TIME = new ParseField("latest_result_time_stamp");
public static final ParseField QUANTILES = new ParseField("quantiles");
public static final ParseField RETAIN = new ParseField("retain");
// Used for QueryPage
public static final ParseField RESULTS_FIELD = new ParseField("model_snapshots");
/**
* Elasticsearch type
* Legacy type, now used only as a discriminant in the document ID
*/
public static final ParseField TYPE = new ParseField("model_snapshot");
@ -86,7 +87,7 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
throw new IllegalArgumentException(
"unexpected token [" + p.currentToken() + "] for [" + LATEST_RESULT_TIME.getPreferredName() + "]");
}, LATEST_RESULT_TIME, ValueType.VALUE);
PARSER.declareObject(Builder::setQuantiles, Quantiles.PARSER, Quantiles.TYPE);
PARSER.declareObject(Builder::setQuantiles, Quantiles.PARSER, QUANTILES);
PARSER.declareBoolean(Builder::setRetain, RETAIN);
}
@ -184,7 +185,7 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
latestResultTimeStamp.getTime());
}
if (quantiles != null) {
builder.field(Quantiles.TYPE.getPreferredName(), quantiles);
builder.field(QUANTILES.getPreferredName(), quantiles);
}
builder.field(RETAIN.getPreferredName(), retain);
builder.endObject();
@ -260,32 +261,53 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
&& this.retain == that.retain;
}
private String stateDocumentPrefix() {
return jobId + "-" + snapshotId;
}
public List<String> stateDocumentIds() {
String prefix = stateDocumentPrefix();
List<String> stateDocumentIds = new ArrayList<>(snapshotDocCount);
// The state documents count suffices are 1-based
for (int i = 1; i <= snapshotDocCount; i++) {
stateDocumentIds.add(prefix + '#' + i);
stateDocumentIds.add(ModelState.documentId(jobId, snapshotId, i));
}
return stateDocumentIds;
}
/**
* This is how the IDs were formed in v5.4
*/
public List<String> legacyStateDocumentIds() {
List<String> stateDocumentIds = new ArrayList<>(snapshotDocCount);
// The state documents count suffices are 1-based
for (int i = 1; i <= snapshotDocCount; i++) {
stateDocumentIds.add(ModelState.legacyDocumentId(jobId, snapshotId, i));
}
return stateDocumentIds;
}
public static String documentIdPrefix(String jobId) {
return jobId + "_model_snapshot_";
return jobId + "_" + TYPE + "_";
}
public static String documentId(ModelSnapshot snapshot) {
return documentId(snapshot.getJobId(), snapshot.getSnapshotId());
}
/**
* This is how the IDs were formed in v5.4
*/
public static String legacyDocumentId(ModelSnapshot snapshot) {
return legacyDocumentId(snapshot.getJobId(), snapshot.getSnapshotId());
}
public static String documentId(String jobId, String snapshotId) {
return documentIdPrefix(jobId) + snapshotId;
}
/**
* This is how the IDs were formed in v5.4
*/
public static String legacyDocumentId(String jobId, String snapshotId) {
return jobId + "-" + snapshotId;
}
public static ModelSnapshot fromJson(BytesReference bytesReference) {
try (XContentParser parser = XContentFactory.xContent(bytesReference).createParser(NamedXContentRegistry.EMPTY, bytesReference)) {
return PARSER.apply(parser, null).build();

View File

@ -6,22 +6,28 @@
package org.elasticsearch.xpack.ml.job.process.autodetect.state;
import org.elasticsearch.common.ParseField;
/**
* The serialised models can get very large and only the C++ code
* understands how to decode them, hence there is no reason to load
* them into the Java process.
* However, the Java process DOES set up a mapping on the Elasticsearch
* index to tell Elasticsearch not to analyse the model state documents
* in any way. (Otherwise Elasticsearch would go into a spin trying to
* make sense of such large JSON documents.)
* The model state does not need to be understood on the Java side.
* The Java code only needs to know how to form the document IDs so that
* it can retrieve and delete the correct documents.
*/
public class ModelState {
/**
* The type of this class used when persisting the data
* Legacy type, now used only as a discriminant in the document ID
*/
public static final ParseField TYPE = new ParseField("model_state");
public static final String TYPE = "model_state";
public static final String documentId(String jobId, String snapshotId, int docNum) {
return jobId + "_" + TYPE + "_" + snapshotId + "#" + docNum;
}
/**
* This is how the IDs were formed in v5.4
*/
public static final String legacyDocumentId(String jobId, String snapshotId, int docNum) {
return jobId + "-" + snapshotId + "#" + docNum;
}
private ModelState() {
}

View File

@ -31,7 +31,7 @@ public class Quantiles extends ToXContentToBytes implements Writeable {
public static final ParseField QUANTILE_STATE = new ParseField("quantile_state");
/**
* Elasticsearch type
* Legacy type, now used only as a discriminant in the document ID
*/
public static final ParseField TYPE = new ParseField("quantiles");
@ -45,7 +45,14 @@ public class Quantiles extends ToXContentToBytes implements Writeable {
}
public static String documentId(String jobId) {
return jobId + "_" + TYPE.getPreferredName();
return jobId + "_" + TYPE;
}
/**
* This is how the IDs were formed in v5.4
*/
public static String legacyDocumentId(String jobId) {
return jobId + "-" + TYPE;
}
private final String jobId;

View File

@ -24,7 +24,11 @@ import java.util.TreeSet;
public class CategoryDefinition extends ToXContentToBytes implements Writeable {
/**
* Legacy type, now used only as a discriminant in the document ID
*/
public static final ParseField TYPE = new ParseField("category_definition");
public static final ParseField CATEGORY_ID = new ParseField("category_id");
public static final ParseField TERMS = new ParseField("terms");
public static final ParseField REGEX = new ParseField("regex");
@ -82,7 +86,7 @@ public class CategoryDefinition extends ToXContentToBytes implements Writeable {
}
public String getId() {
return jobId + "_category_definition_" + categoryId;
return jobId + "_" + TYPE + "_" + categoryId;
}
public long getCategoryId() {

View File

@ -25,10 +25,8 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.notifications.AuditMessage;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.Before;
@ -236,10 +234,8 @@ public class MachineLearningTemplateRegistryTests extends ESTestCase {
PutIndexTemplateRequest request = captor.getValue();
assertNotNull(request);
assertEquals(templateRegistry.mlStateIndexSettings().build(), request.settings());
assertTrue(request.mappings().containsKey(CategorizerState.TYPE));
assertTrue(request.mappings().containsKey(Quantiles.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(ModelState.TYPE.getPreferredName()));
assertEquals(3, request.mappings().size());
assertTrue(request.mappings().containsKey(ElasticsearchMappings.DOC_TYPE));
assertEquals(1, request.mappings().size());
assertEquals(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName()), request.patterns());
assertEquals(new Integer(Version.CURRENT.id), request.version());
});

View File

@ -801,6 +801,7 @@ public class JobProviderTests extends ESTestCase {
}
public void testRestoreStateToStream() throws Exception {
String snapshotId = "123";
Map<String, Object> categorizerState = new HashMap<>();
categorizerState.put("catName", "catVal");
GetResponse categorizerStateGetResponse1 = createGetResponse(true, categorizerState);
@ -812,16 +813,18 @@ public class JobProviderTests extends ESTestCase {
GetResponse modelStateGetResponse2 = createGetResponse(true, modelState);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), CategorizerState.TYPE, JOB_ID + "#1", categorizerStateGetResponse1)
.prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), CategorizerState.TYPE, JOB_ID + "#2", categorizerStateGetResponse2)
.prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ModelState.TYPE.getPreferredName(), JOB_ID + "-123#1",
modelStateGetResponse1)
.prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ModelState.TYPE.getPreferredName(), JOB_ID + "-123#2",
modelStateGetResponse2);
.prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE,
CategorizerState.documentId(JOB_ID, 1), categorizerStateGetResponse1)
.prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE,
CategorizerState.documentId(JOB_ID, 2), categorizerStateGetResponse2)
.prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE,
ModelState.documentId(JOB_ID, snapshotId, 1), modelStateGetResponse1)
.prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE,
ModelState.documentId(JOB_ID, snapshotId, 2), modelStateGetResponse2);
JobProvider provider = createProvider(clientBuilder.build());
ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID).setSnapshotId("123").setSnapshotDocCount(2).build();
ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID).setSnapshotId(snapshotId).setSnapshotDocCount(2).build();
ByteArrayOutputStream stream = new ByteArrayOutputStream();

View File

@ -183,11 +183,12 @@ public class ModelSnapshotTests extends AbstractSerializingTestCase<ModelSnapsho
public void testStateDocumentIds_GivenDocCountIsOne() {
ModelSnapshot snapshot = new ModelSnapshot.Builder("foo").setSnapshotId("1").setSnapshotDocCount(1).build();
assertThat(snapshot.stateDocumentIds(), equalTo(Arrays.asList("foo-1#1")));
assertThat(snapshot.stateDocumentIds(), equalTo(Arrays.asList("foo_model_state_1#1")));
}
public void testStateDocumentIds_GivenDocCountIsThree() {
ModelSnapshot snapshot = new ModelSnapshot.Builder("foo").setSnapshotId("123456789").setSnapshotDocCount(3).build();
assertThat(snapshot.stateDocumentIds(), equalTo(Arrays.asList("foo-123456789#1", "foo-123456789#2", "foo-123456789#3")));
assertThat(snapshot.stateDocumentIds(),
equalTo(Arrays.asList("foo_model_state_123456789#1", "foo_model_state_123456789#2", "foo_model_state_123456789#3")));
}
}

View File

@ -423,7 +423,7 @@
- do:
index:
index: .ml-state
type: quantiles
type: doc
id: index-layout-quantiles-job_quantiles
body:
state: quantile-state
@ -436,6 +436,9 @@
body:
state: quantile-state
- do:
indices.refresh: {}
- do:
xpack.ml.delete_job:
job_id: "index-layout-quantiles-job"
@ -448,3 +451,135 @@
count:
index: .ml-state
- match: {count: 0}
---
"Test delete removes 5.4 and 5.5 state":
- do:
xpack.ml.put_job:
job_id: index-layout-state-job
body: >
{
"analysis_config" : {
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"mlcategory"}],
"categorization_field_name": "message"
},
"data_description" : {
"format":"xcontent"
}
}
- match: { job_id: "index-layout-state-job" }
- do:
index:
index: .ml-anomalies-shared
type: doc
id: "index-layout-state-job_model_snapshot_123"
body: >
{
"job_id" : "index-layout-state-job",
"timestamp": "2017-05-02T00:00:00Z",
"snapshot_id": "123",
"snapshot_doc_count": 2,
"retain": false
}
- do:
index:
index: .ml-anomalies-shared
type: model_snapshot
id: "index-layout-state-job-456"
body: >
{
"job_id" : "index-layout-state-job",
"timestamp": "2017-05-01T00:00:00Z",
"snapshot_id": "456",
"snapshot_doc_count": 2,
"retain": false
}
- do:
index:
index: .ml-state
type: doc
id: index-layout-state-job_model_state_123#1
body:
state: new-model-state
- do:
index:
index: .ml-state
type: doc
id: index-layout-state-job_model_state_123#2
body:
state: more-new-model-state
- do:
index:
index: .ml-state
type: model_state
id: index-layout-state-job-456#1
body:
state: old-model-state
- do:
index:
index: .ml-state
type: model_state
id: index-layout-state-job-456#2
body:
state: more-old-model-state
- do:
index:
index: .ml-state
type: doc
id: index-layout-state-job_categorizer_state#1
body:
state: new-categorizer-state
- do:
index:
index: .ml-state
type: doc
id: index-layout-state-job_categorizer_state#2
body:
state: more-new-categorizer-state
- do:
index:
index: .ml-state
type: categorizer_state
id: index-layout-state-job#1
body:
state: old-categorizer-state
- do:
index:
index: .ml-state
type: categorizer_state
id: index-layout-state-job#2
body:
state: more-old-categorizer-state
- do:
indices.refresh: {}
- do:
xpack.ml.delete_job:
job_id: "index-layout-state-job"
- match: { acknowledged: true }
- do:
indices.refresh: {}
- do:
count:
index: .ml-anomalies-shared
- match: {count: 0}
- do:
count:
index: .ml-state
- match: {count: 0}