reduction step failing fails jobs (#5831)

* adding a catch for failing reduction step

* spotless

* added changelog

---------

Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-mbp.home>
This commit is contained in:
TipzCM 2024-04-08 13:51:41 -04:00 committed by GitHub
parent eafa2aba29
commit 107de2cfba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 94 additions and 8 deletions

View File

@ -0,0 +1,8 @@
---
type: fix
issue: 5828
title: "When batch 2 jobs with Reduction steps fail in the final part
of the reduction step, this would often leave the job
stuck in the FINALIZE state.
This has been fixed; the job will now FAIL.
"

View File

@ -234,6 +234,61 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
assertEquals(1.0, jobInstance.getProgress()); assertEquals(1.0, jobInstance.getProgress());
} }
@Test
public void reductionStepFailing_willFailJob() throws InterruptedException {
// setup
String jobId = new Exception().getStackTrace()[0].getMethodName();
int totalChunks = 3;
AtomicInteger chunkCounter = new AtomicInteger();
String error = "this is an error";
buildAndDefine3StepReductionJob(jobId, new IReductionStepHandler() {
@Override
public void firstStep(StepExecutionDetails<TestJobParameters, VoidModel> theStep, IJobDataSink<FirstStepOutput> theDataSink) {
for (int i = 0; i < totalChunks; i++) {
theDataSink.accept(new FirstStepOutput());
}
}
@Override
public void secondStep(StepExecutionDetails<TestJobParameters, FirstStepOutput> theStep, IJobDataSink<SecondStepOutput> theDataSink) {
SecondStepOutput output = new SecondStepOutput();
theDataSink.accept(output);
}
@Override
public void reductionStepConsume(ChunkExecutionDetails<TestJobParameters, SecondStepOutput> theChunkDetails, IJobDataSink<ReductionStepOutput> theDataSink) {
chunkCounter.getAndIncrement();
}
@Override
public void reductionStepRun(StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails, IJobDataSink<ReductionStepOutput> theDataSink) {
// always throw
throw new RuntimeException(error);
}
});
// test
JobInstanceStartRequest request = buildRequest(jobId);
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
String instanceId = startResponse.getInstanceId();
assertNotNull(instanceId);
// waiting for job to end (any status - but we'll verify failed later)
myBatch2JobHelper.awaitJobHasStatus(instanceId, StatusEnum.getEndedStatuses().toArray(new StatusEnum[0]));
// verify
Optional<JobInstance> instanceOp = myJobPersistence.fetchInstance(instanceId);
assertTrue(instanceOp.isPresent());
JobInstance jobInstance = instanceOp.get();
assertEquals(totalChunks, chunkCounter.get());
assertEquals(StatusEnum.FAILED, jobInstance.getStatus());
}
@Test @Test
public void testJobWithReductionStepFiresCompletionHandler() throws InterruptedException { public void testJobWithReductionStepFiresCompletionHandler() throws InterruptedException {
// setup // setup

View File

@ -30,6 +30,7 @@ import ca.uhn.fhir.batch2.model.ChunkOutcome;
import ca.uhn.fhir.batch2.model.JobDefinitionStep; import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor; import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
@ -137,7 +138,6 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
public void reducerPass() { public void reducerPass() {
if (myCurrentlyExecuting.tryAcquire()) { if (myCurrentlyExecuting.tryAcquire()) {
try { try {
String[] instanceIds = myInstanceIdToJobWorkCursor.keySet().toArray(new String[0]); String[] instanceIds = myInstanceIdToJobWorkCursor.keySet().toArray(new String[0]);
if (instanceIds.length > 0) { if (instanceIds.length > 0) {
String instanceId = instanceIds[0]; String instanceId = instanceIds[0];
@ -214,6 +214,36 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
boolean defaultSuccessValue = true; boolean defaultSuccessValue = true;
ReductionStepChunkProcessingResponse response = new ReductionStepChunkProcessingResponse(defaultSuccessValue); ReductionStepChunkProcessingResponse response = new ReductionStepChunkProcessingResponse(defaultSuccessValue);
try {
processChunksAndCompleteJob(theJobWorkCursor, step, instance, parameters, reductionStepWorker, response);
} catch (Exception ex) {
ourLog.error("Job completion failed for Job {}", instance.getInstanceId());
executeInTransactionWithSynchronization(() -> {
myJobPersistence.updateInstance(instance.getInstanceId(), theInstance -> {
theInstance.setStatus(StatusEnum.FAILED);
return true;
});
return null;
});
response.setSuccessful(false);
}
// if no successful chunks, return false
if (!response.hasSuccessfulChunksIds()) {
response.setSuccessful(false);
}
return response;
}
private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> void processChunksAndCompleteJob(
JobWorkCursor<PT, IT, OT> theJobWorkCursor,
JobDefinitionStep<PT, IT, OT> step,
JobInstance instance,
PT parameters,
IReductionStepWorker<PT, IT, OT> reductionStepWorker,
ReductionStepChunkProcessingResponse response) {
try { try {
executeInTransactionWithSynchronization(() -> { executeInTransactionWithSynchronization(() -> {
try (Stream<WorkChunk> chunkIterator = try (Stream<WorkChunk> chunkIterator =
@ -277,13 +307,6 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
return null; return null;
}); });
} }
// if no successful chunks, return false
if (!response.hasSuccessfulChunksIds()) {
response.setSuccessful(false);
}
return response;
} }
private <T> T executeInTransactionWithSynchronization(Callable<T> runnable) { private <T> T executeInTransactionWithSynchronization(Callable<T> runnable) {