merging together

This commit is contained in:
leif stawnyczy 2024-04-12 16:27:19 -04:00
commit 3d7910415a
22 changed files with 527 additions and 256 deletions

View File

@ -0,0 +1,7 @@
---
type: add
issue: 5818
title: "Added another state to the Batch2 work chunk state machine: GATE_WAITING.
This work chunk state will be the initial state on creation for gated jobs.
Once all chunks are completed for the previous step, they will transition to READY.
"

View File

@ -57,10 +57,11 @@ stateDiagram-v2
state FAILED
state COMPLETED
direction LR
[*] --> READY : on create - normal
[*] --> GATED : on create - gated
GATED --> READY : on create - gated
READY --> QUEUED : placed on kafka (maint.)
[*] --> READY : on create - normal or gated first chunk
[*] --> GATE_WAITING : on create - gated jobs for all but the first chunk
GATE_WAITING --> READY : on gate release - gated
QUEUED --> READY : on gate release - gated (for compatibility with legacy QUEUED state up to Hapi-fhir version 7.1)
READY --> QUEUED : placed on kafka (maint.)
POLL_WAITING --> READY : after a poll delay on a POLL_WAITING work chunk has elapsed
%% worker processing states

View File

@ -19,19 +19,20 @@ A HAPI-FHIR batch job definition consists of a job name, version, parameter json
After a job has been defined, *instances* of that job can be submitted for batch processing by populating a `JobInstanceStartRequest` with the job name and job parameters json and then submitting that request to the Batch Job Coordinator.
The Batch Job Coordinator will then store two records in the database:
- Job Instance with status QUEUED: that is the parent record for all data concerning this job
- Batch Work Chunk with status READY: this describes the first "chunk" of work required for this job. The first Batch Work Chunk contains no data.
- Job Instance with status `QUEUED`: that is the parent record for all data concerning this job
- Batch Work Chunk with status `READY`: this describes the first "chunk" of work required for this job. The first Batch Work Chunk contains no data.
### The Maintenance Job
A Scheduled Job runs periodically (once a minute). For each Job Instance in the database, it:
1. Calculates job progress (% of work chunks in `COMPLETE` status). If the job is finished, purges any left over work chunks still in the database.
1. Moves all `POLL_WAITING` work chunks to `READY` if their `nextPollTime` has expired.
1. Moves all `READY` work chunks into the `QUEUED` state and publishes a message to the Batch Notification Message Channel to inform worker threads that a work chunk is now ready for processing. \*
1. Calculates job progress (% of work chunks in `COMPLETE` status). If the job is finished, purges any leftover work chunks still in the database.
1. Cleans up any complete, failed, or cancelled jobs that need to be removed.
1. Moves any gated jobs onto their next step when complete.
1. When the current step is complete, moves any gated jobs onto their next step and updates all chunks in `GATE_WAITING` to `READY`.
1. If the final step of a gated job is a reduction step, a reduction step execution will be triggered.
1. Moves all `READY` work chunks into the `QUEUED` state and publishes a message to the Batch Notification Message Channel to inform worker threads that a work chunk is now ready for processing. \*
\* An exception is for the final reduction step, where work chunks are not published to the Batch Notification Message Channel,
but instead processed inline.

View File

@ -129,7 +129,11 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
entity.setSerializedData(theBatchWorkChunk.serializedData);
entity.setCreateTime(new Date());
entity.setStartTime(new Date());
entity.setStatus(WorkChunkStatusEnum.READY);
// set gated job chunks to GATE_WAITING; they will be transitioned to READY during maintenance pass when all
// chunks in the previous step are COMPLETED
entity.setStatus(
theBatchWorkChunk.isGatedExecution ? WorkChunkStatusEnum.GATE_WAITING : WorkChunkStatusEnum.READY);
ourLog.debug("Create work chunk {}/{}/{}", entity.getInstanceId(), entity.getId(), entity.getTargetStepId());
ourLog.trace(
"Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData());
@ -595,4 +599,35 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
myInterceptorBroadcaster.callHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE, params);
}
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public boolean advanceJobStepAndUpdateChunkStatus(String theJobInstanceId, String theNextStepId) {
boolean changed = updateInstance(theJobInstanceId, instance -> {
if (instance.getCurrentGatedStepId().equals(theNextStepId)) {
// someone else beat us here. No changes
return false;
}
ourLog.debug("Moving gated instance {} to the next step {}.", theJobInstanceId, theNextStepId);
instance.setCurrentGatedStepId(theNextStepId);
return true;
});
if (changed) {
ourLog.debug(
"Updating chunk status from GATE_WAITING to READY for gated instance {} in step {}.",
theJobInstanceId,
theNextStepId);
// when we reach here, the current step id is equal to theNextStepId
int numChanged =
myWorkChunkRepository.updateAllChunksForStepFromGateWaitingToReady(theJobInstanceId, theNextStepId);
ourLog.debug(
"Updated {} chunks of gated instance {} for step {} from fake QUEUED to READY.",
numChanged,
theJobInstanceId,
theNextStepId);
}
return changed;
}
}

View File

@ -128,6 +128,19 @@ public interface IBatch2WorkChunkRepository
@Param("newStatus") WorkChunkStatusEnum theNewStatus,
@Param("oldStatus") WorkChunkStatusEnum theOldStatus);
// Up to 7.1, gated jobs' work chunks are created in status QUEUED but not actually queued for the
// workers.
// In order to keep them compatible, turn QUEUED chunks into READY, too.
// TODO: remove QUEUED from the IN clause when we are certain that no one is still running the old code.
@Modifying
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = ca.uhn.fhir.batch2.model.WorkChunkStatusEnum.READY WHERE "
+ "e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId AND e.myStatus in ("
+ "ca.uhn.fhir.batch2.model.WorkChunkStatusEnum.GATE_WAITING,"
+ "ca.uhn.fhir.batch2.model.WorkChunkStatusEnum.QUEUED"
+ ")")
int updateAllChunksForStepFromGateWaitingToReady(
@Param("instanceId") String theInstanceId, @Param("stepId") String theStepId);
@Modifying
@Query("DELETE FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId")
int deleteAllForInstance(@Param("instanceId") String theInstanceId);

View File

@ -4,6 +4,7 @@ import ca.uhn.fhir.batch2.api.JobOperationResultJson;
import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
@ -31,6 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)

View File

