[ML] Improve handling of job errors from datafeed (elastic/x-pack-elasticsearch#1049)

- stops the datafeed when post/flush throw a conflict exception.
A conflict exception signifies the job state is not opened, thus
we are better off stopping the datafeed.
- handles flushing the job the same way as posting to the job.

relates elastic/x-pack-elasticsearch#855

Original commit: elastic/x-pack-elasticsearch@49a54912c2
This commit is contained in:
Dimitris Athanasiou 2017-04-11 15:55:05 +01:00 committed by GitHub
parent 9525cb4784
commit 3fa43093eb
5 changed files with 165 additions and 6 deletions

View File

@ -80,6 +80,9 @@ public abstract class TransportJobTaskAction<OperationTask extends OpenJobAction
innerTaskOperation(request, task, listener, state);
} else {
logger.warn("Unexpected job state based on cluster state version [{}]", state.getVersion());
// Note that DatafeedJob relies on this exception being thrown to detect the state
// conflict and stop the datafeed. If this exception type/status changes, DatafeedJob
// also needs to change.
listener.onFailure(ExceptionsHelper.conflictStatusException("Cannot perform requested action because job [" +
request.getJobId() + "] is not open"));
}

View File

@ -6,12 +6,14 @@
package org.elasticsearch.xpack.ml.datafeed;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ml.action.FlushJobAction;
import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
@ -168,11 +170,16 @@ class DatafeedJob {
Thread.currentThread().interrupt();
}
LOGGER.debug("[" + jobId + "] error while posting data", e);
// a conflict exception means the job state is not open any more.
// we should therefore stop the datafeed.
boolean shouldStop = isConflictException(e);
// When an analysis problem occurs, it means something catastrophic has
// happened to the c++ process. We sent a batch of data to the c++ process
// yet we do not know how many of those were processed. It is better to
// advance time in order to avoid importing duplicate data.
error = new AnalysisProblemException(e);
error = new AnalysisProblemException(shouldStop, e);
break;
}
recordCount += counts.getProcessedRecordCount();
@ -197,7 +204,7 @@ class DatafeedJob {
// we call flush the job is closed. Thus, we don't flush unless the
// datafeed is stilll running.
if (isRunning()) {
client.execute(FlushJobAction.INSTANCE, flushRequest).actionGet();
flushJob(flushRequest);
}
}
@ -212,6 +219,11 @@ class DatafeedJob {
return response.getDataCounts();
}
private boolean isConflictException(Exception e) {
return e instanceof ElasticsearchStatusException
&& ((ElasticsearchStatusException) e).status() == RestStatus.CONFLICT;
}
private long nextRealtimeTimestamp() {
long epochMs = currentTimeSupplier.get() + frequencyMs;
return toIntervalStartEpochMs(epochMs) + NEXT_TASK_DELAY_MS;
@ -221,12 +233,32 @@ class DatafeedJob {
return (epochMs / frequencyMs) * frequencyMs;
}
private void flushJob(FlushJobAction.Request flushRequest) {
try {
client.execute(FlushJobAction.INSTANCE, flushRequest).actionGet();
} catch (Exception e) {
LOGGER.debug("[" + jobId + "] error while flushing job", e);
// a conflict exception means the job state is not open any more.
// we should therefore stop the datafeed.
boolean shouldStop = isConflictException(e);
// When an analysis problem occurs, it means something catastrophic has
// happened to the c++ process. We sent a batch of data to the c++ process
// yet we do not know how many of those were processed. It is better to
// advance time in order to avoid importing duplicate data.
throw new AnalysisProblemException(shouldStop, e);
}
}
class AnalysisProblemException extends RuntimeException {
final boolean shouldStop;
final long nextDelayInMsSinceEpoch = nextRealtimeTimestamp();
AnalysisProblemException(Throwable cause) {
AnalysisProblemException(boolean shouldStop, Throwable cause) {
super(cause);
this.shouldStop = shouldStop;
}
}

View File

@ -157,6 +157,10 @@ public class DatafeedManager extends AbstractComponent {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
if (e.shouldStop) {
holder.stop("lookback_analysis_error", TimeValue.timeValueSeconds(20), e);
return;
}
} catch (DatafeedJob.EmptyDataCountException e) {
if (endTime == null) {
holder.problemTracker.reportEmptyDataCount();
@ -206,6 +210,10 @@ public class DatafeedManager extends AbstractComponent {
} catch (DatafeedJob.AnalysisProblemException e) {
nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
if (e.shouldStop) {
holder.stop("realtime_analysis_error", TimeValue.timeValueSeconds(20), e);
return;
}
} catch (DatafeedJob.EmptyDataCountException e) {
nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
holder.problemTracker.reportEmptyDataCount();
@ -276,6 +284,13 @@ public class DatafeedManager extends AbstractComponent {
return new TimeValue(Math.max(1, next - currentTimeSupplier.get()));
}
/**
* Visible for testing
*/
boolean isRunning(String datafeedId) {
return runningDatafeeds.containsKey(datafeedId);
}
public class Holder {
private final long taskId;

View File

@ -17,6 +17,7 @@ import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
@ -171,13 +172,15 @@ public class DatafeedJobTests extends ESTestCase {
assertThat(flushJobRequests.getAllValues().isEmpty(), is(true));
}
public void testAnalysisProblem() throws Exception {
public void testPostAnalysisProblem() throws Exception {
client = mock(Client.class);
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
when(client.execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("_job_id")))).thenThrow(new RuntimeException());
when(client.execute(same(PostDataAction.INSTANCE), any())).thenThrow(new RuntimeException());
DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1);
DatafeedJob.AnalysisProblemException analysisProblemException =
expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runLookBack(0L, 1000L));
assertThat(analysisProblemException.shouldStop, is(false));
currentTime = 3001;
expectThrows(DatafeedJob.EmptyDataCountException.class, datafeedJob::runRealtime);
@ -192,6 +195,53 @@ public class DatafeedJobTests extends ESTestCase {
verify(client, times(0)).execute(same(FlushJobAction.INSTANCE), any());
}
public void testPostAnalysisProblemIsConflict() throws Exception {
client = mock(Client.class);
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
when(client.execute(same(PostDataAction.INSTANCE), any())).thenThrow(ExceptionsHelper.conflictStatusException("conflict"));
DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1);
DatafeedJob.AnalysisProblemException analysisProblemException =
expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runLookBack(0L, 1000L));
assertThat(analysisProblemException.shouldStop, is(true));
currentTime = 3001;
expectThrows(DatafeedJob.EmptyDataCountException.class, datafeedJob::runRealtime);
ArgumentCaptor<Long> startTimeCaptor = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Long> endTimeCaptor = ArgumentCaptor.forClass(Long.class);
verify(dataExtractorFactory, times(2)).newExtractor(startTimeCaptor.capture(), endTimeCaptor.capture());
assertEquals(0L, startTimeCaptor.getAllValues().get(0).longValue());
assertEquals(1000L, startTimeCaptor.getAllValues().get(1).longValue());
assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue());
assertEquals(2000L, endTimeCaptor.getAllValues().get(1).longValue());
verify(client, times(0)).execute(same(FlushJobAction.INSTANCE), any());
}
public void testFlushAnalysisProblem() throws Exception {
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenThrow(new RuntimeException());
currentTime = 60000L;
long frequencyMs = 100;
long queryDelayMs = 1000;
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1);
DatafeedJob.AnalysisProblemException analysisProblemException =
expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runRealtime());
assertThat(analysisProblemException.shouldStop, is(false));
}
public void testFlushAnalysisProblemIsConflict() throws Exception {
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenThrow(ExceptionsHelper.conflictStatusException("conflict"));
currentTime = 60000L;
long frequencyMs = 100;
long queryDelayMs = 1000;
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1);
DatafeedJob.AnalysisProblemException analysisProblemException =
expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runRealtime());
assertThat(analysisProblemException.shouldStop, is(true));
}
private DatafeedJob createDatafeedJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs,
long latestRecordTimeMs) {
Supplier<Long> currentTimeSupplier = () -> currentTime;

View File

@ -44,6 +44,7 @@ import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.notifications.AuditMessage;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasksService;
@ -63,6 +64,8 @@ import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
@ -263,6 +266,60 @@ public class DatafeedManagerTests extends ESTestCase {
verify(client, never()).execute(same(FlushJobAction.INSTANCE), any());
}
public void testRealTime_GivenPostAnalysisProblemIsConflict() throws Exception {
Exception conflictProblem = ExceptionsHelper.conflictStatusException("conflict");
when(client.execute(same(PostDataAction.INSTANCE), any())).thenThrow(conflictProblem);
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
byte[] contentBytes = "".getBytes(Charset.forName("utf-8"));
InputStream in = new ByteArrayInputStream(contentBytes);
when(dataExtractor.next()).thenReturn(Optional.of(in));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
new Date(0), new Date(0), new Date(0), new Date(0), new Date(0));
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer();
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request("datafeed_id", 0L);
DatafeedTask task = StartDatafeedActionTests.createDatafeedTask(1, "type", "action", null,
startDatafeedRequest, datafeedManager);
task = spyDatafeedTask(task);
datafeedManager.run(task, handler);
ArgumentCaptor<DatafeedJob.AnalysisProblemException> analysisProblemCaptor =
ArgumentCaptor.forClass(DatafeedJob.AnalysisProblemException.class);
verify(handler).accept(analysisProblemCaptor.capture());
assertThat(analysisProblemCaptor.getValue().getCause(), equalTo(conflictProblem));
verify(auditor).error("job_id", "Datafeed is encountering errors submitting data for analysis: conflict");
assertThat(datafeedManager.isRunning(task.getDatafeedId()), is(false));
}
public void testRealTime_GivenPostAnalysisProblemIsNonConflict() throws Exception {
Exception nonConflictProblem = new RuntimeException("just runtime");
when(client.execute(same(PostDataAction.INSTANCE), any())).thenThrow(nonConflictProblem);
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
byte[] contentBytes = "".getBytes(Charset.forName("utf-8"));
InputStream in = new ByteArrayInputStream(contentBytes);
when(dataExtractor.next()).thenReturn(Optional.of(in));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
new Date(0), new Date(0), new Date(0), new Date(0), new Date(0));
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer();
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request("datafeed_id", 0L);
DatafeedTask task = StartDatafeedActionTests.createDatafeedTask(1, "type", "action", null,
startDatafeedRequest, datafeedManager);
task = spyDatafeedTask(task);
datafeedManager.run(task, handler);
verify(auditor).error("job_id", "Datafeed is encountering errors submitting data for analysis: just runtime");
assertThat(datafeedManager.isRunning(task.getDatafeedId()), is(true));
}
public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception {
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
@ -286,11 +343,13 @@ public class DatafeedManagerTests extends ESTestCase {
if (cancelled) {
task.stop("test");
verify(handler).accept(null);
assertThat(datafeedManager.isRunning(task.getDatafeedId()), is(false));
} else {
verify(client).execute(same(PostDataAction.INSTANCE),
eq(createExpectedPostDataRequest("job_id", contentBytes, xContentType)));
verify(client).execute(same(FlushJobAction.INSTANCE), any());
verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any());
assertThat(datafeedManager.isRunning(task.getDatafeedId()), is(true));
}
}