[ML] Do not advance time when datafeed encounters extraction problems (elastic/x-pack-elasticsearch#782)

Original commit: elastic/x-pack-elasticsearch@fcc9af6869
This commit is contained in:
Dimitris Athanasiou 2017-03-21 14:50:10 +00:00 committed by GitHub
parent cb5d44c9c4
commit 58827dd433
2 changed files with 18 additions and 6 deletions

View File

@ -132,7 +132,9 @@ class DatafeedJob {
LOGGER.trace("[{}] Searching data in: [{}, {})", jobId, start, end); LOGGER.trace("[{}] Searching data in: [{}, {})", jobId, start, end);
// A storage for errors that should only be thrown after advancing time
RuntimeException error = null; RuntimeException error = null;
long recordCount = 0; long recordCount = 0;
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(start, end); DataExtractor dataExtractor = dataExtractorFactory.newExtractor(start, end);
while (dataExtractor.hasNext()) { while (dataExtractor.hasNext()) {
@ -145,8 +147,10 @@ class DatafeedJob {
extractedData = dataExtractor.next(); extractedData = dataExtractor.next();
} catch (Exception e) { } catch (Exception e) {
LOGGER.debug("[" + jobId + "] error while extracting data", e); LOGGER.debug("[" + jobId + "] error while extracting data", e);
error = new ExtractionProblemException(e); // When extraction problems are encountered, we do not want to advance time.
break; // Instead, it is preferable to retry the given interval next time an extraction
// is triggered.
throw new ExtractionProblemException(e);
} }
if (extractedData.isPresent()) { if (extractedData.isPresent()) {
DataCounts counts; DataCounts counts;
@ -157,6 +161,10 @@ class DatafeedJob {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
LOGGER.debug("[" + jobId + "] error while posting data", e); LOGGER.debug("[" + jobId + "] error while posting data", e);
// When an analysis problem occurs, it means something catastrophic has
// happened to the c++ process. We sent a batch of data to the c++ process
// yet we do not know how many of those were processed. It is better to
// advance time in order to avoid importing duplicate data.
error = new AnalysisProblemException(e); error = new AnalysisProblemException(e);
break; break;
} }
@ -169,8 +177,7 @@ class DatafeedJob {
lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, end - 1); lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, end - 1);
// Ensure time is always advanced in order to avoid importing duplicate data. // We can now throw any stored error as we have updated time.
// This is the reason we store the error rather than throw inline.
if (error != null) { if (error != null) {
throw error; throw error;
} }

View File

@ -26,6 +26,7 @@ import java.util.Date;
import java.util.Optional; import java.util.Optional;
import java.util.function.Supplier; import java.util.function.Supplier;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
@ -43,6 +44,7 @@ public class DatafeedJobTests extends ESTestCase {
private Client client; private Client client;
private DataDescription.Builder dataDescription; private DataDescription.Builder dataDescription;
private ActionFuture<FlushJobAction.Response> flushJobFuture; private ActionFuture<FlushJobAction.Response> flushJobFuture;
private ArgumentCaptor<FlushJobAction.Request> flushJobRequests;
private long currentTime; private long currentTime;
@ -69,8 +71,10 @@ public class DatafeedJobTests extends ESTestCase {
PostDataAction.Request expectedRequest = new PostDataAction.Request("_job_id"); PostDataAction.Request expectedRequest = new PostDataAction.Request("_job_id");
expectedRequest.setDataDescription(dataDescription.build()); expectedRequest.setDataDescription(dataDescription.build());
when(client.execute(same(PostDataAction.INSTANCE), eq(expectedRequest))).thenReturn(jobDataFuture); when(client.execute(same(PostDataAction.INSTANCE), eq(expectedRequest))).thenReturn(jobDataFuture);
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts)); when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
flushJobRequests = ArgumentCaptor.forClass(FlushJobAction.Request.class);
when(client.execute(same(FlushJobAction.INSTANCE), flushJobRequests.capture())).thenReturn(flushJobFuture);
} }
public void testLookBackRunWithEndTime() throws Exception { public void testLookBackRunWithEndTime() throws Exception {
@ -155,9 +159,10 @@ public class DatafeedJobTests extends ESTestCase {
ArgumentCaptor<Long> endTimeCaptor = ArgumentCaptor.forClass(Long.class); ArgumentCaptor<Long> endTimeCaptor = ArgumentCaptor.forClass(Long.class);
verify(dataExtractorFactory, times(2)).newExtractor(startTimeCaptor.capture(), endTimeCaptor.capture()); verify(dataExtractorFactory, times(2)).newExtractor(startTimeCaptor.capture(), endTimeCaptor.capture());
assertEquals(0L, startTimeCaptor.getAllValues().get(0).longValue()); assertEquals(0L, startTimeCaptor.getAllValues().get(0).longValue());
assertEquals(1000L, startTimeCaptor.getAllValues().get(1).longValue()); assertEquals(0L, startTimeCaptor.getAllValues().get(1).longValue());
assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue()); assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue());
assertEquals(2000L, endTimeCaptor.getAllValues().get(1).longValue()); assertEquals(2000L, endTimeCaptor.getAllValues().get(1).longValue());
assertThat(flushJobRequests.getAllValues().isEmpty(), is(true));
} }
public void testAnalysisProblem() throws Exception { public void testAnalysisProblem() throws Exception {