[7.x][ML] Always refresh dest index before starting analytics process (#48090) (#48196)

If a job stops right after reindexing is finished but before
we refresh the destination index, we don't refresh at all.
If the job is started again right after, it jumps into the analyzing state.
However, the data is still not searchable.
This is why we were seeing test failures that we start the process
expecting X rows (where X is lower than the expected number of docs)
and we end up getting X+.

We fix this by moving the refresh of the dest index right before
we start the process so it always ensures the data is searchable.

Closes #47612

Backport of #48090
This commit is contained in:
Dimitris Athanasiou 2019-10-17 17:20:19 +01:00 committed by GitHub
parent eb7969e8cc
commit e0489fc328
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 15 additions and 28 deletions

View File

@ -238,7 +238,6 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
"Finished analysis"); "Finished analysis");
} }
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47612")
public void testStopAndRestart() throws Exception { public void testStopAndRestart() throws Exception {
initialize("regression_stop_and_restart"); initialize("regression_stop_and_restart");

View File

@ -550,7 +550,6 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
"Stopped analytics"); "Stopped analytics");
} }
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47612")
public void testOutlierDetectionStopAndRestart() throws Exception { public void testOutlierDetectionStopAndRestart() throws Exception {
String sourceIndex = "test-outlier-detection-stop-and-restart"; String sourceIndex = "test-outlier-detection-stop-and-restart";

View File

@ -15,9 +15,6 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexAction; import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -160,34 +157,18 @@ public class DataFrameAnalyticsManager {
} }
// Reindexing is complete; start analytics // Reindexing is complete; start analytics
ActionListener<RefreshResponse> refreshListener = ActionListener.wrap( ActionListener<BulkByScrollResponse> reindexCompletedListener = ActionListener.wrap(
refreshResponse -> { refreshResponse -> {
if (task.isStopping()) { if (task.isStopping()) {
LOGGER.debug("[{}] Stopping before starting analytics process", config.getId()); LOGGER.debug("[{}] Stopping before starting analytics process", config.getId());
return; return;
} }
task.setReindexingTaskId(null); task.setReindexingTaskId(null);
startAnalytics(task, config, false);
},
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
);
// Refresh to ensure copied index is fully searchable
ActionListener<BulkByScrollResponse> reindexCompletedListener = ActionListener.wrap(
bulkResponse -> {
if (task.isStopping()) {
LOGGER.debug("[{}] Stopping before refreshing destination index", config.getId());
return;
}
task.setReindexingFinished(); task.setReindexingFinished();
auditor.info( auditor.info(
config.getId(), config.getId(),
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex())); Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex()));
ClientHelper.executeAsyncWithOrigin(client, startAnalytics(task, config, false);
ClientHelper.ML_ORIGIN,
RefreshAction.INSTANCE,
new RefreshRequest(config.getDest().getIndex()),
refreshListener);
}, },
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
); );

View File

@ -79,6 +79,7 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
} }
public void setReindexingTaskId(Long reindexingTaskId) { public void setReindexingTaskId(Long reindexingTaskId) {
LOGGER.debug("[{}] Setting reindexing task id to [{}] from [{}]", taskParams.getId(), reindexingTaskId, this.reindexingTaskId);
this.reindexingTaskId = reindexingTaskId; this.reindexingTaskId = reindexingTaskId;
} }

View File

@ -237,7 +237,9 @@ public class DataFrameDataExtractor {
public DataSummary collectDataSummary() { public DataSummary collectDataSummary() {
SearchRequestBuilder searchRequestBuilder = buildDataSummarySearchRequestBuilder(); SearchRequestBuilder searchRequestBuilder = buildDataSummarySearchRequestBuilder();
SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder); SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
return new DataSummary(searchResponse.getHits().getTotalHits().value, context.extractedFields.getAllFields().size()); long rows = searchResponse.getHits().getTotalHits().value;
LOGGER.debug("[{}] Data summary rows [{}]", context.jobId, rows);
return new DataSummary(rows, context.extractedFields.getAllFields().size());
} }
public void collectDataSummaryAsync(ActionListener<DataSummary> dataSummaryActionListener) { public void collectDataSummaryAsync(ActionListener<DataSummary> dataSummaryActionListener) {

View File

@ -81,6 +81,9 @@ public class AnalyticsProcessManager {
return; return;
} }
// First we refresh the dest index to ensure data is searchable
refreshDest(config);
ProcessContext processContext = new ProcessContext(config.getId()); ProcessContext processContext = new ProcessContext(config.getId());
if (processContextByAllocation.putIfAbsent(task.getAllocationId(), processContext) != null) { if (processContextByAllocation.putIfAbsent(task.getAllocationId(), processContext) != null) {
finishHandler.accept(ExceptionsHelper.serverError("[" + processContext.id finishHandler.accept(ExceptionsHelper.serverError("[" + processContext.id
@ -147,10 +150,12 @@ public class AnalyticsProcessManager {
refreshDest(config); refreshDest(config);
LOGGER.info("[{}] Result processor has completed", config.getId()); LOGGER.info("[{}] Result processor has completed", config.getId());
} catch (Exception e) { } catch (Exception e) {
String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage()) if (task.isStopping() == false) {
.getFormattedMessage(); String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage())
LOGGER.error(errorMsg, e); .getFormattedMessage();
processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg); LOGGER.error(errorMsg, e);
processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg);
}
} finally { } finally {
closeProcess(task); closeProcess(task);