diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_2_0/5828-reduction-jobs-failing-in-reduction-fails-jobs.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_2_0/5828-reduction-jobs-failing-in-reduction-fails-jobs.yaml new file mode 100644 index 00000000000..978d0e3e233 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_2_0/5828-reduction-jobs-failing-in-reduction-fails-jobs.yaml @@ -0,0 +1,8 @@ +--- +type: fix +issue: 5828 +title: "When batch 2 jobs with Reduction steps fail in the final part + of the reduction step, this would often leave the job + stuck in the FINALIZE state. + This has been fixed; the job will now FAIL. +" diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java index 130cf6951a3..2e304e08373 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java @@ -234,6 +234,61 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test { assertEquals(1.0, jobInstance.getProgress()); } + @Test + public void reductionStepFailing_willFailJob() throws InterruptedException { + // setup + String jobId = new Exception().getStackTrace()[0].getMethodName(); + int totalChunks = 3; + AtomicInteger chunkCounter = new AtomicInteger(); + String error = "this is an error"; + + buildAndDefine3StepReductionJob(jobId, new IReductionStepHandler() { + + @Override + public void firstStep(StepExecutionDetails theStep, IJobDataSink theDataSink) { + for (int i = 0; i < totalChunks; i++) { + theDataSink.accept(new FirstStepOutput()); + } + } + + @Override + public void secondStep(StepExecutionDetails theStep, IJobDataSink theDataSink) { + SecondStepOutput output = new SecondStepOutput(); + theDataSink.accept(output); + } + + @Override + public void reductionStepConsume(ChunkExecutionDetails theChunkDetails, IJobDataSink theDataSink) { + chunkCounter.getAndIncrement(); + } + + @Override + public void reductionStepRun(StepExecutionDetails theStepExecutionDetails, IJobDataSink theDataSink) { + // always throw + throw new RuntimeException(error); + } + }); + + // test + JobInstanceStartRequest request = buildRequest(jobId); + myFirstStepLatch.setExpectedCount(1); + Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request); + String instanceId = startResponse.getInstanceId(); + assertNotNull(instanceId); + + // waiting for job to end (any status - but we'll verify failed later) + myBatch2JobHelper.awaitJobHasStatus(instanceId, StatusEnum.getEndedStatuses().toArray(new StatusEnum[0])); + + // verify + Optional instanceOp = myJobPersistence.fetchInstance(instanceId); + assertTrue(instanceOp.isPresent()); + JobInstance jobInstance = instanceOp.get(); + + assertEquals(totalChunks, chunkCounter.get()); + + assertEquals(StatusEnum.FAILED, jobInstance.getStatus()); + } + @Test public void testJobWithReductionStepFiresCompletionHandler() throws InterruptedException { // setup diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImpl.java index 48c20840784..febd36805bb 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImpl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImpl.java @@ -30,6 +30,7 @@ import ca.uhn.fhir.batch2.model.ChunkOutcome; import ca.uhn.fhir.batch2.model.JobDefinitionStep; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobWorkCursor; +import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; @@ -137,7 +138,6 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS public void reducerPass() { if (myCurrentlyExecuting.tryAcquire()) { try { - String[] instanceIds = myInstanceIdToJobWorkCursor.keySet().toArray(new String[0]); if (instanceIds.length > 0) { String instanceId = instanceIds[0]; @@ -214,6 +214,36 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS boolean defaultSuccessValue = true; ReductionStepChunkProcessingResponse response = new ReductionStepChunkProcessingResponse(defaultSuccessValue); + try { + processChunksAndCompleteJob(theJobWorkCursor, step, instance, parameters, reductionStepWorker, response); + } catch (Exception ex) { + ourLog.error("Job completion failed for Job {}", instance.getInstanceId()); + + executeInTransactionWithSynchronization(() -> { + myJobPersistence.updateInstance(instance.getInstanceId(), theInstance -> { + theInstance.setStatus(StatusEnum.FAILED); + return true; + }); + return null; + }); + response.setSuccessful(false); + } + + // if no successful chunks, return false + if (!response.hasSuccessfulChunksIds()) { + response.setSuccessful(false); + } + + return response; + } + + private void processChunksAndCompleteJob( + JobWorkCursor theJobWorkCursor, + JobDefinitionStep step, + JobInstance instance, + PT parameters, + IReductionStepWorker reductionStepWorker, + ReductionStepChunkProcessingResponse response) { try { executeInTransactionWithSynchronization(() -> { try (Stream chunkIterator = @@ -277,13 +307,6 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS return null; }); } - - // if no successful chunks, return false - if (!response.hasSuccessfulChunksIds()) { - response.setSuccessful(false); - } - - return response; } private T executeInTransactionWithSynchronization(Callable runnable) {