- Implemented GATE_WAITING state for the batch2 state machine.

- This will be the initial status for all workchunks of a gated job.
- made compatible with the equivalent "fake QUEUED" state in the Old batch2 implementation.
- Updated corresponding docs.
- added corresponding tests and changelog
This commit is contained in:
tyner 2024-04-03 10:44:08 -04:00
parent 74319a7b6d
commit 32a00f4b81
20 changed files with 438 additions and 127 deletions

View File

@ -0,0 +1,7 @@
---
type: add
issue: 5818
title: "Added another state to the Batch2 work chunk state machine: GATE_WAITING.
This work chunk state will be the initial state on creation for gated jobs.
Once all chunks are completed for the previous step, they will transition to READY.
"

View File

@ -56,10 +56,11 @@ stateDiagram-v2
state FAILED
state COMPLETED
direction LR
[*] --> READY : on create - normal
[*] --> GATED : on create - gated
GATED --> READY : on create - gated
READY --> QUEUED : placed on kafka (maint.)
[*] --> READY : on create - normal
[*] --> GATE_WAITING : on create - gated
GATE_WAITING --> READY : on gate release - gated (new)
QUEUED --> READY : on gate release - gated (for compatibility with old "fake QUEUED" state)
READY --> QUEUED : placed on kafka (maint.)
%% worker processing states
QUEUED --> on_receive : on deque by worker

View File

@ -19,14 +19,14 @@ A HAPI-FHIR batch job definition consists of a job name, version, parameter json
After a job has been defined, *instances* of that job can be submitted for batch processing by populating a `JobInstanceStartRequest` with the job name and job parameters json and then submitting that request to the Batch Job Coordinator.
The Batch Job Coordinator will then store two records in the database:
- Job Instance with status QUEUED: that is the parent record for all data concerning this job
- Batch Work Chunk with status READY: this describes the first "chunk" of work required for this job. The first Batch Work Chunk contains no data.
- Job Instance with status `QUEUED`: that is the parent record for all data concerning this job
- Batch Work Chunk with status `READY`/`GATE_WAITING`: this describes the first "chunk" of work required for this job. The first Batch Work Chunk contains no data. The initial status of the work chunk will be `READY` or `GATE_WAITING` for non-gated and gated batch jobs, respectively. Please refer to [Gated Execution](#gated-execution) for more explanation on gated batch jobs.
### The Maintenance Job
A Scheduled Job runs periodically (once a minute). For each Job Instance in the database, it:
1. Moves all `READY` work chunks into the `QUEUED` state and publishes a message to the Batch Notification Message Channel to inform worker threads that a work chunk is now ready for processing. \*
1. Moves all `READY` work chunks into the `QUEUED` state and publishes a message to the Batch Notification Message Channel to inform worker threads that a work chunk is now ready for processing. For gated batch jobs, the maintenance also moves all `GATE_WAITING` work chunks into `READY` when the current batch step is ready to advance. \*
1. Calculates job progress (% of work chunks in `COMPLETE` status). If the job is finished, purges any left over work chunks still in the database.
1. Cleans up any complete, failed, or cancelled jobs that need to be removed.
1. Moves any gated jobs onto their next step when complete.

View File

@ -128,7 +128,10 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
entity.setSerializedData(theBatchWorkChunk.serializedData);
entity.setCreateTime(new Date());
entity.setStartTime(new Date());
entity.setStatus(WorkChunkStatusEnum.READY);
// set to GATE_WAITING if job is gated, to READY if not
entity.setStatus(
theBatchWorkChunk.isGatedExecution ? WorkChunkStatusEnum.GATE_WAITING : WorkChunkStatusEnum.READY);
ourLog.debug("Create work chunk {}/{}/{}", entity.getInstanceId(), entity.getId(), entity.getTargetStepId());
ourLog.trace(
"Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData());
@ -575,4 +578,61 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
myInterceptorBroadcaster.callHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE, params);
}
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public boolean advanceJobStepAndUpdateChunkStatus(String theJobInstanceId, String theNextStepId) {
boolean changed = updateInstance(theJobInstanceId, instance -> {
if (instance.getCurrentGatedStepId().equals(theNextStepId)) {
// someone else beat us here. No changes
return false;
}
ourLog.debug("Moving gated instance {} to the next step {}.", theJobInstanceId, theNextStepId);
instance.setCurrentGatedStepId(theNextStepId);
return true;
});
if (changed) {
ourLog.debug(
"Updating chunk status from GATE_WAITING to READY for gated instance {} in step {}.",
theJobInstanceId,
theNextStepId);
// when we reach here, the current step id is equal to theNextStepId
myWorkChunkRepository.updateAllChunksForStepWithStatus(
theJobInstanceId, theNextStepId, WorkChunkStatusEnum.READY, WorkChunkStatusEnum.GATE_WAITING);
// In the old code, gated jobs' workchunks are created in status QUEUED but not actually queued for the
// workers.
// In order to keep them compatible, turn QUEUED chunks into READY, too.
// TODO: remove this when we are certain that no one is still running the old code.
int numChanged = myWorkChunkRepository.updateAllChunksForStepWithStatus(
theJobInstanceId, theNextStepId, WorkChunkStatusEnum.READY, WorkChunkStatusEnum.QUEUED);
if (numChanged > 0) {
ourLog.debug(
"Updated {} chunks of gated instance {} for step {} from fake QUEUED to READY.",
numChanged,
theJobInstanceId,
theNextStepId);
}
}
return changed;
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public int updateAllChunksForStepWithStatus(
String theJobInstanceId,
String theStepId,
WorkChunkStatusEnum theNewStatus,
WorkChunkStatusEnum theOldStatus) {
ourLog.debug(
"Updating chunk status from {} to {} for gated instance {} in step {}.",
theOldStatus,
theNewStatus,
theJobInstanceId,
theStepId);
return myWorkChunkRepository.updateAllChunksForStepWithStatus(
theJobInstanceId, theStepId, theNewStatus, theOldStatus);
}
}

View File

@ -109,6 +109,15 @@ public interface IBatch2WorkChunkRepository
@Param("newStatus") WorkChunkStatusEnum theNewStatus,
@Param("oldStatus") WorkChunkStatusEnum theOldStatus);
@Modifying
@Query(
"UPDATE Batch2WorkChunkEntity e SET e.myStatus = :newStatus WHERE e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId AND e.myStatus = :oldStatus")
int updateAllChunksForStepWithStatus(
@Param("instanceId") String theInstanceId,
@Param("stepId") String theStepId,
@Param("newStatus") WorkChunkStatusEnum theNewStatus,
@Param("oldStatus") WorkChunkStatusEnum theOldStatus);
@Modifying
@Query("DELETE FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId")
int deleteAllForInstance(@Param("instanceId") String theInstanceId);

View File

@ -4,6 +4,7 @@ import ca.uhn.fhir.batch2.api.JobOperationResultJson;
import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
@ -173,6 +174,36 @@ class JpaJobPersistenceImplTest {
assertEquals(instance.getInstanceId(), retInstance.get().getInstanceId());
}
@Test
void advanceJobStepAndUpdateChunkStatus_validRequest_callsPersistenceUpdateAndReturnsChanged() {
// setup
String instanceId = "jobId";
String nextStepId = "nextStep";
// execute
mySvc.advanceJobStepAndUpdateChunkStatus(instanceId, nextStepId);
// verify
verify(mySvc).updateInstance(instanceId, any());
verify(myWorkChunkRepository).updateAllChunksForStepWithStatus(instanceId, nextStepId, WorkChunkStatusEnum.READY, WorkChunkStatusEnum.GATE_WAITING);
verify(myWorkChunkRepository).updateAllChunksForStepWithStatus(instanceId, nextStepId, WorkChunkStatusEnum.READY, WorkChunkStatusEnum.QUEUED);
}
@Test
void updateAllChunksForStepWithStatus_validRequest_callsPersistenceUpdateAndReturnsChanged() {
// setup
String instanceId = "jobId";
String nextStepId = "nextStep";
WorkChunkStatusEnum expectedNewStatus = WorkChunkStatusEnum.READY;
WorkChunkStatusEnum expectedOldStatus = WorkChunkStatusEnum.GATE_WAITING;
// execute
mySvc.updateAllChunksForStepWithStatus(instanceId, nextStepId, expectedNewStatus, expectedOldStatus);
// verify
verify(myWorkChunkRepository).updateAllChunksForStepWithStatus(instanceId, nextStepId, expectedNewStatus, expectedOldStatus);
}
private JobInstance createJobInstanceFromEntity(Batch2JobInstanceEntity theEntity) {
return JobInstanceUtil.fromEntityToInstance(theEntity);
}

View File

