From 3fa43093eb06ba8d4ac342db9b4320fedb778793 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 11 Apr 2017 15:55:05 +0100 Subject: [PATCH] [ML] Improve handling of job errors from datafeed (elastic/x-pack-elasticsearch#1049) - stops the datafeed when post/flush throw a conflict exception. A conflict exception signifies the job state is not opened, thus we are better off stopping the datafeed. - handles flushing the job the same way as posting to the job. relates elastic/x-pack-elasticsearch#855 Original commit: elastic/x-pack-elasticsearch@49a54912c2b438807c6d6440468d99a721bb01d7 --- .../ml/action/TransportJobTaskAction.java | 3 + .../xpack/ml/datafeed/DatafeedJob.java | 38 +++++++++++- .../xpack/ml/datafeed/DatafeedManager.java | 15 +++++ .../xpack/ml/datafeed/DatafeedJobTests.java | 56 +++++++++++++++++- .../ml/datafeed/DatafeedManagerTests.java | 59 +++++++++++++++++++ 5 files changed, 165 insertions(+), 6 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java index 73397d24b0e..ef23b998594 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java @@ -80,6 +80,9 @@ public abstract class TransportJobTaskAction datafeedJob.runLookBack(0L, 1000L)); + DatafeedJob.AnalysisProblemException analysisProblemException = + expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runLookBack(0L, 1000L)); + assertThat(analysisProblemException.shouldStop, is(false)); currentTime = 3001; expectThrows(DatafeedJob.EmptyDataCountException.class, datafeedJob::runRealtime); @@ -192,6 +195,53 @@ public class DatafeedJobTests extends ESTestCase { verify(client, times(0)).execute(same(FlushJobAction.INSTANCE), any()); } + public void testPostAnalysisProblemIsConflict() throws Exception { + client = mock(Client.class); + when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); + when(client.execute(same(PostDataAction.INSTANCE), any())).thenThrow(ExceptionsHelper.conflictStatusException("conflict")); + + DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1); + DatafeedJob.AnalysisProblemException analysisProblemException = + expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runLookBack(0L, 1000L)); + assertThat(analysisProblemException.shouldStop, is(true)); + + currentTime = 3001; + expectThrows(DatafeedJob.EmptyDataCountException.class, datafeedJob::runRealtime); + + ArgumentCaptor startTimeCaptor = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor endTimeCaptor = ArgumentCaptor.forClass(Long.class); + verify(dataExtractorFactory, times(2)).newExtractor(startTimeCaptor.capture(), endTimeCaptor.capture()); + assertEquals(0L, startTimeCaptor.getAllValues().get(0).longValue()); + assertEquals(1000L, startTimeCaptor.getAllValues().get(1).longValue()); + assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue()); + assertEquals(2000L, endTimeCaptor.getAllValues().get(1).longValue()); + verify(client, times(0)).execute(same(FlushJobAction.INSTANCE), any()); + } + + public void testFlushAnalysisProblem() throws Exception { + when(client.execute(same(FlushJobAction.INSTANCE), any())).thenThrow(new RuntimeException()); + + currentTime = 60000L; + long frequencyMs = 100; + long queryDelayMs = 1000; + DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1); + DatafeedJob.AnalysisProblemException analysisProblemException = + expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runRealtime()); + assertThat(analysisProblemException.shouldStop, is(false)); + } + + public void testFlushAnalysisProblemIsConflict() throws Exception { + when(client.execute(same(FlushJobAction.INSTANCE), any())).thenThrow(ExceptionsHelper.conflictStatusException("conflict")); + + currentTime = 60000L; + long frequencyMs = 100; + long queryDelayMs = 1000; + DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1); + DatafeedJob.AnalysisProblemException analysisProblemException = + expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runRealtime()); + assertThat(analysisProblemException.shouldStop, is(true)); + } + private DatafeedJob createDatafeedJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs, long latestRecordTimeMs) { Supplier currentTimeSupplier = () -> currentTime; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index 2372c3a5cfc..3da00ef5a16 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.notifications.AuditMessage; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksService; @@ -63,6 +64,8 @@ import java.util.concurrent.ScheduledFuture; import java.util.function.Consumer; import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; @@ -263,6 +266,60 @@ public class DatafeedManagerTests extends ESTestCase { verify(client, never()).execute(same(FlushJobAction.INSTANCE), any()); } + public void testRealTime_GivenPostAnalysisProblemIsConflict() throws Exception { + Exception conflictProblem = ExceptionsHelper.conflictStatusException("conflict"); + when(client.execute(same(PostDataAction.INSTANCE), any())).thenThrow(conflictProblem); + + DataExtractor dataExtractor = mock(DataExtractor.class); + when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor); + when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); + byte[] contentBytes = "".getBytes(Charset.forName("utf-8")); + InputStream in = new ByteArrayInputStream(contentBytes); + when(dataExtractor.next()).thenReturn(Optional.of(in)); + + DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, + new Date(0), new Date(0), new Date(0), new Date(0), new Date(0)); + when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts)); + Consumer handler = mockConsumer(); + StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request("datafeed_id", 0L); + DatafeedTask task = StartDatafeedActionTests.createDatafeedTask(1, "type", "action", null, + startDatafeedRequest, datafeedManager); + task = spyDatafeedTask(task); + datafeedManager.run(task, handler); + + ArgumentCaptor analysisProblemCaptor = + ArgumentCaptor.forClass(DatafeedJob.AnalysisProblemException.class); + verify(handler).accept(analysisProblemCaptor.capture()); + assertThat(analysisProblemCaptor.getValue().getCause(), equalTo(conflictProblem)); + verify(auditor).error("job_id", "Datafeed is encountering errors submitting data for analysis: conflict"); + assertThat(datafeedManager.isRunning(task.getDatafeedId()), is(false)); + } + + public void testRealTime_GivenPostAnalysisProblemIsNonConflict() throws Exception { + Exception nonConflictProblem = new RuntimeException("just runtime"); + when(client.execute(same(PostDataAction.INSTANCE), any())).thenThrow(nonConflictProblem); + + DataExtractor dataExtractor = mock(DataExtractor.class); + when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor); + when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); + byte[] contentBytes = "".getBytes(Charset.forName("utf-8")); + InputStream in = new ByteArrayInputStream(contentBytes); + when(dataExtractor.next()).thenReturn(Optional.of(in)); + + DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, + new Date(0), new Date(0), new Date(0), new Date(0), new Date(0)); + when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts)); + Consumer handler = mockConsumer(); + StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request("datafeed_id", 0L); + DatafeedTask task = StartDatafeedActionTests.createDatafeedTask(1, "type", "action", null, + startDatafeedRequest, datafeedManager); + task = spyDatafeedTask(task); + datafeedManager.run(task, handler); + + verify(auditor).error("job_id", "Datafeed is encountering errors submitting data for analysis: just runtime"); + assertThat(datafeedManager.isRunning(task.getDatafeedId()), is(true)); + } + public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception { DataExtractor dataExtractor = mock(DataExtractor.class); when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor); @@ -286,11 +343,13 @@ public class DatafeedManagerTests extends ESTestCase { if (cancelled) { task.stop("test"); verify(handler).accept(null); + assertThat(datafeedManager.isRunning(task.getDatafeedId()), is(false)); } else { verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest("job_id", contentBytes, xContentType))); verify(client).execute(same(FlushJobAction.INSTANCE), any()); verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any()); + assertThat(datafeedManager.isRunning(task.getDatafeedId()), is(true)); } }