This commit is contained in:
leif stawnyczy 2024-03-15 12:08:04 -04:00
parent 7a8f551bd2
commit 3fafd51314
2 changed files with 37 additions and 6 deletions

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
import ca.uhn.fhir.batch2.channel.BatchJobSender; import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry; import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor; import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification; import ca.uhn.fhir.batch2.model.JobWorkNotification;
@ -193,7 +194,7 @@ public class JobInstanceProcessor {
String instanceId = theInstance.getInstanceId(); String instanceId = theInstance.getInstanceId();
String currentStepId = jobWorkCursor.getCurrentStepId(); String currentStepId = jobWorkCursor.getCurrentStepId();
boolean canAdvance = canAdvanceGatedJob(theJobDefinition, theInstance, currentStepId); boolean canAdvance = canAdvanceGatedJob(theJobDefinition, theInstance, jobWorkCursor);
if (canAdvance) { if (canAdvance) {
if (jobWorkCursor.isReductionStep()) { if (jobWorkCursor.isReductionStep()) {
// current step is the reduction step (all reduction steps are final) // current step is the reduction step (all reduction steps are final)
@ -228,15 +229,17 @@ public class JobInstanceProcessor {
} }
} }
private boolean canAdvanceGatedJob(JobDefinition<?> theJobDefinition, JobInstance theInstance, String theStepId) { private boolean canAdvanceGatedJob(
JobDefinition<?> theJobDefinition, JobInstance theInstance, JobWorkCursor<?, ?, ?> theWorkCursor) {
// make sure our instance still exists // make sure our instance still exists
if (myJobPersistence.fetchInstance(theInstance.getInstanceId()).isEmpty()) { if (myJobPersistence.fetchInstance(theInstance.getInstanceId()).isEmpty()) {
// no more job // no more job
return false; return false;
} }
String currentGatedStepId = theInstance.getCurrentGatedStepId();
Set<WorkChunkStatusEnum> workChunkStatuses = Set<WorkChunkStatusEnum> workChunkStatuses = myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(
myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(theInstance.getInstanceId(), theStepId); theInstance.getInstanceId(), currentGatedStepId);
if (workChunkStatuses.isEmpty()) { if (workChunkStatuses.isEmpty()) {
// no work chunks = no output // no work chunks = no output
@ -251,8 +254,30 @@ public class JobInstanceProcessor {
} }
if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.READY))) { if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.READY))) {
// all workchunks ready -> proceed if (theWorkCursor.isFirstStep()) {
return true; // first step - all ready means we're ready to proceed to the next step
return true;
} else {
// it's a future step;
// make sure previous step's workchunks are completed
JobDefinitionStep<?, ?, ?> previousStep =
theJobDefinition.getSteps().get(0);
for (JobDefinitionStep<?, ?, ?> step : theJobDefinition.getSteps()) {
if (step.getStepId().equalsIgnoreCase(currentGatedStepId)) {
break;
}
previousStep = step;
}
Set<WorkChunkStatusEnum> previousStepWorkChunkStates =
myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(
theInstance.getInstanceId(), previousStep.getStepId());
// completed means "all in COMPLETE state" or no previous chunks (they're cleaned up or never existed)
if (previousStepWorkChunkStates.isEmpty()
|| previousStepWorkChunkStates.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) {
return true;
}
}
} }
// anything else // anything else
@ -385,6 +410,8 @@ public class JobInstanceProcessor {
return; return;
} }
ourLog.debug("Moving gated instance {} to next step.", theInstance.getInstanceId());
// because we now have all gated job chunks in READY state, // because we now have all gated job chunks in READY state,
// we can enqueue them // we can enqueue them
enqueueReadyChunks(theInstance, theJobDefinition, true); enqueueReadyChunks(theInstance, theJobDefinition, true);

View File

@ -104,6 +104,10 @@ public class JobWorkCursor<PT extends IModelJson, IT extends IModelJson, OT exte
return currentStep.getStepId(); return currentStep.getStepId();
} }
public boolean isFirstStep() {
return isFirstStep;
}
public boolean isFinalStep() { public boolean isFinalStep() {
return nextStep == null; return nextStep == null;
} }