resolved review comments

This commit is contained in:
tyner 2024-04-10 16:59:52 -04:00
parent fd4fdd7541
commit decdeafdc1
8 changed files with 42 additions and 27 deletions

View File

@ -57,9 +57,9 @@ stateDiagram-v2
state COMPLETED
direction LR
[*] --> READY : on create - normal or gated first chunk
[*] --> GATE_WAITING : on create - gated non-first chunk
[*] --> GATE_WAITING : on create - gated jobs for all but the first chunk
GATE_WAITING --> READY : on gate release - gated
QUEUED --> READY : on gate release - gated (for compatibility with legacy QUEUED state up to 7.1.8-SNAPSHOT)
QUEUED --> READY : on gate release - gated (for compatibility with legacy QUEUED state up to Hapi-fhir version 7.1)
READY --> QUEUED : placed on kafka (maint.)
%% worker processing states

View File

@ -28,7 +28,7 @@ A Scheduled Job runs periodically (once a minute). For each Job Instance in the
1. Calculates job progress (% of work chunks in `COMPLETE` status). If the job is finished, purges any left over work chunks still in the database.
1. Cleans up any complete, failed, or cancelled jobs that need to be removed.
1. When the current step is complete, moves any gated jobs onto their next step and update all chunks in `GATE_WAITING` to `READY`.
1. When the current step is complete, moves any gated jobs onto their next step and updates all chunks in `GATE_WAITING` to `READY`.
1. If the final step of a gated job is a reduction step, a reduction step execution will be triggered.
1. Moves all `READY` work chunks into the `QUEUED` state and publishes a message to the Batch Notification Message Channel to inform worker threads that a work chunk is now ready for processing. \*

View File

@ -128,7 +128,8 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
entity.setSerializedData(theBatchWorkChunk.serializedData);
entity.setCreateTime(new Date());
entity.setStartTime(new Date());
// set to GATE_WAITING if job is gated, to READY if not
// set gated job chunks to GATE_WAITING; they will be transitioned to READY during maintenance pass when all
// chunks in the previous step are COMPLETED
entity.setStatus(
theBatchWorkChunk.isGatedExecution ? WorkChunkStatusEnum.GATE_WAITING : WorkChunkStatusEnum.READY);

View File

@ -109,7 +109,7 @@ public interface IBatch2WorkChunkRepository
@Param("newStatus") WorkChunkStatusEnum theNewStatus,
@Param("oldStatus") WorkChunkStatusEnum theOldStatus);
// Up to 7.1.8-SNAPSHOT, gated jobs' work chunks are created in status QUEUED but not actually queued for the
// Up to 7.1, gated jobs' work chunks are created in status QUEUED but not actually queued for the
// workers.
// In order to keep them compatible, turn QUEUED chunks into READY, too.
// TODO: remove QUEUED from the IN clause when we are certain that no one is still running the old code.

View File

