[ML][Data Frame] only complete task after state persistence (#43230) (#43294)

* [ML][Data Frame] only complete task after state persistence

There is a race condition where the task could be completed, but there
is still a pending document write. This change moves
the task cancellation into the actionlistener of the state persistence.

intermediate commit

intermediate commit

* removing unused import

* removing unused const

* refreshing internal index after waiting for task to complete

* adjusting test data generation
This commit is contained in:
Benjamin Trent 2019-06-17 16:49:00 -05:00 committed by GitHub
parent 04a7c84e8b
commit 365f87c622
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 40 additions and 19 deletions

View File

@ -65,8 +65,6 @@ import static org.hamcrest.core.Is.is;
abstract class DataFrameIntegTestCase extends ESRestTestCase {
protected static final String REVIEWS_INDEX_NAME = "data_frame_reviews";
private Map<String, DataFrameTransformConfig> transformConfigs = new HashMap<>();
protected void cleanUp() throws IOException {
@ -213,8 +211,7 @@ abstract class DataFrameIntegTestCase extends ESRestTestCase {
.build();
}
protected void createReviewsIndex() throws Exception {
final int numDocs = 1000;
protected void createReviewsIndex(String indexName, int numDocs) throws Exception {
RestHighLevelClient restClient = new TestRestHighLevelClient();
// create mapping
@ -241,12 +238,12 @@ abstract class DataFrameIntegTestCase extends ESRestTestCase {
}
builder.endObject();
CreateIndexResponse response =
restClient.indices().create(new CreateIndexRequest(REVIEWS_INDEX_NAME).mapping(builder), RequestOptions.DEFAULT);
restClient.indices().create(new CreateIndexRequest(indexName).mapping(builder), RequestOptions.DEFAULT);
assertThat(response.isAcknowledged(), is(true));
}
// create index
BulkRequest bulk = new BulkRequest(REVIEWS_INDEX_NAME);
BulkRequest bulk = new BulkRequest(indexName);
int day = 10;
for (int i = 0; i < numDocs; i++) {
long user = i % 28;
@ -256,7 +253,7 @@ abstract class DataFrameIntegTestCase extends ESRestTestCase {
int min = 10 + (i % 49);
int sec = 10 + (i % 49);
String date_string = "2017-01-" + day + "T" + hour + ":" + min + ":" + sec + "Z";
String date_string = "2017-01-" + (day < 10 ? "0" + day : day) + "T" + hour + ":" + min + ":" + sec + "Z";
StringBuilder sourceBuilder = new StringBuilder();
sourceBuilder.append("{\"user_id\":\"")
@ -277,13 +274,13 @@ abstract class DataFrameIntegTestCase extends ESRestTestCase {
if (i % 50 == 0) {
BulkResponse response = restClient.bulk(bulk, RequestOptions.DEFAULT);
assertThat(response.buildFailureMessage(), response.hasFailures(), is(false));
bulk = new BulkRequest(REVIEWS_INDEX_NAME);
day += 1;
bulk = new BulkRequest(indexName);
day = (day + 1) % 28;
}
}
BulkResponse response = restClient.bulk(bulk, RequestOptions.DEFAULT);
assertThat(response.buildFailureMessage(), response.hasFailures(), is(false));
restClient.indices().refresh(new RefreshRequest(REVIEWS_INDEX_NAME), RequestOptions.DEFAULT);
restClient.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
}
protected Map<String, Object> toLazy(ToXContent parsedObject) throws Exception {

View File

@ -30,7 +30,8 @@ public class DataFrameTransformIT extends DataFrameIntegTestCase {
}
public void testDataFrameTransformCrud() throws Exception {
createReviewsIndex();
String indexName = "basic-crud-reviews";
createReviewsIndex(indexName, 100);
Map<String, SingleGroupSource> groups = new HashMap<>();
groups.put("by-day", createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null, null));
@ -45,7 +46,7 @@ public class DataFrameTransformIT extends DataFrameIntegTestCase {
groups,
aggs,
"reviews-by-user-business-day",
REVIEWS_INDEX_NAME);
indexName);
assertTrue(putDataFrameTransform(config, RequestOptions.DEFAULT).isAcknowledged());
assertTrue(startDataFrameTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged());
@ -56,7 +57,8 @@ public class DataFrameTransformIT extends DataFrameIntegTestCase {
assertBusy(() ->
assertThat(getDataFrameTransformStats(config.getId()).getTransformsStateAndStats().get(0).getTransformState().getIndexerState(),
equalTo(IndexerState.STOPPED)));
stopDataFrameTransform(config.getId());
deleteDataFrameTransform(config.getId());
}
}

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.dataframe.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
@ -12,6 +14,7 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
@ -26,6 +29,7 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
@ -38,20 +42,25 @@ public class TransportStopDataFrameTransformAction extends
TransportTasksAction<DataFrameTransformTask, StopDataFrameTransformAction.Request,
StopDataFrameTransformAction.Response, StopDataFrameTransformAction.Response> {
private static final Logger logger = LogManager.getLogger(TransportStopDataFrameTransformAction.class);
private final ThreadPool threadPool;
private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
private final PersistentTasksService persistentTasksService;
private final Client client;
@Inject
public TransportStopDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters,
ClusterService clusterService, ThreadPool threadPool,
PersistentTasksService persistentTasksService,
DataFrameTransformsConfigManager dataFrameTransformsConfigManager) {
DataFrameTransformsConfigManager dataFrameTransformsConfigManager,
Client client) {
super(StopDataFrameTransformAction.NAME, clusterService, transportService, actionFilters, StopDataFrameTransformAction.Request::new,
StopDataFrameTransformAction.Response::new, StopDataFrameTransformAction.Response::new, ThreadPool.Names.SAME);
this.threadPool = threadPool;
this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
this.persistentTasksService = persistentTasksService;
this.client = client;
}
@Override
@ -132,12 +141,26 @@ public class TransportStopDataFrameTransformAction extends
waitForStopListener(StopDataFrameTransformAction.Request request,
ActionListener<StopDataFrameTransformAction.Response> listener) {
ActionListener<StopDataFrameTransformAction.Response> onStopListener = ActionListener.wrap(
waitResponse ->
client.admin()
.indices()
.prepareRefresh(DataFrameInternalIndex.INDEX_NAME)
.execute(ActionListener.wrap(
r -> listener.onResponse(waitResponse),
e -> {
logger.info("Failed to refresh internal index after delete", e);
listener.onResponse(waitResponse);
})
),
listener::onFailure
);
return ActionListener.wrap(
response -> {
// Wait until the persistent task is stopped
// Switch over to Generic threadpool so we don't block the network thread
threadPool.generic().execute(() ->
waitForDataFrameStopped(request.getExpandedIds(), request.getTimeout(), listener));
waitForDataFrameStopped(request.getExpandedIds(), request.getTimeout(), onStopListener));
},
listener::onFailure
);

View File

@ -248,7 +248,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
IndexerState state = getIndexer().stop();
if (state == IndexerState.STOPPED) {
//doSaveState calls `onStop` when the task state is `STOPPED`
getIndexer().onStop();
getIndexer().doSaveState(state, getIndexer().getPosition(), () -> {});
}
}
@ -610,7 +610,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
r -> {
// for auto stop shutdown the task
if (state.getTaskState().equals(DataFrameTransformTaskState.STOPPED)) {
onStop();
transformTask.shutdown();
}
next.run();
},
@ -620,7 +620,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
"Failure updating stats of transform: " + statsExc.getMessage());
// for auto stop shutdown the task
if (state.getTaskState().equals(DataFrameTransformTaskState.STOPPED)) {
onStop();
transformTask.shutdown();
}
next.run();
}
@ -666,7 +666,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
protected void onStop() {
auditor.info(transformConfig.getId(), "Data frame transform has stopped.");
logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId());
transformTask.shutdown();
}
@Override