review fixes round 2

This commit is contained in:
leif stawnyczy 2024-04-02 10:57:14 -04:00
parent ba67d6fa82
commit 7f6d1aa9c7
8 changed files with 119 additions and 41 deletions

View File

@ -61,6 +61,7 @@ stateDiagram-v2
[*] --> GATED : on create - gated
GATED --> READY : on create - gated
READY --> QUEUED : placed on kafka (maint.)
POLL_WAITING --> READY : after a poll delay on a POLL_WAITING work chunk has elapsed
%% worker processing states
QUEUED --> on_receive : on deque by worker
@ -70,6 +71,7 @@ stateDiagram-v2
execute --> ERROR : on re-triable error
execute --> COMPLETED : success\n maybe trigger instance first_step_finished
execute --> FAILED : on unrecoverable \n or too many errors
execute --> POLL_WAITING : job step has throw a RetryChunkLaterException and must be tried again after the provided poll delay
%% temporary error state until retry
ERROR --> on_receive : exception rollback\n triggers redelivery

View File

@ -28,7 +28,7 @@ A Scheduled Job runs periodically (once a minute). For each Job Instance in the
1. Moves all `POLL_WAITING` work chunks to `READY` if their `nextPollTime` has expired.
1. Moves all `READY` work chunks into the `QUEUED` state and publishes a message to the Batch Notification Message Channel to inform worker threads that a work chunk is now ready for processing. \*
1. Calculates job progress (% of work chunks in `COMPLETE` status). If the job is finished, purges any left over work chunks still in the database.
1. Calculates job progress (% of work chunks in `COMPLETE` status). If the job is finished, purges any leftover work chunks still in the database.
1. Cleans up any complete, failed, or cancelled jobs that need to be removed.
1. Moves any gated jobs onto their next step when complete.
1. If the final step of a gated job is a reduction step, a reduction step execution will be triggered.

View File

@ -123,6 +123,8 @@ class JobInstanceUtil {
retVal.setErrorMessage(theEntity.getErrorMessage());
retVal.setErrorCount(theEntity.getErrorCount());
retVal.setRecordsProcessed(theEntity.getRecordsProcessed());
retVal.setNextPollTime(theEntity.getNextPollTime());
retVal.setPollAttempts(theEntity.getPollAttempts());
// note: may be null out if queried NoData
retVal.setData(theEntity.getSerializedData());
retVal.setWarningMessage(theEntity.getWarningMessage());

View File

@ -68,7 +68,6 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
@ -364,8 +363,10 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
public void onWorkChunkPollDelay(String theChunkId, Date theDeadline) {
int updated = myWorkChunkRepository.updateWorkChunkNextPollTime(
theChunkId,
WorkChunkStatusEnum.POLL_WAITING, theDeadline);
theChunkId,
WorkChunkStatusEnum.POLL_WAITING,
Set.of(WorkChunkStatusEnum.IN_PROGRESS),
theDeadline);
if (updated != 1) {
ourLog.warn("Expected to update 1 work chunk's poll delay; but found {}", updated);
@ -488,16 +489,6 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
return toChunk(myWorkChunkRepository.save(Batch2WorkChunkEntity.fromWorkChunk(theWorkChunk)));
}
@Override
public void updateWorkChunkToStatus(
String theChunkId, WorkChunkStatusEnum theOldStatus, WorkChunkStatusEnum theNewStatus) {
int updated = myWorkChunkRepository.updateChunkStatus(theChunkId, theNewStatus, theOldStatus);
if (updated != 1) {
ourLog.warn("Expected to update 1 work chunk's status; instead updated {}", updated);
}
}
/**
* Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope
*/

View File

@ -78,19 +78,20 @@ public interface IBatch2WorkChunkRepository
@Modifying
@Query(
"UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myNextPollTime = :nextPollTime, e.myPollAttempts = e.myPollAttempts + 1 WHERE e.myId = :id")
"UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myNextPollTime = :nextPollTime, e.myPollAttempts = e.myPollAttempts + 1 WHERE e.myId = :id AND e.myStatus IN(:states)")
int updateWorkChunkNextPollTime(
@Param("id") String theChunkId,
@Param("status") WorkChunkStatusEnum theStatus,
@Param("states") Set<WorkChunkStatusEnum> theInitialStates,
@Param("nextPollTime") Date theNextPollTime);
@Modifying
@Query(
"UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myNextPollTime = null WHERE e.myInstanceId = :instanceId AND e.myStatus IN(:states) AND e.myNextPollTime IS NOT NULL AND e.myNextPollTime <= :pollTime")
"UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myNextPollTime = null WHERE e.myInstanceId = :instanceId AND e.myStatus IN(:states) AND e.myNextPollTime <= :pollTime")
int updateWorkChunksForPollWaiting(
@Param("instanceId") String theInstanceId,
@Param("status") WorkChunkStatusEnum theNewStatus,
@Param("states") Set<WorkChunkStatusEnum> theStates,
@Param("states") Set<WorkChunkStatusEnum> theInitialStates,
@Param("pollTime") Date theTime);
@Modifying

