[ML] Use delete-by-query in JobDataDeleter (elastic/x-pack-elasticsearch#1274)

JobDataDeleter handles the deletion logic for 3 cases:

1. deleting a model snapshot and its state docs
2. deleting all results after a timestamp
3. deleting all interim results

The last 2 are currently implemented by manually performing
a search and scroll and then adding matching hits in a bulk
delete action. This operation is exactly what delete-by-query
does.

This commit changes JobDataDeleter to use delete-by-query. This
makes the code simpler and less error-prone. The downside is
losing some logging which seems non-critical. Unit tests for
JobDataDeleter are also removed as they are heavily mocked tests,
adding little value and high maintenance cost. This functionality
is tested by integration tests already.

relates elastic/x-pack-elasticsearch#821

Original commit: elastic/x-pack-elasticsearch@7da91332bd
This commit is contained in:
Dimitris Athanasiou 2017-05-03 14:51:41 +01:00 committed by GitHub
parent 3e9c36838d
commit 3f73748d14
10 changed files with 133 additions and 341 deletions

View File

@ -25,17 +25,18 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
public class DeleteModelSnapshotAction extends Action<DeleteModelSnapshotAction.Request,
@ -180,8 +181,7 @@ public class DeleteModelSnapshotAction extends Action<DeleteModelSnapshotAction.
// Delete the snapshot and any associated state files
JobDataDeleter deleter = new JobDataDeleter(client, request.getJobId());
deleter.deleteModelSnapshot(deleteCandidate);
deleter.commit(new ActionListener<BulkResponse>() {
deleter.deleteModelSnapshots(Collections.singletonList(deleteCandidate), new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOT_DELETED,
@ -194,8 +194,7 @@ public class DeleteModelSnapshotAction extends Action<DeleteModelSnapshotAction.
public void onFailure(Exception e) {
listener.onFailure(e);
}
},
true);
});
}, listener::onFailure);
}

View File

