[7.x] Delete empty .ml-state* indices during nightly maintenance task. (#53587) (#53849)

This commit is contained in:
Przemysław Witek 2020-03-20 13:08:36 +01:00 committed by GitHub
parent d23112f441
commit a68071dbba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 338 additions and 3 deletions

View File

@ -6,6 +6,8 @@
package org.elasticsearch.xpack.ml.integration; package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
@ -14,6 +16,7 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateAction; import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
@ -42,6 +45,7 @@ import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -279,6 +283,44 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
nonExistingJobDocsCount, equalTo(0)); nonExistingJobDocsCount, equalTo(0));
} }
/**
* Verifies empty state indices deletion. Here is the summary of indices used by the test:
*
* +------------------+--------+----------+-------------------------+
* | index name | empty? | current? | expected to be removed? |
* +------------------+--------+----------+-------------------------+
* | .ml-state | yes | no | yes |
* | .ml-state-000001 | no | no | no |
* | .ml-state-000003 | yes | no | yes |
* | .ml-state-000005 | no | no | no |
* | .ml-state-000007 | yes | yes | no |
* +------------------+--------+----------+-------------------------+
*/
public void testDeleteExpiredDataActionDeletesEmptyStateIndices() throws Exception {
client().admin().indices().prepareCreate(".ml-state").get();
client().admin().indices().prepareCreate(".ml-state-000001").get();
client().prepareIndex(".ml-state-000001", SINGLE_MAPPING_NAME).setSource("field_1", "value_1").get();
client().admin().indices().prepareCreate(".ml-state-000003").get();
client().admin().indices().prepareCreate(".ml-state-000005").get();
client().prepareIndex(".ml-state-000005", SINGLE_MAPPING_NAME).setSource("field_5", "value_5").get();
client().admin().indices().prepareCreate(".ml-state-000007").addAlias(new Alias(".ml-state-write").isHidden(true)).get();
refresh();
GetIndexResponse getIndexResponse = client().admin().indices().prepareGetIndex().setIndices(".ml-state*").get();
assertThat(Strings.toString(getIndexResponse),
getIndexResponse.getIndices(),
is(arrayContaining(".ml-state", ".ml-state-000001", ".ml-state-000003", ".ml-state-000005", ".ml-state-000007")));
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
refresh();
getIndexResponse = client().admin().indices().prepareGetIndex().setIndices(".ml-state*").get();
assertThat(Strings.toString(getIndexResponse),
getIndexResponse.getIndices(),
// Only non-empty or current indices should survive deletion process
is(arrayContaining(".ml-state-000001", ".ml-state-000005", ".ml-state-000007")));
}
private static Job.Builder newJobBuilder(String id) { private static Job.Builder newJobBuilder(String id) {
Detector.Builder detector = new Detector.Builder(); Detector.Builder detector = new Detector.Builder();
detector.setFunction("count"); detector.setFunction("count");

View File

@ -21,6 +21,7 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.retention.EmptyStateIndexRemover;
import org.elasticsearch.xpack.ml.job.retention.ExpiredForecastsRemover; import org.elasticsearch.xpack.ml.job.retention.ExpiredForecastsRemover;
import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover; import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover;
import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover; import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover;
@ -84,7 +85,8 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
new ExpiredResultsRemover(client, auditor, threadPool), new ExpiredResultsRemover(client, auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool), new ExpiredForecastsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, threadPool), new ExpiredModelSnapshotsRemover(client, threadPool),
new UnusedStateRemover(client, clusterService) new UnusedStateRemover(client, clusterService),
new EmptyStateIndexRemover(client)
); );
Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers); Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);
deleteExpiredData(dataRemoversIterator, listener, isTimedOutSupplier, true); deleteExpiredData(dataRemoversIterator, listener, isTimedOutSupplier, true);

View File