View File

@ -16,6 +16,7 @@ import java.time.Instant;
import java.util.Date;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkTestConstants {
@ -94,29 +95,62 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT
@ParameterizedTest
@ValueSource(strings = {
"2|IN_PROGRESS,2|POLL_WAITING",
"2|IN_PROGRESS"
"2|READY",
"2|QUEUED",
//"2|GATED,", // TODO - update/enable when gated status is available
"2|POLL_WAITING",
"2|ERRORED",
"2|FAILED",
"2|COMPLETED"
})
default void onWorkChunkPollDelay_workChunkIN_PROGRESSdeadlineExpired_transitionsToPOLL_WAITINGandUpdatesTime() {
default void onWorkChunkPollDelay_withNoInProgressChunks_doNotTransitionNorSetTime(String theState) {
// setup
disableWorkChunkMessageHandler();
enableMaintenanceRunner(false);
JobDefinition<?> jobDef = withJobDefinition(false);
String jobInstanceId = createAndStoreJobInstance(jobDef);
// the time we set it to
Date newTime = Date.from(
Instant.now().plus(Duration.ofSeconds(100))
);
JobMaintenanceStateInformation stateInformation = new JobMaintenanceStateInformation(
jobInstanceId,
jobDef,
theState
);
stateInformation.initialize(getSvc());
String chunkId = stateInformation.getInitialWorkChunks()
.stream().findFirst().orElseThrow().getId();
// test
getSvc().onWorkChunkPollDelay(chunkId, newTime);
// verify
stateInformation.verifyFinalStates(getSvc(), chunk -> {
assertNull(chunk.getNextPollTime());
});
}
@Test
default void onWorkChunkPollDelay_withInProgressChunks_transitionsAndSetsNewTime() {
// setup
disableWorkChunkMessageHandler();
enableMaintenanceRunner(false);
JobDefinition<?> jobDef = withJobDefinition(false);
String jobInstanceId = createAndStoreJobInstance(jobDef);
// the time we set it to
Date newTime = Date.from(
Instant.now().plus(Duration.ofSeconds(100))
);
String state = "2|IN_PROGRESS,2|POLL_WAITING";
JobMaintenanceStateInformation stateInformation = new JobMaintenanceStateInformation(
jobInstanceId, jobDef,
state
);
stateInformation.addWorkChunkModifier(chunk -> {
chunk.setNextPollTime(Date.from(
Instant.now().minus(Duration.ofSeconds(100))
));
});
stateInformation.initialize(getSvc());
String chunkId = stateInformation.getInitialWorkChunks()
@ -127,18 +161,72 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT
// verify
stateInformation.verifyFinalStates(getSvc(), (chunk) -> {
// verify the time has been set
assertEquals(newTime, chunk.getNextPollTime());
assertEquals(1, chunk.getPollAttempts());
});
}
@Test
default void updatePollWaitingChunksForJobIfReady_pollWaitingChunkWithExpiredTime_transition() {
updatePollWaitingChunksForJobIfReady_POLL_WAITING_chunksTest(true);
}
@Test
default void updatePollWaitingChunksForJobIfReady_pollWaitingChunkWithNonExpiredTime_doesNotTransition() {
updatePollWaitingChunksForJobIfReady_POLL_WAITING_chunksTest(false);
}
private void updatePollWaitingChunksForJobIfReady_POLL_WAITING_chunksTest(boolean theDeadlineIsExpired) {
// setup
disableWorkChunkMessageHandler();
enableMaintenanceRunner(false);
String state = "1|POLL_WAITING";
if (theDeadlineIsExpired) {
state += ",1|READY";
}
JobDefinition<?> jobDef = withJobDefinition(false);
String jobInstanceId = createAndStoreJobInstance(jobDef);
JobMaintenanceStateInformation stateInformation = new JobMaintenanceStateInformation(
jobInstanceId,
jobDef,
state
);
Date nextPollTime = theDeadlineIsExpired ?
Date.from(Instant.now().minus(Duration.ofSeconds(10))) : Date.from(Instant.now().plus(Duration.ofSeconds(10)));
stateInformation.addWorkChunkModifier(chunk -> {
chunk.setNextPollTime(nextPollTime);
});
stateInformation.initialize(getSvc());
// test
int updateCount = getSvc().updatePollWaitingChunksForJobIfReady(jobInstanceId);
// verify
if (theDeadlineIsExpired) {
assertEquals(1, updateCount);
} else {
assertEquals(0, updateCount);
}
stateInformation.verifyFinalStates(getSvc());
}
/**
* Only POLL_WAITING chunks should be able to transition to READY via
* updatePollWaitingChunksForJobIfReady
*/
@ParameterizedTest
@ValueSource(strings = {
"2|READY",
// "2|GATED", // TODO - update/enable whenever gated status is ready
"2|QUEUED",
"2|IN_PROGRESS",
"2|ERRORED",
"2|FAILED",
"2|COMPLETE"
"2|COMPLETED"
})
default void updatePollWaitingChunksForJobIfReady_nonApplicableStates_doNotTransitionToPollWaiting() {
default void updatePollWaitingChunksForJobIfReady_withNoPollWaitingChunks_doNotTransitionNorUpdateTime(String theState) {
// setup
disableWorkChunkMessageHandler();
enableMaintenanceRunner(false);
@ -146,21 +234,12 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT
JobDefinition<?> jobDef = withJobDefinition(false);
String jobInstanceId = createAndStoreJobInstance(jobDef);
StringBuilder sb = new StringBuilder();
for (WorkChunkStatusEnum status : WorkChunkStatusEnum.values()) {
if (status != WorkChunkStatusEnum.POLL_WAITING && status != WorkChunkStatusEnum.IN_PROGRESS
&& status != WorkChunkStatusEnum.ERRORED) {
sb.append("2|")
.append(status.name())
.append("\n");
}
}
String state = sb.toString();
JobMaintenanceStateInformation stateInformation = new JobMaintenanceStateInformation(jobInstanceId,
jobDef,
state);
theState);
stateInformation.addWorkChunkModifier((chunk) -> {
// make sure time is in the past, so we aren't testing the
// time <= now aspect
chunk.setNextPollTime(
Date.from(Instant.now().minus(Duration.ofSeconds(10)))
);

View File

@ -66,7 +66,7 @@ public class StepExecutor {
Date nextPollTime = Date.from(
Instant.now().plus(ex.getNextPollDuration())
);
ourLog.debug("Polling job encountered; will retry after {}ms", ex.getNextPollDuration().get(ChronoUnit.MILLIS));
ourLog.debug("Polling job encountered; will retry after {}s", ex.getNextPollDuration().get(ChronoUnit.SECONDS));
myJobPersistence.onWorkChunkPollDelay(theStepExecutionDetails.getChunkId(), nextPollTime);
return false;
} catch (JobExecutionFailedException e) {

View File

@ -52,9 +52,12 @@ public enum WorkChunkStatusEnum {
*/
POLL_WAITING,
/**
* ERRORED is a transient state on retry when a chunk throws an error, but hasn't FAILED yet. Will move back to IN_PROGRESS on retry.
* A transient state on retry when a chunk throws an error, but hasn't FAILED yet. Will move back to IN_PROGRESS on retry.
*/
ERRORED,
/**
* Chunk has failed with a non-retriable error, or has run out of retry attempts.
*/
FAILED,
/**
* The state of workchunks that have finished their job's step.