@ -38,6 +38,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Import;
@ -79,6 +80,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
public static final String JOB_DEFINITION_ID = "definition-id";
public static final String TARGET_STEP_ID = TestJobDefinitionUtils.FIRST_STEP_ID;
public static final String LAST_STEP_ID = TestJobDefinitionUtils.LAST_STEP_ID;
public static final String DEF_CHUNK_ID = "definition-chunkId";
public static final String STEP_CHUNK_ID = TestJobDefinitionUtils.FIRST_STEP_ID;
public static final int JOB_DEF_VER = 1;
@ -110,7 +112,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
for (int i = 0; i < 10; i++) {
storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, i, JsonUtil.serialize(new NdJsonFileJson().setNdJsonText("{}")));
storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, i, JsonUtil.serialize(new NdJsonFileJson().setNdJsonText("{}")), false);
}
// Execute
@ -125,8 +127,8 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
});
}
private String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) {
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, TestJobDefinitionUtils.TEST_JOB_VERSION, theTargetStepId, theInstanceId, theSequence, theSerializedData);
private String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData, boolean theGatedExecution) {
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, TestJobDefinitionUtils.TEST_JOB_VERSION, theTargetStepId, theInstanceId, theSequence, theSerializedData, theGatedExecution);
return mySvc.onWorkChunkCreate(batchWorkChunk);
}
@ -240,8 +242,8 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
// Setup
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(JOB_DEFINITION_ID, JOB_DEF_VER, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA, false);
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(JOB_DEFINITION_ID, JOB_DEF_VER, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA, false);
String chunkId = mySvc.onWorkChunkCreate(batchWorkChunk);
Optional<Batch2WorkChunkEntity> byId = myWorkChunkRepository.findById(chunkId);
Batch2WorkChunkEntity entity = byId.get();
@ -367,7 +369,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testUpdateTime() {
// Setup
JobInstance instance = createInstance();
JobInstance instance = createInstance(true, false);
String instanceId = mySvc.storeNewInstance(instance);
Date updateTime = runInTransaction(() -> new Date(myJobInstanceRepository.findById(instanceId).orElseThrow().getUpdateTime().getTime()));
@ -382,35 +384,128 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertNotEquals(updateTime, updateTime2);
}
@Test
public void advanceJobStepAndUpdateChunkStatus_forGatedJob_updatesCurrentStepAndChunkStatus() {
// setup
JobInstance instance = createInstance(true, true);
String instanceId = mySvc.storeNewInstance(instance);
String chunkIdFirstStep = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null, true);
String chunkIdSecondStep1 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, true);
String chunkIdSecondStep2 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, true);
runInTransaction(() -> assertEquals(TARGET_STEP_ID, myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalArgumentException::new).getCurrentGatedStepId()));
// execute
runInTransaction(() -> mySvc.advanceJobStepAndUpdateChunkStatus(instanceId, LAST_STEP_ID));
// verify
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.GATE_WAITING, myWorkChunkRepository.findById(chunkIdFirstStep).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkIdSecondStep1).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkIdSecondStep2).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(LAST_STEP_ID, myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalArgumentException::new).getCurrentGatedStepId()));
}
@Test
public void updateAllChunksForStepWithStatus_forGatedJob_updatesChunkStatus() {
// setup
WorkChunkStatusEnum expectedNoChangeStatus = WorkChunkStatusEnum.GATE_WAITING;
WorkChunkStatusEnum expectedChangedStatus = WorkChunkStatusEnum.COMPLETED;
JobInstance instance = createInstance(true, true);
String instanceId = mySvc.storeNewInstance(instance);
String chunkIdFirstStep = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null, true);
String chunkIdSecondStep1 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, true);
String chunkIdSecondStep2 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, true);
runInTransaction(() -> assertEquals(expectedNoChangeStatus, myWorkChunkRepository.findById(chunkIdFirstStep).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(expectedNoChangeStatus, myWorkChunkRepository.findById(chunkIdSecondStep1).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(expectedNoChangeStatus, myWorkChunkRepository.findById(chunkIdSecondStep2).orElseThrow(IllegalArgumentException::new).getStatus()));
// execute
runInTransaction(() -> mySvc.updateAllChunksForStepWithStatus(instanceId, LAST_STEP_ID, expectedChangedStatus, WorkChunkStatusEnum.GATE_WAITING));
// verify
runInTransaction(() -> assertEquals(expectedNoChangeStatus, myWorkChunkRepository.findById(chunkIdFirstStep).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(expectedChangedStatus, myWorkChunkRepository.findById(chunkIdSecondStep1).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(expectedChangedStatus, myWorkChunkRepository.findById(chunkIdSecondStep2).orElseThrow(IllegalArgumentException::new).getStatus()));
}
@Test
public void testFetchUnknownWork() {
assertFalse(myWorkChunkRepository.findById("FOO").isPresent());
}
@Test
public void testStoreAndFetchWorkChunk_NoData() {
JobInstance instance = createInstance(true, false);
@ParameterizedTest
@CsvSource({
"false, READY, QUEUED",
"true, GATE_WAITING, QUEUED"
})
public void testStoreAndFetchWorkChunk_withOrWithoutGatedExecution_createdAndTransitionToExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum theExpectedStatusOnCreate, WorkChunkStatusEnum theExpectedStatusAfterTransition) {
// setup
JobInstance instance = createInstance(true, theGatedExecution);
String instanceId = mySvc.storeNewInstance(instance);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
// execute & verify
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null, theGatedExecution);
runInTransaction(() -> assertEquals(theExpectedStatusOnCreate, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(theExpectedStatusAfterTransition, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
if (WorkChunkStatusEnum.READY.equals(theExpectedStatusAfterTransition)) {
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
}
WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
assertNull(chunk.getData());
}
@Test
public void testStoreAndFetchWorkChunk_withGatedJobMultipleChunk_correctTransitions() {
// setup
boolean isGatedExecution = true;
String expectedFirstChunkData = "IAmChunk1";
String expectedSecondChunkData = "IAmChunk2";
JobInstance instance = createInstance(true, isGatedExecution);
String instanceId = mySvc.storeNewInstance(instance);
// execute & verify
String firstChunkId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, expectedFirstChunkData, isGatedExecution);
String secondChunkId = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, expectedSecondChunkData, isGatedExecution);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.GATE_WAITING, myWorkChunkRepository.findById(firstChunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.GATE_WAITING, myWorkChunkRepository.findById(secondChunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(firstChunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.GATE_WAITING, myWorkChunkRepository.findById(secondChunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
WorkChunk actualFirstChunkData = mySvc.onWorkChunkDequeue(firstChunkId).orElseThrow(IllegalArgumentException::new);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(firstChunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
assertEquals(expectedFirstChunkData, actualFirstChunkData.getData());
mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(firstChunkId, 50, 0));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.COMPLETED, myWorkChunkRepository.findById(firstChunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.GATE_WAITING, myWorkChunkRepository.findById(secondChunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.COMPLETED, myWorkChunkRepository.findById(firstChunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(secondChunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
WorkChunk actualSecondChunkData = mySvc.onWorkChunkDequeue(secondChunkId).orElseThrow(IllegalArgumentException::new);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(secondChunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
assertEquals(expectedSecondChunkData, actualSecondChunkData.getData());
}
@Test
void testStoreAndFetchChunksForInstance_NoData() {
// given
boolean isGatedExecution = false;
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String queuedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, "some data");
String erroredId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 1, "some more data");
String completedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 2, "some more data");
String queuedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, "some data", isGatedExecution);
String erroredId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 1, "some more data", isGatedExecution);
String completedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 2, "some more data", isGatedExecution);
mySvc.onWorkChunkDequeue(erroredId);
WorkChunkErrorEvent parameters = new WorkChunkErrorEvent(erroredId, "Our error message");
@ -468,17 +563,26 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
}
@Test
public void testStoreAndFetchWorkChunk_WithData() {
JobInstance instance = createInstance(true, false);
@ParameterizedTest
@CsvSource({
"false, READY, QUEUED",
"true, GATE_WAITING, QUEUED"
})
public void testStoreAndFetchWorkChunk_WithData(boolean theGatedExecution, WorkChunkStatusEnum theExpectedCreatedStatus, WorkChunkStatusEnum theExpectedTransitionStatus) {
JobInstance instance = createInstance(true, theGatedExecution);
String instanceId = mySvc.storeNewInstance(instance);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA, theGatedExecution);
assertNotNull(id);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(theExpectedCreatedStatus, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(theExpectedTransitionStatus, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
if (WorkChunkStatusEnum.READY.equals(theExpectedTransitionStatus)){
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
}
WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
assertEquals(36, chunk.getInstanceId().length());
assertEquals(JOB_DEFINITION_ID, chunk.getJobDefinitionId());
@ -491,9 +595,10 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testMarkChunkAsCompleted_Success() {
JobInstance instance = createInstance(true, false);
boolean isGatedExecution = false;
JobInstance instance = createInstance(true, isGatedExecution);
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA, isGatedExecution);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
@ -528,9 +633,10 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testMarkChunkAsCompleted_Error() {
JobInstance instance = createInstance(true, false);
boolean isGatedExecution = false;
JobInstance instance = createInstance(true, isGatedExecution);
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(JOB_DEFINITION_ID, TestJobDefinitionUtils.FIRST_STEP_ID, instanceId, SEQUENCE_NUMBER, null);
String chunkId = storeWorkChunk(JOB_DEFINITION_ID, TestJobDefinitionUtils.FIRST_STEP_ID, instanceId, SEQUENCE_NUMBER, null, isGatedExecution);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
@ -580,9 +686,10 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testMarkChunkAsCompleted_Fail() {
JobInstance instance = createInstance(true, false);
boolean isGatedExecution = false;
JobInstance instance = createInstance(true, isGatedExecution);
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null, isGatedExecution);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
@ -620,7 +727,8 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
"stepId",
instanceId,
0,
"{}"
"{}",
false
);
String id = mySvc.onWorkChunkCreate(chunk);
chunkIds.add(id);
@ -698,6 +806,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
}
);
instance.setCurrentGatedStepId(jobDef.getFirstStepId());
} else {
jobDef = TestJobDefinitionUtils.buildJobDefinition(
JOB_DEFINITION_ID,

View File

@ -91,6 +91,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa
return mySvc;
}
@Nonnull
public JobDefinition<TestJobParameters> withJobDefinition(boolean theIsGatedBoolean) {
JobDefinition.Builder<TestJobParameters, ?> builder = JobDefinition.newBuilder()
.setJobDefinitionId(theIsGatedBoolean ? GATED_JOB_DEFINITION_ID : JOB_DEFINITION_ID)
@ -165,11 +166,14 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa
instance.setJobDefinitionVersion(JOB_DEF_VER);
instance.setParameters(CHUNK_DATA);
instance.setReport("TEST");
if (jobDefinition.isGatedExecution()) {
instance.setCurrentGatedStepId(jobDefinition.getFirstStepId());
}
return instance;
}
public String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) {
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData);
public String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData, boolean theGatedExecution) {
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData, theGatedExecution);
return mySvc.onWorkChunkCreate(batchWorkChunk);
}
@ -226,7 +230,11 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa
}
public String createChunk(String theInstanceId) {
return storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, theInstanceId, 0, CHUNK_DATA);
return storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, theInstanceId, 0, CHUNK_DATA, false);
}
public String createChunk(String theInstanceId, boolean theGatedExecution) {
return storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, theInstanceId, 0, CHUNK_DATA, theGatedExecution);
}
public void enableMaintenanceRunner(boolean theToEnable) {

View File

@ -47,7 +47,7 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
@ValueSource(strings = {
"""
1|COMPLETED
2|GATED
2|GATE_WAITING
""",
"""
# Chunk already queued -> waiting for complete
@ -69,8 +69,9 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
""",
"""
# Not all steps ready to advance
# Latch Count: 1
1|COMPLETED
2|READY # a single ready chunk
2|READY,2|QUEUED # a single ready chunk
2|IN_PROGRESS
""",
"""
@ -82,32 +83,36 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
3|READY
""",
"""
1|COMPLETED
2|READY
2|QUEUED
2|COMPLETED
2|ERRORED
2|FAILED
2|IN_PROGRESS
3|GATED
3|GATED
# when current step is not all queued, should queue READY chunks
# Latch Count: 1
1|COMPLETED
2|READY,2|QUEUED
2|QUEUED
2|COMPLETED
2|ERRORED
2|FAILED
2|IN_PROGRESS
3|GATE_WAITING
3|QUEUED
""",
"""
1|COMPLETED
2|READY
2|QUEUED
2|COMPLETED
2|ERRORED
2|FAILED
2|IN_PROGRESS
3|QUEUED # a lie
3|GATED
# when current step is all queued but not done, should not proceed
1|COMPLETED
2|COMPLETED
2|QUEUED
2|COMPLETED
2|ERRORED
2|FAILED
2|IN_PROGRESS
3|GATE_WAITING
3|GATE_WAITING
"""
})
default void testGatedStep2NotReady_notAdvance(String theChunkState) throws InterruptedException {
// setup
int expectedLatchCount = getLatchCountFromState(theChunkState);
PointcutLatch sendingLatch = disableWorkChunkMessageHandler();
sendingLatch.setExpectedCount(0);
sendingLatch.setExpectedCount(expectedLatchCount);
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true);
createChunksInStates(result);
@ -117,11 +122,21 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
// verify
// nothing ever queued -> nothing ever sent to queue
verifyWorkChunkMessageHandlerCalled(sendingLatch, 0);
verifyWorkChunkMessageHandlerCalled(sendingLatch, expectedLatchCount);
verifyWorkChunkFinalStates(result);
}
@Disabled
/**
* Returns the expected latch count specified in the state. Defaults to 0 if not found.
* Expected format: # Latch Count: {}
* e.g. # Latch Count: 3
*/
private int getLatchCountFromState(String theState){
String keyStr = "# Latch Count: ";
int index = theState.indexOf(keyStr);
return index == -1 ? 0 : theState.charAt(index + keyStr.length()) - '0';
}
@ParameterizedTest
@ValueSource(strings = {
"""
@ -129,27 +144,30 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
1|COMPLETED
2|COMPLETED
2|COMPLETED
3|GATED|READY
3|GATED|READY
3|GATE_WAITING,3|QUEUED
3|GATE_WAITING,3|QUEUED
""",
"""
# OLD code only
1|COMPLETED
2|QUEUED,2|READY
2|QUEUED,2|READY
2|COMPLETED
2|COMPLETED
3|QUEUED,3|QUEUED
3|QUEUED,3|QUEUED
""",
"""
# mixed code only
# mixed code
1|COMPLETED
2|COMPLETED
2|COMPLETED
3|GATED|READY
3|QUEUED|READY
3|GATE_WAITING,3|QUEUED
3|QUEUED,3|QUEUED
"""
})
default void testGatedStep2ReadyToAdvance_advanceToStep3(String theChunkState) throws InterruptedException {
// setup
PointcutLatch sendingLatch = disableWorkChunkMessageHandler();
sendingLatch.setExpectedCount(2);
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true);
createChunksInStates(result);
@ -157,8 +175,7 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
runMaintenancePass();
// verify
// things are being set to READY; is anything being queued?
verifyWorkChunkMessageHandlerCalled(sendingLatch, 0);
verifyWorkChunkMessageHandlerCalled(sendingLatch, 2);
verifyWorkChunkFinalStates(result);
}

View File

@ -35,8 +35,8 @@ public interface IWorkChunkCommon extends WorkChunkTestConstants {
return getTestManager().createInstance();
}
default String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) {
return getTestManager().storeWorkChunk(theJobDefinitionId, theTargetStepId, theInstanceId, theSequence, theSerializedData);
default String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData, boolean theGatedExecution) {
return getTestManager().storeWorkChunk(theJobDefinitionId, theTargetStepId, theInstanceId, theSequence, theSerializedData, theGatedExecution);
}
default void runInTransaction(Runnable theRunnable) {
@ -80,6 +80,10 @@ public interface IWorkChunkCommon extends WorkChunkTestConstants {
return getTestManager().createChunk(theJobInstanceId);
}
default String createChunk(String theJobInstanceId, boolean theGatedExecution) {
return getTestManager().createChunk(theJobInstanceId, theGatedExecution);
}
/**
* Enable/disable the maintenance runner (So it doesn't run on a scheduler)
*/

View File

@ -6,6 +6,8 @@ import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation;
import ca.uhn.test.concurrency.PointcutLatch;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -15,21 +17,30 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT
Logger ourLog = LoggerFactory.getLogger(IWorkChunkStateTransitions.class);
@Test
default void chunkCreation_isQueued() {
@ParameterizedTest
@CsvSource({
"false, READY",
"true, GATE_WAITING"
})
default void chunkCreation_isInExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum expectedStatus) {
String jobInstanceId = createAndStoreJobInstance(null);
String myChunkId = createChunk(jobInstanceId);
String myChunkId = createChunk(jobInstanceId, theGatedExecution);
WorkChunk fetchedWorkChunk = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.READY, fetchedWorkChunk.getStatus(), "New chunks are READY");
assertEquals(expectedStatus, fetchedWorkChunk.getStatus(), "New chunks are " + expectedStatus);
}
@Test
default void chunkReceived_queuedToInProgress() throws InterruptedException {
@ParameterizedTest
@CsvSource({
"false",
"true"
})
default void chunkReceived_queuedToInProgress(boolean theGatedExecution) throws InterruptedException {
PointcutLatch sendLatch = disableWorkChunkMessageHandler();
sendLatch.setExpectedCount(1);
String jobInstanceId = createAndStoreJobInstance(null);
String myChunkId = createChunk(jobInstanceId);
String jobInstanceId = createAndStoreJobInstance(withJobDefinition(theGatedExecution));
String myChunkId = createChunk(jobInstanceId, theGatedExecution);
runMaintenancePass();
// the worker has received the chunk, and marks it started.

View File

@ -2,18 +2,17 @@ package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation;
import ca.uhn.test.concurrency.PointcutLatch;
import com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
@ -23,8 +22,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestConstants {
@ -33,7 +30,7 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
JobInstance instance = createInstance();
String instanceId = getSvc().storeNewInstance(instance);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null, false);
runInTransaction(() -> {
WorkChunk chunk = freshFetchWorkChunk(id);
@ -41,17 +38,21 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
});
}
@Test
default void testWorkChunkCreate_inReadyState() {
@ParameterizedTest
@CsvSource({
"false, READY",
"true, GATE_WAITING"
})
default void testWorkChunkCreate_inExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum expectedStatus) {
JobInstance instance = createInstance();
String instanceId = getSvc().storeNewInstance(instance);
enableMaintenanceRunner(false);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA, theGatedExecution);
assertNotNull(id);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, freshFetchWorkChunk(id).getStatus()));
runInTransaction(() -> assertEquals(expectedStatus, freshFetchWorkChunk(id).getStatus()));
}
@Test

