code review points

This commit is contained in:
leif stawnyczy 2024-03-08 15:09:29 -05:00
parent 7c1538fcc1
commit 3f190009b3
10 changed files with 66 additions and 31 deletions

View File

@ -1,11 +1,9 @@
---
type: add
issue: 5745
title: "Added another work chunk state to Batch2 jobs: READY.
This work chunk state will be the initial state work chunks
are created. After which, they will be picked up by a maintenance
pass and have their states updated to QUEUED before sending them to
the queue.
title: "Added another state to the Batch2 work chunk state machine: READY.
This work chunk state will be the initial state on creation.
Once queued for deliver, they will transition to QUEUED.
The exception is for ReductionStep chunks (because reduction steps
are not read off of the queue, but executed by the maintenance job
inline.

View File

@ -47,7 +47,6 @@ stateDiagram-v2
title: Batch2 Job Work Chunk state transitions
---
stateDiagram-v2
[*]:
state READY
state QUEUED
state on_receive <<choice>>

View File

@ -24,12 +24,12 @@ The Batch Job Coordinator will then store two records in the database:
### The Maintenance Job
A Scheduled Job runs every so often (default once a minute), and does the following for each Job Instance in the database:
A Scheduled Job runs periodically (once a minute). For each Job Instance in the database, it:
1. Moves all `READY` work chunks into the `QUEUED` state and publishes a message to the Batch Notification Message Channel to inform worker threads that a work chunk is now ready for processing. \*
1. Calculates job progress (% of work chunks in `COMPLETE` status). If the job is finished, purges any left over work chunks still in the database.
1. Cleans up any complete, failed, or cancelled jobs that need to be removed.
1. Moves any gated jobs onto their next step.
1. Moves any gated jobs onto their next step when complete.
1. If the final step of a gated job is a reduction step, a reduction step execution will be triggered.
\* An exception is for the final reduction step, where work chunks are not published to the Batch Notification Message Channel,

View File

@ -298,10 +298,6 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
int updated = myWorkChunkRepository.updateChunkStatus(
theChunkId, WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.READY);
theCallback.accept(updated);
if (updated == 1) {
myEntityManager.flush();
myEntityManager.unwrap(Session.class).doWork(Connection::commit);
}
}
@Override
@ -401,7 +397,8 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId) {
if (getRunningJob(theInstanceId) == null) {
Batch2JobInstanceEntity jobInstanceEntity = getRunningJob(theInstanceId);
if (jobInstanceEntity == null) {
return false;
}
@ -413,7 +410,9 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
theInstanceId,
theCurrentStepId,
statusesForStep);
return statusesForStep.isEmpty() || statusesForStep.equals(Set.of(WorkChunkStatusEnum.COMPLETED));
return statusesForStep.isEmpty() || statusesForStep.equals(Set.of(WorkChunkStatusEnum.COMPLETED))
|| statusesForStep.equals(Set.of(WorkChunkStatusEnum.READY));
}
@Override

View File

@ -251,6 +251,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myCompletionHandler = (params) -> {
// ensure our completion handler fires
assertEquals(StatusEnum.COMPLETED, params.getInstance().getStatus());
completionBool.getAndSet(true);
};

View File

@ -63,7 +63,6 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(chunkId).getStatus()));
sleepUntilTimeChanges();
WorkChunk chunk = getSvc().onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());

View File

