[ML] Stop timing stats failure propagation (#49495) (#49501)

This commit is contained in:
Benjamin Trent 2019-11-25 10:09:30 -05:00 committed by GitHub
parent df5afa797e
commit 688c78c589
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 28 additions and 19 deletions

View File

@ -117,13 +117,7 @@ class DatafeedJob {
} }
public void finishReportingTimingStats() { public void finishReportingTimingStats() {
try {
timingStatsReporter.finishReporting(); timingStatsReporter.finishReporting();
} catch (Exception e) {
// We don't want the exception to propagate out of this method as it can leave the datafeed in the "stopping" state forever.
// Since persisting datafeed timing stats is not critical, we just log a warning here.
LOGGER.warn("[{}] Datafeed timing stats could not be reported due to: {}", jobId, e);
}
} }
Long runLookBack(long startTime, Long endTime) throws Exception { Long runLookBack(long startTime, Long endTime) throws Exception {

View File

@ -5,6 +5,9 @@
*/ */
package org.elasticsearch.xpack.ml.datafeed; package org.elasticsearch.xpack.ml.datafeed;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
@ -20,6 +23,7 @@ import java.util.Objects;
*/ */
public class DatafeedTimingStatsReporter { public class DatafeedTimingStatsReporter {
private static final Logger LOGGER = LogManager.getLogger(DatafeedTimingStatsReporter.class);
/** Interface used for persisting current timing stats to the results index. */ /** Interface used for persisting current timing stats to the results index. */
@FunctionalInterface @FunctionalInterface
public interface DatafeedTimingStatsPersister { public interface DatafeedTimingStatsPersister {
@ -96,7 +100,14 @@ public class DatafeedTimingStatsReporter {
private void flush(WriteRequest.RefreshPolicy refreshPolicy) { private void flush(WriteRequest.RefreshPolicy refreshPolicy) {
persistedTimingStats = new DatafeedTimingStats(currentTimingStats); persistedTimingStats = new DatafeedTimingStats(currentTimingStats);
if (allowedPersisting) { if (allowedPersisting) {
try {
persister.persistDatafeedTimingStats(persistedTimingStats, refreshPolicy); persister.persistDatafeedTimingStats(persistedTimingStats, refreshPolicy);
} catch (Exception ex) {
// Since persisting datafeed timing stats is not critical, we just log a warning here.
LOGGER.warn(
() -> new ParameterizedMessage("[{}] failed to report datafeed timing stats", currentTimingStats.getJobId()),
ex);
}
} }
} }

View File

@ -12,7 +12,6 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
@ -61,7 +60,6 @@ import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same; import static org.mockito.Matchers.same;
import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -456,15 +454,6 @@ public class DatafeedJobTests extends ESTestCase {
assertThat(analysisProblemException.shouldStop, is(true)); assertThat(analysisProblemException.shouldStop, is(true));
} }
public void testFinishReportingTimingStats() {
doThrow(new EsRejectedExecutionException()).when(timingStatsReporter).finishReporting();
long frequencyMs = 100;
long queryDelayMs = 1000;
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1, randomBoolean());
datafeedJob.finishReportingTimingStats();
}
private DatafeedJob createDatafeedJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs, private DatafeedJob createDatafeedJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs,
long latestRecordTimeMs, boolean haveSeenDataPreviously) { long latestRecordTimeMs, boolean haveSeenDataPreviously) {
Supplier<Long> currentTimeSupplier = () -> currentTime; Supplier<Long> currentTimeSupplier = () -> currentTime;

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.ml.datafeed; package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -18,8 +19,10 @@ import org.mockito.InOrder;
import java.sql.Date; import java.sql.Date;
import java.time.Instant; import java.time.Instant;
import static org.elasticsearch.mock.orig.Mockito.doThrow;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -176,6 +179,18 @@ public class DatafeedTimingStatsReporterTests extends ESTestCase {
is(true)); is(true));
} }
public void testFinishReportingTimingStatsException() {
doThrow(new ElasticsearchException("BOOM")).when(timingStatsPersister).persistDatafeedTimingStats(any(), any());
DatafeedTimingStatsReporter reporter = createReporter(new DatafeedTimingStats(JOB_ID));
try {
reporter.reportDataCounts(createDataCounts(0, TIMESTAMP));
reporter.finishReporting();
} catch (ElasticsearchException ex) {
fail("Should not have failed with: " + ex.getDetailedMessage());
}
}
private DatafeedTimingStatsReporter createReporter(DatafeedTimingStats timingStats) { private DatafeedTimingStatsReporter createReporter(DatafeedTimingStats timingStats) {
return new DatafeedTimingStatsReporter(timingStats, timingStatsPersister); return new DatafeedTimingStatsReporter(timingStats, timingStatsPersister);
} }