@ -0,0 +1,113 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.retention;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import java.util.Arrays;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import static java.util.stream.Collectors.toSet;
/**
* This class deletes empty indices matching .ml-state* pattern that are not pointed at by the .ml-state-write alias.
*/
public class EmptyStateIndexRemover implements MlDataRemover {
private final OriginSettingClient client;
public EmptyStateIndexRemover(OriginSettingClient client) {
this.client = Objects.requireNonNull(client);
}
@Override
public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
try {
if (isTimedOutSupplier.get()) {
listener.onResponse(false);
return;
}
getEmptyStateIndices(
ActionListener.wrap(
emptyStateIndices -> {
if (emptyStateIndices.isEmpty()) {
listener.onResponse(true);
return;
}
getCurrentStateIndices(
ActionListener.wrap(
currentStateIndices -> {
Set<String> stateIndicesToRemove = Sets.difference(emptyStateIndices, currentStateIndices);
if (stateIndicesToRemove.isEmpty()) {
listener.onResponse(true);
return;
}
executeDeleteEmptyStateIndices(stateIndicesToRemove, listener);
},
listener::onFailure
)
);
},
listener::onFailure
)
);
} catch (Exception e) {
listener.onFailure(e);
}
}
private void getEmptyStateIndices(ActionListener<Set<String>> listener) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest().indices(AnomalyDetectorsIndex.jobStateIndexPattern());
client.admin().indices().stats(
indicesStatsRequest,
ActionListener.wrap(
indicesStatsResponse -> {
Set<String> emptyStateIndices =
indicesStatsResponse.getIndices().values().stream()
.filter(stats -> stats.getTotal().getDocs().getCount() == 0)
.map(IndexStats::getIndex)
.collect(toSet());
listener.onResponse(emptyStateIndices);
},
listener::onFailure
)
);
}
private void getCurrentStateIndices(ActionListener<Set<String>> listener) {
GetIndexRequest getIndexRequest = new GetIndexRequest().indices(AnomalyDetectorsIndex.jobStateIndexWriteAlias());
client.admin().indices().getIndex(
getIndexRequest,
ActionListener.wrap(
getIndexResponse -> {
Set<String> currentStateIndices = Arrays.stream(getIndexResponse.getIndices()).collect(toSet());
listener.onResponse(currentStateIndices);
},
listener::onFailure
)
);
}
private void executeDeleteEmptyStateIndices(Set<String> emptyStateIndices, ActionListener<Boolean> listener) {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(emptyStateIndices.toArray(new String[0]));
client.admin().indices().delete(
deleteIndexRequest,
ActionListener.wrap(
deleteResponse -> listener.onResponse(deleteResponse.isAcknowledged()),
listener::onFailure
)
);
}
}

View File

