resolved review comments

This commit is contained in:
tyner 2024-04-10 09:17:44 -04:00
parent d06337d103
commit fd4fdd7541
7 changed files with 26 additions and 16 deletions

View File

@ -56,10 +56,10 @@ stateDiagram-v2
state FAILED
state COMPLETED
direction LR
[*] --> READY : on create - normal
[*] --> GATE_WAITING : on create - gated
GATE_WAITING --> READY : on gate release - gated (new)
QUEUED --> READY : on gate release - gated (for compatibility with old "fake QUEUED" state)
[*] --> READY : on create - normal or gated first chunk
[*] --> GATE_WAITING : on create - gated non-first chunk
GATE_WAITING --> READY : on gate release - gated
QUEUED --> READY : on gate release - gated (for compatibility with legacy QUEUED state up to 7.1.8-SNAPSHOT)
READY --> QUEUED : placed on kafka (maint.)
%% worker processing states

View File

@ -20,17 +20,17 @@ After a job has been defined, *instances* of that job can be submitted for batch
The Batch Job Coordinator will then store two records in the database:
- Job Instance with status `QUEUED`: that is the parent record for all data concerning this job
- Batch Work Chunk with status `READY`/`GATE_WAITING`: this describes the first "chunk" of work required for this job. The first Batch Work Chunk contains no data. The initial status of the work chunk will be `READY` or `GATE_WAITING` for non-gated and gated batch jobs, respectively. Please refer to [Gated Execution](#gated-execution) for more explanation on gated batch jobs.
- Batch Work Chunk with status `READY`: this describes the first "chunk" of work required for this job. The first Batch Work Chunk contains no data.
### The Maintenance Job
A Scheduled Job runs periodically (once a minute). For each Job Instance in the database, it:
1. Moves all `READY` work chunks into the `QUEUED` state and publishes a message to the Batch Notification Message Channel to inform worker threads that a work chunk is now ready for processing. For gated batch jobs, the maintenance also moves all `GATE_WAITING` work chunks into `READY` when the current batch step is ready to advance. \*
1. Calculates job progress (% of work chunks in `COMPLETE` status). If the job is finished, purges any left over work chunks still in the database.
1. Cleans up any complete, failed, or cancelled jobs that need to be removed.
1. Moves any gated jobs onto their next step when complete.
1. When the current step is complete, moves any gated jobs onto their next step and update all chunks in `GATE_WAITING` to `READY`.
1. If the final step of a gated job is a reduction step, a reduction step execution will be triggered.
1. Moves all `READY` work chunks into the `QUEUED` state and publishes a message to the Batch Notification Message Channel to inform worker threads that a work chunk is now ready for processing. \*
\* An exception is for the final reduction step, where work chunks are not published to the Batch Notification Message Channel,
but instead processed inline.

View File

@ -109,10 +109,10 @@ public interface IBatch2WorkChunkRepository
@Param("newStatus") WorkChunkStatusEnum theNewStatus,
@Param("oldStatus") WorkChunkStatusEnum theOldStatus);
// In the old code, gated jobs' workchunks are created in status QUEUED but not actually queued for the
// Up to 7.1.8-SNAPSHOT, gated jobs' work chunks are created in status QUEUED but not actually queued for the
// workers.
// In order to keep them compatible, turn QUEUED chunks into READY, too.
// TODO: remove QUEUED from the in clause when we are certain that no one is still running the old code.
// TODO: remove QUEUED from the IN clause when we are certain that no one is still running the old code.
@Modifying
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = ca.uhn.fhir.batch2.model.WorkChunkStatusEnum.READY WHERE "
+ "e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId AND e.myStatus in ("

View File

@ -182,9 +182,10 @@ class JpaJobPersistenceImplTest {
String nextStepId = "nextStep";
// execute
mySvc.updateAllChunksForStepFromGateWaitingToReady(instanceId, nextStepId);
int changed = mySvc.updateAllChunksForStepFromGateWaitingToReady(instanceId, nextStepId);
// verify
assertEquals(0, changed);
verify(myWorkChunkRepository).updateAllChunksForStepFromGateWaitingToReady(instanceId, nextStepId);
}

View File

@ -400,7 +400,10 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
runInTransaction(() -> assertEquals(TARGET_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId()));
// execute
runInTransaction(() -> mySvc.advanceJobStepAndUpdateChunkStatus(instanceId, LAST_STEP_ID));
runInTransaction(() -> {
boolean changed = mySvc.advanceJobStepAndUpdateChunkStatus(instanceId, LAST_STEP_ID);
assertTrue(changed);
});
// verify
runInTransaction(() -> {
@ -421,7 +424,10 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
runInTransaction(() -> assertEquals(TARGET_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId()));
// execute
runInTransaction(() -> mySvc.advanceJobStepAndUpdateChunkStatus(instanceId, TARGET_STEP_ID));
runInTransaction(() -> {
boolean changed = mySvc.advanceJobStepAndUpdateChunkStatus(instanceId, TARGET_STEP_ID);
assertFalse(changed);
});
// verify
runInTransaction(() -> {
@ -447,7 +453,10 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
});
// execute
runInTransaction(() -> mySvc.updateAllChunksForStepFromGateWaitingToReady(instanceId, LAST_STEP_ID));
runInTransaction(() -> {
int numChanged = mySvc.updateAllChunksForStepFromGateWaitingToReady(instanceId, LAST_STEP_ID);
assertEquals(2, numChanged);
});
// verify
runInTransaction(() -> {

View File

@ -166,7 +166,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa
instance.setJobDefinitionVersion(JOB_DEF_VER);
instance.setParameters(CHUNK_DATA);
instance.setReport("TEST");
if (jobDefinition.isGatedExecution()) {
if (jobDefinition.isGatedExecution()) {
instance.setCurrentGatedStepId(jobDefinition.getFirstStepId());
}
return instance;

View File

@ -285,7 +285,7 @@ public interface IJobPersistence extends IWorkChunkPersistence {
* Atomically advance the given job to the given step and change the status of all chunks in the next step to READY
* @param theJobInstanceId the id of the job instance to be updated
* @param theNextStepId the id of the next job step
* @return
* @return whether any changes were made
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
boolean advanceJobStepAndUpdateChunkStatus(String theJobInstanceId, String theNextStepId);
@ -294,7 +294,7 @@ public interface IJobPersistence extends IWorkChunkPersistence {
* Update all chunks of the given step id for the given job from GATE_WAITING to READY
* @param theJobInstanceId the id of the job instance to be updated
* @param theStepId the id of the step which the chunks belong to
* @return
* @return the number of chunks updated
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
int updateAllChunksForStepFromGateWaitingToReady(String theJobInstanceId, String theStepId);