reduction step failing fails jobs (#5831)
* adding a catch for failing reduction step * spotless * added changelog --------- Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-mbp.home>
This commit is contained in:
parent
56cc596df1
commit
57fdc2d5c1
|
@ -0,0 +1,8 @@
|
||||||
|
---
|
||||||
|
type: fix
|
||||||
|
issue: 5828
|
||||||
|
title: "When batch 2 jobs with Reduction steps fail in the final part
|
||||||
|
of the reduction step, this would often leave the job
|
||||||
|
stuck in the FINALIZE state.
|
||||||
|
This has been fixed; the job will now FAIL.
|
||||||
|
"
|
|
@ -234,6 +234,61 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
|
||||||
assertEquals(1.0, jobInstance.getProgress());
|
assertEquals(1.0, jobInstance.getProgress());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void reductionStepFailing_willFailJob() throws InterruptedException {
|
||||||
|
// setup
|
||||||
|
String jobId = new Exception().getStackTrace()[0].getMethodName();
|
||||||
|
int totalChunks = 3;
|
||||||
|
AtomicInteger chunkCounter = new AtomicInteger();
|
||||||
|
String error = "this is an error";
|
||||||
|
|
||||||
|
buildAndDefine3StepReductionJob(jobId, new IReductionStepHandler() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void firstStep(StepExecutionDetails<TestJobParameters, VoidModel> theStep, IJobDataSink<FirstStepOutput> theDataSink) {
|
||||||
|
for (int i = 0; i < totalChunks; i++) {
|
||||||
|
theDataSink.accept(new FirstStepOutput());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void secondStep(StepExecutionDetails<TestJobParameters, FirstStepOutput> theStep, IJobDataSink<SecondStepOutput> theDataSink) {
|
||||||
|
SecondStepOutput output = new SecondStepOutput();
|
||||||
|
theDataSink.accept(output);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reductionStepConsume(ChunkExecutionDetails<TestJobParameters, SecondStepOutput> theChunkDetails, IJobDataSink<ReductionStepOutput> theDataSink) {
|
||||||
|
chunkCounter.getAndIncrement();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reductionStepRun(StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails, IJobDataSink<ReductionStepOutput> theDataSink) {
|
||||||
|
// always throw
|
||||||
|
throw new RuntimeException(error);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// test
|
||||||
|
JobInstanceStartRequest request = buildRequest(jobId);
|
||||||
|
myFirstStepLatch.setExpectedCount(1);
|
||||||
|
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
|
||||||
|
String instanceId = startResponse.getInstanceId();
|
||||||
|
assertNotNull(instanceId);
|
||||||
|
|
||||||
|
// waiting for job to end (any status - but we'll verify failed later)
|
||||||
|
myBatch2JobHelper.awaitJobHasStatus(instanceId, StatusEnum.getEndedStatuses().toArray(new StatusEnum[0]));
|
||||||
|
|
||||||
|
// verify
|
||||||
|
Optional<JobInstance> instanceOp = myJobPersistence.fetchInstance(instanceId);
|
||||||
|
assertTrue(instanceOp.isPresent());
|
||||||
|
JobInstance jobInstance = instanceOp.get();
|
||||||
|
|
||||||
|
assertEquals(totalChunks, chunkCounter.get());
|
||||||
|
|
||||||
|
assertEquals(StatusEnum.FAILED, jobInstance.getStatus());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testJobWithReductionStepFiresCompletionHandler() throws InterruptedException {
|
public void testJobWithReductionStepFiresCompletionHandler() throws InterruptedException {
|
||||||
// setup
|
// setup
|
||||||
|
|
|
@ -30,6 +30,7 @@ import ca.uhn.fhir.batch2.model.ChunkOutcome;
|
||||||
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
|
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
|
||||||
import ca.uhn.fhir.batch2.model.JobInstance;
|
import ca.uhn.fhir.batch2.model.JobInstance;
|
||||||
import ca.uhn.fhir.batch2.model.JobWorkCursor;
|
import ca.uhn.fhir.batch2.model.JobWorkCursor;
|
||||||
|
import ca.uhn.fhir.batch2.model.StatusEnum;
|
||||||
import ca.uhn.fhir.batch2.model.WorkChunk;
|
import ca.uhn.fhir.batch2.model.WorkChunk;
|
||||||
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
|
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
|
||||||
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
|
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
|
||||||
|
@ -137,7 +138,6 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
|
||||||
public void reducerPass() {
|
public void reducerPass() {
|
||||||
if (myCurrentlyExecuting.tryAcquire()) {
|
if (myCurrentlyExecuting.tryAcquire()) {
|
||||||
try {
|
try {
|
||||||
|
|
||||||
String[] instanceIds = myInstanceIdToJobWorkCursor.keySet().toArray(new String[0]);
|
String[] instanceIds = myInstanceIdToJobWorkCursor.keySet().toArray(new String[0]);
|
||||||
if (instanceIds.length > 0) {
|
if (instanceIds.length > 0) {
|
||||||
String instanceId = instanceIds[0];
|
String instanceId = instanceIds[0];
|
||||||
|
@ -214,6 +214,36 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
|
||||||
boolean defaultSuccessValue = true;
|
boolean defaultSuccessValue = true;
|
||||||
ReductionStepChunkProcessingResponse response = new ReductionStepChunkProcessingResponse(defaultSuccessValue);
|
ReductionStepChunkProcessingResponse response = new ReductionStepChunkProcessingResponse(defaultSuccessValue);
|
||||||
|
|
||||||
|
try {
|
||||||
|
processChunksAndCompleteJob(theJobWorkCursor, step, instance, parameters, reductionStepWorker, response);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
ourLog.error("Job completion failed for Job {}", instance.getInstanceId());
|
||||||
|
|
||||||
|
executeInTransactionWithSynchronization(() -> {
|
||||||
|
myJobPersistence.updateInstance(instance.getInstanceId(), theInstance -> {
|
||||||
|
theInstance.setStatus(StatusEnum.FAILED);
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
response.setSuccessful(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// if no successful chunks, return false
|
||||||
|
if (!response.hasSuccessfulChunksIds()) {
|
||||||
|
response.setSuccessful(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> void processChunksAndCompleteJob(
|
||||||
|
JobWorkCursor<PT, IT, OT> theJobWorkCursor,
|
||||||
|
JobDefinitionStep<PT, IT, OT> step,
|
||||||
|
JobInstance instance,
|
||||||
|
PT parameters,
|
||||||
|
IReductionStepWorker<PT, IT, OT> reductionStepWorker,
|
||||||
|
ReductionStepChunkProcessingResponse response) {
|
||||||
try {
|
try {
|
||||||
executeInTransactionWithSynchronization(() -> {
|
executeInTransactionWithSynchronization(() -> {
|
||||||
try (Stream<WorkChunk> chunkIterator =
|
try (Stream<WorkChunk> chunkIterator =
|
||||||
|
@ -277,13 +307,6 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
|
||||||
return null;
|
return null;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// if no successful chunks, return false
|
|
||||||
if (!response.hasSuccessfulChunksIds()) {
|
|
||||||
response.setSuccessful(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
return response;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> T executeInTransactionWithSynchronization(Callable<T> runnable) {
|
private <T> T executeInTransactionWithSynchronization(Callable<T> runnable) {
|
||||||
|
|
Loading…
Reference in New Issue