[ML] Refresh indices after job deletion (elastic/x-pack-elasticsearch#1174)

* Refresh indices after job deletion

* Make refresh index option explicit

Original commit: elastic/x-pack-elasticsearch@d3c819966b
This commit is contained in:
David Kyle 2017-04-24 13:55:36 +01:00 committed by GitHub
parent 9a4fa90ada
commit 0b267242f1
6 changed files with 29 additions and 10 deletions

View File

@ -194,7 +194,8 @@ public class DeleteModelSnapshotAction extends Action<DeleteModelSnapshotAction.
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
},
true);
}, listener::onFailure);
}

View File

@ -338,7 +338,8 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
public void onResponse(Boolean success) {
dataDeleter.commit(ActionListener.wrap(
bulkItemResponses -> {listener.onResponse(response);},
listener::onFailure));
listener::onFailure),
true);
}
@Override

View File

@ -14,6 +14,7 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
@ -156,9 +157,12 @@ public class JobDataDeleter {
}
/**
* Commit the deletions without enforcing the removal of data from disk
* Commit the deletions without enforcing the removal of data from disk.
* @param listener Response listener
* @param refresh If true a refresh is forced with request policy
* {@link WriteRequest.RefreshPolicy#IMMEDIATE} else the default
*/
public void commit(ActionListener<BulkResponse> listener) {
public void commit(ActionListener<BulkResponse> listener, boolean refresh) {
if (bulkRequestBuilder.numberOfActions() == 0) {
listener.onResponse(new BulkResponse(new BulkItemResponse[0], 0L));
return;
@ -168,6 +172,9 @@ public class JobDataDeleter {
LOGGER.log(logLevel, "Requesting deletion of {} results, {} model snapshots and {} model state documents",
deletedResultCount, deletedModelSnapshotCount, deletedModelStateCount);
if (refresh) {
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
try {
bulkRequestBuilder.execute(listener);
} catch (Exception e) {
@ -176,9 +183,9 @@ public class JobDataDeleter {
}
/**
* Blocking version of {@linkplain #commit(ActionListener)}
* Blocking version of {@linkplain #commit(ActionListener, boolean)}
*/
public void commit() {
public void commit(boolean refresh) {
if (bulkRequestBuilder.numberOfActions() == 0) {
return;
}
@ -186,7 +193,9 @@ public class JobDataDeleter {
Level logLevel = quiet ? Level.DEBUG : Level.INFO;
LOGGER.log(logLevel, "Requesting deletion of {} results, {} model snapshots and {} model state documents",
deletedResultCount, deletedModelSnapshotCount, deletedModelStateCount);
if (refresh) {
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
BulkResponse response = bulkRequestBuilder.get();
if (response.hasFailures()) {
LOGGER.debug("Bulk request has failures. {}", response.buildFailureMessage());

View File

@ -274,7 +274,7 @@ public class JobResultsPersister extends AbstractComponent {
public void deleteInterimResults(String jobId) {
JobDataDeleter deleter = new JobDataDeleter(client, jobId, true);
deleter.deleteInterimResults();
deleter.commit();
deleter.commit(false);
}
/**

View File

@ -87,6 +87,7 @@ public class JobStorageDeletionTask extends Task {
searchRequest.indicesOptions(JobProvider.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
request.setSlices(5);
request.setAbortOnVersionConflict(false);
request.setRefresh(true);
client.execute(XPackDeleteByQueryAction.INSTANCE, request, dbqHandler);
},
@ -123,7 +124,7 @@ public class JobStorageDeletionTask extends Task {
deleter.deleteModelSnapshot(deleteCandidate);
}
deleter.commit(listener);
deleter.commit(listener, true);
},
listener::onFailure);
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.search.SearchHit;
@ -23,6 +24,7 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
import static org.elasticsearch.mock.orig.Mockito.never;
import static org.elasticsearch.mock.orig.Mockito.times;
import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.mockito.Matchers.any;
@ -73,9 +75,14 @@ public class JobDataDeleterTests extends ESTestCase {
};
when(client.prepareBulk().numberOfActions()).thenReturn(new Integer((int)TOTAL_HIT_COUNT));
bulkDeleter.commit(bulkListener);
bulkDeleter.commit(bulkListener, true);
verify(client.prepareBulk(), times(1)).execute(bulkListener);
verify(client.prepareBulk(), times(1)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
bulkDeleter.commit(bulkListener, false);
verify(client.prepareBulk(), times(2)).execute(bulkListener);
verify(client.prepareBulk(), times(1)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
public void testDeleteModelSnapShot() {