[ML] Add document type to ID (elastic/x-pack-elasticsearch#1525)

* Add document type to ID

* Delete v5.4 quantiles

Original commit: elastic/x-pack-elasticsearch@d1f383b972
This commit is contained in:
David Kyle 2017-05-24 11:43:25 +01:00 committed by GitHub
parent fc2d1266f4
commit b083062689
14 changed files with 89 additions and 22 deletions

View File

@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
@ -94,7 +95,7 @@ public class JobStorageDeletionTask extends Task {
failureHandler);
// Step 3. Delete quantiles done, delete the categorizer state
ActionListener<DeleteResponse> deleteQuantilesHandler = ActionListener.wrap(
ActionListener<Boolean> deleteQuantilesHandler = ActionListener.wrap(
response -> deleteCategorizerState(jobId, client, deleteCategorizerStateHandler),
failureHandler);
@ -107,14 +108,19 @@ public class JobStorageDeletionTask extends Task {
deleteModelState(jobId, client, deleteStateHandler);
}
private void deleteQuantiles(String jobId, Client client, ActionListener<DeleteResponse> finishedHandler) {
client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), Quantiles.TYPE.getPreferredName(), Quantiles.documentId(jobId))
.execute(ActionListener.wrap(
finishedHandler::onResponse,
private void deleteQuantiles(String jobId, Client client, ActionListener<Boolean> finishedHandler) {
// The quantiles doc Id changed in v5.5 so delete both the old and new format
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), Quantiles.TYPE.getPreferredName(),
Quantiles.documentId(jobId)));
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), Quantiles.TYPE.getPreferredName(),
jobId + "-" + Quantiles.TYPE.getPreferredName()));
bulkRequestBuilder.execute(ActionListener.wrap(
response -> finishedHandler.onResponse(true),
e -> {
// It's not a problem for us if the index wasn't found - it's equivalent to document not found
if (e instanceof IndexNotFoundException) {
finishedHandler.onResponse(new DeleteResponse());
finishedHandler.onResponse(true);
} else {
finishedHandler.onFailure(e);
}

View File

@ -36,7 +36,7 @@ import java.util.Objects;
public class DataCounts extends ToXContentToBytes implements Writeable {
private static final String DOCUMENT_SUFFIX = "-data-counts";
private static final String DOCUMENT_SUFFIX = "_data_counts";
public static final String PROCESSED_RECORD_COUNT_STR = "processed_record_count";
public static final String PROCESSED_FIELD_COUNT_STR = "processed_field_count";
public static final String INPUT_BYTES_STR = "input_bytes";

View File

@ -260,8 +260,12 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
&& this.retain == that.retain;
}
private String stateDocumentPrefix() {
return jobId + "-" + snapshotId;
}
public List<String> stateDocumentIds() {
String prefix = documentId(this);
String prefix = stateDocumentPrefix();
List<String> stateDocumentIds = new ArrayList<>(snapshotDocCount);
// The state documents count suffices are 1-based
for (int i = 1; i <= snapshotDocCount; i++) {
@ -275,7 +279,7 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
}
public static String documentId(String jobId, String snapshotId) {
return jobId + "-" + snapshotId;
return jobId + "_model_snapshot_" + snapshotId;
}
public static ModelSnapshot fromJson(BytesReference bytesReference) {

View File

@ -45,7 +45,7 @@ public class Quantiles extends ToXContentToBytes implements Writeable {
}
public static String documentId(String jobId) {
return jobId + "-" + TYPE.getPreferredName();
return jobId + "_" + TYPE.getPreferredName();
}
private final String jobId;

View File

@ -109,7 +109,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
assertThat(getRecords(job.getId()).size(), equalTo(1));
List<ModelSnapshot> modelSnapshots = getModelSnapshots(job.getId());
assertThat(modelSnapshots.size(), equalTo(1));
String snapshotDocId = job.getId() + "-" + modelSnapshots.get(0).getSnapshotId();
String snapshotDocId = ModelSnapshot.documentId(modelSnapshots.get(0));
// Update snapshot timestamp to force it out of snapshot retention window
String snapshotUpdate = "{ \"timestamp\": " + oneDayAgo + "}";

View File

@ -172,7 +172,7 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
private static DataCounts getDataCountsFromIndex(String jobId) {
SearchResponse searchResponse = client().prepareSearch()
.setTypes(DataCounts.TYPE.getPreferredName())
.setQuery(QueryBuilders.idsQuery().addIds(jobId + "-data-counts"))
.setQuery(QueryBuilders.idsQuery().addIds(DataCounts.documentId(jobId)))
.get();
if (searchResponse.getHits().getTotalHits() != 1) {
return new DataCounts(jobId);

View File

@ -134,6 +134,12 @@ public class DataCountsTests extends AbstractSerializingTestCase<DataCounts> {
assertEquals(new Date(100L), counts.getEarliestRecordTimeStamp());
}
public void testDocumentId() {
DataCounts dataCounts = createTestInstance();
String jobId = dataCounts.getJobid();
assertEquals(jobId + "_data_counts", DataCounts.documentId(jobId));
}
private void assertAllFieldsEqualZero(DataCounts stats) throws Exception {
assertEquals(0L, stats.getProcessedRecordCount());
assertEquals(0L, stats.getProcessedFieldCount());

View File

@ -176,9 +176,9 @@ public class ModelSnapshotTests extends AbstractSerializingTestCase<ModelSnapsho
ModelSnapshot snapshot2 = new ModelSnapshot.Builder("foo").setSnapshotId("2").build();
ModelSnapshot snapshot3 = new ModelSnapshot.Builder("bar").setSnapshotId("1").build();
assertEquals("foo-1", ModelSnapshot.documentId(snapshot1));
assertEquals("foo-2", ModelSnapshot.documentId(snapshot2));
assertEquals("bar-1", ModelSnapshot.documentId(snapshot3));
assertEquals("foo_model_snapshot_1", ModelSnapshot.documentId(snapshot1));
assertEquals("foo_model_snapshot_2", ModelSnapshot.documentId(snapshot2));
assertEquals("bar_model_snapshot_1", ModelSnapshot.documentId(snapshot3));
}
public void testStateDocumentIds_GivenDocCountIsOne() {

View File

@ -54,6 +54,11 @@ public class QuantilesTests extends AbstractSerializingTestCase<Quantiles> {
assertEquals(quantiles1.hashCode(), quantiles2.hashCode());
}
public void testDocumentId() {
Quantiles quantiles = createTestInstance();
String jobId = quantiles.getJobId();
assertEquals(jobId + "_quantiles", Quantiles.documentId(jobId));
}
@Override
protected Quantiles createTestInstance() {

View File

@ -29,7 +29,7 @@ setup:
index:
index: .ml-anomalies-delete-model-snapshot
type: model_snapshot
id: "delete-model-snapshot-inactive-snapshot"
id: "delete-model-snapshot_model_snapshot_inactive-snapshot"
body: >
{
"job_id": "delete-model-snapshot",
@ -65,7 +65,7 @@ setup:
index:
index: .ml-anomalies-delete-model-snapshot
type: model_snapshot
id: "delete-model-snapshot-active-snapshot"
id: "delete-model-snapshot_model_snapshot_active-snapshot"
body: >
{
"job_id": "delete-model-snapshot",

View File

@ -402,3 +402,49 @@
index: .ml-anomalies-foo
- match: {count: 2}
---
"Test delete removes 5.4 and 5.5 quantiles":
- do:
xpack.ml.put_job:
job_id: index-layout-quantiles-job
body: >
{
"analysis_config" : {
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"xcontent"
}
}
- match: { job_id: "index-layout-quantiles-job" }
- do:
index:
index: .ml-state
type: quantiles
id: index-layout-quantiles-job_quantiles
body:
state: quantile-state
- do:
index:
index: .ml-state
type: quantiles
id: index-layout-quantiles-job-quantiles
body:
state: quantile-state
- do:
xpack.ml.delete_job:
job_id: "index-layout-quantiles-job"
- match: { acknowledged: true }
- do:
indices.refresh: {}
- do:
count:
index: .ml-state
- match: {count: 0}

View File

@ -81,7 +81,7 @@ setup:
get:
index: .ml-anomalies-post-data-job
type: data_counts
id: post-data-job-data-counts
id: post-data-job_data_counts
- match: { _source.processed_record_count: 2 }
- match: { _source.processed_field_count: 4}

View File

@ -29,7 +29,7 @@ setup:
index:
index: .ml-anomalies-revert-model-snapshot
type: model_snapshot
id: "revert-model-snapshot-first"
id: "revert-model-snapshot_model_snapshot_first"
body: >
{
"job_id": "revert-model-snapshot",
@ -54,7 +54,7 @@ setup:
index:
index: .ml-anomalies-revert-model-snapshot
type: model_snapshot
id: "revert-model-snapshot-second"
id: "revert-model-snapshot_model_snapshot_second"
body: >
{
"job_id": "revert-model-snapshot",

View File

@ -17,7 +17,7 @@ setup:
index:
index: .ml-anomalies-update-model-snapshot
type: model_snapshot
id: "update-model-snapshot-snapshot-1"
id: "update-model-snapshot_model_snapshot_snapshot-1"
body: >
{
"job_id" : "update-model-snapshot",
@ -31,7 +31,7 @@ setup:
index:
index: .ml-anomalies-update-model-snapshot
type: model_snapshot
id: "update-model-snapshot-snapshot-2"
id: "update-model-snapshot_model_snapshot_snapshot-2"
body: >
{
"job_id": "update-model-snapshot",