diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index 298b71326df..47ee0365bed 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -24,6 +24,7 @@ import ca.uhn.fhir.batch2.api.IReductionStepExecutorService; import ca.uhn.fhir.batch2.channel.BatchJobSender; import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry; import ca.uhn.fhir.batch2.model.JobDefinition; +import ca.uhn.fhir.batch2.model.JobDefinitionStep; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobWorkCursor; import ca.uhn.fhir.batch2.model.JobWorkNotification; @@ -193,7 +194,7 @@ public class JobInstanceProcessor { String instanceId = theInstance.getInstanceId(); String currentStepId = jobWorkCursor.getCurrentStepId(); - boolean canAdvance = canAdvanceGatedJob(theJobDefinition, theInstance, currentStepId); + boolean canAdvance = canAdvanceGatedJob(theJobDefinition, theInstance, jobWorkCursor); if (canAdvance) { if (jobWorkCursor.isReductionStep()) { // current step is the reduction step (all reduction steps are final) @@ -228,15 +229,17 @@ public class JobInstanceProcessor { } } - private boolean canAdvanceGatedJob(JobDefinition theJobDefinition, JobInstance theInstance, String theStepId) { + private boolean canAdvanceGatedJob( + JobDefinition theJobDefinition, JobInstance theInstance, JobWorkCursor theWorkCursor) { // make sure our instance still exists if (myJobPersistence.fetchInstance(theInstance.getInstanceId()).isEmpty()) { // no more job return false; } + String currentGatedStepId = theInstance.getCurrentGatedStepId(); - Set workChunkStatuses = - myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(theInstance.getInstanceId(), theStepId); + Set workChunkStatuses = myJobPersistence.getDistinctWorkChunkStatesForJobAndStep( + theInstance.getInstanceId(), currentGatedStepId); if (workChunkStatuses.isEmpty()) { // no work chunks = no output @@ -251,8 +254,30 @@ public class JobInstanceProcessor { } if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.READY))) { - // all workchunks ready -> proceed - return true; + if (theWorkCursor.isFirstStep()) { + // first step - all ready means we're ready to proceed to the next step + return true; + } else { + // it's a future step; + // make sure previous step's workchunks are completed + JobDefinitionStep previousStep = + theJobDefinition.getSteps().get(0); + for (JobDefinitionStep step : theJobDefinition.getSteps()) { + if (step.getStepId().equalsIgnoreCase(currentGatedStepId)) { + break; + } + previousStep = step; + } + Set previousStepWorkChunkStates = + myJobPersistence.getDistinctWorkChunkStatesForJobAndStep( + theInstance.getInstanceId(), previousStep.getStepId()); + + // completed means "all in COMPLETE state" or no previous chunks (they're cleaned up or never existed) + if (previousStepWorkChunkStates.isEmpty() + || previousStepWorkChunkStates.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) { + return true; + } + } } // anything else @@ -385,6 +410,8 @@ public class JobInstanceProcessor { return; } + ourLog.debug("Moving gated instance {} to next step.", theInstance.getInstanceId()); + // because we now have all gated job chunks in READY state, // we can enqueue them enqueueReadyChunks(theInstance, theJobDefinition, true); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobWorkCursor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobWorkCursor.java index 94db85defaf..687127caad1 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobWorkCursor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobWorkCursor.java @@ -104,6 +104,10 @@ public class JobWorkCursor