From e826a5621283cd8a24e0d6243bd6b8e78bce6730 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 19 Jan 2017 09:31:03 +0000 Subject: [PATCH] Make document Ids unique if in a shared index (elastic/elasticsearch#749) Original commit: elastic/x-pack-elasticsearch@ecc7e876ce40f7f2c0afc3d3cd6404346717c5bc --- .../org/elasticsearch/xpack/ml/job/DataCounts.java | 6 +++++- .../elasticsearch/xpack/ml/job/ModelSizeStats.java | 11 ++++------- .../elasticsearch/xpack/ml/job/ModelSnapshot.java | 4 ++++ .../ml/job/persistence/JobDataCountsPersister.java | 2 +- .../xpack/ml/job/persistence/JobProvider.java | 11 ++++++----- .../ml/job/persistence/JobResultsPersister.java | 10 +++++----- .../xpack/ml/job/quantiles/Quantiles.java | 2 +- .../xpack/ml/job/results/CategoryDefinition.java | 4 ++++ .../xpack/ml/job/ModelSizeStatsTests.java | 12 ++---------- .../test/jobs_get_result_categories.yaml | 6 +++--- .../resources/rest-api-spec/test/jobs_get_stats.yaml | 2 +- 11 files changed, 36 insertions(+), 34 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/DataCounts.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/DataCounts.java index a276115509c..68273e6dbcc 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/DataCounts.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/DataCounts.java @@ -35,7 +35,7 @@ import java.util.Objects; public class DataCounts extends ToXContentToBytes implements Writeable { - public static final String DOCUMENT_SUFFIX = "-data-counts"; + private static final String DOCUMENT_SUFFIX = "-data-counts"; public static final String PROCESSED_RECORD_COUNT_STR = "processed_record_count"; public static final String PROCESSED_FIELD_COUNT_STR = "processed_field_count"; public static final String INPUT_BYTES_STR = "input_bytes"; @@ -94,6 +94,10 @@ public class DataCounts extends ToXContentToBytes implements Writeable { PARSER.declareLong((t, u) -> {;}, INPUT_RECORD_COUNT); } + public static String documentId(String jobId) { + return jobId + DOCUMENT_SUFFIX; + } + private final String jobId; private long processedRecordCount; private long processedFieldCount; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/ModelSizeStats.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/ModelSizeStats.java index 8bd5a8463ca..f2a5f98c10e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/ModelSizeStats.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/ModelSizeStats.java @@ -76,6 +76,10 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable { PARSER.declareField(Builder::setMemoryStatus, p -> MemoryStatus.fromString(p.text()), MEMORY_STATUS_FIELD, ValueType.STRING); } + public static String documentId(String jobId) { + return jobId + "-" + RESULT_TYPE_VALUE; + } + /** * The status of the memory monitored by the ResourceMonitor. OK is default, * SOFT_LIMIT means that the models have done some aggressive pruning to @@ -119,7 +123,6 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable { } private final String jobId; - private final String id; private final long modelBytes; private final long totalByFieldCount; private final long totalOverFieldCount; @@ -133,7 +136,6 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable { long totalPartitionFieldCount, long bucketAllocationFailuresCount, MemoryStatus memoryStatus, Date timestamp, Date logTime) { this.jobId = jobId; - this.id = id; this.modelBytes = modelBytes; this.totalByFieldCount = totalByFieldCount; this.totalOverFieldCount = totalOverFieldCount; @@ -146,7 +148,6 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable { public ModelSizeStats(StreamInput in) throws IOException { jobId = in.readString(); - id = null; modelBytes = in.readVLong(); totalByFieldCount = in.readVLong(); totalOverFieldCount = in.readVLong(); @@ -203,10 +204,6 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable { return jobId; } - public String getId() { - return id; - } - public long getModelBytes() { return modelBytes; } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/ModelSnapshot.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/ModelSnapshot.java index 3ad78e8d61e..7a41e476d39 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/ModelSnapshot.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/ModelSnapshot.java @@ -191,6 +191,10 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable { return jobId; } + public String documentId() { + return jobId + "-" + snapshotId; + } + public Date getTimestamp() { return timestamp; } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java index 6aaf889bd46..5fdedc72e05 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java @@ -49,7 +49,7 @@ public class JobDataCountsPersister extends AbstractComponent { try { XContentBuilder content = serialiseCounts(counts); client.prepareIndex(AnomalyDetectorsIndex.jobResultsIndexName(jobId), DataCounts.TYPE.getPreferredName(), - jobId + DataCounts.DOCUMENT_SUFFIX) + DataCounts.documentId(jobId)) .setSource(content).execute(new ActionListener() { @Override public void onResponse(IndexResponse indexResponse) { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index afb9bde1b58..9066c123027 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -287,7 +287,7 @@ public class JobProvider { * @param jobId The job id */ public void dataCounts(String jobId, Consumer handler, Consumer errorHandler) { - get(jobId, DataCounts.TYPE.getPreferredName(), jobId + DataCounts.DOCUMENT_SUFFIX, handler, errorHandler, + get(jobId, DataCounts.TYPE.getPreferredName(), DataCounts.documentId(jobId), handler, errorHandler, DataCounts.PARSER, () -> new DataCounts(jobId)); } @@ -626,9 +626,10 @@ public class JobProvider { SearchRequest searchRequest = new SearchRequest(indexName); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); if (categoryId != null) { - String uid = Uid.createUid(CategoryDefinition.TYPE.getPreferredName(), categoryId); + String documentId = CategoryDefinition.documentId(jobId, categoryId); + String uid = Uid.createUid(CategoryDefinition.TYPE.getPreferredName(), documentId); sourceBuilder.query(QueryBuilders.termQuery(UidFieldMapper.NAME, uid)); - searchRequest.routing(categoryId); + searchRequest.routing(documentId); } else if (from != null && size != null) { searchRequest.types(CategoryDefinition.TYPE.getPreferredName()); sourceBuilder.from(from).size(size) @@ -798,7 +799,7 @@ public class JobProvider { */ public Optional getQuantiles(String jobId) { String indexName = AnomalyDetectorsIndex.jobStateIndexName(); - String quantilesId = Quantiles.quantilesId(jobId); + String quantilesId = Quantiles.documentId(jobId); LOGGER.trace("ES API CALL: get ID {} type {} from index {}", quantilesId, Quantiles.TYPE.getPreferredName(), indexName); Optional quantiles = getBlocking(indexName, Quantiles.TYPE.getPreferredName(), quantilesId, Quantiles.PARSER); @@ -1015,7 +1016,7 @@ public class JobProvider { LOGGER.trace("ES API CALL: get result type {} ID {} for job {}", ModelSizeStats.RESULT_TYPE_VALUE, ModelSizeStats.RESULT_TYPE_FIELD, jobId); - get(jobId, Result.TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName(), + get(jobId, Result.TYPE.getPreferredName(), ModelSizeStats.documentId(jobId), handler, errorHandler, (parser, context) -> ModelSizeStats.PARSER.apply(parser, null).build(), () -> { LOGGER.warn("No memory usage details for job with id {}", jobId); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index 710737b9c58..759a506fcf7 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -212,7 +212,7 @@ public class JobResultsPersister extends AbstractComponent { */ public void persistCategoryDefinition(CategoryDefinition category) { Persistable persistable = new Persistable(category.getJobId(), category, CategoryDefinition.TYPE.getPreferredName(), - String.valueOf(category.getCategoryId())); + CategoryDefinition.documentId(category.getJobId(), Long.toString(category.getCategoryId()))); persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(category.getJobId())); // Don't commit as we expect masses of these updates and they're not // read again by this process @@ -223,7 +223,7 @@ public class JobResultsPersister extends AbstractComponent { */ public void persistQuantiles(Quantiles quantiles) { Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE.getPreferredName(), - Quantiles.quantilesId(quantiles.getJobId())); + Quantiles.documentId(quantiles.getJobId())); if (persistable.persist(AnomalyDetectorsIndex.jobStateIndexName())) { // Refresh the index when persisting quantiles so that previously // persisted results will be available for searching. Do this using the @@ -239,13 +239,13 @@ public class JobResultsPersister extends AbstractComponent { */ public void persistModelSnapshot(ModelSnapshot modelSnapshot) { Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.TYPE.getPreferredName(), - modelSnapshot.getSnapshotId()); + modelSnapshot.documentId()); persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(modelSnapshot.getJobId())); } public void updateModelSnapshot(ModelSnapshot modelSnapshot, Consumer handler, Consumer errorHandler) { String index = AnomalyDetectorsIndex.jobResultsIndexName(modelSnapshot.getJobId()); - IndexRequest indexRequest = new IndexRequest(index, ModelSnapshot.TYPE.getPreferredName(), modelSnapshot.getSnapshotId()); + IndexRequest indexRequest = new IndexRequest(index, ModelSnapshot.TYPE.getPreferredName(), modelSnapshot.documentId()); try { indexRequest.source(toXContentBuilder(modelSnapshot)); } catch (IOException e) { @@ -261,7 +261,7 @@ public class JobResultsPersister extends AbstractComponent { String jobId = modelSizeStats.getJobId(); logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes()); Persistable persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, Result.TYPE.getPreferredName(), - ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName()); + ModelSizeStats.documentId(jobId)); persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(jobId)); persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, Result.TYPE.getPreferredName(), null); persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(jobId)); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/quantiles/Quantiles.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/quantiles/Quantiles.java index 2da5e55d7e7..52822463703 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/quantiles/Quantiles.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/quantiles/Quantiles.java @@ -44,7 +44,7 @@ public class Quantiles extends ToXContentToBytes implements Writeable { PARSER.declareString(ConstructingObjectParser.constructorArg(), QUANTILE_STATE); } - public static String quantilesId(String jobId) { + public static String documentId(String jobId) { return jobId + "-" + TYPE.getPreferredName(); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/results/CategoryDefinition.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/results/CategoryDefinition.java index f1c2fc8c349..9beb1dbb28f 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/results/CategoryDefinition.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/results/CategoryDefinition.java @@ -46,6 +46,10 @@ public class CategoryDefinition extends ToXContentToBytes implements Writeable { PARSER.declareStringArray(CategoryDefinition::setExamples, EXAMPLES); } + public static String documentId(String jobId, String categoryId) { + return jobId + "-" + categoryId; + } + private final String jobId; private long id = 0L; private String terms = ""; diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/ModelSizeStatsTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/ModelSizeStatsTests.java index ead627dd331..ab776e49cc6 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/ModelSizeStatsTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/ModelSizeStatsTests.java @@ -16,7 +16,6 @@ public class ModelSizeStatsTests extends AbstractSerializingTestCase