@ -206,7 +206,7 @@ public class Batch2JobMaintenanceIT extends BaseJpaR4Test {
@Nonnull
private JobDefinition<? extends IModelJson> buildGatedJobDefinition(String theJobId, IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> theFirstStep, IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> theLastStep) {
return TestJobDefinitionUtils.buildJobDefinition(
return TestJobDefinitionUtils.buildGatedJobDefinition(
theJobId,
theFirstStep,
theLastStep,

View File

@ -38,6 +38,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Import;
@ -78,7 +79,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
public static final String JOB_DEFINITION_ID = "definition-id";
public static final String TARGET_STEP_ID = TestJobDefinitionUtils.FIRST_STEP_ID;
public static final String FIRST_STEP_ID = TestJobDefinitionUtils.FIRST_STEP_ID;
public static final String LAST_STEP_ID = TestJobDefinitionUtils.LAST_STEP_ID;
public static final String DEF_CHUNK_ID = "definition-chunkId";
public static final String STEP_CHUNK_ID = TestJobDefinitionUtils.FIRST_STEP_ID;
public static final int JOB_DEF_VER = 1;
@ -110,7 +112,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
for (int i = 0; i < 10; i++) {
storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, i, JsonUtil.serialize(new NdJsonFileJson().setNdJsonText("{}")));
storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, i, JsonUtil.serialize(new NdJsonFileJson().setNdJsonText("{}")), false);
}
// Execute
@ -125,8 +127,13 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
});
}
private String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) {
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, TestJobDefinitionUtils.TEST_JOB_VERSION, theTargetStepId, theInstanceId, theSequence, theSerializedData);
private String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData, boolean theGatedExecution) {
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, TestJobDefinitionUtils.TEST_JOB_VERSION, theTargetStepId, theInstanceId, theSequence, theSerializedData, theGatedExecution);
return mySvc.onWorkChunkCreate(batchWorkChunk);
}
private String storeFirstWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) {
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, TestJobDefinitionUtils.TEST_JOB_VERSION, theTargetStepId, theInstanceId, theSequence, theSerializedData, false);
return mySvc.onWorkChunkCreate(batchWorkChunk);
}
@ -136,7 +143,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String instanceId = mySvc.storeNewInstance(instance);
runInTransaction(() -> {
Batch2JobInstanceEntity instanceEntity = myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalStateException::new);
Batch2JobInstanceEntity instanceEntity = findInstanceByIdOrThrow(instanceId);
assertEquals(StatusEnum.QUEUED, instanceEntity.getStatus());
});
@ -149,7 +156,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertEquals(instance.getReport(), foundInstance.getReport());
runInTransaction(() -> {
Batch2JobInstanceEntity instanceEntity = myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalStateException::new);
Batch2JobInstanceEntity instanceEntity = findInstanceByIdOrThrow(instanceId);
assertEquals(StatusEnum.QUEUED, instanceEntity.getStatus());
});
}
@ -240,8 +247,8 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
// Setup
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(JOB_DEFINITION_ID, JOB_DEF_VER, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, CHUNK_DATA, false);
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(JOB_DEFINITION_ID, JOB_DEF_VER, FIRST_STEP_ID, instanceId, 0, CHUNK_DATA, false);
String chunkId = mySvc.onWorkChunkCreate(batchWorkChunk);
Optional<Batch2WorkChunkEntity> byId = myWorkChunkRepository.findById(chunkId);
Batch2WorkChunkEntity entity = byId.get();
@ -367,10 +374,11 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testUpdateTime() {
// Setup
JobInstance instance = createInstance();
boolean isGatedExecution = false;
JobInstance instance = createInstance(true, isGatedExecution);
String instanceId = mySvc.storeNewInstance(instance);
Date updateTime = runInTransaction(() -> new Date(myJobInstanceRepository.findById(instanceId).orElseThrow().getUpdateTime().getTime()));
Date updateTime = runInTransaction(() -> new Date(findInstanceByIdOrThrow(instanceId).getUpdateTime().getTime()));
sleepUntilTimeChange();
@ -378,39 +386,148 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
runInTransaction(() -> mySvc.updateInstanceUpdateTime(instanceId));
// Verify
Date updateTime2 = runInTransaction(() -> new Date(myJobInstanceRepository.findById(instanceId).orElseThrow().getUpdateTime().getTime()));
Date updateTime2 = runInTransaction(() -> new Date(findInstanceByIdOrThrow(instanceId).getUpdateTime().getTime()));
assertNotEquals(updateTime, updateTime2);
}
@Test
public void advanceJobStepAndUpdateChunkStatus_forGatedJob_updatesCurrentStepAndChunkStatus() {
// setup
boolean isGatedExecution = true;
JobInstance instance = createInstance(true, isGatedExecution);
String instanceId = mySvc.storeNewInstance(instance);
String chunkIdSecondStep1 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, isGatedExecution);
String chunkIdSecondStep2 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, isGatedExecution);
runInTransaction(() -> assertEquals(FIRST_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId()));
// execute
runInTransaction(() -> {
boolean changed = mySvc.advanceJobStepAndUpdateChunkStatus(instanceId, LAST_STEP_ID);
assertTrue(changed);
});
// verify
runInTransaction(() -> {
assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkIdSecondStep1).getStatus());
assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkIdSecondStep2).getStatus());
assertEquals(LAST_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId());
});
}
@Test
public void advanceJobStepAndUpdateChunkStatus_whenAlreadyInTargetStep_DoesNotUpdateStepOrChunks() {
// setup
boolean isGatedExecution = true;
JobInstance instance = createInstance(true, isGatedExecution);
String instanceId = mySvc.storeNewInstance(instance);
String chunkIdSecondStep1 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, isGatedExecution);
String chunkIdSecondStep2 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, isGatedExecution);
runInTransaction(() -> assertEquals(FIRST_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId()));
// execute
runInTransaction(() -> {
boolean changed = mySvc.advanceJobStepAndUpdateChunkStatus(instanceId, FIRST_STEP_ID);
assertFalse(changed);
});
// verify
runInTransaction(() -> {
assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(chunkIdSecondStep1).getStatus());
assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(chunkIdSecondStep2).getStatus());
assertEquals(FIRST_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId());
});
}
@Test
public void testFetchUnknownWork() {
assertFalse(myWorkChunkRepository.findById("FOO").isPresent());
}
@Test
public void testStoreAndFetchWorkChunk_NoData() {
JobInstance instance = createInstance(true, false);
@ParameterizedTest
@CsvSource({
"false, READY, QUEUED",
"true, GATE_WAITING, QUEUED"
})
public void testStoreAndFetchWorkChunk_withOrWithoutGatedExecutionNoData_createdAndTransitionToExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum theExpectedStatusOnCreate, WorkChunkStatusEnum theExpectedStatusAfterTransition) {
// setup
JobInstance instance = createInstance(true, theGatedExecution);
String instanceId = mySvc.storeNewInstance(instance);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null);
// execute & verify
String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, null);
// mark the first chunk as COMPLETED to allow step advance
runInTransaction(() -> myWorkChunkRepository.updateChunkStatus(firstChunkId, WorkChunkStatusEnum.COMPLETED, WorkChunkStatusEnum.READY));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
String id = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, theGatedExecution);
runInTransaction(() -> assertEquals(theExpectedStatusOnCreate, findChunkByIdOrThrow(id).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(theExpectedStatusAfterTransition, findChunkByIdOrThrow(id).getStatus()));
WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
// assert null since we did not input any data when creating the chunks
assertNull(chunk.getData());
}
@Test
public void testStoreAndFetchWorkChunk_withGatedJobMultipleChunk_correctTransitions() {
// setup
boolean isGatedExecution = true;
String expectedFirstChunkData = "IAmChunk1";
String expectedSecondChunkData = "IAmChunk2";
JobInstance instance = createInstance(true, isGatedExecution);
String instanceId = mySvc.storeNewInstance(instance);
// execute & verify
String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, expectedFirstChunkData);
String secondChunkId = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, expectedSecondChunkData, isGatedExecution);
runInTransaction(() -> {
// check chunks created in expected states
assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(firstChunkId).getStatus());
assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(secondChunkId).getStatus());
});
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> {
assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(firstChunkId).getStatus());
// maintenance should not affect chunks in step 2
assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(secondChunkId).getStatus());
});
WorkChunk actualFirstChunkData = mySvc.onWorkChunkDequeue(firstChunkId).orElseThrow(IllegalArgumentException::new);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(firstChunkId).getStatus()));
assertEquals(expectedFirstChunkData, actualFirstChunkData.getData());
mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(firstChunkId, 50, 0));
runInTransaction(() -> {
assertEquals(WorkChunkStatusEnum.COMPLETED, findChunkByIdOrThrow(firstChunkId).getStatus());
assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(secondChunkId).getStatus());
});
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> {
assertEquals(WorkChunkStatusEnum.COMPLETED, findChunkByIdOrThrow(firstChunkId).getStatus());
// now that all chunks for step 1 is COMPLETED, should enqueue chunks in step 2
assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(secondChunkId).getStatus());
});
WorkChunk actualSecondChunkData = mySvc.onWorkChunkDequeue(secondChunkId).orElseThrow(IllegalArgumentException::new);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(secondChunkId).getStatus()));
assertEquals(expectedSecondChunkData, actualSecondChunkData.getData());
}
@Test
void testStoreAndFetchChunksForInstance_NoData() {
// given
boolean isGatedExecution = false;
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String queuedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, "some data");
String erroredId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 1, "some more data");
String completedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 2, "some more data");
String queuedId = storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, "some data", isGatedExecution);
String erroredId = storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 1, "some more data", isGatedExecution);
String completedId = storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 2, "some more data", isGatedExecution);
mySvc.onWorkChunkDequeue(erroredId);
WorkChunkErrorEvent parameters = new WorkChunkErrorEvent(erroredId, "Our error message");
@ -434,7 +551,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertEquals(JOB_DEFINITION_ID, workChunk.getJobDefinitionId());
assertEquals(JOB_DEF_VER, workChunk.getJobDefinitionVersion());
assertEquals(instanceId, workChunk.getInstanceId());
assertEquals(TARGET_STEP_ID, workChunk.getTargetStepId());
assertEquals(FIRST_STEP_ID, workChunk.getTargetStepId());
assertEquals(0, workChunk.getSequence());
assertEquals(WorkChunkStatusEnum.READY, workChunk.getStatus());
@ -468,17 +585,27 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
}
@Test
public void testStoreAndFetchWorkChunk_WithData() {
JobInstance instance = createInstance(true, false);
@ParameterizedTest
@CsvSource({
"false, READY, QUEUED",
"true, GATE_WAITING, QUEUED"
})
public void testStoreAndFetchWorkChunk_withOrWithoutGatedExecutionwithData_createdAndTransitionToExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum theExpectedCreatedStatus, WorkChunkStatusEnum theExpectedTransitionStatus) {
// setup
JobInstance instance = createInstance(true, theGatedExecution);
String instanceId = mySvc.storeNewInstance(instance);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
// execute & verify
String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, null);
// mark the first chunk as COMPLETED to allow step advance
runInTransaction(() -> myWorkChunkRepository.updateChunkStatus(firstChunkId, WorkChunkStatusEnum.COMPLETED, WorkChunkStatusEnum.READY));
String id = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, CHUNK_DATA, theGatedExecution);
assertNotNull(id);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(theExpectedCreatedStatus, findChunkByIdOrThrow(id).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(theExpectedTransitionStatus, findChunkByIdOrThrow(id).getStatus()));
WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
assertEquals(36, chunk.getInstanceId().length());
assertEquals(JOB_DEFINITION_ID, chunk.getJobDefinitionId());
@ -486,19 +613,20 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
assertEquals(CHUNK_DATA, chunk.getData());
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(id).getStatus()));
}
@Test
public void testMarkChunkAsCompleted_Success() {
JobInstance instance = createInstance(true, false);
boolean isGatedExecution = false;
JobInstance instance = createInstance(true, isGatedExecution);
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA, isGatedExecution);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkId).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(chunkId).getStatus()));
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
@ -508,13 +636,13 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertNull(chunk.getEndTime());
assertNull(chunk.getRecordsProcessed());
assertNotNull(chunk.getData());
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(chunkId).getStatus()));
sleepUntilTimeChange();
mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkId, 50, 0));
runInTransaction(() -> {
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new);
Batch2WorkChunkEntity entity = findChunkByIdOrThrow(chunkId);
assertEquals(WorkChunkStatusEnum.COMPLETED, entity.getStatus());
assertEquals(50, entity.getRecordsProcessed());
assertNotNull(entity.getCreateTime());
@ -528,14 +656,15 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testMarkChunkAsCompleted_Error() {
JobInstance instance = createInstance(true, false);
boolean isGatedExecution = false;
JobInstance instance = createInstance(true, isGatedExecution);
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(JOB_DEFINITION_ID, TestJobDefinitionUtils.FIRST_STEP_ID, instanceId, SEQUENCE_NUMBER, null);
String chunkId = storeWorkChunk(JOB_DEFINITION_ID, TestJobDefinitionUtils.FIRST_STEP_ID, instanceId, SEQUENCE_NUMBER, null, isGatedExecution);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkId).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(chunkId).getStatus()));
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
@ -546,7 +675,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
WorkChunkErrorEvent request = new WorkChunkErrorEvent(chunkId).setErrorMsg("This is an error message");
mySvc.onWorkChunkError(request);
runInTransaction(() -> {
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new);
Batch2WorkChunkEntity entity = findChunkByIdOrThrow(chunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus());
assertEquals("This is an error message", entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
@ -562,7 +691,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
WorkChunkErrorEvent request2 = new WorkChunkErrorEvent(chunkId).setErrorMsg("This is an error message 2");
mySvc.onWorkChunkError(request2);
runInTransaction(() -> {
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new);
Batch2WorkChunkEntity entity = findChunkByIdOrThrow(chunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus());
assertEquals("This is an error message 2", entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
@ -580,14 +709,15 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testMarkChunkAsCompleted_Fail() {
JobInstance instance = createInstance(true, false);
boolean isGatedExecution = false;
JobInstance instance = createInstance(true, isGatedExecution);
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null, isGatedExecution);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkId).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(chunkId).getStatus()));
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
@ -597,7 +727,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
mySvc.onWorkChunkFailed(chunkId, "This is an error message");
runInTransaction(() -> {
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new);
Batch2WorkChunkEntity entity = findChunkByIdOrThrow(chunkId);
assertEquals(WorkChunkStatusEnum.FAILED, entity.getStatus());
assertEquals("This is an error message", entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
@ -620,7 +750,8 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
"stepId",
instanceId,
0,
"{}"
"{}",
false
);
String id = mySvc.onWorkChunkCreate(chunk);
chunkIds.add(id);
@ -698,6 +829,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
}
);
instance.setCurrentGatedStepId(jobDef.getFirstStepId());
} else {
jobDef = TestJobDefinitionUtils.buildJobDefinition(
JOB_DEFINITION_ID,
@ -754,4 +886,12 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
Arguments.of(WorkChunkStatusEnum.COMPLETED, false)
);
}
private Batch2JobInstanceEntity findInstanceByIdOrThrow(String instanceId) {
return myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalStateException::new);
}
private Batch2WorkChunkEntity findChunkByIdOrThrow(String secondChunkId) {
return myWorkChunkRepository.findById(secondChunkId).orElseThrow(IllegalArgumentException::new);
}
}