View File

@ -145,7 +145,8 @@ public class JobMaintenanceStateInformation {
if (jobDef.isGatedExecution()) {
AtomicReference<String> latestStepId = new AtomicReference<>();
int totalSteps = jobDef.getSteps().size();
for (int i = totalSteps - 1; i >= 0; i--) {
// ignore the last step
for (int i = totalSteps - 2; i >= 0; i--) {
JobDefinitionStep<?, ?, ?> step = jobDef.getSteps().get(i);
if (stepIds.contains(step.getStepId())) {
latestStepId.set(step.getStepId());

View File

@ -280,4 +280,28 @@ public interface IJobPersistence extends IWorkChunkPersistence {
@VisibleForTesting
WorkChunk createWorkChunk(WorkChunk theWorkChunk);
/**
* Atomically advance the given job to the given step and change the status of all chunks in the next step to READY
* @param theJobInstanceId the id of the job instance to be updated
* @param theNextStepId the id of the next job step
* @return
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
boolean advanceJobStepAndUpdateChunkStatus(String theJobInstanceId, String theNextStepId);
/**
* Update all chunks of the given step id for the given job from
* @param theJobInstanceId the id of the job instance to be updated
* @param theStepId the id of the step which the chunks belong to
* @param theNewStatus the new status
* @param theOldStatus the old status
* @return
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
int updateAllChunksForStepWithStatus(
String theJobInstanceId,
String theStepId,
WorkChunkStatusEnum theNewStatus,
WorkChunkStatusEnum theOldStatus);
}

View File

@ -55,7 +55,8 @@ public interface IWorkChunkPersistence {
* The first state event, as the chunk is created.
* This method should be atomic and should only
* return when the chunk has been successfully stored in the database.
* Chunk should be stored with a status of {@link WorkChunkStatusEnum#QUEUED}
* Chunk should be stored with a status of {@link WorkChunkStatusEnum#READY} or
* {@link WorkChunkStatusEnum#GATE_WAITING} for ungated and gated jobs, respectively.
*
* @param theBatchWorkChunk the batch work chunk to be stored
* @return a globally unique identifier for this chunk.

View File

@ -82,7 +82,13 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
// once finished, create workchunks in READY state
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(
myJobDefinitionId, myJobDefinitionVersion, targetStepId, instanceId, sequence, dataValueString);
myJobDefinitionId,
myJobDefinitionVersion,
targetStepId,
instanceId,
sequence,
dataValueString,
myGatedExecution);
String chunkId = myHapiTransactionService
.withSystemRequestOnDefaultPartition()
.withPropagation(Propagation.REQUIRES_NEW)

View File

@ -245,15 +245,22 @@ public class JobInstanceProcessor {
return true;
}
if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) {
// all previous workchunks are complete;
// none in READY though -> still proceed
if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.GATE_WAITING)) && theWorkCursor.isFirstStep()) {
// We are in the first step and all chunks are in GATE_WAITING
// this means that the job has just started, no workchunks have been queued yet -> proceed.
return true;
}
if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.READY))) {
if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) {
// all workchunks for the current step are in COMPLETED -> proceed.
return true;
}
// all workchunks for gated jobs should be turned to QUEUED immediately after they are set to READY
// but in case we die in between, this conditional ought to catch the READY chunks.
if (workChunkStatuses.contains(WorkChunkStatusEnum.READY)) {
if (theWorkCursor.isFirstStep()) {
// first step - all ready means we're ready to proceed to the next step
// first step - all ready means we're ready to proceed to enqueue
return true;
} else {
// it's a future step;
@ -300,11 +307,11 @@ public class JobInstanceProcessor {
*
* We could block chunks from being moved from QUEUE'd to READY here for gated steps
* but currently, progress is calculated by looking at completed chunks only;
* we'd need a new GATE_WAITING state to move chunks to to prevent jobs from
* we'd need a new GATE_WAITING state to move chunks to prevent jobs from
* completing prematurely.
*/
private void enqueueReadyChunks(
JobInstance theJobInstance, JobDefinition<?> theJobDefinition, boolean theIsGatedExecutionAdvancementBool) {
JobInstance theJobInstance, JobDefinition<?> theJobDefinition, boolean theIsGatedExecutionBool) {
Iterator<WorkChunkMetadata> iter = getReadyChunks();
AtomicInteger counter = new AtomicInteger();
@ -315,8 +322,7 @@ public class JobInstanceProcessor {
return JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, metadata.getTargetStepId());
});
counter.getAndIncrement();
if (!theIsGatedExecutionAdvancementBool
&& (theJobDefinition.isGatedExecution() || jobWorkCursor.isReductionStep())) {
if (!theIsGatedExecutionBool && (theJobDefinition.isGatedExecution() || jobWorkCursor.isReductionStep())) {
/*
* Gated executions are queued later when all work chunks are ready.
*
@ -398,32 +404,36 @@ public class JobInstanceProcessor {
readyChunksForNextStep.size());
}
// TODO
// create a new persistence transition for state advance
// update stepId to next step AND update all chunks in this step to READY (from GATED or QUEUED ;-P)
// so we can queue them safely.
boolean isEnqueue;
String currentStepId = theWorkCursor.getCurrentStepId();
Set<WorkChunkStatusEnum> workChunkStatusesForCurrentStep =
myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(theInstance.getInstanceId(), currentStepId);
if (workChunkStatusesForCurrentStep.equals(Set.of(WorkChunkStatusEnum.GATE_WAITING))
&& theWorkCursor.isFirstStep()) {
// this means that the job has just started, no workchunks have been queued yet
// turn the first chunk to READY, do NOT advance the step.
isEnqueue = myJobPersistence.updateAllChunksForStepWithStatus(
instanceId, currentStepId, WorkChunkStatusEnum.READY, WorkChunkStatusEnum.GATE_WAITING)
!= 0;
} else if (workChunkStatusesForCurrentStep.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) {
// update the job step so the workers will process them.
// if it's the last (gated) step, there will be no change - but we should
// queue up the chunks anyway
isEnqueue = theWorkCursor.isFinalStep()
|| myJobPersistence.advanceJobStepAndUpdateChunkStatus(instanceId, nextStepId);
} else {
// this means that the current step's chunks contains only READY and QUEUED chunks, possibly leftover from
// other maintenance job who died in the middle
// should enqueue the rest of the ready chunks
isEnqueue = true;
}
// update the job step so the workers will process them.
// if it's the last (gated) step, there will be no change - but we should
// queue up the chunks anyways
boolean changed = theWorkCursor.isFinalStep()
|| myJobPersistence.updateInstance(instanceId, instance -> {
if (instance.getCurrentGatedStepId().equals(nextStepId)) {
// someone else beat us here. No changes
return false;
}
instance.setCurrentGatedStepId(nextStepId);
return true;
});
if (!changed) {
if (!isEnqueue) {
// we collided with another maintenance job.
ourLog.warn("Skipping gate advance to {} for instance {} - already advanced.", nextStepId, instanceId);
return;
}
ourLog.debug("Moving gated instance {} to next step.", theInstance.getInstanceId());
// because we now have all gated job chunks in READY state,
// we can enqueue them
enqueueReadyChunks(theInstance, theJobDefinition, true);

View File

@ -36,6 +36,7 @@ public class WorkChunkCreateEvent {
public final String instanceId;
public final int sequence;
public final String serializedData;
public final boolean isGatedExecution;
/**
* Constructor
@ -52,20 +53,24 @@ public class WorkChunkCreateEvent {
@Nonnull String theTargetStepId,
@Nonnull String theInstanceId,
int theSequence,
@Nullable String theSerializedData) {
@Nullable String theSerializedData,
boolean theGatedExecution) {
jobDefinitionId = theJobDefinitionId;
jobDefinitionVersion = theJobDefinitionVersion;
targetStepId = theTargetStepId;
instanceId = theInstanceId;
sequence = theSequence;
serializedData = theSerializedData;
isGatedExecution = theGatedExecution;
}
public static WorkChunkCreateEvent firstChunk(JobDefinition<?> theJobDefinition, String theInstanceId) {
String firstStepId = theJobDefinition.getFirstStepId();
String jobDefinitionId = theJobDefinition.getJobDefinitionId();
int jobDefinitionVersion = theJobDefinition.getJobDefinitionVersion();
return new WorkChunkCreateEvent(jobDefinitionId, jobDefinitionVersion, firstStepId, theInstanceId, 0, null);
boolean isGatedExecution = theJobDefinition.isGatedExecution();
return new WorkChunkCreateEvent(
jobDefinitionId, jobDefinitionVersion, firstStepId, theInstanceId, 0, null, isGatedExecution);
}
@Override
@ -83,6 +88,7 @@ public class WorkChunkCreateEvent {
.append(instanceId, that.instanceId)
.append(sequence, that.sequence)
.append(serializedData, that.serializedData)
.append(isGatedExecution, that.isGatedExecution)
.isEquals();
}
@ -95,6 +101,7 @@ public class WorkChunkCreateEvent {
.append(instanceId)
.append(sequence)
.append(serializedData)
.append(isGatedExecution)
.toHashCode();
}
}

View File

@ -32,6 +32,7 @@ import java.util.Set;
*/
public enum WorkChunkStatusEnum {
// wipmb For 6.8 Add WAITING for gated, and READY for in db, but not yet sent to channel.
GATE_WAITING,
READY,
QUEUED,
IN_PROGRESS,
@ -59,6 +60,8 @@ public enum WorkChunkStatusEnum {
public Set<WorkChunkStatusEnum> getNextStates() {
switch (this) {
case GATE_WAITING:
return EnumSet.of(READY);
case READY:
return EnumSet.of(QUEUED);
case QUEUED:

View File

@ -73,6 +73,7 @@ public class InstanceProgress {
statusToCountMap.put(theChunk.getStatus(), statusToCountMap.getOrDefault(theChunk.getStatus(), 0) + 1);
switch (theChunk.getStatus()) {
case GATE_WAITING:
case READY:
case QUEUED:
case IN_PROGRESS: