Make document Ids unique if in a shared index (elastic/elasticsearch#749)
Original commit: elastic/x-pack-elasticsearch@ecc7e876ce
This commit is contained in:
parent
d3c589c33d
commit
e826a56212
|
@ -35,7 +35,7 @@ import java.util.Objects;
|
|||
|
||||
public class DataCounts extends ToXContentToBytes implements Writeable {
|
||||
|
||||
public static final String DOCUMENT_SUFFIX = "-data-counts";
|
||||
private static final String DOCUMENT_SUFFIX = "-data-counts";
|
||||
public static final String PROCESSED_RECORD_COUNT_STR = "processed_record_count";
|
||||
public static final String PROCESSED_FIELD_COUNT_STR = "processed_field_count";
|
||||
public static final String INPUT_BYTES_STR = "input_bytes";
|
||||
|
@ -94,6 +94,10 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
|
|||
PARSER.declareLong((t, u) -> {;}, INPUT_RECORD_COUNT);
|
||||
}
|
||||
|
||||
public static String documentId(String jobId) {
|
||||
return jobId + DOCUMENT_SUFFIX;
|
||||
}
|
||||
|
||||
private final String jobId;
|
||||
private long processedRecordCount;
|
||||
private long processedFieldCount;
|
||||
|
|
|
@ -76,6 +76,10 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable {
|
|||
PARSER.declareField(Builder::setMemoryStatus, p -> MemoryStatus.fromString(p.text()), MEMORY_STATUS_FIELD, ValueType.STRING);
|
||||
}
|
||||
|
||||
public static String documentId(String jobId) {
|
||||
return jobId + "-" + RESULT_TYPE_VALUE;
|
||||
}
|
||||
|
||||
/**
|
||||
* The status of the memory monitored by the ResourceMonitor. OK is default,
|
||||
* SOFT_LIMIT means that the models have done some aggressive pruning to
|
||||
|
@ -119,7 +123,6 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable {
|
|||
}
|
||||
|
||||
private final String jobId;
|
||||
private final String id;
|
||||
private final long modelBytes;
|
||||
private final long totalByFieldCount;
|
||||
private final long totalOverFieldCount;
|
||||
|
@ -133,7 +136,6 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable {
|
|||
long totalPartitionFieldCount, long bucketAllocationFailuresCount, MemoryStatus memoryStatus,
|
||||
Date timestamp, Date logTime) {
|
||||
this.jobId = jobId;
|
||||
this.id = id;
|
||||
this.modelBytes = modelBytes;
|
||||
this.totalByFieldCount = totalByFieldCount;
|
||||
this.totalOverFieldCount = totalOverFieldCount;
|
||||
|
@ -146,7 +148,6 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable {
|
|||
|
||||
public ModelSizeStats(StreamInput in) throws IOException {
|
||||
jobId = in.readString();
|
||||
id = null;
|
||||
modelBytes = in.readVLong();
|
||||
totalByFieldCount = in.readVLong();
|
||||
totalOverFieldCount = in.readVLong();
|
||||
|
@ -203,10 +204,6 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable {
|
|||
return jobId;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public long getModelBytes() {
|
||||
return modelBytes;
|
||||
}
|
||||
|
|
|
@ -191,6 +191,10 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
|
|||
return jobId;
|
||||
}
|
||||
|
||||
public String documentId() {
|
||||
return jobId + "-" + snapshotId;
|
||||
}
|
||||
|
||||
public Date getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
|
|
@ -49,7 +49,7 @@ public class JobDataCountsPersister extends AbstractComponent {
|
|||
try {
|
||||
XContentBuilder content = serialiseCounts(counts);
|
||||
client.prepareIndex(AnomalyDetectorsIndex.jobResultsIndexName(jobId), DataCounts.TYPE.getPreferredName(),
|
||||
jobId + DataCounts.DOCUMENT_SUFFIX)
|
||||
DataCounts.documentId(jobId))
|
||||
.setSource(content).execute(new ActionListener<IndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(IndexResponse indexResponse) {
|
||||
|
|
|
@ -287,7 +287,7 @@ public class JobProvider {
|
|||
* @param jobId The job id
|
||||
*/
|
||||
public void dataCounts(String jobId, Consumer<DataCounts> handler, Consumer<Exception> errorHandler) {
|
||||
get(jobId, DataCounts.TYPE.getPreferredName(), jobId + DataCounts.DOCUMENT_SUFFIX, handler, errorHandler,
|
||||
get(jobId, DataCounts.TYPE.getPreferredName(), DataCounts.documentId(jobId), handler, errorHandler,
|
||||
DataCounts.PARSER, () -> new DataCounts(jobId));
|
||||
}
|
||||
|
||||
|
@ -626,9 +626,10 @@ public class JobProvider {
|
|||
SearchRequest searchRequest = new SearchRequest(indexName);
|
||||
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
|
||||
if (categoryId != null) {
|
||||
String uid = Uid.createUid(CategoryDefinition.TYPE.getPreferredName(), categoryId);
|
||||
String documentId = CategoryDefinition.documentId(jobId, categoryId);
|
||||
String uid = Uid.createUid(CategoryDefinition.TYPE.getPreferredName(), documentId);
|
||||
sourceBuilder.query(QueryBuilders.termQuery(UidFieldMapper.NAME, uid));
|
||||
searchRequest.routing(categoryId);
|
||||
searchRequest.routing(documentId);
|
||||
} else if (from != null && size != null) {
|
||||
searchRequest.types(CategoryDefinition.TYPE.getPreferredName());
|
||||
sourceBuilder.from(from).size(size)
|
||||
|
@ -798,7 +799,7 @@ public class JobProvider {
|
|||
*/
|
||||
public Optional<Quantiles> getQuantiles(String jobId) {
|
||||
String indexName = AnomalyDetectorsIndex.jobStateIndexName();
|
||||
String quantilesId = Quantiles.quantilesId(jobId);
|
||||
String quantilesId = Quantiles.documentId(jobId);
|
||||
LOGGER.trace("ES API CALL: get ID {} type {} from index {}", quantilesId, Quantiles.TYPE.getPreferredName(), indexName);
|
||||
|
||||
Optional<Quantiles> quantiles = getBlocking(indexName, Quantiles.TYPE.getPreferredName(), quantilesId, Quantiles.PARSER);
|
||||
|
@ -1015,7 +1016,7 @@ public class JobProvider {
|
|||
LOGGER.trace("ES API CALL: get result type {} ID {} for job {}",
|
||||
ModelSizeStats.RESULT_TYPE_VALUE, ModelSizeStats.RESULT_TYPE_FIELD, jobId);
|
||||
|
||||
get(jobId, Result.TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName(),
|
||||
get(jobId, Result.TYPE.getPreferredName(), ModelSizeStats.documentId(jobId),
|
||||
handler, errorHandler, (parser, context) -> ModelSizeStats.PARSER.apply(parser, null).build(),
|
||||
() -> {
|
||||
LOGGER.warn("No memory usage details for job with id {}", jobId);
|
||||
|
|
|
@ -212,7 +212,7 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
*/
|
||||
public void persistCategoryDefinition(CategoryDefinition category) {
|
||||
Persistable persistable = new Persistable(category.getJobId(), category, CategoryDefinition.TYPE.getPreferredName(),
|
||||
String.valueOf(category.getCategoryId()));
|
||||
CategoryDefinition.documentId(category.getJobId(), Long.toString(category.getCategoryId())));
|
||||
persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(category.getJobId()));
|
||||
// Don't commit as we expect masses of these updates and they're not
|
||||
// read again by this process
|
||||
|
@ -223,7 +223,7 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
*/
|
||||
public void persistQuantiles(Quantiles quantiles) {
|
||||
Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE.getPreferredName(),
|
||||
Quantiles.quantilesId(quantiles.getJobId()));
|
||||
Quantiles.documentId(quantiles.getJobId()));
|
||||
if (persistable.persist(AnomalyDetectorsIndex.jobStateIndexName())) {
|
||||
// Refresh the index when persisting quantiles so that previously
|
||||
// persisted results will be available for searching. Do this using the
|
||||
|
@ -239,13 +239,13 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
*/
|
||||
public void persistModelSnapshot(ModelSnapshot modelSnapshot) {
|
||||
Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.TYPE.getPreferredName(),
|
||||
modelSnapshot.getSnapshotId());
|
||||
modelSnapshot.documentId());
|
||||
persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(modelSnapshot.getJobId()));
|
||||
}
|
||||
|
||||
public void updateModelSnapshot(ModelSnapshot modelSnapshot, Consumer<Boolean> handler, Consumer<Exception> errorHandler) {
|
||||
String index = AnomalyDetectorsIndex.jobResultsIndexName(modelSnapshot.getJobId());
|
||||
IndexRequest indexRequest = new IndexRequest(index, ModelSnapshot.TYPE.getPreferredName(), modelSnapshot.getSnapshotId());
|
||||
IndexRequest indexRequest = new IndexRequest(index, ModelSnapshot.TYPE.getPreferredName(), modelSnapshot.documentId());
|
||||
try {
|
||||
indexRequest.source(toXContentBuilder(modelSnapshot));
|
||||
} catch (IOException e) {
|
||||
|
@ -261,7 +261,7 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
String jobId = modelSizeStats.getJobId();
|
||||
logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes());
|
||||
Persistable persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, Result.TYPE.getPreferredName(),
|
||||
ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName());
|
||||
ModelSizeStats.documentId(jobId));
|
||||
persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(jobId));
|
||||
persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, Result.TYPE.getPreferredName(), null);
|
||||
persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(jobId));
|
||||
|
|
|
@ -44,7 +44,7 @@ public class Quantiles extends ToXContentToBytes implements Writeable {
|
|||
PARSER.declareString(ConstructingObjectParser.constructorArg(), QUANTILE_STATE);
|
||||
}
|
||||
|
||||
public static String quantilesId(String jobId) {
|
||||
public static String documentId(String jobId) {
|
||||
return jobId + "-" + TYPE.getPreferredName();
|
||||
}
|
||||
|
||||
|
|
|
@ -46,6 +46,10 @@ public class CategoryDefinition extends ToXContentToBytes implements Writeable {
|
|||
PARSER.declareStringArray(CategoryDefinition::setExamples, EXAMPLES);
|
||||
}
|
||||
|
||||
public static String documentId(String jobId, String categoryId) {
|
||||
return jobId + "-" + categoryId;
|
||||
}
|
||||
|
||||
private final String jobId;
|
||||
private long id = 0L;
|
||||
private String terms = "";
|
||||
|
|
|
@ -16,7 +16,6 @@ public class ModelSizeStatsTests extends AbstractSerializingTestCase<ModelSizeSt
|
|||
|
||||
public void testDefaultConstructor() {
|
||||
ModelSizeStats stats = new ModelSizeStats.Builder("foo").build();
|
||||
assertEquals("model_size_stats", stats.getId());
|
||||
assertEquals(0, stats.getModelBytes());
|
||||
assertEquals(0, stats.getTotalByFieldCount());
|
||||
assertEquals(0, stats.getTotalOverFieldCount());
|
||||
|
@ -25,16 +24,10 @@ public class ModelSizeStatsTests extends AbstractSerializingTestCase<ModelSizeSt
|
|||
assertEquals(MemoryStatus.OK, stats.getMemoryStatus());
|
||||
}
|
||||
|
||||
|
||||
public void testSetId() {
|
||||
ModelSizeStats.Builder stats = new ModelSizeStats.Builder("foo");
|
||||
|
||||
stats.setId("bar");
|
||||
|
||||
assertEquals("bar", stats.build().getId());
|
||||
public void testDocumentId() {
|
||||
assertEquals("foo-model_size_stats", ModelSizeStats.documentId("foo"));
|
||||
}
|
||||
|
||||
|
||||
public void testSetMemoryStatus_GivenNull() {
|
||||
ModelSizeStats.Builder stats = new ModelSizeStats.Builder("foo");
|
||||
|
||||
|
@ -43,7 +36,6 @@ public class ModelSizeStatsTests extends AbstractSerializingTestCase<ModelSizeSt
|
|||
assertEquals("[memory_status] must not be null", ex.getMessage());
|
||||
}
|
||||
|
||||
|
||||
public void testSetMemoryStatus_GivenSoftLimit() {
|
||||
ModelSizeStats.Builder stats = new ModelSizeStats.Builder("foo");
|
||||
|
||||
|
|
|
@ -3,19 +3,19 @@ setup:
|
|||
index:
|
||||
index: .ml-anomalies-farequote
|
||||
type: category_definition
|
||||
id: 1
|
||||
id: farequote-1
|
||||
body: { "job_id": "farequote", "category_id": 1 }
|
||||
- do:
|
||||
index:
|
||||
index: .ml-anomalies-farequote
|
||||
type: category_definition
|
||||
id: 2
|
||||
id: farequote-2
|
||||
body: { "job_id": "farequote", "category_id": 2 }
|
||||
- do:
|
||||
index:
|
||||
index: .ml-anomalies-unrelated
|
||||
type: category_definition
|
||||
id: 3
|
||||
id: farequote-3
|
||||
body: { "job_id": "unrelated", "category_id": 1 }
|
||||
|
||||
- do:
|
||||
|
|
|
@ -56,7 +56,7 @@ setup:
|
|||
index:
|
||||
index: .ml-anomalies-job-stats-test
|
||||
type: result
|
||||
id: model_size_stats
|
||||
id: job-stats-test-model_size_stats
|
||||
body: {
|
||||
"job_id": "job-stats-test",
|
||||
"result_type": "model_size_stats",
|
||||
|
|
Loading…
Reference in New Issue