@ -4,7 +4,6 @@ import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation;
import ca.uhn.test.concurrency.PointcutLatch;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@ -21,7 +20,7 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
}
@Test
default void test_gatedJob_stepReady_advances() throws InterruptedException {
default void test_gatedJob_stepReady_stepAdvances() throws InterruptedException {
// setup
String initialState = """
# chunks ready - move to queued
@ -108,7 +107,7 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
3|GATE_WAITING
"""
})
default void testGatedStep2NotReady_notAdvance(String theChunkState) throws InterruptedException {
default void testGatedStep2NotReady_stepNotAdvance(String theChunkState) throws InterruptedException {
// setup
int expectedLatchCount = getLatchCountFromState(theChunkState);
PointcutLatch sendingLatch = getTestManager().disableWorkChunkMessageHandler();

View File

@ -145,7 +145,7 @@ public class JobMaintenanceStateInformation {
if (jobDef.isGatedExecution()) {
AtomicReference<String> latestStepId = new AtomicReference<>();
int totalSteps = jobDef.getSteps().size();
// ignore the last step
// ignore the last step since tests in gated jobs needs the current step to be the second-last step
for (int i = totalSteps - 2; i >= 0; i--) {
JobDefinitionStep<?, ?, ?> step = jobDef.getSteps().get(i);
if (stepIds.contains(step.getStepId())) {

View File

@ -282,7 +282,8 @@ public interface IJobPersistence extends IWorkChunkPersistence {
WorkChunk createWorkChunk(WorkChunk theWorkChunk);
/**
* Atomically advance the given job to the given step and change the status of all chunks in the next step to READY
* Atomically advance the given job to the given step and change the status of all QUEUED and GATE_WAITING chunks
* in the next step to READY
* @param theJobInstanceId the id of the job instance to be updated
* @param theNextStepId the id of the next job step
* @return whether any changes were made

View File

@ -44,7 +44,6 @@ import org.springframework.data.domain.Pageable;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
public class JobInstanceProcessor {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@ -99,6 +98,18 @@ public class JobInstanceProcessor {
cleanupInstance(theInstance);
triggerGatedExecutions(theInstance, jobDefinition);
if (theInstance.hasGatedStep()) {
JobWorkCursor<?, ?, ?> jobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(
jobDefinition, theInstance.getCurrentGatedStepId());
if (jobWorkCursor.isReductionStep()) {
// Reduction step work chunks should never be sent to the queue but to its specific service instead.
triggerReductionStep(theInstance, jobWorkCursor);
return;
}
}
// enqueue the chunks as normal
enqueueReadyChunks(theInstance, jobDefinition);
ourLog.debug("Finished job processing: {} - {}", myInstanceId, stopWatch);
@ -187,18 +198,12 @@ public class JobInstanceProcessor {
JobWorkCursor<?, ?, ?> jobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(
theJobDefinition, theInstance.getCurrentGatedStepId());
String instanceId = theInstance.getInstanceId();
String currentStepId = jobWorkCursor.getCurrentStepId();
boolean canAdvance = canAdvanceGatedJob(theInstance);
if (canAdvance) {
if (jobWorkCursor.isReductionStep()) {
// current step is the reduction step (all reduction steps are final)
JobWorkCursor<?, ?, ?> nextJobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(
jobWorkCursor.getJobDefinition(), jobWorkCursor.getCurrentStepId());
myReductionStepExecutorService.triggerReductionStep(instanceId, nextJobWorkCursor);
} else if (!jobWorkCursor.isFinalStep()) {
// all other gated job steps except for final steps
if (!jobWorkCursor.isFinalStep()) {
// all other gated job steps except for final steps - final steps does not need to be advanced
String nextStepId = jobWorkCursor.nextStep.getStepId();
ourLog.info(
"All processing is complete for gated execution of instance {} step {}. Proceeding to step {}",
@ -206,8 +211,12 @@ public class JobInstanceProcessor {
currentStepId,
nextStepId);
// otherwise, continue processing as expected
processChunksForNextGatedSteps(theInstance, nextStepId);
} else {
ourLog.info(
"Ready to advance gated execution of instance {} but already at the final step {}. Not proceeding to advance steps.",
instanceId,
jobWorkCursor.getCurrentStepId());
}
} else {
String stepId = jobWorkCursor.nextStep != null
@ -253,20 +262,24 @@ public class JobInstanceProcessor {
});
}
/**
* Trigger the reduction step for the given job instance. Reduction step chunks should never be queued.
*/
private void triggerReductionStep(JobInstance theInstance, JobWorkCursor<?, ?, ?> jobWorkCursor) {
String instanceId = theInstance.getInstanceId();
ourLog.debug("Triggering Reduction step {} of instance {}.", jobWorkCursor.getCurrentStepId(), instanceId);
myReductionStepExecutorService.triggerReductionStep(instanceId, jobWorkCursor);
}
/**
* Chunks are initially created in READY state.
* We will move READY chunks to QUEUE'd and send them to the queue/topic (kafka)
* for processing.
*
* We could block chunks from being moved from QUEUE'd to READY here for gated steps
* but currently, progress is calculated by looking at completed chunks only;
* we'd need a new GATE_WAITING state to move chunks to prevent jobs from
* completing prematurely.
*/
private void enqueueReadyChunks(JobInstance theJobInstance, JobDefinition<?> theJobDefinition) {
Iterator<WorkChunkMetadata> iter = getReadyChunks();
AtomicInteger counter = new AtomicInteger();
int counter = 0;
while (iter.hasNext()) {
WorkChunkMetadata metadata = iter.next();
@ -278,10 +291,11 @@ public class JobInstanceProcessor {
* * commit
*/
updateChunkAndSendToQueue(metadata);
counter++;
}
ourLog.debug(
"Encountered {} READY work chunks for job {} of type {}",
counter.get(),
counter,
theJobInstance.getInstanceId(),
theJobDefinition.getJobDefinitionId());
}