@ -118,6 +118,7 @@ public interface IJobPersistence extends IWorkChunkPersistence {
Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest);
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
@Deprecated
boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId);
/**

View File

@ -309,10 +309,10 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
ReductionStepChunkProcessingResponse theResponseObject,
JobWorkCursor<PT, IT, OT> theJobWorkCursor) {
if (!theChunk.getStatus().isIncomplete()) {
if (theChunk.getStatus() != WorkChunkStatusEnum.READY) {
// This should never happen since jobs with reduction are required to be gated
ourLog.error(
"Unexpected chunk {} with status {} found while reducing {}. No chunks feeding into a reduction step should be complete.",
"Unexpected chunk {} with status {} found while reducing {}. No chunks feeding into a reduction step should be in a state other than READY.",
theChunk.getId(),
theChunk.getStatus(),
theInstance);

View File

@ -42,6 +42,7 @@ import org.slf4j.Logger;
import org.springframework.transaction.annotation.Propagation;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
@ -196,7 +197,7 @@ public class JobInstanceProcessor {
String instanceId = theInstance.getInstanceId();
String currentStepId = jobWorkCursor.getCurrentStepId();
boolean canAdvance = myJobPersistence.canAdvanceInstanceToNextStep(instanceId, currentStepId);
boolean canAdvance = canAdvanceGatedJob(theJobDefinition, theInstance, currentStepId);
if (canAdvance) {
String nextStepId = jobWorkCursor.nextStep.getStepId();
ourLog.info(
@ -222,6 +223,37 @@ public class JobInstanceProcessor {
}
}
private boolean canAdvanceGatedJob(JobDefinition<?> theJobDefinition, JobInstance theInstance, String theStepId) {
// make sure our instance still exists
if (myJobPersistence.fetchInstance(theInstance.getInstanceId()).isEmpty()) {
// no more job
return false;
}
Set<WorkChunkStatusEnum> workChunkStatuses = myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(theInstance.getInstanceId(), theStepId);
if (workChunkStatuses.isEmpty()) {
// no work chunks = no output
// trivial to advance to next step
return true;
}
if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) {
// all work chunks complete -> go to next step
return true;
}
if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.READY))
&& theJobDefinition.getStepById(theStepId).isReductionStep()) {
// all workchunks ready && last step is reduction step;
// proceed
return true;
}
// anything else
return false;
}
/**
* Chunks are initially created in READY state.
* We will move READY chunks to QUEUE'd and send them to the queue/topic (kafka)
@ -241,6 +273,18 @@ public class JobInstanceProcessor {
theJobInstance.getInstanceId(), Set.of(WorkChunkStatusEnum.READY));
readyChunks.forEach(chunk -> {
JobWorkCursor<?, ?, ?> jobWorkCursor =
JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, chunk.getTargetStepId());
if (theJobDefinition.isGatedExecution()
&& jobWorkCursor.isFinalStep()
&& jobWorkCursor.isReductionStep()) {
// reduction steps are processed by
// ReductionStepExecutorServiceImpl
// which does not wait for steps off the queue.
// so we will not process them here
return;
}
/*
* For each chunk id
* * Move to QUEUE'd
@ -267,19 +311,6 @@ public class JobInstanceProcessor {
String chunkId = theChunk.getId();
myJobPersistence.enqueueWorkChunkForProcessing(chunkId, updated -> {
if (updated == 1) {
JobWorkCursor<?, ?, ?> jobWorkCursor =
JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, theChunk.getTargetStepId());
if (theJobDefinition.isGatedExecution()
&& jobWorkCursor.isFinalStep()
&& jobWorkCursor.isReductionStep()) {
// reduction steps are processed by
// ReductionStepExecutorServiceImpl
// which does not wait for steps off the queue but reads all the
// "QUEUE'd" chunks and processes them inline
return;
}
// send to the queue
// we use current step id because it has not been moved to the next step (yet)
JobWorkNotification workNotification = new JobWorkNotification(

View File

@ -145,6 +145,13 @@ public class JobDefinition<PT extends IModelJson> {
return myGatedExecution;
}
public JobDefinitionStep<?, ?, ?> getStepById(String theId) {
return getSteps().stream()
.filter(s -> s.getStepId().equals(theId))
.findFirst()
.orElse(null);
}
public boolean isLastStepReduction() {
int stepCount = getSteps().size();
return stepCount >= 1 && getSteps().get(stepCount - 1).isReductionStep();