View File

@ -39,6 +39,7 @@ import ca.uhn.hapi.fhir.batch2.test.support.TestJobStep3InputType;
import ca.uhn.test.concurrency.PointcutLatch;
import jakarta.annotation.Nonnull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
@ -91,15 +92,16 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa
return mySvc;
}
@Nonnull
public JobDefinition<TestJobParameters> withJobDefinition(boolean theIsGatedBoolean) {
JobDefinition.Builder<TestJobParameters, ?> builder = JobDefinition.newBuilder()
.setJobDefinitionId(theIsGatedBoolean ? GATED_JOB_DEFINITION_ID : JOB_DEFINITION_ID)
.setJobDefinitionVersion(JOB_DEF_VER)
.setJobDescription("A job description")
.setParametersType(TestJobParameters.class)
.addFirstStep(TARGET_STEP_ID, "the first step", TestJobStep2InputType.class, (theStepExecutionDetails, theDataSink) -> new RunOutcome(0))
.addIntermediateStep("2nd-step-id", "the second step", TestJobStep3InputType.class, (theStepExecutionDetails, theDataSink) -> new RunOutcome(0))
.addLastStep("last-step-id", "the final step", (theStepExecutionDetails, theDataSink) -> new RunOutcome(0));
.addFirstStep(FIRST_STEP_ID, "the first step", TestJobStep2InputType.class, (theStepExecutionDetails, theDataSink) -> new RunOutcome(0))
.addIntermediateStep(SECOND_STEP_ID, "the second step", TestJobStep3InputType.class, (theStepExecutionDetails, theDataSink) -> new RunOutcome(0))
.addLastStep(LAST_STEP_ID, "the final step", (theStepExecutionDetails, theDataSink) -> new RunOutcome(0));
if (theIsGatedBoolean) {
builder.gatedExecution();
}
@ -165,11 +167,19 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa
instance.setJobDefinitionVersion(JOB_DEF_VER);
instance.setParameters(CHUNK_DATA);
instance.setReport("TEST");
if (jobDefinition.isGatedExecution()) {
instance.setCurrentGatedStepId(jobDefinition.getFirstStepId());
}
return instance;
}
public String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) {
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData);
public String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData, boolean theGatedExecution) {
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData, theGatedExecution);
return mySvc.onWorkChunkCreate(batchWorkChunk);
}
public String storeFirstWorkChunk(JobDefinition<TestJobParameters> theJobDefinition, String theInstanceId) {
WorkChunkCreateEvent batchWorkChunk = WorkChunkCreateEvent.firstChunk(theJobDefinition, theInstanceId);
return mySvc.onWorkChunkCreate(batchWorkChunk);
}
@ -229,7 +239,15 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa
}
public String createChunk(String theInstanceId) {
return storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, theInstanceId, 0, CHUNK_DATA);
return storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, theInstanceId, 0, CHUNK_DATA, false);
}
public String createChunk(String theInstanceId, boolean theGatedExecution) {
return storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, theInstanceId, 0, CHUNK_DATA, theGatedExecution);
}
public String createFirstChunk(JobDefinition<TestJobParameters> theJobDefinition, String theJobInstanceId){
return storeFirstWorkChunk(theJobDefinition, theJobInstanceId);
}
public void enableMaintenanceRunner(boolean theToEnable) {

View File

@ -1,16 +1,18 @@
package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation;
import ca.uhn.test.concurrency.PointcutLatch;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestConstants {
Logger ourLog = LoggerFactory.getLogger(IJobMaintenanceActions.class);
@ -21,7 +23,7 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
}
@Test
default void test_gatedJob_stepReady_advances() throws InterruptedException {
default void test_gatedJob_stepReady_stepAdvances() throws InterruptedException {
// setup
String initialState = """
# chunks ready - move to queued
@ -47,7 +49,7 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
@ValueSource(strings = {
"""
1|COMPLETED
2|GATED
2|GATE_WAITING
""",
"""
# Chunk already queued -> waiting for complete
@ -69,8 +71,9 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
""",
"""
# Not all steps ready to advance
# Latch Count: 1
1|COMPLETED
2|READY # a single ready chunk
2|READY,2|QUEUED # a single ready chunk
2|IN_PROGRESS
""",
"""
@ -78,50 +81,65 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
1|COMPLETED
2|COMPLETED
2|IN_PROGRESS
3|READY
3|READY
3|GATE_WAITING
3|GATE_WAITING
""",
"""
1|COMPLETED
2|READY
2|QUEUED
2|COMPLETED
2|ERRORED
2|FAILED
2|IN_PROGRESS
3|GATED
3|GATED
# when current step is not all queued, should queue READY chunks
# Latch Count: 1
1|COMPLETED
2|READY,2|QUEUED
2|QUEUED
2|COMPLETED
2|ERRORED
2|FAILED
2|IN_PROGRESS
3|GATE_WAITING
3|QUEUED
""",
"""
1|COMPLETED
2|READY
2|QUEUED
2|COMPLETED
2|ERRORED
2|FAILED
2|IN_PROGRESS
3|QUEUED # a lie
3|GATED
# when current step is all queued but not done, should not proceed
1|COMPLETED
2|COMPLETED
2|QUEUED
2|COMPLETED
2|ERRORED
2|FAILED
2|IN_PROGRESS
3|GATE_WAITING
3|GATE_WAITING
"""
})
default void testGatedStep2NotReady_notAdvance(String theChunkState) throws InterruptedException {
default void testGatedStep2NotReady_stepNotAdvanceToStep3(String theChunkState) throws InterruptedException {
// setup
int expectedLatchCount = getLatchCountFromState(theChunkState);
PointcutLatch sendingLatch = getTestManager().disableWorkChunkMessageHandler();
sendingLatch.setExpectedCount(0);
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true);
sendingLatch.setExpectedCount(expectedLatchCount);
JobMaintenanceStateInformation state = setupGatedWorkChunkTransitionTest(theChunkState, true);
getTestManager().createChunksInStates(result);
getTestManager().createChunksInStates(state);
// test
getTestManager().runMaintenancePass();
// verify
// nothing ever queued -> nothing ever sent to queue
getTestManager().verifyWorkChunkMessageHandlerCalled(sendingLatch, 0);
verifyWorkChunkFinalStates(result);
getTestManager().verifyWorkChunkMessageHandlerCalled(sendingLatch, expectedLatchCount);
assertEquals(SECOND_STEP_ID, getJobInstanceFromState(state).getCurrentGatedStepId());
verifyWorkChunkFinalStates(state);
}
/**
* Returns the expected latch count specified in the state. Defaults to 0 if not found.
* Expected format: # Latch Count: {}
* e.g. # Latch Count: 3
*/
private int getLatchCountFromState(String theState){
String keyStr = "# Latch Count: ";
int index = theState.indexOf(keyStr);
return index == -1 ? 0 : theState.charAt(index + keyStr.length()) - '0';
}
@Disabled
@ParameterizedTest
@ValueSource(strings = {
"""
@ -129,37 +147,40 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
1|COMPLETED
2|COMPLETED
2|COMPLETED
3|GATED|READY
3|GATED|READY
3|GATE_WAITING,3|QUEUED
3|GATE_WAITING,3|QUEUED
""",
"""
# OLD code only
1|COMPLETED
2|QUEUED,2|READY
2|QUEUED,2|READY
2|COMPLETED
2|COMPLETED
3|QUEUED,3|QUEUED
3|QUEUED,3|QUEUED
""",
"""
# mixed code only
# mixed code
1|COMPLETED
2|COMPLETED
2|COMPLETED
3|GATED|READY
3|QUEUED|READY
3|GATE_WAITING,3|QUEUED
3|QUEUED,3|QUEUED
"""
})
default void testGatedStep2ReadyToAdvance_advanceToStep3(String theChunkState) throws InterruptedException {
// setup
PointcutLatch sendingLatch = getTestManager().disableWorkChunkMessageHandler();
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true);
getTestManager().createChunksInStates(result);
sendingLatch.setExpectedCount(2);
JobMaintenanceStateInformation state = setupGatedWorkChunkTransitionTest(theChunkState, true);
getTestManager().createChunksInStates(state);
// test
getTestManager().runMaintenancePass();
// verify
// things are being set to READY; is anything being queued?
getTestManager().verifyWorkChunkMessageHandlerCalled(sendingLatch, 0);
verifyWorkChunkFinalStates(result);
getTestManager().verifyWorkChunkMessageHandlerCalled(sendingLatch, 2);
assertEquals(LAST_STEP_ID, getJobInstanceFromState(state).getCurrentGatedStepId());
verifyWorkChunkFinalStates(state);
}
@Test
@ -206,4 +227,8 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
private void verifyWorkChunkFinalStates(JobMaintenanceStateInformation theStateInformation) {
theStateInformation.verifyFinalStates(getTestManager().getSvc());
}
private JobInstance getJobInstanceFromState(JobMaintenanceStateInformation state) {
return getTestManager().freshFetchJobInstance(state.getInstanceId());
}
}

View File

@ -37,7 +37,7 @@ public interface ITestFixture {
WorkChunk freshFetchWorkChunk(String theChunkId);
String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData);
String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData, boolean theGatedExecution);
void runInTransaction(Runnable theRunnable);
@ -62,6 +62,14 @@ public interface ITestFixture {
*/
String createChunk(String theJobInstanceId);
String createChunk(String theJobInstanceId, boolean theGatedExecution);
/**
* Create chunk as the first chunk of a job.
* @return the id of the created chunk
*/
String createFirstChunk(JobDefinition<TestJobParameters> theJobDefinition, String theJobInstanceId);
/**
* Enable/disable the maintenance runner (So it doesn't run on a scheduler)
*/

View File

@ -19,16 +19,16 @@
*/
package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters;
import ca.uhn.test.concurrency.PointcutLatch;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -44,21 +44,44 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT
Logger ourLog = LoggerFactory.getLogger(IWorkChunkStateTransitions.class);
@Test
default void chunkCreation_isQueued() {
@BeforeEach
default void before() {
getTestManager().enableMaintenanceRunner(false);
}
@ParameterizedTest
@CsvSource({
"false, READY",
"true, GATE_WAITING"
})
default void chunkCreation_nonFirstChunk_isInExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum expectedStatus) {
String jobInstanceId = getTestManager().createAndStoreJobInstance(null);
String myChunkId = getTestManager().createChunk(jobInstanceId);
String myChunkId = getTestManager().createChunk(jobInstanceId, theGatedExecution);
WorkChunk fetchedWorkChunk = getTestManager().freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.READY, fetchedWorkChunk.getStatus(), "New chunks are READY");
assertEquals(expectedStatus, fetchedWorkChunk.getStatus(), "New chunks are " + expectedStatus);
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
default void chunkCreation_firstChunk_isInReady(boolean theGatedExecution) {
JobDefinition<TestJobParameters> jobDef = getTestManager().withJobDefinition(theGatedExecution);
String jobInstanceId = getTestManager().createAndStoreJobInstance(jobDef);
String myChunkId = getTestManager().createFirstChunk(jobDef, jobInstanceId);
WorkChunk fetchedWorkChunk = getTestManager().freshFetchWorkChunk(myChunkId);
// the first chunk of both gated and non-gated job should start in READY
assertEquals(WorkChunkStatusEnum.READY, fetchedWorkChunk.getStatus(), "New chunks are " + WorkChunkStatusEnum.READY);
}
@Test
default void chunkReceived_queuedToInProgress() throws InterruptedException {
default void chunkReceived_forNongatedJob_queuedToInProgress() throws InterruptedException {
PointcutLatch sendLatch = getTestManager().disableWorkChunkMessageHandler();
sendLatch.setExpectedCount(1);
String jobInstanceId = getTestManager().createAndStoreJobInstance(null);
String myChunkId = getTestManager().createChunk(jobInstanceId);
JobDefinition<TestJobParameters> jobDef = getTestManager().withJobDefinition(false);
String jobInstanceId = getTestManager().createAndStoreJobInstance(jobDef);
String myChunkId = getTestManager().createChunk(jobInstanceId, false);
getTestManager().runMaintenancePass();
// the worker has received the chunk, and marks it started.
@ -73,11 +96,37 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT
getTestManager().verifyWorkChunkMessageHandlerCalled(sendLatch, 1);
}
@Test
default void advanceJobStepAndUpdateChunkStatus_forGatedJob_updatesBothREADYAndQUEUEDChunks() {
// setup
getTestManager().disableWorkChunkMessageHandler();
String state = """
1|COMPLETED
2|COMPLETED
3|GATE_WAITING,3|READY
3|QUEUED,3|READY
""";
JobDefinition<TestJobParameters> jobDef = getTestManager().withJobDefinition(true);
String jobInstanceId = getTestManager().createAndStoreJobInstance(jobDef);
JobMaintenanceStateInformation info = new JobMaintenanceStateInformation(jobInstanceId, jobDef, state);
getTestManager().createChunksInStates(info);
assertEquals(SECOND_STEP_ID, getTestManager().freshFetchJobInstance(jobInstanceId).getCurrentGatedStepId());
// execute
getTestManager().runInTransaction(() -> getTestManager().getSvc().advanceJobStepAndUpdateChunkStatus(jobInstanceId, LAST_STEP_ID));
// verify
assertEquals(LAST_STEP_ID, getTestManager().freshFetchJobInstance(jobInstanceId).getCurrentGatedStepId());
info.verifyFinalStates(getTestManager().getSvc());
}
@Test
default void enqueueWorkChunkForProcessing_enqueuesOnlyREADYChunks() throws InterruptedException {
// setup
getTestManager().disableWorkChunkMessageHandler();
getTestManager().enableMaintenanceRunner(false);
StringBuilder sb = new StringBuilder();
// first step is always complete

View File

@ -21,24 +21,21 @@ package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertNull;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation;
import ca.uhn.test.concurrency.PointcutLatch;
import com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
@ -48,17 +45,20 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestConstants {
@BeforeEach
default void before() {
getTestManager().enableMaintenanceRunner(false);
}
@Test
default void testStoreAndFetchWorkChunk_NoData() {
JobInstance instance = createInstance();
String instanceId = getTestManager().getSvc().storeNewInstance(instance);
String id = getTestManager().storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null);
String id = getTestManager().storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, null, false);
getTestManager().runInTransaction(() -> {
WorkChunk chunk = getTestManager().freshFetchWorkChunk(id);
@ -66,24 +66,25 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
});
}
@Test
default void testWorkChunkCreate_inReadyState() {
@ParameterizedTest
@CsvSource({
"false, READY",
"true, GATE_WAITING"
})
default void testWorkChunkCreate_inExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum expectedStatus) {
JobInstance instance = createInstance();
String instanceId = getTestManager().getSvc().storeNewInstance(instance);
getTestManager().enableMaintenanceRunner(false);
String id = getTestManager().storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
String id = getTestManager().storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, CHUNK_DATA, theGatedExecution);
assertNotNull(id);
getTestManager().runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, getTestManager().freshFetchWorkChunk(id).getStatus()));
getTestManager().runInTransaction(() -> assertEquals(expectedStatus, getTestManager().freshFetchWorkChunk(id).getStatus()));
}
@Test
default void testNonGatedWorkChunkInReady_IsQueuedDuringMaintenance() throws InterruptedException {
// setup
int expectedCalls = 1;
getTestManager().enableMaintenanceRunner(false);
PointcutLatch sendingLatch = getTestManager().disableWorkChunkMessageHandler();
sendingLatch.setExpectedCount(expectedCalls);
String state = "1|READY,1|QUEUED";
@ -109,7 +110,6 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
default void testStoreAndFetchWorkChunk_WithData() {
// setup
getTestManager().disableWorkChunkMessageHandler();
getTestManager().enableMaintenanceRunner(false);
JobDefinition<?> jobDefinition = getTestManager().withJobDefinition(false);
JobInstance instance = createInstance();
String instanceId = getTestManager().getSvc().storeNewInstance(instance);
@ -144,7 +144,6 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
// setup
String state = "2|IN_PROGRESS,2|COMPLETED";
getTestManager().disableWorkChunkMessageHandler();
getTestManager().enableMaintenanceRunner(false);
JobDefinition<?> jobDefinition = getTestManager().withJobDefinition(false);
String instanceId = getTestManager().createAndStoreJobInstance(jobDefinition);
@ -174,7 +173,6 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
// setup
String state = "1|IN_PROGRESS,1|ERRORED";
getTestManager().disableWorkChunkMessageHandler();
getTestManager().enableMaintenanceRunner(false);
JobDefinition<?> jobDef = getTestManager().withJobDefinition(false);
String instanceId = getTestManager().createAndStoreJobInstance(jobDef);
JobMaintenanceStateInformation info = new JobMaintenanceStateInformation(
@ -216,7 +214,6 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
// setup
String state = "1|IN_PROGRESS,1|FAILED";
getTestManager().disableWorkChunkMessageHandler();
getTestManager().enableMaintenanceRunner(false);
JobDefinition<?> jobDef = getTestManager().withJobDefinition(false);
String instanceId = getTestManager().createAndStoreJobInstance(jobDef);
JobMaintenanceStateInformation info = new JobMaintenanceStateInformation(
@ -248,7 +245,6 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
1|IN_PROGRESS,1|COMPLETED
""";
getTestManager().disableWorkChunkMessageHandler();
getTestManager().enableMaintenanceRunner(false);
JobDefinition<?> jobDef = getTestManager().withJobDefinition(false);
String instanceId = getTestManager().createAndStoreJobInstance(jobDef);
JobMaintenanceStateInformation info = new JobMaintenanceStateInformation(

View File

@ -24,7 +24,9 @@ public interface WorkChunkTestConstants {
// we use a separate id for gated jobs because these job definitions might not
// be cleaned up after any given test run
String GATED_JOB_DEFINITION_ID = "gated_job_def_id";
public static final String TARGET_STEP_ID = "step-id";
public static final String FIRST_STEP_ID = "step-id";
public static final String SECOND_STEP_ID = "2nd-step-id";
public static final String LAST_STEP_ID = "last-step-id";
public static final String DEF_CHUNK_ID = "definition-chunkId";
public static final int JOB_DEF_VER = 1;
public static final int SEQUENCE_NUMBER = 1;

View File

@ -150,7 +150,8 @@ public class JobMaintenanceStateInformation {
if (jobDef.isGatedExecution()) {
AtomicReference<String> latestStepId = new AtomicReference<>();
int totalSteps = jobDef.getSteps().size();
for (int i = totalSteps - 1; i >= 0; i--) {
// ignore the last step since tests in gated jobs needs the current step to be the second-last step
for (int i = totalSteps - 2; i >= 0; i--) {
JobDefinitionStep<?, ?, ?> step = jobDef.getSteps().get(i);
if (stepIds.contains(step.getStepId())) {
latestStepId.set(step.getStepId());

View File

@ -289,4 +289,14 @@ public interface IJobPersistence extends IWorkChunkPersistence {
@VisibleForTesting
WorkChunk createWorkChunk(WorkChunk theWorkChunk);
/**
* Atomically advance the given job to the given step and change the status of all QUEUED and GATE_WAITING chunks
* in the next step to READY
* @param theJobInstanceId the id of the job instance to be updated
* @param theNextStepId the id of the next job step
* @return whether any changes were made
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
boolean advanceJobStepAndUpdateChunkStatus(String theJobInstanceId, String theNextStepId);
}

View File

@ -56,7 +56,8 @@ public interface IWorkChunkPersistence {
* The first state event, as the chunk is created.
* This method should be atomic and should only
* return when the chunk has been successfully stored in the database.
* Chunk should be stored with a status of {@link WorkChunkStatusEnum#QUEUED}
* Chunk should be stored with a status of {@link WorkChunkStatusEnum#READY} or
* {@link WorkChunkStatusEnum#GATE_WAITING} for ungated and gated jobs, respectively.
*
* @param theBatchWorkChunk the batch work chunk to be stored
* @return a globally unique identifier for this chunk.

View File

@ -82,7 +82,13 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
// once finished, create workchunks in READY state
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(
myJobDefinitionId, myJobDefinitionVersion, targetStepId, instanceId, sequence, dataValueString);
myJobDefinitionId,
myJobDefinitionVersion,
targetStepId,
instanceId,
sequence,
dataValueString,
myGatedExecution);
String chunkId = myHapiTransactionService
.withSystemRequestOnDefaultPartition()
.withPropagation(Propagation.REQUIRES_NEW)

View File

@ -24,7 +24,6 @@ import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
@ -45,8 +44,6 @@ import org.springframework.data.domain.Pageable;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class JobInstanceProcessor {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@ -100,10 +97,24 @@ public class JobInstanceProcessor {
myJobDefinitionegistry.getJobDefinitionOrThrowException(theInstance);
processPollingChunks(theInstance.getInstanceId());
enqueueReadyChunks(theInstance, jobDefinition, false);
cleanupInstance(theInstance);
triggerGatedExecutions(theInstance, jobDefinition);
JobInstance updatedInstance =
myJobPersistence.fetchInstance(theInstance.getInstanceId()).orElseThrow();
if (theInstance.hasGatedStep()) {
JobWorkCursor<?, ?, ?> jobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(
jobDefinition, updatedInstance.getCurrentGatedStepId());
if (jobWorkCursor.isReductionStep()) {
// Reduction step work chunks should never be sent to the queue but to its specific service instead.
triggerReductionStep(updatedInstance, jobWorkCursor);
return;
}
}
// enqueue the chunks as normal
enqueueReadyChunks(updatedInstance, jobDefinition);
ourLog.debug("Finished job processing: {} - {}", myInstanceId, stopWatch);
}
@ -190,22 +201,12 @@ public class JobInstanceProcessor {
JobWorkCursor<?, ?, ?> jobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(
theJobDefinition, theInstance.getCurrentGatedStepId());
String instanceId = theInstance.getInstanceId();
String currentStepId = jobWorkCursor.getCurrentStepId();
boolean canAdvance = canAdvanceGatedJob(theJobDefinition, theInstance, jobWorkCursor);
boolean canAdvance = canAdvanceGatedJob(theInstance);
if (canAdvance) {
if (jobWorkCursor.isReductionStep()) {
// current step is the reduction step (all reduction steps are final)
JobWorkCursor<?, ?, ?> nextJobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(
jobWorkCursor.getJobDefinition(), jobWorkCursor.getCurrentStepId());
myReductionStepExecutorService.triggerReductionStep(instanceId, nextJobWorkCursor);
} else if (jobWorkCursor.isFinalStep()) {
// current step is the final step in a non-reduction gated job
processChunksForNextGatedSteps(
theInstance, theJobDefinition, jobWorkCursor, jobWorkCursor.getCurrentStepId());
} else {
// all other gated job steps
if (!jobWorkCursor.isFinalStep()) {
// all other gated job steps except for final steps - final steps does not need to be advanced
String nextStepId = jobWorkCursor.nextStep.getStepId();
ourLog.info(
"All processing is complete for gated execution of instance {} step {}. Proceeding to step {}",
@ -213,8 +214,12 @@ public class JobInstanceProcessor {
currentStepId,
nextStepId);
// otherwise, continue processing as expected
processChunksForNextGatedSteps(theInstance, theJobDefinition, jobWorkCursor, nextStepId);
processChunksForNextGatedSteps(theInstance, nextStepId);
} else {
ourLog.info(
"Ready to advance gated execution of instance {} but already at the final step {}. Not proceeding to advance steps.",
instanceId,
jobWorkCursor.getCurrentStepId());
}
} else {
String stepId = jobWorkCursor.nextStep != null
@ -228,8 +233,7 @@ public class JobInstanceProcessor {
}
}
private boolean canAdvanceGatedJob(
JobDefinition<?> theJobDefinition, JobInstance theInstance, JobWorkCursor<?, ?, ?> theWorkCursor) {
private boolean canAdvanceGatedJob(JobInstance theInstance) {
// make sure our instance still exists
if (myJobPersistence.fetchInstance(theInstance.getInstanceId()).isEmpty()) {
// no more job
@ -240,36 +244,14 @@ public class JobInstanceProcessor {
Set<WorkChunkStatusEnum> workChunkStatuses = myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(
theInstance.getInstanceId(), currentGatedStepId);
// we only advance if all of the current steps workchunks are READY
if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.READY))) {
if (theWorkCursor.isFirstStep()) {
// first step - all ready means we're ready to proceed to the next step
return true;
} else {
// it's a future step;
// make sure previous step's workchunks are completed
JobDefinitionStep<?, ?, ?> previousStep =
theJobDefinition.getSteps().get(0);
for (JobDefinitionStep<?, ?, ?> step : theJobDefinition.getSteps()) {
if (step.getStepId().equalsIgnoreCase(currentGatedStepId)) {
break;
}
previousStep = step;
}
Set<WorkChunkStatusEnum> previousStepWorkChunkStates =
myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(
theInstance.getInstanceId(), previousStep.getStepId());
// completed means "all in COMPLETE state" or no previous chunks (they're cleaned up or never existed)
if (previousStepWorkChunkStates.isEmpty()
|| previousStepWorkChunkStates.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) {
return true;
}
}
if (workChunkStatuses.isEmpty()) {
// no work chunks = no output
// trivial to advance to next step
return true;
}
// anything else
return false;
// all workchunks for the current step are in COMPLETED -> proceed.
return workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED));
}
protected PagingIterator<WorkChunkMetadata> getReadyChunks() {
@ -283,39 +265,26 @@ public class JobInstanceProcessor {
});
}
/**
* Trigger the reduction step for the given job instance. Reduction step chunks should never be queued.
*/
private void triggerReductionStep(JobInstance theInstance, JobWorkCursor<?, ?, ?> jobWorkCursor) {
String instanceId = theInstance.getInstanceId();
ourLog.debug("Triggering Reduction step {} of instance {}.", jobWorkCursor.getCurrentStepId(), instanceId);
myReductionStepExecutorService.triggerReductionStep(instanceId, jobWorkCursor);
}
/**
* Chunks are initially created in READY state.
* We will move READY chunks to QUEUE'd and send them to the queue/topic (kafka)
* for processing.
*
* We could block chunks from being moved from QUEUE'd to READY here for gated steps
* but currently, progress is calculated by looking at completed chunks only;
* we'd need a new GATE_WAITING state to move chunks to to prevent jobs from
* completing prematurely.
*/
private void enqueueReadyChunks(
JobInstance theJobInstance, JobDefinition<?> theJobDefinition, boolean theIsGatedExecutionAdvancementBool) {
private void enqueueReadyChunks(JobInstance theJobInstance, JobDefinition<?> theJobDefinition) {
Iterator<WorkChunkMetadata> iter = getReadyChunks();
AtomicInteger counter = new AtomicInteger();
ConcurrentHashMap<String, JobWorkCursor<?, ?, ?>> stepToWorkCursor = new ConcurrentHashMap<>();
int counter = 0;
while (iter.hasNext()) {
WorkChunkMetadata metadata = iter.next();
JobWorkCursor<?, ?, ?> jobWorkCursor = stepToWorkCursor.computeIfAbsent(metadata.getTargetStepId(), (e) -> {
return JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, metadata.getTargetStepId());
});
counter.getAndIncrement();
if (!theIsGatedExecutionAdvancementBool
&& (theJobDefinition.isGatedExecution() || jobWorkCursor.isReductionStep())) {
/*
* Gated executions are queued later when all work chunks are ready.
*
* Reduction steps are not submitted to the queue at all, but processed inline.
* Currently all reduction steps are also gated, but this might not always
* be true.
*/
return;
}
/*
* For each chunk id
@ -325,9 +294,13 @@ public class JobInstanceProcessor {
* * commit
*/
updateChunkAndSendToQueue(metadata);
counter++;
}
ourLog.debug(
"Encountered {} READY work chunks for job {}", counter.get(), theJobDefinition.getJobDefinitionId());
"Encountered {} READY work chunks for job {} of type {}",
counter,
theJobInstance.getInstanceId(),
theJobDefinition.getJobDefinitionId());
}
/**
@ -370,53 +343,22 @@ public class JobInstanceProcessor {
myBatchJobSender.sendWorkChannelMessage(workNotification);
}
private void processChunksForNextGatedSteps(
JobInstance theInstance,
JobDefinition<?> theJobDefinition,
JobWorkCursor<?, ?, ?> theWorkCursor,
String nextStepId) {
private void processChunksForNextGatedSteps(JobInstance theInstance, String nextStepId) {
String instanceId = theInstance.getInstanceId();
List<String> readyChunksForNextStep =
myProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, WorkChunkStatusEnum.READY);
myProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, WorkChunkStatusEnum.GATE_WAITING);
int totalChunksForNextStep = myProgressAccumulator.getTotalChunkCountForInstanceAndStep(instanceId, nextStepId);
if (totalChunksForNextStep != readyChunksForNextStep.size()) {
ourLog.debug(
"Total ProgressAccumulator READY chunk count does not match READY chunk size! [instanceId={}, stepId={}, totalChunks={}, queuedChunks={}]",
"Total ProgressAccumulator GATE_WAITING chunk count does not match GATE_WAITING chunk size! [instanceId={}, stepId={}, totalChunks={}, queuedChunks={}]",
instanceId,
nextStepId,
totalChunksForNextStep,
readyChunksForNextStep.size());
}
// TODO
// create a new persistence transition for state advance
// update stepId to next step AND update all chunks in this step to READY (from GATED or QUEUED ;-P)
// so we can queue them safely.
// update the job step so the workers will process them.
// if it's the last (gated) step, there will be no change - but we should
// queue up the chunks anyways
boolean changed = theWorkCursor.isFinalStep()
|| myJobPersistence.updateInstance(instanceId, instance -> {
if (instance.getCurrentGatedStepId().equals(nextStepId)) {
// someone else beat us here. No changes
return false;
}
instance.setCurrentGatedStepId(nextStepId);
return true;
});
if (!changed) {
// we collided with another maintenance job.
ourLog.warn("Skipping gate advance to {} for instance {} - already advanced.", nextStepId, instanceId);
return;
}
ourLog.debug("Moving gated instance {} to next step.", theInstance.getInstanceId());
// because we now have all gated job chunks in READY state,
// we can enqueue them
enqueueReadyChunks(theInstance, theJobDefinition, true);
myJobPersistence.advanceJobStepAndUpdateChunkStatus(instanceId, nextStepId);
}
/**

View File

@ -36,6 +36,7 @@ public class WorkChunkCreateEvent {
public final String instanceId;
public final int sequence;
public final String serializedData;
public final boolean isGatedExecution;
/**
* Constructor
@ -52,20 +53,28 @@ public class WorkChunkCreateEvent {
@Nonnull String theTargetStepId,
@Nonnull String theInstanceId,
int theSequence,
@Nullable String theSerializedData) {
@Nullable String theSerializedData,
boolean theGatedExecution) {
jobDefinitionId = theJobDefinitionId;
jobDefinitionVersion = theJobDefinitionVersion;
targetStepId = theTargetStepId;
instanceId = theInstanceId;
sequence = theSequence;
serializedData = theSerializedData;
isGatedExecution = theGatedExecution;
}
/**
* Creates the WorkChunkCreateEvent for the first chunk of a job.
*/
public static WorkChunkCreateEvent firstChunk(JobDefinition<?> theJobDefinition, String theInstanceId) {
String firstStepId = theJobDefinition.getFirstStepId();
String jobDefinitionId = theJobDefinition.getJobDefinitionId();
int jobDefinitionVersion = theJobDefinition.getJobDefinitionVersion();
return new WorkChunkCreateEvent(jobDefinitionId, jobDefinitionVersion, firstStepId, theInstanceId, 0, null);
// the first chunk of a job is always READY, no matter whether the job is gated
boolean isGatedExecution = false;
return new WorkChunkCreateEvent(
jobDefinitionId, jobDefinitionVersion, firstStepId, theInstanceId, 0, null, isGatedExecution);
}
@Override
@ -83,6 +92,7 @@ public class WorkChunkCreateEvent {
.append(instanceId, that.instanceId)
.append(sequence, that.sequence)
.append(serializedData, that.serializedData)
.append(isGatedExecution, that.isGatedExecution)
.isEquals();
}
@ -95,6 +105,7 @@ public class WorkChunkCreateEvent {
.append(instanceId)
.append(sequence)
.append(serializedData)
.append(isGatedExecution)
.toHashCode();
}
}

View File

@ -31,17 +31,17 @@ import java.util.Set;
* @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md
*/
public enum WorkChunkStatusEnum {
// wipmb For 6.8 Add WAITING for gated, and READY for in db, but not yet sent to channel.
/**
* The initial state all workchunks start in
*/
READY,
GATE_WAITING,
QUEUED,
/**
* The state of workchunks that have been sent to the queue;
* or of workchunks that are about to be processed in a final
* reduction step (these workchunks are never queued)
*/
QUEUED,
/**
* The state of workchunks that are doing work.
*/
@ -84,6 +84,8 @@ public enum WorkChunkStatusEnum {
public Set<WorkChunkStatusEnum> getNextStates() {
switch (this) {
case GATE_WAITING:
return EnumSet.of(READY);
case READY:
return EnumSet.of(QUEUED);
case QUEUED:

View File

@ -73,6 +73,7 @@ public class InstanceProgress {
statusToCountMap.put(theChunk.getStatus(), statusToCountMap.getOrDefault(theChunk.getStatus(), 0) + 1);
switch (theChunk.getStatus()) {
case GATE_WAITING:
case READY:
case QUEUED:
case POLL_WAITING: