[ML] Never stop the datafeeder when no data can be found

Changed `ProblemTracker#updateEmptyDataCount(boolean)` into `ProblemTracker#reportEmptyDataCount()`.
and added `ProblemTracker#reportNoneEmptyCount()` to reset the empty count counter.

Original commit: elastic/x-pack-elasticsearch@c1a44d6fd3
This commit is contained in:
Martijn van Groningen 2017-02-23 14:21:07 +01:00
parent e7d56e92f8
commit d87926ab86
4 changed files with 66 additions and 33 deletions

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
@ -28,7 +29,6 @@ import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DefaultFrequency;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
@ -110,7 +110,8 @@ public class DatafeedJobRunner extends AbstractComponent {
}
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
} catch (DatafeedJob.EmptyDataCountException e) {
if (endTime == null && holder.problemTracker.updateEmptyDataCount(true) == false) {
if (endTime == null) {
holder.problemTracker.reportEmptyDataCount();
next = e.nextDelayInMsSinceEpoch;
}
} catch (Exception e) {
@ -127,7 +128,7 @@ public class DatafeedJobRunner extends AbstractComponent {
});
}
private void doDatafeedRealtime(long delayInMsSinceEpoch, String jobId, Holder holder) {
void doDatafeedRealtime(long delayInMsSinceEpoch, String jobId, Holder holder) {
if (holder.isRunning()) {
TimeValue delay = computeNextDelay(delayInMsSinceEpoch);
logger.debug("Waiting [{}] before executing next realtime import for job [{}]", delay, jobId);
@ -135,6 +136,7 @@ public class DatafeedJobRunner extends AbstractComponent {
long nextDelayInMsSinceEpoch;
try {
nextDelayInMsSinceEpoch = holder.datafeedJob.runRealtime();
holder.problemTracker.reportNoneEmptyCount();
} catch (DatafeedJob.ExtractionProblemException e) {
nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
@ -143,11 +145,7 @@ public class DatafeedJobRunner extends AbstractComponent {
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
} catch (DatafeedJob.EmptyDataCountException e) {
nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
if (holder.problemTracker.updateEmptyDataCount(true)) {
holder.problemTracker.finishReport();
holder.stop("empty_data", e);
return;
}
holder.problemTracker.reportEmptyDataCount();
} catch (Exception e) {
logger.error("Unexpected datafeed failure for job [" + jobId + "] stopping...", e);
holder.stop("general_realtime_error", e);
@ -159,7 +157,7 @@ public class DatafeedJobRunner extends AbstractComponent {
}
}
private Holder createJobDatafeed(DatafeedConfig datafeed, Job job, long finalBucketEndMs, long latestRecordTimeMs,
Holder createJobDatafeed(DatafeedConfig datafeed, Job job, long finalBucketEndMs, long latestRecordTimeMs,
Consumer<Exception> handler, StartDatafeedAction.DatafeedTask task) {
Auditor auditor = jobProvider.audit(job.getId());
Duration frequency = getFrequencyOrDefault(datafeed, job);
@ -232,7 +230,7 @@ public class DatafeedJobRunner extends AbstractComponent {
private final Consumer<Exception> handler;
volatile Future<?> future;
private Holder(DatafeedConfig datafeed, DatafeedJob datafeedJob, boolean autoCloseJob, ProblemTracker problemTracker,
Holder(DatafeedConfig datafeed, DatafeedJob datafeedJob, boolean autoCloseJob, ProblemTracker problemTracker,
Consumer<Exception> handler) {
this.datafeed = datafeed;
this.datafeedJob = datafeedJob;

View File

@ -5,8 +5,8 @@
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import java.util.Objects;
import java.util.function.Supplier;
@ -72,26 +72,22 @@ class ProblemTracker {
/**
* Updates the tracking of empty data cycles. If the number of consecutive empty data
* cycles reaches {@code EMPTY_DATA_WARN_COUNT}, a warning is reported. If non-empty
* is reported and a warning was issued previously, a recovery info is reported.
*
* @param empty Whether data was seen since last report
* @return {@code true} if an empty data warning was issued, {@code false} otherwise
* cycles reaches {@code EMPTY_DATA_WARN_COUNT}, a warning is reported.
*/
public boolean updateEmptyDataCount(boolean empty) {
if (empty && emptyDataCount < EMPTY_DATA_WARN_COUNT) {
public void reportEmptyDataCount() {
if (emptyDataCount < EMPTY_DATA_WARN_COUNT) {
emptyDataCount++;
if (emptyDataCount == EMPTY_DATA_WARN_COUNT) {
auditor.get().warning(Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_NO_DATA));
return true;
}
} else if (!empty) {
if (emptyDataCount >= EMPTY_DATA_WARN_COUNT) {
auditor.get().info(Messages.getMessage(Messages.JOB_AUDIR_DATAFEED_DATA_SEEN_AGAIN));
}
emptyDataCount = 0;
}
return false;
}
public void reportNoneEmptyCount() {
if (emptyDataCount >= EMPTY_DATA_WARN_COUNT) {
auditor.get().info(Messages.getMessage(Messages.JOB_AUDIR_DATAFEED_DATA_SEEN_AGAIN));
}
emptyDataCount = 0;
}
public boolean hasProblems() {

View File

@ -47,10 +47,12 @@ import java.util.Arrays;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
@ -64,6 +66,7 @@ import static org.mockito.Mockito.when;
public class DatafeedJobRunnerTests extends ESTestCase {
private Client client;
private Auditor auditor;
private ActionFuture<PostDataAction.Response> jobDataFuture;
private ActionFuture<FlushJobAction.Response> flushJobFuture;
private ClusterService clusterService;
@ -89,7 +92,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
return null;
}).when(jobProvider).dataCounts(any(), any(), any());
dataExtractorFactory = mock(DataExtractorFactory.class);
Auditor auditor = mock(Auditor.class);
auditor = mock(Auditor.class);
threadPool = mock(ThreadPool.class);
ExecutorService executorService = mock(ExecutorService.class);
doAnswer(invocation -> {
@ -190,6 +193,42 @@ public class DatafeedJobRunnerTests extends ESTestCase {
verify(client, never()).execute(same(FlushJobAction.INSTANCE), any());
}
public void testStart_emptyDataCountException() throws Exception {
currentTime = 6000000;
Job.Builder jobBuilder = createDatafeedJob();
DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "foo").build();
Job job = jobBuilder.build();
MlMetadata mlMetadata = new MlMetadata.Builder()
.putJob(job, false)
.putDatafeed(datafeedConfig)
.build();
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata))
.build());
int[] counter = new int[] {0};
doAnswer(invocationOnMock -> {
if (counter[0]++ < 10) {
Runnable r = (Runnable) invocationOnMock.getArguments()[2];
currentTime += 600000;
r.run();
}
return mock(ScheduledFuture.class);
}).when(threadPool).schedule(any(), any(), any());
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(false);
Consumer<Exception> handler = mockConsumer();
StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed1", 0L, null);
DatafeedJobRunner.Holder holder = datafeedJobRunner.createJobDatafeed(datafeedConfig, job, 100, 100, handler, task);
datafeedJobRunner.doDatafeedRealtime(10L, "foo", holder);
verify(threadPool, times(11)).schedule(any(), eq(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME), any());
verify(auditor, times(1)).warning(anyString());
verify(client, never()).execute(same(PostDataAction.INSTANCE), any());
verify(client, never()).execute(same(FlushJobAction.INSTANCE), any());
}
public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception {
Job.Builder jobBuilder = createDatafeedJob();
DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "foo").build();

View File

@ -59,7 +59,7 @@ public class ProblemTrackerTests extends ESTestCase {
public void testUpdateEmptyDataCount_GivenEmptyNineTimes() {
for (int i = 0; i < 9; i++) {
problemTracker.updateEmptyDataCount(true);
problemTracker.reportEmptyDataCount();
}
Mockito.verifyNoMoreInteractions(auditor);
@ -67,7 +67,7 @@ public class ProblemTrackerTests extends ESTestCase {
public void testUpdateEmptyDataCount_GivenEmptyTenTimes() {
for (int i = 0; i < 10; i++) {
problemTracker.updateEmptyDataCount(true);
problemTracker.reportEmptyDataCount();
}
verify(auditor).warning("Datafeed has been retrieving no data for a while");
@ -75,7 +75,7 @@ public class ProblemTrackerTests extends ESTestCase {
public void testUpdateEmptyDataCount_GivenEmptyElevenTimes() {
for (int i = 0; i < 11; i++) {
problemTracker.updateEmptyDataCount(true);
problemTracker.reportEmptyDataCount();
}
verify(auditor, times(1)).warning("Datafeed has been retrieving no data for a while");
@ -83,18 +83,18 @@ public class ProblemTrackerTests extends ESTestCase {
public void testUpdateEmptyDataCount_GivenNonEmptyAfterNineEmpty() {
for (int i = 0; i < 9; i++) {
problemTracker.updateEmptyDataCount(true);
problemTracker.reportEmptyDataCount();
}
problemTracker.updateEmptyDataCount(false);
problemTracker.reportNoneEmptyCount();
Mockito.verifyNoMoreInteractions(auditor);
}
public void testUpdateEmptyDataCount_GivenNonEmptyAfterTenEmpty() {
for (int i = 0; i < 10; i++) {
problemTracker.updateEmptyDataCount(true);
problemTracker.reportEmptyDataCount();
}
problemTracker.updateEmptyDataCount(false);
problemTracker.reportNoneEmptyCount();
verify(auditor).warning("Datafeed has been retrieving no data for a while");
verify(auditor).info("Datafeed has started retrieving data again");