@ -41,9 +41,9 @@ import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
/** /**
* If for any reason a job is deleted by some of its state documents * If for any reason a job is deleted but some of its state documents
* are left behind, this class deletes any unused documents stored * are left behind, this class deletes any unused documents stored
* in the .ml-state index. * in the .ml-state* indices.
*/ */
public class UnusedStateRemover implements MlDataRemover { public class UnusedStateRemover implements MlDataRemover {

View File

@ -0,0 +1,178 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.retention;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.ml.test.MockOriginSettingClient;
import org.junit.After;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.stubbing.Answer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class EmptyStateIndexRemoverTests extends ESTestCase {
private Client client;
private ActionListener<Boolean> listener;
private ArgumentCaptor<DeleteIndexRequest> deleteIndexRequestCaptor;
private EmptyStateIndexRemover remover;
@SuppressWarnings("unchecked")
@Before
public void setUpTests() {
client = mock(Client.class);
OriginSettingClient originSettingClient = MockOriginSettingClient.mockOriginSettingClient(client, ClientHelper.ML_ORIGIN);
listener = mock(ActionListener.class);
deleteIndexRequestCaptor = ArgumentCaptor.forClass(DeleteIndexRequest.class);
remover = new EmptyStateIndexRemover(originSettingClient);
}
@After
public void verifyNoOtherInteractionsWithMocks() {
verify(client).settings();
verify(client, atLeastOnce()).threadPool();
verifyNoMoreInteractions(client, listener);
}
public void testRemove_TimedOut() {
remover.remove(listener, () -> true);
InOrder inOrder = inOrder(client, listener);
inOrder.verify(listener).onResponse(false);
}
public void testRemove_NoStateIndices() {
IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class);
when(indicesStatsResponse.getIndices()).thenReturn(Collections.emptyMap());
doAnswer(withResponse(indicesStatsResponse)).when(client).execute(any(), any(), any());
remover.remove(listener, () -> false);
InOrder inOrder = inOrder(client, listener);
inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
inOrder.verify(listener).onResponse(true);
}
public void testRemove_NoEmptyStateIndices() {
IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class);
Map<String, IndexStats> indexStatsMap = new HashMap<String, IndexStats>();
indexStatsMap.put(".ml-state-a", indexStats(".ml-state-a", 1));
indexStatsMap.put(".ml-state-b", indexStats(".ml-state-b", 2));
indexStatsMap.put(".ml-state-c", indexStats(".ml-state-c", 1));
indexStatsMap.put(".ml-state-d", indexStats(".ml-state-d", 2));
doReturn(indexStatsMap).when(indicesStatsResponse).getIndices();
doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
remover.remove(listener, () -> false);
InOrder inOrder = inOrder(client, listener);
inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
inOrder.verify(listener).onResponse(true);
}
private void assertDeleteActionExecuted(boolean acknowledged) {
IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class);
Map<String, IndexStats> indexStatsMap = new HashMap<String, IndexStats>();
indexStatsMap.put(".ml-state-a", indexStats(".ml-state-a", 1));
indexStatsMap.put(".ml-state-b", indexStats(".ml-state-b", 0));
indexStatsMap.put(".ml-state-c", indexStats(".ml-state-c", 2));
indexStatsMap.put(".ml-state-d", indexStats(".ml-state-d", 0));
indexStatsMap.put(".ml-state-e", indexStats(".ml-state-e", 0));
doReturn(indexStatsMap).when(indicesStatsResponse).getIndices();
doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
GetIndexResponse getIndexResponse = new GetIndexResponse(new String[] { ".ml-state-e" }, null, null, null, null);
doAnswer(withResponse(getIndexResponse)).when(client).execute(eq(GetIndexAction.INSTANCE), any(), any());
AcknowledgedResponse deleteIndexResponse = new AcknowledgedResponse(acknowledged);
doAnswer(withResponse(deleteIndexResponse)).when(client).execute(eq(DeleteIndexAction.INSTANCE), any(), any());
remover.remove(listener, () -> false);
InOrder inOrder = inOrder(client, listener);
inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
inOrder.verify(client).execute(eq(GetIndexAction.INSTANCE), any(), any());
inOrder.verify(client).execute(eq(DeleteIndexAction.INSTANCE), deleteIndexRequestCaptor.capture(), any());
inOrder.verify(listener).onResponse(acknowledged);
DeleteIndexRequest deleteIndexRequest = deleteIndexRequestCaptor.getValue();
assertThat(deleteIndexRequest.indices(), arrayContainingInAnyOrder(".ml-state-b", ".ml-state-d"));
}
public void testRemove_DeleteAcknowledged() {
assertDeleteActionExecuted(true);
}
public void testRemove_DeleteNotAcknowledged() {
assertDeleteActionExecuted(false);
}
public void testRemove_NoIndicesToRemove() {
IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class);
doReturn(Collections.singletonMap(".ml-state-a", indexStats(".ml-state-a", 0))).when(indicesStatsResponse).getIndices();
doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
GetIndexResponse getIndexResponse = new GetIndexResponse(new String[] { ".ml-state-a" }, null, null, null, null);
doAnswer(withResponse(getIndexResponse)).when(client).execute(eq(GetIndexAction.INSTANCE), any(), any());
remover.remove(listener, () -> false);
InOrder inOrder = inOrder(client, listener);
inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
inOrder.verify(client).execute(eq(GetIndexAction.INSTANCE), any(), any());
inOrder.verify(listener).onResponse(true);
}
@SuppressWarnings("unchecked")
private static <Response> Answer<Response> withResponse(Response response) {
return invocationOnMock -> {
ActionListener<Response> listener = (ActionListener<Response>) invocationOnMock.getArguments()[2];
listener.onResponse(response);
return null;
};
}
private static IndexStats indexStats(String index, int docCount) {
CommonStats indexTotalStats = mock(CommonStats.class);
when(indexTotalStats.getDocs()).thenReturn(new DocsStats(docCount, 0, 0));
IndexStats indexStats = mock(IndexStats.class);
when(indexStats.getIndex()).thenReturn(index);
when(indexStats.getTotal()).thenReturn(indexTotalStats);
return indexStats;
}
}