When we index data frame analytics stats docs we do not need to refresh immediately. Backport of #53977
This commit is contained in:
parent
4e462db2ed
commit
be20bb5755
|
@ -46,6 +46,7 @@ import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
|
||||||
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
|
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
@ -178,7 +179,7 @@ public class AnalyticsProcessManager {
|
||||||
processContext.setFailureReason(resultProcessor.getFailure());
|
processContext.setFailureReason(resultProcessor.getFailure());
|
||||||
|
|
||||||
refreshDest(config);
|
refreshDest(config);
|
||||||
refreshStateIndex(config.getId());
|
refreshIndices(config.getId());
|
||||||
LOGGER.info("[{}] Result processor has completed", config.getId());
|
LOGGER.info("[{}] Result processor has completed", config.getId());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (task.isStopping()) {
|
if (task.isStopping()) {
|
||||||
|
@ -316,12 +317,15 @@ public class AnalyticsProcessManager {
|
||||||
() -> client.execute(RefreshAction.INSTANCE, new RefreshRequest(config.getDest().getIndex())).actionGet());
|
() -> client.execute(RefreshAction.INSTANCE, new RefreshRequest(config.getDest().getIndex())).actionGet());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void refreshStateIndex(String jobId) {
|
private void refreshIndices(String jobId) {
|
||||||
String indexName = AnomalyDetectorsIndex.jobStateIndexPattern();
|
RefreshRequest refreshRequest = new RefreshRequest(
|
||||||
LOGGER.debug("[{}] Refresh index {}", jobId, indexName);
|
AnomalyDetectorsIndex.jobStateIndexPattern(),
|
||||||
|
MlStatsIndex.indexPattern()
|
||||||
RefreshRequest refreshRequest = new RefreshRequest(indexName);
|
);
|
||||||
refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
|
refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
|
||||||
|
|
||||||
|
LOGGER.debug("[{}] Refreshing indices {}", jobId, Arrays.toString(refreshRequest.indices()));
|
||||||
|
|
||||||
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
|
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
|
||||||
client.admin().indices().refresh(refreshRequest).actionGet();
|
client.admin().indices().refresh(refreshRequest).actionGet();
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,7 +47,7 @@ public class StatsPersister {
|
||||||
MlStatsIndex.writeAlias(),
|
MlStatsIndex.writeAlias(),
|
||||||
result,
|
result,
|
||||||
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")),
|
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")),
|
||||||
WriteRequest.RefreshPolicy.IMMEDIATE,
|
WriteRequest.RefreshPolicy.NONE,
|
||||||
docIdSupplier.apply(jobId),
|
docIdSupplier.apply(jobId),
|
||||||
() -> isCancelled == false,
|
() -> isCancelled == false,
|
||||||
errorMsg -> auditor.error(jobId,
|
errorMsg -> auditor.error(jobId,
|
||||||
|
|
Loading…
Reference in New Issue