@ -45,6 +45,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.security.InternalClient;
import java.io.IOException;
import java.util.Date;
@ -260,8 +261,8 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
@Inject
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager, JobProvider jobProvider,
ClusterService clusterService, Client client, JobDataCountsPersister jobDataCountsPersister) {
IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager, JobProvider jobProvider,
ClusterService clusterService, InternalClient client, JobDataCountsPersister jobDataCountsPersister) {
super(settings, NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, Request::new);
this.client = client;
this.jobManager = jobManager;
@ -330,16 +331,11 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
logger.info("Deleting results after '" + deleteAfter + "'");
// TODO JobDataDeleter is basically delete-by-query.
// We should replace this whole abstraction with DBQ eventually (See #821)
JobDataDeleter dataDeleter = new JobDataDeleter(client, jobId);
dataDeleter.deleteResultsFromTime(deleteAfter.getTime() + 1, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean success) {
dataDeleter.commit(ActionListener.wrap(
bulkItemResponses -> {listener.onResponse(response);},
listener::onFailure),
true);
listener.onResponse(response);
}
@Override

View File

@ -5,15 +5,14 @@
*/
package org.elasticsearch.xpack.ml.job.persistence;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
@ -22,44 +21,56 @@ import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.common.action.XPackDeleteByQueryAction;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Result;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
public class JobDataDeleter {
private static final Logger LOGGER = Loggers.getLogger(JobDataDeleter.class);
private static final int SCROLL_SIZE = 1000;
private static final String SCROLL_CONTEXT_DURATION = "5m";
private final Client client;
private final String jobId;
private final BulkRequestBuilder bulkRequestBuilder;
private long deletedResultCount;
private long deletedModelSnapshotCount;
private long deletedModelStateCount;
private boolean quiet;
public JobDataDeleter(Client client, String jobId) {
this(client, jobId, false);
}
public JobDataDeleter(Client client, String jobId, boolean quiet) {
this.client = Objects.requireNonNull(client);
this.jobId = Objects.requireNonNull(jobId);
bulkRequestBuilder = client.prepareBulk();
deletedResultCount = 0;
deletedModelSnapshotCount = 0;
deletedModelStateCount = 0;
this.quiet = quiet;
}
/**
* Delete a list of model snapshots and their corresponding state documents.
*
* @param modelSnapshots the model snapshots to delete
*/
public void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, ActionListener<BulkResponse> listener) {
if (modelSnapshots.isEmpty()) {
listener.onResponse(new BulkResponse(new BulkItemResponse[0], 0L));
return;
}
String stateIndexName = AnomalyDetectorsIndex.jobStateIndexName();
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
for (ModelSnapshot modelSnapshot : modelSnapshots) {
for (String stateDocId : modelSnapshot.stateDocumentIds()) {
bulkRequestBuilder.add(client.prepareDelete(stateIndexName, ModelState.TYPE.getPreferredName(), stateDocId));
}
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()),
ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.documentId(modelSnapshot)));
}
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try {
bulkRequestBuilder.execute(listener);
} catch (Exception e) {
listener.onFailure(e);
}
}
/**
@ -69,181 +80,60 @@ public class JobDataDeleter {
* @param listener Response listener
*/
public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> listener) {
String index = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
DeleteByQueryHolder deleteByQueryHolder = new DeleteByQueryHolder(AnomalyDetectorsIndex.jobResultsAliasedName(jobId));
deleteByQueryHolder.dbqRequest.setRefresh(true);
RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName());
timeRange.gte(cutoffEpochMs);
deleteByQueryHolder.searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
deleteByQueryHolder.searchRequest.types(Result.TYPE.getPreferredName());
deleteByQueryHolder.searchRequest.source(new SearchSourceBuilder().query(timeRange));
client.execute(XPackDeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest, new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
listener.onResponse(true);
}
RepeatingSearchScrollListener scrollSearchListener = new RepeatingSearchScrollListener(index, listener);
client.prepareSearch(index)
.setTypes(Result.TYPE.getPreferredName())
.setFetchSource(false)
.setQuery(timeRange)
.setScroll(SCROLL_CONTEXT_DURATION)
.setSize(SCROLL_SIZE)
.execute(scrollSearchListener);
}
private void addDeleteRequestForSearchHits(SearchHits hits, String index) {
for (SearchHit hit : hits.getHits()) {
LOGGER.trace("Search hit for result: {}", hit.getId());
addDeleteRequest(hit, index);
}
deletedResultCount = hits.getTotalHits();
}
private void addDeleteRequest(SearchHit hit, String index) {
DeleteRequestBuilder deleteRequest = DeleteAction.INSTANCE.newRequestBuilder(client)
.setIndex(index)
.setType(hit.getType())
.setId(hit.getId());
bulkRequestBuilder.add(deleteRequest);
}
/**
* Delete a {@code ModelSnapshot}
*
* @param modelSnapshot the model snapshot to delete
*/
public void deleteModelSnapshot(ModelSnapshot modelSnapshot) {
String snapshotDocId = ModelSnapshot.documentId(modelSnapshot);
int docCount = modelSnapshot.getSnapshotDocCount();
String stateIndexName = AnomalyDetectorsIndex.jobStateIndexName();
// Deduce the document IDs of the state documents from the information
// in the snapshot document - we cannot query the state itself as it's
// too big and has no mappings.
// Note: state docs are 1-based
for (int i = 1; i <= docCount; ++i) {
String stateId = snapshotDocId + '#' + i;
bulkRequestBuilder.add(client.prepareDelete(stateIndexName, ModelState.TYPE.getPreferredName(), stateId));
++deletedModelStateCount;
}
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()),
ModelSnapshot.TYPE.getPreferredName(), snapshotDocId));
++deletedModelSnapshotCount;
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
/**
* Delete all results marked as interim
*/
public void deleteInterimResults() {
String index = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId));
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
request.setRefresh(false);
request.setSlices(5);
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
searchRequest.types(Result.TYPE.getPreferredName());
QueryBuilder qb = QueryBuilders.termQuery(Bucket.IS_INTERIM.getPreferredName(), true);
searchRequest.source(new SearchSourceBuilder().query(new ConstantScoreQueryBuilder(qb)));
SearchResponse searchResponse = client.prepareSearch(index)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setTypes(Result.TYPE.getPreferredName())
.setQuery(new ConstantScoreQueryBuilder(qb))
.setFetchSource(false)
.setScroll(SCROLL_CONTEXT_DURATION)
.setSize(SCROLL_SIZE)
.get();
long totalHits = searchResponse.getHits().getTotalHits();
long totalDeletedCount = 0;
while (totalDeletedCount < totalHits) {
for (SearchHit hit : searchResponse.getHits()) {
LOGGER.trace("Search hit for result: {}", hit.getId());
++totalDeletedCount;
addDeleteRequest(hit, index);
++deletedResultCount;
}
searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(SCROLL_CONTEXT_DURATION).get();
}
clearScroll(searchResponse.getScrollId());
}
private void clearScroll(String scrollId) {
try {
client.prepareClearScroll().addScrollId(scrollId).get();
client.execute(XPackDeleteByQueryAction.INSTANCE, request).get();
} catch (Exception e) {
LOGGER.warn("[{}] Error while clearing scroll with id [{}]", jobId, scrollId);
LOGGER.error("[" + jobId + "] An error occurred while deleting interim results", e);
}
}
/**
* Commit the deletions without enforcing the removal of data from disk.
* @param listener Response listener
* @param refresh If true a refresh is forced with request policy
* {@link WriteRequest.RefreshPolicy#IMMEDIATE} else the default
*/
public void commit(ActionListener<BulkResponse> listener, boolean refresh) {
if (bulkRequestBuilder.numberOfActions() == 0) {
listener.onResponse(new BulkResponse(new BulkItemResponse[0], 0L));
return;
}
// Wrapper to ensure safety
private static class DeleteByQueryHolder {
Level logLevel = quiet ? Level.DEBUG : Level.INFO;
LOGGER.log(logLevel, "Requesting deletion of {} results, {} model snapshots and {} model state documents",
deletedResultCount, deletedModelSnapshotCount, deletedModelStateCount);
private final SearchRequest searchRequest;
private final DeleteByQueryRequest dbqRequest;
if (refresh) {
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
try {
bulkRequestBuilder.execute(listener);
} catch (Exception e) {
listener.onFailure(e);
private DeleteByQueryHolder(String index) {
// The search request has to be constructed and passed to the DeleteByQueryRequest before more details are set to it
searchRequest = new SearchRequest(index);
dbqRequest = new DeleteByQueryRequest(searchRequest);
dbqRequest.setSlices(5);
dbqRequest.setAbortOnVersionConflict(false);
}
}
/**
* Blocking version of {@linkplain #commit(ActionListener, boolean)}
*/
public void commit(boolean refresh) {
if (bulkRequestBuilder.numberOfActions() == 0) {
return;
}
Level logLevel = quiet ? Level.DEBUG : Level.INFO;
LOGGER.log(logLevel, "Requesting deletion of {} results, {} model snapshots and {} model state documents",
deletedResultCount, deletedModelSnapshotCount, deletedModelStateCount);
if (refresh) {
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
BulkResponse response = bulkRequestBuilder.get();
if (response.hasFailures()) {
LOGGER.debug("Bulk request has failures. {}", response.buildFailureMessage());
}
}
/**
* Repeats a scroll search adding the hits to the bulk delete request
*/
private class RepeatingSearchScrollListener implements ActionListener<SearchResponse> {
private final AtomicLong totalDeletedCount;
private final String index;
private final ActionListener<Boolean> scrollFinishedListener;
RepeatingSearchScrollListener(String index, ActionListener<Boolean> scrollFinishedListener) {
totalDeletedCount = new AtomicLong(0L);
this.index = index;
this.scrollFinishedListener = scrollFinishedListener;
}
@Override
public void onResponse(SearchResponse searchResponse) {
addDeleteRequestForSearchHits(searchResponse.getHits(), index);
totalDeletedCount.addAndGet(searchResponse.getHits().getHits().length);
if (totalDeletedCount.get() < searchResponse.getHits().getTotalHits()) {
client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(SCROLL_CONTEXT_DURATION).execute(this);
}
else {
clearScroll(searchResponse.getScrollId());
scrollFinishedListener.onResponse(true);
}
}
@Override
public void onFailure(Exception e) {
scrollFinishedListener.onFailure(e);
}
};
}

View File

@ -88,7 +88,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
@ -868,24 +867,19 @@ public class JobProvider {
public void restoreStateToStream(String jobId, ModelSnapshot modelSnapshot, OutputStream restoreStream) throws IOException {
String indexName = AnomalyDetectorsIndex.jobStateIndexName();
// First try to restore model state.
int numDocs = modelSnapshot.getSnapshotDocCount();
for (int docNum = 1; docNum <= numDocs; ++docNum) {
String docId = String.format(Locale.ROOT, "%s#%d", ModelSnapshot.documentId(modelSnapshot), docNum);
for (String stateDocId : modelSnapshot.stateDocumentIds()) {
LOGGER.trace("ES API CALL: get ID {} type {} from index {}", stateDocId, ModelState.TYPE, indexName);
LOGGER.trace("ES API CALL: get ID {} type {} from index {}", docId, ModelState.TYPE, indexName);
GetResponse stateResponse = client.prepareGet(indexName, ModelState.TYPE.getPreferredName(), docId).get();
GetResponse stateResponse = client.prepareGet(indexName, ModelState.TYPE.getPreferredName(), stateDocId).get();
if (!stateResponse.isExists()) {
LOGGER.error("Expected {} documents for model state for {} snapshot {} but failed to find {}",
numDocs, jobId, modelSnapshot.getSnapshotId(), docId);
modelSnapshot.getSnapshotDocCount(), jobId, modelSnapshot.getSnapshotId(), stateDocId);
break;
}
writeStateToStream(stateResponse.getSourceAsBytesRef(), restoreStream);
}
// Secondly try to restore categorizer state. This must come after model state because that's
// the order the C++ process expects.
// There are no snapshots for this, so the IDs simply

View File

@ -272,9 +272,7 @@ public class JobResultsPersister extends AbstractComponent {
* Delete any existing interim results synchronously
*/
public void deleteInterimResults(String jobId) {
JobDataDeleter deleter = new JobDataDeleter(client, jobId, true);
deleter.deleteInterimResults();
deleter.commit(false);
new JobDataDeleter(client, jobId).deleteInterimResults();
}
/**

View File

@ -117,14 +117,8 @@ public class JobStorageDeletionTask extends Task {
jobProvider.modelSnapshots(jobId, 0, 10000,
page -> {
List<ModelSnapshot> deleteCandidates = page.results();
// Delete the snapshot and any associated state files
JobDataDeleter deleter = new JobDataDeleter(client, jobId);
for (ModelSnapshot deleteCandidate : deleteCandidates) {
deleter.deleteModelSnapshot(deleteCandidate);
}
deleter.commit(listener, true);
deleter.deleteModelSnapshots(deleteCandidates, listener);
},
listener::onFailure);
}

View File

@ -23,7 +23,9 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
@ -258,6 +260,16 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
&& this.retain == that.retain;
}
public List<String> stateDocumentIds() {
String prefix = documentId(this);
List<String> stateDocumentIds = new ArrayList<>(snapshotDocCount);
// The state documents count suffices are 1-based
for (int i = 1; i <= snapshotDocCount; i++) {
stateDocumentIds.add(prefix + '#' + i);
}
return stateDocumentIds;
}
public static String documentId(ModelSnapshot snapshot) {
return documentId(snapshot.getJobId(), snapshot.getSnapshotId());
}

View File

@ -13,8 +13,11 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.XPackSingleNodeTestCase;
import org.elasticsearch.xpack.ml.MachineLearningTemplateRegistry;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.JobTests;
@ -44,6 +47,7 @@ import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
@ -61,7 +65,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
private static final String JOB_ID = "autodetect-result-processor-it-job";
private Renormalizer renormalizer;
@ -70,6 +74,22 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
private List<ModelSnapshot> capturedUpdateModelSnapshotOnJobRequests;
private AutoDetectResultProcessor resultProcessor;
@Override
protected Settings nodeSettings() {
Settings.Builder newSettings = Settings.builder();
newSettings.put(super.nodeSettings());
// Disable security otherwise delete-by-query action fails to get authorized
newSettings.put(XPackSettings.SECURITY_ENABLED.getKey(), false);
newSettings.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
newSettings.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
return newSettings.build();
}
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(XPackPlugin.class);
}
@Before
public void createComponents() {
renormalizer = new NoOpRenormalizer();
@ -320,7 +340,6 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
private class ResultsBuilder {
private List<AutodetectResult> results = new ArrayList<>();
FlushAcknowledgement flushAcknowledgement;
ResultsBuilder addBucket(Bucket bucket) {
results.add(new AutodetectResult(Objects.requireNonNull(bucket), null, null, null, null, null, null, null, null));

View File

@ -1,123 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import static org.elasticsearch.mock.orig.Mockito.times;
import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
public class JobDataDeleterTests extends ESTestCase {
public void testDeleteResultsFromTime() {
final long TOTAL_HIT_COUNT = 100L;
SearchResponse response = createSearchResponseWithHits(TOTAL_HIT_COUNT);
BulkResponse bulkResponse = Mockito.mock(BulkResponse.class);
Client client = new MockClientBuilder("myCluster")
.prepareSearchExecuteListener(AnomalyDetectorsIndex.jobResultsAliasedName("foo"), response)
.prepareSearchScrollExecuteListener(response)
.prepareBulk(bulkResponse).build();
JobDataDeleter bulkDeleter = new JobDataDeleter(client, "foo");
// because of the mocking this runs in the current thread
bulkDeleter.deleteResultsFromTime(new Date().getTime(), new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
assertTrue(aBoolean);
}
@Override
public void onFailure(Exception e) {
fail(e.toString());
}
});
verify(client.prepareBulk(), times((int)TOTAL_HIT_COUNT)).add(any(DeleteRequestBuilder.class));
ActionListener<BulkResponse> bulkListener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
}
@Override
public void onFailure(Exception e) {
fail(e.toString());
}
};
when(client.prepareBulk().numberOfActions()).thenReturn(new Integer((int)TOTAL_HIT_COUNT));
bulkDeleter.commit(bulkListener, true);
verify(client.prepareBulk(), times(1)).execute(bulkListener);
verify(client.prepareBulk(), times(1)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
bulkDeleter.commit(bulkListener, false);
verify(client.prepareBulk(), times(2)).execute(bulkListener);
verify(client.prepareBulk(), times(1)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
public void testDeleteModelSnapShot() {
String jobId = "foo";
ModelSnapshot snapshot = new ModelSnapshot.Builder(jobId).setSnapshotDocCount(5)
.setSnapshotId("snap-1").build();
BulkResponse bulkResponse = Mockito.mock(BulkResponse.class);
Client client = new MockClientBuilder("myCluster").prepareBulk(bulkResponse).build();
JobDataDeleter bulkDeleter = new JobDataDeleter(client, jobId);
bulkDeleter.deleteModelSnapshot(snapshot);
verify(client, times(5))
.prepareDelete(eq(AnomalyDetectorsIndex.jobStateIndexName()), eq(ModelState.TYPE.getPreferredName()), anyString());
verify(client, times(1))
.prepareDelete(eq(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)), eq(ModelSnapshot.TYPE.getPreferredName()),
eq("foo-snap-1"));
}
private SearchResponse createSearchResponseWithHits(long totalHitCount) {
SearchHits hits = mockSearchHits(totalHitCount);
SearchResponse searchResponse = Mockito.mock(SearchResponse.class);
when(searchResponse.getHits()).thenReturn(hits);
when(searchResponse.getScrollId()).thenReturn("scroll1");
return searchResponse;
}
private SearchHits mockSearchHits(long totalHitCount) {
List<SearchHit> hitList = new ArrayList<>();
for (int i=0; i<20; i++) {
SearchHit hit = new SearchHit(123, "mockSeachHit-" + i,
new Text("mockSearchHit"), Collections.emptyMap());
hitList.add(hit);
}
return new SearchHits(hitList.toArray(new SearchHit[0]), totalHitCount, 1);
}
}

View File

@ -10,8 +10,11 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
import java.util.Arrays;
import java.util.Date;
import static org.hamcrest.Matchers.equalTo;
public class ModelSnapshotTests extends AbstractSerializingTestCase<ModelSnapshot> {
private static final Date DEFAULT_TIMESTAMP = new Date();
private static final String DEFAULT_DESCRIPTION = "a snapshot";
@ -177,4 +180,14 @@ public class ModelSnapshotTests extends AbstractSerializingTestCase<ModelSnapsho
assertEquals("foo-2", ModelSnapshot.documentId(snapshot2));
assertEquals("bar-1", ModelSnapshot.documentId(snapshot3));
}
public void testStateDocumentIds_GivenDocCountIsOne() {
ModelSnapshot snapshot = new ModelSnapshot.Builder("foo").setSnapshotId("1").setSnapshotDocCount(1).build();
assertThat(snapshot.stateDocumentIds(), equalTo(Arrays.asList("foo-1#1")));
}
public void testStateDocumentIds_GivenDocCountIsThree() {
ModelSnapshot snapshot = new ModelSnapshot.Builder("foo").setSnapshotId("123456789").setSnapshotDocCount(3).build();
assertThat(snapshot.stateDocumentIds(), equalTo(Arrays.asList("foo-123456789#1", "foo-123456789#2", "foo-123456789#3")));
}
}