diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md index 503153c7374..192444d9478 100644 --- a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md @@ -61,6 +61,7 @@ stateDiagram-v2 [*] --> GATED : on create - gated GATED --> READY : on create - gated READY --> QUEUED : placed on kafka (maint.) + POLL_WAITING --> READY : after a poll delay on a POLL_WAITING work chunk has elapsed %% worker processing states QUEUED --> on_receive : on deque by worker @@ -70,6 +71,7 @@ stateDiagram-v2 execute --> ERROR : on re-triable error execute --> COMPLETED : success\n maybe trigger instance first_step_finished execute --> FAILED : on unrecoverable \n or too many errors + execute --> POLL_WAITING : job step has throw a RetryChunkLaterException and must be tried again after the provided poll delay %% temporary error state until retry ERROR --> on_receive : exception rollback\n triggers redelivery diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/introduction.md b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/introduction.md index 1663ddae3de..efb5ff42a39 100644 --- a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/introduction.md +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/introduction.md @@ -28,7 +28,7 @@ A Scheduled Job runs periodically (once a minute). For each Job Instance in the 1. Moves all `POLL_WAITING` work chunks to `READY` if their `nextPollTime` has expired. 1. Moves all `READY` work chunks into the `QUEUED` state and publishes a message to the Batch Notification Message Channel to inform worker threads that a work chunk is now ready for processing. \* -1. Calculates job progress (% of work chunks in `COMPLETE` status). If the job is finished, purges any left over work chunks still in the database. +1. Calculates job progress (% of work chunks in `COMPLETE` status). If the job is finished, purges any leftover work chunks still in the database. 1. Cleans up any complete, failed, or cancelled jobs that need to be removed. 1. Moves any gated jobs onto their next step when complete. 1. If the final step of a gated job is a reduction step, a reduction step execution will be triggered. diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JobInstanceUtil.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JobInstanceUtil.java index c2db708638a..51698299f79 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JobInstanceUtil.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JobInstanceUtil.java @@ -123,6 +123,8 @@ class JobInstanceUtil { retVal.setErrorMessage(theEntity.getErrorMessage()); retVal.setErrorCount(theEntity.getErrorCount()); retVal.setRecordsProcessed(theEntity.getRecordsProcessed()); + retVal.setNextPollTime(theEntity.getNextPollTime()); + retVal.setPollAttempts(theEntity.getPollAttempts()); // note: may be null out if queried NoData retVal.setData(theEntity.getSerializedData()); retVal.setWarningMessage(theEntity.getWarningMessage()); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java index 814cf8a32fb..33fbd5b1edf 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java @@ -68,7 +68,6 @@ import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.TransactionSynchronizationManager; import java.time.Instant; -import java.time.temporal.ChronoUnit; import java.util.Collections; import java.util.Date; import java.util.HashSet; @@ -364,8 +363,10 @@ public class JpaJobPersistenceImpl implements IJobPersistence { @Override public void onWorkChunkPollDelay(String theChunkId, Date theDeadline) { int updated = myWorkChunkRepository.updateWorkChunkNextPollTime( - theChunkId, - WorkChunkStatusEnum.POLL_WAITING, theDeadline); + theChunkId, + WorkChunkStatusEnum.POLL_WAITING, + Set.of(WorkChunkStatusEnum.IN_PROGRESS), + theDeadline); if (updated != 1) { ourLog.warn("Expected to update 1 work chunk's poll delay; but found {}", updated); @@ -488,16 +489,6 @@ public class JpaJobPersistenceImpl implements IJobPersistence { return toChunk(myWorkChunkRepository.save(Batch2WorkChunkEntity.fromWorkChunk(theWorkChunk))); } - @Override - public void updateWorkChunkToStatus( - String theChunkId, WorkChunkStatusEnum theOldStatus, WorkChunkStatusEnum theNewStatus) { - int updated = myWorkChunkRepository.updateChunkStatus(theChunkId, theNewStatus, theOldStatus); - - if (updated != 1) { - ourLog.warn("Expected to update 1 work chunk's status; instead updated {}", updated); - } - } - /** * Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope */ diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java index f04e88ce512..667ff18f667 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java @@ -78,19 +78,20 @@ public interface IBatch2WorkChunkRepository @Modifying @Query( - "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myNextPollTime = :nextPollTime, e.myPollAttempts = e.myPollAttempts + 1 WHERE e.myId = :id") + "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myNextPollTime = :nextPollTime, e.myPollAttempts = e.myPollAttempts + 1 WHERE e.myId = :id AND e.myStatus IN(:states)") int updateWorkChunkNextPollTime( @Param("id") String theChunkId, @Param("status") WorkChunkStatusEnum theStatus, + @Param("states") Set theInitialStates, @Param("nextPollTime") Date theNextPollTime); @Modifying @Query( - "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myNextPollTime = null WHERE e.myInstanceId = :instanceId AND e.myStatus IN(:states) AND e.myNextPollTime IS NOT NULL AND e.myNextPollTime <= :pollTime") + "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myNextPollTime = null WHERE e.myInstanceId = :instanceId AND e.myStatus IN(:states) AND e.myNextPollTime <= :pollTime") int updateWorkChunksForPollWaiting( @Param("instanceId") String theInstanceId, @Param("status") WorkChunkStatusEnum theNewStatus, - @Param("states") Set theStates, + @Param("states") Set theInitialStates, @Param("pollTime") Date theTime); @Modifying diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStateTransitions.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStateTransitions.java index d00f241b8b1..df551948e04 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStateTransitions.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStateTransitions.java @@ -16,6 +16,7 @@ import java.time.Instant; import java.util.Date; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkTestConstants { @@ -94,29 +95,62 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT @ParameterizedTest @ValueSource(strings = { - "2|IN_PROGRESS,2|POLL_WAITING", - "2|IN_PROGRESS" + "2|READY", + "2|QUEUED", + //"2|GATED,", // TODO - update/enable when gated status is available + "2|POLL_WAITING", + "2|ERRORED", + "2|FAILED", + "2|COMPLETED" }) - default void onWorkChunkPollDelay_workChunkIN_PROGRESSdeadlineExpired_transitionsToPOLL_WAITINGandUpdatesTime() { + default void onWorkChunkPollDelay_withNoInProgressChunks_doNotTransitionNorSetTime(String theState) { // setup disableWorkChunkMessageHandler(); enableMaintenanceRunner(false); JobDefinition jobDef = withJobDefinition(false); String jobInstanceId = createAndStoreJobInstance(jobDef); + // the time we set it to Date newTime = Date.from( Instant.now().plus(Duration.ofSeconds(100)) ); + JobMaintenanceStateInformation stateInformation = new JobMaintenanceStateInformation( + jobInstanceId, + jobDef, + theState + ); + stateInformation.initialize(getSvc()); + + String chunkId = stateInformation.getInitialWorkChunks() + .stream().findFirst().orElseThrow().getId(); + + // test + getSvc().onWorkChunkPollDelay(chunkId, newTime); + + // verify + stateInformation.verifyFinalStates(getSvc(), chunk -> { + assertNull(chunk.getNextPollTime()); + }); + } + + @Test + default void onWorkChunkPollDelay_withInProgressChunks_transitionsAndSetsNewTime() { + // setup + disableWorkChunkMessageHandler(); + enableMaintenanceRunner(false); + JobDefinition jobDef = withJobDefinition(false); + String jobInstanceId = createAndStoreJobInstance(jobDef); + + // the time we set it to + Date newTime = Date.from( + Instant.now().plus(Duration.ofSeconds(100)) + ); + String state = "2|IN_PROGRESS,2|POLL_WAITING"; JobMaintenanceStateInformation stateInformation = new JobMaintenanceStateInformation( jobInstanceId, jobDef, state ); - stateInformation.addWorkChunkModifier(chunk -> { - chunk.setNextPollTime(Date.from( - Instant.now().minus(Duration.ofSeconds(100)) - )); - }); stateInformation.initialize(getSvc()); String chunkId = stateInformation.getInitialWorkChunks() @@ -127,18 +161,72 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT // verify stateInformation.verifyFinalStates(getSvc(), (chunk) -> { + // verify the time has been set assertEquals(newTime, chunk.getNextPollTime()); + assertEquals(1, chunk.getPollAttempts()); }); } + @Test + default void updatePollWaitingChunksForJobIfReady_pollWaitingChunkWithExpiredTime_transition() { + updatePollWaitingChunksForJobIfReady_POLL_WAITING_chunksTest(true); + } + + @Test + default void updatePollWaitingChunksForJobIfReady_pollWaitingChunkWithNonExpiredTime_doesNotTransition() { + updatePollWaitingChunksForJobIfReady_POLL_WAITING_chunksTest(false); + } + + private void updatePollWaitingChunksForJobIfReady_POLL_WAITING_chunksTest(boolean theDeadlineIsExpired) { + // setup + disableWorkChunkMessageHandler(); + enableMaintenanceRunner(false); + String state = "1|POLL_WAITING"; + if (theDeadlineIsExpired) { + state += ",1|READY"; + } + + JobDefinition jobDef = withJobDefinition(false); + String jobInstanceId = createAndStoreJobInstance(jobDef); + JobMaintenanceStateInformation stateInformation = new JobMaintenanceStateInformation( + jobInstanceId, + jobDef, + state + ); + Date nextPollTime = theDeadlineIsExpired ? + Date.from(Instant.now().minus(Duration.ofSeconds(10))) : Date.from(Instant.now().plus(Duration.ofSeconds(10))); + stateInformation.addWorkChunkModifier(chunk -> { + chunk.setNextPollTime(nextPollTime); + }); + stateInformation.initialize(getSvc()); + + // test + int updateCount = getSvc().updatePollWaitingChunksForJobIfReady(jobInstanceId); + + // verify + if (theDeadlineIsExpired) { + assertEquals(1, updateCount); + } else { + assertEquals(0, updateCount); + } + stateInformation.verifyFinalStates(getSvc()); + } + + /** + * Only POLL_WAITING chunks should be able to transition to READY via + * updatePollWaitingChunksForJobIfReady + */ @ParameterizedTest @ValueSource(strings = { "2|READY", + // "2|GATED", // TODO - update/enable whenever gated status is ready "2|QUEUED", + "2|IN_PROGRESS", + "2|ERRORED", "2|FAILED", - "2|COMPLETE" + "2|COMPLETED" }) - default void updatePollWaitingChunksForJobIfReady_nonApplicableStates_doNotTransitionToPollWaiting() { + default void updatePollWaitingChunksForJobIfReady_withNoPollWaitingChunks_doNotTransitionNorUpdateTime(String theState) { // setup disableWorkChunkMessageHandler(); enableMaintenanceRunner(false); @@ -146,21 +234,12 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT JobDefinition jobDef = withJobDefinition(false); String jobInstanceId = createAndStoreJobInstance(jobDef); - StringBuilder sb = new StringBuilder(); - for (WorkChunkStatusEnum status : WorkChunkStatusEnum.values()) { - if (status != WorkChunkStatusEnum.POLL_WAITING && status != WorkChunkStatusEnum.IN_PROGRESS - && status != WorkChunkStatusEnum.ERRORED) { - sb.append("2|") - .append(status.name()) - .append("\n"); - } - } - String state = sb.toString(); - JobMaintenanceStateInformation stateInformation = new JobMaintenanceStateInformation(jobInstanceId, jobDef, - state); + theState); stateInformation.addWorkChunkModifier((chunk) -> { + // make sure time is in the past, so we aren't testing the + // time <= now aspect chunk.setNextPollTime( Date.from(Instant.now().minus(Duration.ofSeconds(10))) ); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/StepExecutor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/StepExecutor.java index 52eb916c60e..06fa1d28a76 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/StepExecutor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/StepExecutor.java @@ -66,7 +66,7 @@ public class StepExecutor { Date nextPollTime = Date.from( Instant.now().plus(ex.getNextPollDuration()) ); - ourLog.debug("Polling job encountered; will retry after {}ms", ex.getNextPollDuration().get(ChronoUnit.MILLIS)); + ourLog.debug("Polling job encountered; will retry after {}s", ex.getNextPollDuration().get(ChronoUnit.SECONDS)); myJobPersistence.onWorkChunkPollDelay(theStepExecutionDetails.getChunkId(), nextPollTime); return false; } catch (JobExecutionFailedException e) { diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkStatusEnum.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkStatusEnum.java index f752bbc1055..b898399a0f3 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkStatusEnum.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkStatusEnum.java @@ -52,9 +52,12 @@ public enum WorkChunkStatusEnum { */ POLL_WAITING, /** - * ERRORED is a transient state on retry when a chunk throws an error, but hasn't FAILED yet. Will move back to IN_PROGRESS on retry. + * A transient state on retry when a chunk throws an error, but hasn't FAILED yet. Will move back to IN_PROGRESS on retry. */ ERRORED, + /** + * Chunk has failed with a non-retriable error, or has run out of retry attempts. + */ FAILED, /** * The state of workchunks that have finished their job's step.