Merge branch '5745-ready-state-batch2' into 5767-add-poll-waiting-step

This commit is contained in:
leif stawnyczy 2024-03-15 13:08:04 -04:00
commit 69fa83a866
2 changed files with 37 additions and 6 deletions

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
@ -193,7 +194,7 @@ public class JobInstanceProcessor {
String instanceId = theInstance.getInstanceId();
String currentStepId = jobWorkCursor.getCurrentStepId();
boolean canAdvance = canAdvanceGatedJob(theJobDefinition, theInstance, currentStepId);
boolean canAdvance = canAdvanceGatedJob(theJobDefinition, theInstance, jobWorkCursor);
if (canAdvance) {
if (jobWorkCursor.isReductionStep()) {
// current step is the reduction step (all reduction steps are final)
@ -228,15 +229,17 @@ public class JobInstanceProcessor {
}
}
private boolean canAdvanceGatedJob(JobDefinition<?> theJobDefinition, JobInstance theInstance, String theStepId) {
private boolean canAdvanceGatedJob(
JobDefinition<?> theJobDefinition, JobInstance theInstance, JobWorkCursor<?, ?, ?> theWorkCursor) {
// make sure our instance still exists
if (myJobPersistence.fetchInstance(theInstance.getInstanceId()).isEmpty()) {
// no more job
return false;
}
String currentGatedStepId = theInstance.getCurrentGatedStepId();
Set<WorkChunkStatusEnum> workChunkStatuses =
myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(theInstance.getInstanceId(), theStepId);
Set<WorkChunkStatusEnum> workChunkStatuses = myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(
theInstance.getInstanceId(), currentGatedStepId);
if (workChunkStatuses.isEmpty()) {
// no work chunks = no output
@ -251,8 +254,30 @@ public class JobInstanceProcessor {
}
if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.READY))) {
// all workchunks ready -> proceed
if (theWorkCursor.isFirstStep()) {
// first step - all ready means we're ready to proceed to the next step
return true;
} else {
// it's a future step;
// make sure previous step's workchunks are completed
JobDefinitionStep<?, ?, ?> previousStep =
theJobDefinition.getSteps().get(0);
for (JobDefinitionStep<?, ?, ?> step : theJobDefinition.getSteps()) {
if (step.getStepId().equalsIgnoreCase(currentGatedStepId)) {
break;
}
previousStep = step;
}
Set<WorkChunkStatusEnum> previousStepWorkChunkStates =
myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(
theInstance.getInstanceId(), previousStep.getStepId());
// completed means "all in COMPLETE state" or no previous chunks (they're cleaned up or never existed)
if (previousStepWorkChunkStates.isEmpty()
|| previousStepWorkChunkStates.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) {
return true;
}
}
}
// anything else
@ -385,6 +410,8 @@ public class JobInstanceProcessor {
return;
}
ourLog.debug("Moving gated instance {} to next step.", theInstance.getInstanceId());
// because we now have all gated job chunks in READY state,
// we can enqueue them
enqueueReadyChunks(theInstance, theJobDefinition, true);

View File

@ -104,6 +104,10 @@ public class JobWorkCursor<PT extends IModelJson, IT extends IModelJson, OT exte
return currentStep.getStepId();
}
public boolean isFirstStep() {
return isFirstStep;
}
public boolean isFinalStep() {
return nextStep == null;
}