Make the bulk results deleter non-blocking (elastic/elasticsearch#456)
* Make deleting old results a non blocking operation * Add test for ElasticsearchBulkDeleter Original commit: elastic/x-pack-elasticsearch@9fd9fb0b02
This commit is contained in:
parent
d530edc263
commit
385ec37bc3
|
@ -20,6 +20,7 @@ import org.elasticsearch.index.query.QueryBuilder;
|
|||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.query.RangeQueryBuilder;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHits;
|
||||
import org.elasticsearch.search.sort.SortBuilders;
|
||||
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
|
||||
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
|
||||
|
@ -30,6 +31,7 @@ import org.elasticsearch.xpack.prelert.job.results.Result;
|
|||
|
||||
import java.util.Date;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class ElasticsearchBulkDeleter implements JobDataDeleter {
|
||||
private static final Logger LOGGER = Loggers.getLogger(ElasticsearchBulkDeleter.class);
|
||||
|
@ -60,33 +62,30 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void deleteResultsFromTime(long cutoffEpochMs) {
|
||||
public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> listener) {
|
||||
String index = JobResultsPersister.getJobIndexName(jobId);
|
||||
|
||||
RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(ElasticsearchMappings.ES_TIMESTAMP);
|
||||
timeRange.gte(cutoffEpochMs);
|
||||
timeRange.lt(new Date().getTime());
|
||||
|
||||
SearchResponse searchResponse = client.prepareSearch(index)
|
||||
RepeatingSearchScrollListener scrollSearchListener = new RepeatingSearchScrollListener(index, listener);
|
||||
|
||||
client.prepareSearch(index)
|
||||
.setTypes(Result.TYPE.getPreferredName())
|
||||
.setFetchSource(false)
|
||||
.setQuery(timeRange)
|
||||
.setScroll(SCROLL_CONTEXT_DURATION)
|
||||
.setSize(SCROLL_SIZE)
|
||||
.get();
|
||||
.execute(scrollSearchListener);
|
||||
}
|
||||
|
||||
String scrollId = searchResponse.getScrollId();
|
||||
long totalHits = searchResponse.getHits().totalHits();
|
||||
long totalDeletedCount = 0;
|
||||
while (totalDeletedCount < totalHits) {
|
||||
for (SearchHit hit : searchResponse.getHits()) {
|
||||
LOGGER.trace("Search hit for result: {}", hit.getId());
|
||||
++totalDeletedCount;
|
||||
addDeleteRequest(hit, index);
|
||||
++deletedResultCount;
|
||||
}
|
||||
searchResponse = client.prepareSearchScroll(scrollId).setScroll(SCROLL_CONTEXT_DURATION).get();
|
||||
private void addDeleteRequestForSearchHits(SearchHits hits, String index) {
|
||||
for (SearchHit hit : hits.hits()) {
|
||||
LOGGER.trace("Search hit for result: {}", hit.getId());
|
||||
addDeleteRequest(hit, index);
|
||||
}
|
||||
deletedResultCount = hits.getTotalHits();
|
||||
}
|
||||
|
||||
private void addDeleteRequest(SearchHit hit, String index) {
|
||||
|
@ -179,4 +178,39 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter {
|
|||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Repeats a scroll search adding the hits a bulk delete request
|
||||
*/
|
||||
private class RepeatingSearchScrollListener implements ActionListener<SearchResponse> {
|
||||
|
||||
private final AtomicLong totalDeletedCount;
|
||||
private final String index;
|
||||
private final ActionListener<Boolean> scrollFinishedListener;
|
||||
|
||||
RepeatingSearchScrollListener(String index, ActionListener<Boolean> scrollFinishedListener) {
|
||||
totalDeletedCount = new AtomicLong(0L);
|
||||
this.index = index;
|
||||
this.scrollFinishedListener = scrollFinishedListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(SearchResponse searchResponse) {
|
||||
addDeleteRequestForSearchHits(searchResponse.getHits(), index);
|
||||
|
||||
totalDeletedCount.addAndGet(searchResponse.getHits().hits().length);
|
||||
if (totalDeletedCount.get() < searchResponse.getHits().totalHits()) {
|
||||
client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(SCROLL_CONTEXT_DURATION)
|
||||
.execute(this);
|
||||
}
|
||||
else {
|
||||
scrollFinishedListener.onResponse(true);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
scrollFinishedListener.onFailure(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -14,11 +14,12 @@ import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
|
|||
public interface JobDataDeleter {
|
||||
|
||||
/**
|
||||
* Delete all result types (Buckets, Records, Influencers) from {@code cutOffTime}
|
||||
* Asynchronously delete all result types (Buckets, Records, Influencers) from {@code cutOffTime}
|
||||
*
|
||||
* @param cutoffEpochMs Results at and after this time will be deleted
|
||||
* @param listener Response listener
|
||||
*/
|
||||
void deleteResultsFromTime(long cutoffEpochMs);
|
||||
void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> listener);
|
||||
|
||||
/**
|
||||
* Delete a {@code ModelSnapshot}
|
||||
|
|
|
@ -28,7 +28,18 @@ public class OldDataRemover {
|
|||
*/
|
||||
public void deleteResultsAfter(ActionListener<BulkResponse> listener, String jobId, long cutoffEpochMs) {
|
||||
JobDataDeleter deleter = dataDeleterFactory.apply(jobId);
|
||||
deleter.deleteResultsFromTime(cutoffEpochMs);
|
||||
deleter.commit(listener);
|
||||
deleter.deleteResultsFromTime(cutoffEpochMs, new ActionListener<Boolean>() {
|
||||
@Override
|
||||
public void onResponse(Boolean success) {
|
||||
if (success) {
|
||||
deleter.commit(listener);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.prelert.job.persistence;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.delete.DeleteRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHits;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.mock.orig.Mockito.times;
|
||||
import static org.elasticsearch.mock.orig.Mockito.verify;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class ElasticsearchBulkDeleterTests extends ESTestCase {
|
||||
|
||||
public void testDeleteResultsFromTime() {
|
||||
|
||||
final long TOTAL_HIT_COUNT = 100L;
|
||||
final int PER_SCROLL_SEARCH_HIT_COUNT = 20;
|
||||
|
||||
SearchResponse response = createSearchResponseWithHits(TOTAL_HIT_COUNT, PER_SCROLL_SEARCH_HIT_COUNT);
|
||||
BulkResponse bulkResponse = Mockito.mock(BulkResponse.class);
|
||||
|
||||
Client client = new MockClientBuilder("myCluster")
|
||||
.prepareSearchExecuteListener(JobResultsPersister.getJobIndexName("foo"), response)
|
||||
.prepareSearchScrollExecuteListener(response)
|
||||
.prepareBulk(bulkResponse).build();
|
||||
|
||||
ElasticsearchBulkDeleter bulkDeleter = new ElasticsearchBulkDeleter(client, "foo");
|
||||
|
||||
// because of the mocking this runs in the current thread
|
||||
bulkDeleter.deleteResultsFromTime(new Date().getTime(), new ActionListener<Boolean>() {
|
||||
@Override
|
||||
public void onResponse(Boolean aBoolean) {
|
||||
assertTrue(aBoolean);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
fail(e.toString());
|
||||
}
|
||||
});
|
||||
|
||||
verify(client.prepareBulk(), times((int)TOTAL_HIT_COUNT)).add(any(DeleteRequestBuilder.class));
|
||||
|
||||
ActionListener<BulkResponse> bulkListener = new ActionListener<BulkResponse>() {
|
||||
@Override
|
||||
public void onResponse(BulkResponse bulkItemResponses) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
fail(e.toString());
|
||||
}
|
||||
};
|
||||
|
||||
when(client.prepareBulk().numberOfActions()).thenReturn(new Integer((int)TOTAL_HIT_COUNT));
|
||||
bulkDeleter.commit(bulkListener);
|
||||
|
||||
verify(client.prepareBulk(), times(1)).execute(bulkListener);
|
||||
}
|
||||
|
||||
private SearchResponse createSearchResponseWithHits(long totalHitCount, int hitsPerSearchResult) {
|
||||
SearchHits hits = mockSearchHits(totalHitCount, hitsPerSearchResult);
|
||||
SearchResponse searchResponse = Mockito.mock(SearchResponse.class);
|
||||
when(searchResponse.getHits()).thenReturn(hits);
|
||||
when(searchResponse.getScrollId()).thenReturn("scroll1");
|
||||
return searchResponse;
|
||||
}
|
||||
|
||||
private SearchHits mockSearchHits(long totalHitCount, int hitsPerSearchResult) {
|
||||
|
||||
SearchHits hits = Mockito.mock(SearchHits.class);
|
||||
when(hits.totalHits()).thenReturn(totalHitCount);
|
||||
|
||||
List<SearchHit> hitList = new ArrayList<>();
|
||||
for (int i=0; i<20; i++) {
|
||||
SearchHit hit = Mockito.mock(SearchHit.class);
|
||||
when(hit.getType()).thenReturn("mockSearchHit");
|
||||
when(hit.getId()).thenReturn("mockSeachHit-" + i);
|
||||
hitList.add(hit);
|
||||
}
|
||||
when(hits.getHits()).thenReturn(hitList.toArray(new SearchHit[hitList.size()]));
|
||||
when(hits.hits()).thenReturn(hitList.toArray(new SearchHit[hitList.size()]));
|
||||
|
||||
return hits;
|
||||
}
|
||||
}
|
|
@ -7,6 +7,9 @@ package org.elasticsearch.xpack.prelert.job.persistence;
|
|||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
|
@ -41,6 +44,7 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
|
|||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
|
||||
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
|
||||
import org.elasticsearch.action.update.UpdateRequestBuilder;
|
||||
import org.elasticsearch.action.update.UpdateResponse;
|
||||
|
@ -62,6 +66,8 @@ import org.elasticsearch.xpack.prelert.action.DeleteJobAction;
|
|||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
public class MockClientBuilder {
|
||||
@Mock
|
||||
|
@ -245,17 +251,66 @@ public class MockClientBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
public MockClientBuilder prepareSearch(String index, String type, int from, int size, SearchResponse response) {
|
||||
public MockClientBuilder prepareSearch(String index, String type, SearchResponse response) {
|
||||
SearchRequestBuilder searchRequestBuilder = mock(SearchRequestBuilder.class);
|
||||
when(searchRequestBuilder.get()).thenReturn(response);
|
||||
when(searchRequestBuilder.setTypes(eq(type))).thenReturn(searchRequestBuilder);
|
||||
when(searchRequestBuilder.setFrom(eq(from))).thenReturn(searchRequestBuilder);
|
||||
when(searchRequestBuilder.setSize(eq(size))).thenReturn(searchRequestBuilder);
|
||||
when(searchRequestBuilder.setFrom(anyInt())).thenReturn(searchRequestBuilder);
|
||||
when(searchRequestBuilder.setSize(anyInt())).thenReturn(searchRequestBuilder);
|
||||
when(searchRequestBuilder.addSort(any(SortBuilder.class))).thenReturn(searchRequestBuilder);
|
||||
when(searchRequestBuilder.setQuery(any())).thenReturn(searchRequestBuilder);
|
||||
when(searchRequestBuilder.setFetchSource(anyBoolean())).thenReturn(searchRequestBuilder);
|
||||
when(searchRequestBuilder.setScroll(anyString())).thenReturn(searchRequestBuilder);
|
||||
when(client.prepareSearch(eq(index))).thenReturn(searchRequestBuilder);
|
||||
return this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public MockClientBuilder prepareSearchExecuteListener(String index, SearchResponse response) {
|
||||
SearchRequestBuilder builder = mock(SearchRequestBuilder.class);
|
||||
when(builder.setTypes(anyString())).thenReturn(builder);
|
||||
when(builder.addSort(any(SortBuilder.class))).thenReturn(builder);
|
||||
when(builder.setFetchSource(anyBoolean())).thenReturn(builder);
|
||||
when(builder.setScroll(anyString())).thenReturn(builder);
|
||||
when(builder.addDocValueField(any(String.class))).thenReturn(builder);
|
||||
when(builder.addSort(any(String.class), any(SortOrder.class))).thenReturn(builder);
|
||||
when(builder.setQuery(any())).thenReturn(builder);
|
||||
when(builder.setSize(anyInt())).thenReturn(builder);
|
||||
|
||||
doAnswer(new Answer<Void>() {
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocationOnMock.getArguments()[0];
|
||||
listener.onResponse(response);
|
||||
return null;
|
||||
}
|
||||
}).when(builder).execute(any());
|
||||
|
||||
when(client.prepareSearch(eq(index))).thenReturn(builder);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public MockClientBuilder prepareSearchScrollExecuteListener(SearchResponse response) {
|
||||
SearchScrollRequestBuilder builder = mock(SearchScrollRequestBuilder.class);
|
||||
when(builder.setScroll(anyString())).thenReturn(builder);
|
||||
when(builder.setScrollId(anyString())).thenReturn(builder);
|
||||
|
||||
doAnswer(new Answer<Void>() {
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocationOnMock.getArguments()[0];
|
||||
listener.onResponse(response);
|
||||
return null;
|
||||
}
|
||||
}).when(builder).execute(any());
|
||||
|
||||
when(client.prepareSearchScroll(anyString())).thenReturn(builder);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public MockClientBuilder prepareSearch(String index, String type, int from, int size, SearchResponse response,
|
||||
ArgumentCaptor<QueryBuilder> filter) {
|
||||
SearchRequestBuilder builder = mock(SearchRequestBuilder.class);
|
||||
|
@ -341,6 +396,23 @@ public class MockClientBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public MockClientBuilder prepareBulkExecuteListener(BulkResponse response) {
|
||||
ListenableActionFuture<BulkResponse> actionFuture = mock(ListenableActionFuture.class);
|
||||
BulkRequestBuilder builder = mock(BulkRequestBuilder.class);
|
||||
when(client.prepareBulk()).thenReturn(builder);
|
||||
|
||||
doAnswer(new Answer<Void>() {
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||
ActionListener<BulkResponse> listener = (ActionListener<BulkResponse>) invocationOnMock.getArguments()[0];
|
||||
listener.onResponse(response);
|
||||
return null;
|
||||
}
|
||||
}).when(builder).execute(any());
|
||||
return this;
|
||||
}
|
||||
|
||||
public MockClientBuilder prepareUpdate(String index, String type, String id, ArgumentCaptor<Map<String, Object>> getSource) {
|
||||
UpdateRequestBuilder builder = mock(UpdateRequestBuilder.class);
|
||||
when(client.prepareUpdate(index, type, id)).thenReturn(builder);
|
||||
|
|
Loading…
Reference in New Issue