resolved review comments

This commit is contained in:
tyner 2024-04-12 11:27:05 -04:00
parent 7cf197c875
commit 0569ddc812
2 changed files with 23 additions and 19 deletions

View File

@ -79,7 +79,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
public static final String JOB_DEFINITION_ID = "definition-id";
public static final String TARGET_STEP_ID = TestJobDefinitionUtils.FIRST_STEP_ID;
public static final String FIRST_STEP_ID = TestJobDefinitionUtils.FIRST_STEP_ID;
public static final String LAST_STEP_ID = TestJobDefinitionUtils.LAST_STEP_ID;
public static final String DEF_CHUNK_ID = "definition-chunkId";
public static final String STEP_CHUNK_ID = TestJobDefinitionUtils.FIRST_STEP_ID;
@ -112,7 +112,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
for (int i = 0; i < 10; i++) {
storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, i, JsonUtil.serialize(new NdJsonFileJson().setNdJsonText("{}")), false);
storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, i, JsonUtil.serialize(new NdJsonFileJson().setNdJsonText("{}")), false);
}
// Execute
@ -247,8 +247,8 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
// Setup
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA, false);
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(JOB_DEFINITION_ID, JOB_DEF_VER, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA, false);
storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, CHUNK_DATA, false);
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(JOB_DEFINITION_ID, JOB_DEF_VER, FIRST_STEP_ID, instanceId, 0, CHUNK_DATA, false);
String chunkId = mySvc.onWorkChunkCreate(batchWorkChunk);
Optional<Batch2WorkChunkEntity> byId = myWorkChunkRepository.findById(chunkId);
Batch2WorkChunkEntity entity = byId.get();
@ -399,7 +399,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String chunkIdSecondStep1 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, isGatedExecution);
String chunkIdSecondStep2 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, isGatedExecution);
runInTransaction(() -> assertEquals(TARGET_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId()));
runInTransaction(() -> assertEquals(FIRST_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId()));
// execute
runInTransaction(() -> {
@ -424,11 +424,11 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String chunkIdSecondStep1 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, isGatedExecution);
String chunkIdSecondStep2 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, isGatedExecution);
runInTransaction(() -> assertEquals(TARGET_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId()));
runInTransaction(() -> assertEquals(FIRST_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId()));
// execute
runInTransaction(() -> {
boolean changed = mySvc.advanceJobStepAndUpdateChunkStatus(instanceId, TARGET_STEP_ID);
boolean changed = mySvc.advanceJobStepAndUpdateChunkStatus(instanceId, FIRST_STEP_ID);
assertFalse(changed);
});
@ -436,7 +436,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
runInTransaction(() -> {
assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(chunkIdSecondStep1).getStatus());
assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(chunkIdSecondStep2).getStatus());
assertEquals(TARGET_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId());
assertEquals(FIRST_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId());
});
}
@ -456,6 +456,10 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String instanceId = mySvc.storeNewInstance(instance);
// execute & verify
String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, null);
// mark the first chunk as COMPLETED to allow step advance
runInTransaction(() -> myWorkChunkRepository.updateChunkStatus(firstChunkId, WorkChunkStatusEnum.COMPLETED, WorkChunkStatusEnum.READY));
String id = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, theGatedExecution);
runInTransaction(() -> assertEquals(theExpectedStatusOnCreate, findChunkByIdOrThrow(id).getStatus()));
myBatch2JobHelper.runMaintenancePass();
@ -476,7 +480,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String instanceId = mySvc.storeNewInstance(instance);
// execute & verify
String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, expectedFirstChunkData);
String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, expectedFirstChunkData);
String secondChunkId = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, expectedSecondChunkData, isGatedExecution);
runInTransaction(() -> {
@ -521,9 +525,9 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String queuedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, "some data", isGatedExecution);
String erroredId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 1, "some more data", isGatedExecution);
String completedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 2, "some more data", isGatedExecution);
String queuedId = storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, "some data", isGatedExecution);
String erroredId = storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 1, "some more data", isGatedExecution);
String completedId = storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 2, "some more data", isGatedExecution);
mySvc.onWorkChunkDequeue(erroredId);
WorkChunkErrorEvent parameters = new WorkChunkErrorEvent(erroredId, "Our error message");
@ -547,7 +551,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertEquals(JOB_DEFINITION_ID, workChunk.getJobDefinitionId());
assertEquals(JOB_DEF_VER, workChunk.getJobDefinitionVersion());
assertEquals(instanceId, workChunk.getInstanceId());
assertEquals(TARGET_STEP_ID, workChunk.getTargetStepId());
assertEquals(FIRST_STEP_ID, workChunk.getTargetStepId());
assertEquals(0, workChunk.getSequence());
assertEquals(WorkChunkStatusEnum.READY, workChunk.getStatus());
@ -587,9 +591,15 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
"true, GATE_WAITING, QUEUED"
})
public void testStoreAndFetchWorkChunk_withOrWithoutGatedExecutionwithData_createdAndTransitionToExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum theExpectedCreatedStatus, WorkChunkStatusEnum theExpectedTransitionStatus) {
// setup
JobInstance instance = createInstance(true, theGatedExecution);
String instanceId = mySvc.storeNewInstance(instance);
// execute & verify
String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, null);
// mark the first chunk as COMPLETED to allow step advance
runInTransaction(() -> myWorkChunkRepository.updateChunkStatus(firstChunkId, WorkChunkStatusEnum.COMPLETED, WorkChunkStatusEnum.READY));
String id = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, CHUNK_DATA, theGatedExecution);
assertNotNull(id);
runInTransaction(() -> assertEquals(theExpectedCreatedStatus, findChunkByIdOrThrow(id).getStatus()));

View File

@ -241,12 +241,6 @@ public class JobInstanceProcessor {
Set<WorkChunkStatusEnum> workChunkStatuses = myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(
theInstance.getInstanceId(), currentGatedStepId);
if (workChunkStatuses.isEmpty()) {
// no work chunks = no output
// trivial to advance to next step
return true;
}
// all workchunks for the current step are in COMPLETED -> proceed.
return workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED));
}