ML: Add reason field in JobTaskState (#38029)

* ML: adding reason to job failure status

* marking reason as nullable

* Update AutodetectProcessManager.java
This commit is contained in:
Benjamin Trent 2019-01-30 11:56:24 -06:00 committed by GitHub
parent ed460c2815
commit 9782aaa1b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 90 additions and 54 deletions

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.core.ml.job.config;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -20,6 +22,7 @@ import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class JobTaskState implements PersistentTaskState {
@ -27,9 +30,11 @@ public class JobTaskState implements PersistentTaskState {
private static ParseField STATE = new ParseField("state");
private static ParseField ALLOCATION_ID = new ParseField("allocation_id");
private static ParseField REASON = new ParseField("reason");
private static final ConstructingObjectParser<JobTaskState, Void> PARSER =
new ConstructingObjectParser<>(NAME, true, args -> new JobTaskState((JobState) args[0], (Long) args[1]));
new ConstructingObjectParser<>(NAME, true,
args -> new JobTaskState((JobState) args[0], (Long) args[1], (String) args[2]));
static {
PARSER.declareField(constructorArg(), p -> {
@ -39,6 +44,7 @@ public class JobTaskState implements PersistentTaskState {
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, STATE, ObjectParser.ValueType.STRING);
PARSER.declareLong(constructorArg(), ALLOCATION_ID);
PARSER.declareString(optionalConstructorArg(), REASON);
}
public static JobTaskState fromXContent(XContentParser parser) {
@ -51,21 +57,33 @@ public class JobTaskState implements PersistentTaskState {
private final JobState state;
private final long allocationId;
private final String reason;
public JobTaskState(JobState state, long allocationId) {
public JobTaskState(JobState state, long allocationId, @Nullable String reason) {
this.state = Objects.requireNonNull(state);
this.allocationId = allocationId;
this.reason = reason;
}
public JobTaskState(StreamInput in) throws IOException {
state = JobState.fromStream(in);
allocationId = in.readLong();
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
reason = in.readOptionalString();
} else {
reason = null;
}
}
public JobState getState() {
return state;
}
@Nullable
public String getReason() {
return reason;
}
/**
* The job state stores the allocation ID at the time it was last set.
* This method compares the allocation ID in the state with the allocation
@ -90,6 +108,9 @@ public class JobTaskState implements PersistentTaskState {
public void writeTo(StreamOutput out) throws IOException {
state.writeTo(out);
out.writeLong(allocationId);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeOptionalString(reason);
}
}
@Override
@ -102,6 +123,9 @@ public class JobTaskState implements PersistentTaskState {
builder.startObject();
builder.field(STATE.getPreferredName(), state.value());
builder.field(ALLOCATION_ID.getPreferredName(), allocationId);
if (reason != null) {
builder.field(REASON.getPreferredName(), reason);
}
builder.endObject();
return builder;
}
@ -112,11 +136,12 @@ public class JobTaskState implements PersistentTaskState {
if (o == null || getClass() != o.getClass()) return false;
JobTaskState that = (JobTaskState) o;
return state == that.state &&
Objects.equals(allocationId, that.allocationId);
Objects.equals(allocationId, that.allocationId) &&
Objects.equals(reason, that.reason);
}
@Override
public int hashCode() {
return Objects.hash(state, allocationId);
return Objects.hash(state, allocationId, reason);
}
}

View File

@ -33,7 +33,7 @@ public class MlTasksTests extends ESTestCase {
new PersistentTasksCustomMetaData.Assignment("bar", "test assignment"));
assertEquals(JobState.OPENING, MlTasks.getJobState("foo", tasksBuilder.build()));
tasksBuilder.updateTaskState(MlTasks.jobTaskId("foo"), new JobTaskState(JobState.OPENED, tasksBuilder.getLastAllocationId()));
tasksBuilder.updateTaskState(MlTasks.jobTaskId("foo"), new JobTaskState(JobState.OPENED, tasksBuilder.getLastAllocationId(), null));
assertEquals(JobState.OPENED, MlTasks.getJobState("foo", tasksBuilder.build()));
}

View File

@ -266,7 +266,7 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
@Override
protected void taskOperation(CloseJobAction.Request request, TransportOpenJobAction.JobTask jobTask,
ActionListener<CloseJobAction.Response> listener) {
JobTaskState taskState = new JobTaskState(JobState.CLOSING, jobTask.getAllocationId());
JobTaskState taskState = new JobTaskState(JobState.CLOSING, jobTask.getAllocationId(), "close job (api)");
jobTask.updatePersistentTaskState(taskState, ActionListener.wrap(task -> {
// we need to fork because we are now on a network threadpool and closeJob method may take a while to complete:
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {

View File

@ -9,6 +9,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
/**
* Factory interface for creating implementations of {@link AutodetectProcess}
@ -28,5 +29,5 @@ public interface AutodetectProcessFactory {
AutodetectProcess createAutodetectProcess(Job job,
AutodetectParams autodetectParams,
ExecutorService executorService,
Runnable onProcessCrash);
Consumer<String> onProcessCrash);
}

View File

@ -475,14 +475,14 @@ public class AutodetectProcessManager implements ClusterStateListener {
.kill();
processByAllocation.remove(jobTask.getAllocationId());
} finally {
setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1, true));
setJobState(jobTask, JobState.FAILED, e1.getMessage(), e2 -> closeHandler.accept(e1, true));
}
}
}
});
}, e1 -> {
logger.warn("Failed to gather information required to open job [" + jobId + "]", e1);
setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1, true));
setJobState(jobTask, JobState.FAILED, e1.getMessage(), e2 -> closeHandler.accept(e1, true));
});
},
e -> closeHandler.accept(e, true)
@ -601,8 +601,8 @@ public class AutodetectProcessManager implements ClusterStateListener {
auditor.info(jobId, msg);
}
private Runnable onProcessCrash(JobTask jobTask) {
return () -> {
private Consumer<String> onProcessCrash(JobTask jobTask) {
return (reason) -> {
ProcessContext processContext = processByAllocation.remove(jobTask.getAllocationId());
if (processContext != null) {
AutodetectCommunicator communicator = processContext.getAutodetectCommunicator();
@ -610,7 +610,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
communicator.destroyCategorizationAnalyzer();
}
}
setJobState(jobTask, JobState.FAILED);
setJobState(jobTask, JobState.FAILED, reason);
try {
removeTmpStorage(jobTask.getJobId());
} catch (IOException e) {
@ -666,7 +666,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
throw e;
}
logger.warn("[" + jobId + "] Exception closing autodetect process", e);
setJobState(jobTask, JobState.FAILED);
setJobState(jobTask, JobState.FAILED, e.getMessage());
throw ExceptionsHelper.serverError("Exception closing autodetect process", e);
} finally {
// to ensure the contract that multiple simultaneous close calls for the same job wait until
@ -720,8 +720,8 @@ public class AutodetectProcessManager implements ClusterStateListener {
return Optional.of(Duration.between(communicator.getProcessStartTime(), ZonedDateTime.now()));
}
void setJobState(JobTask jobTask, JobState state) {
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId());
void setJobState(JobTask jobTask, JobState state, String reason) {
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason);
jobTask.updatePersistentTaskState(jobTaskState, new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> persistentTask) {
@ -735,27 +735,31 @@ public class AutodetectProcessManager implements ClusterStateListener {
});
}
void setJobState(JobTask jobTask, JobState state, CheckedConsumer<Exception, IOException> handler) {
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId());
jobTask.updatePersistentTaskState(jobTaskState, new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> persistentTask) {
try {
handler.accept(null);
} catch (IOException e1) {
logger.warn("Error while delegating response", e1);
}
}
void setJobState(JobTask jobTask, JobState state) {
setJobState(jobTask, state, null);
}
@Override
public void onFailure(Exception e) {
try {
handler.accept(e);
} catch (IOException e1) {
logger.warn("Error while delegating exception [" + e.getMessage() + "]", e1);
}
}
});
void setJobState(JobTask jobTask, JobState state, String reason, CheckedConsumer<Exception, IOException> handler) {
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason);
jobTask.updatePersistentTaskState(jobTaskState, new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> persistentTask) {
try {
handler.accept(null);
} catch (IOException e1) {
logger.warn("Error while delegating response", e1);
}
}
@Override
public void onFailure(Exception e) {
try {
handler.accept(e);
} catch (IOException e1) {
logger.warn("Error while delegating exception [" + e.getMessage() + "]", e1);
}
}
});
}
public Optional<Tuple<DataCounts, ModelSizeStats>> getStatistics(JobTask jobTask) {

View File

@ -28,6 +28,7 @@ import java.io.OutputStream;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
/**
* Autodetect process using native code.
@ -42,7 +43,7 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec
NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
AutodetectResultsParser resultsParser, Runnable onProcessCrash) {
AutodetectResultsParser resultsParser, Consumer<String> onProcessCrash) {
super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash);
this.resultsParser = resultsParser;
}

View File

@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
public class NativeAutodetectProcessFactory implements AutodetectProcessFactory {
@ -56,7 +57,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
public AutodetectProcess createAutodetectProcess(Job job,
AutodetectParams params,
ExecutorService executorService,
Runnable onProcessCrash) {
Consumer<String> onProcessCrash) {
List<Path> filesToDelete = new ArrayList<>();
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AutodetectBuilder.AUTODETECT, job.getId(),
true, false, true, true, params.modelSnapshot() != null,

View File

@ -20,7 +20,7 @@ class NativeNormalizerProcess extends AbstractNativeProcess implements Normalize
private static final String NAME = "normalizer";
NativeNormalizerProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream) {
super(jobId, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), () -> {});
super(jobId, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), (ignore) -> {});
}
@Override

View File

@ -23,12 +23,14 @@ import java.nio.file.Path;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
/**
* Abstract class for implementing a native process.
@ -48,7 +50,7 @@ public abstract class AbstractNativeProcess implements NativeProcess {
private final ZonedDateTime startTime;
private final int numberOfFields;
private final List<Path> filesToDelete;
private final Runnable onProcessCrash;
private final Consumer<String> onProcessCrash;
private volatile Future<?> logTailFuture;
private volatile Future<?> stateProcessorFuture;
private volatile boolean processCloseInitiated;
@ -57,7 +59,7 @@ public abstract class AbstractNativeProcess implements NativeProcess {
protected AbstractNativeProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
Runnable onProcessCrash) {
Consumer<String> onProcessCrash) {
this.jobId = jobId;
cppLogHandler = new CppLogMessageHandler(jobId, logStream);
this.processInStream = new BufferedOutputStream(processInStream);
@ -90,8 +92,9 @@ public abstract class AbstractNativeProcess implements NativeProcess {
// by a user or other process (e.g. the Linux OOM killer)
String errors = cppLogHandler.getErrors();
LOGGER.error("[{}] {} process stopped unexpectedly: {}", jobId, getName(), errors);
onProcessCrash.run();
String fullError = String.format(Locale.ROOT, "[%s] %s process stopped unexpectedly: %s", jobId, getName(), errors);
LOGGER.error(fullError);
onProcessCrash.accept(fullError);
}
}
});

View File

@ -552,7 +552,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
new Assignment(nodeId, "test assignment"));
if (jobState != null) {
builder.updateTaskState(MlTasks.jobTaskId(jobId),
new JobTaskState(jobState, builder.getLastAllocationId() - (isStale ? 1 : 0)));
new JobTaskState(jobState, builder.getLastAllocationId() - (isStale ? 1 : 0), null));
}
}

View File

@ -240,7 +240,7 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask(job.getId(), nodeId, JobState.OPENED, tasksBuilder);
// Set to lower allocationId, so job task is stale:
tasksBuilder.updateTaskState(MlTasks.jobTaskId(job.getId()), new JobTaskState(JobState.OPENED, 0));
tasksBuilder.updateTaskState(MlTasks.jobTaskId(job.getId()), new JobTaskState(JobState.OPENED, 0, null));
tasks = tasksBuilder.build();
givenClusterState("foo", 1, 0);

View File

@ -15,7 +15,7 @@ public class JobTaskStateTests extends AbstractSerializingTestCase<JobTaskState>
@Override
protected JobTaskState createTestInstance() {
return new JobTaskState(randomFrom(JobState.values()), randomLong());
return new JobTaskState(randomFrom(JobState.values()), randomLong(), randomAlphaOfLength(10));
}
@Override

View File

@ -204,7 +204,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
manager.openJob(jobTask, clusterState, (e, b) -> {});
assertEquals(1, manager.numberOfOpenJobs());
assertTrue(manager.jobHasActiveAutodetectProcess(jobTask));
verify(jobTask).updatePersistentTaskState(eq(new JobTaskState(JobState.OPENED, 1L)), any());
verify(jobTask).updatePersistentTaskState(eq(new JobTaskState(JobState.OPENED, 1L, null)), any());
}
@ -266,10 +266,10 @@ public class AutodetectProcessManagerTests extends ESTestCase {
doReturn(executorService).when(manager).createAutodetectExecutorService(any());
doAnswer(invocationOnMock -> {
CheckedConsumer<Exception, IOException> consumer = (CheckedConsumer<Exception, IOException>) invocationOnMock.getArguments()[2];
CheckedConsumer<Exception, IOException> consumer = (CheckedConsumer<Exception, IOException>) invocationOnMock.getArguments()[3];
consumer.accept(null);
return null;
}).when(manager).setJobState(any(), eq(JobState.FAILED), any());
}).when(manager).setJobState(any(), eq(JobState.FAILED), any(), any());
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
@ -512,7 +512,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
expectThrows(ElasticsearchException.class, () -> manager.closeJob(jobTask, false, null));
assertEquals(0, manager.numberOfOpenJobs());
verify(manager).setJobState(any(), eq(JobState.FAILED));
verify(manager).setJobState(any(), eq(JobState.FAILED), any());
}
public void testWriteUpdateProcessMessage() {

View File

@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@ -58,7 +59,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
mock(OutputStream.class), outputStream, mock(OutputStream.class),
NUMBER_FIELDS, null,
new AutodetectResultsParser(), mock(Runnable.class))) {
new AutodetectResultsParser(), mock(Consumer.class))) {
process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class));
ZonedDateTime startTime = process.getProcessStartTime();
@ -80,7 +81,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new AutodetectResultsParser(), mock(Runnable.class))) {
new AutodetectResultsParser(), mock(Consumer.class))) {
process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class));
process.writeRecord(record);
@ -114,7 +115,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
ByteArrayOutputStream bos = new ByteArrayOutputStream(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH + 1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new AutodetectResultsParser(), mock(Runnable.class))) {
new AutodetectResultsParser(), mock(Consumer.class))) {
process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class));
FlushJobParams params = FlushJobParams.builder().build();
@ -147,7 +148,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
processInStream, processOutStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new AutodetectResultsParser(), mock(Runnable.class))) {
new AutodetectResultsParser(), mock(Consumer.class))) {
process.consumeAndCloseOutputStream();
assertThat(processOutStream.available(), equalTo(0));
@ -162,7 +163,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new AutodetectResultsParser(), mock(Runnable.class))) {
new AutodetectResultsParser(), mock(Consumer.class))) {
process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class));
writeFunction.accept(process);