resolved review comments

This commit is contained in:
tyner 2024-04-11 15:03:24 -04:00
parent decdeafdc1
commit 292a4d3858
7 changed files with 24 additions and 96 deletions

View File

@ -610,14 +610,4 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
return changed;
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public int updateAllChunksForStepFromGateWaitingToReady(String theJobInstanceId, String theStepId) {
ourLog.debug(
"Updating chunk status from GATE_WAITING to READY for gated instance {} in step {}.",
theJobInstanceId,
theStepId);
return myWorkChunkRepository.updateAllChunksForStepFromGateWaitingToReady(theJobInstanceId, theStepId);
}
}

View File

@ -175,20 +175,6 @@ class JpaJobPersistenceImplTest {
assertEquals(instance.getInstanceId(), retInstance.get().getInstanceId());
}
@Test
void updateAllChunksForStepWithStatus_validRequest_callsPersistenceUpdateAndReturnsChanged() {
// setup
String instanceId = "jobId";
String nextStepId = "nextStep";
// execute
int changed = mySvc.updateAllChunksForStepFromGateWaitingToReady(instanceId, nextStepId);
// verify
assertEquals(0, changed);
verify(myWorkChunkRepository).updateAllChunksForStepFromGateWaitingToReady(instanceId, nextStepId);
}
private JobInstance createJobInstanceFromEntity(Batch2JobInstanceEntity theEntity) {
return JobInstanceUtil.fromEntityToInstance(theEntity);
}

View File

@ -437,35 +437,6 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
});
}
@Test
public void updateAllChunksForStepWithStatus_forGatedJob_updatesChunkStatus() {
// setup
JobInstance instance = createInstance(true, true);
String instanceId = mySvc.storeNewInstance(instance);
String chunkIdFirstStep = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null, true);
String chunkIdSecondStep1 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, true);
String chunkIdSecondStep2 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, true);
runInTransaction(() -> {
assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(chunkIdFirstStep).getStatus());
assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(chunkIdSecondStep1).getStatus());
assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(chunkIdSecondStep2).getStatus());
});
// execute
runInTransaction(() -> {
int numChanged = mySvc.updateAllChunksForStepFromGateWaitingToReady(instanceId, LAST_STEP_ID);
assertEquals(2, numChanged);
});
// verify
runInTransaction(() -> {
assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(chunkIdFirstStep).getStatus());
assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkIdSecondStep1).getStatus());
assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkIdSecondStep2).getStatus());
});
}
@Test
public void testFetchUnknownWork() {
assertFalse(myWorkChunkRepository.findById("FOO").isPresent());

View File

@ -245,10 +245,6 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa
return storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, theInstanceId, 0, CHUNK_DATA, theGatedExecution);
}
public String createChunkInStep(String theInstanceId, String theStepId, boolean theGatedExecution) {
return storeWorkChunk(JOB_DEFINITION_ID, theStepId, theInstanceId, 0, CHUNK_DATA, theGatedExecution);
}
public String createFirstChunk(JobDefinition<TestJobParameters> theJobDefinition, String theJobInstanceId){
return storeFirstWorkChunk(theJobDefinition, theJobInstanceId);
}

View File

@ -64,8 +64,10 @@ public interface ITestFixture {
String createChunk(String theJobInstanceId, boolean theGatedExecution);
String createChunkInStep(String theJobInstanceId, String theStepId, boolean theGatedExecution);
/**
* Create chunk as the first chunk of a job.
* @return the id of the created chunk
*/
String createFirstChunk(JobDefinition<TestJobParameters> theJobDefinition, String theJobInstanceId);
/**

View File

@ -21,7 +21,6 @@ package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters;
@ -33,8 +32,6 @@ import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import static org.junit.jupiter.api.Assertions.assertEquals;
public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkTestConstants {
@ -62,6 +59,7 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT
String myChunkId = getTestManager().createFirstChunk(jobDef, jobInstanceId);
WorkChunk fetchedWorkChunk = getTestManager().freshFetchWorkChunk(myChunkId);
// the first chunk of both gated and non-gated job should start in READY
assertEquals(WorkChunkStatusEnum.READY, fetchedWorkChunk.getStatus(), "New chunks are " + WorkChunkStatusEnum.READY);
}
@ -88,37 +86,31 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT
}
@Test
default void chunkReceived_forGatedJob_queuedToInProgress() throws InterruptedException {
PointcutLatch sendLatch = getTestManager().disableWorkChunkMessageHandler();
sendLatch.setExpectedCount(2);
default void advanceJobStepAndUpdateChunkStatus_forGatedJob_updatesBothREADYAndQUEUEDChunks() {
// setup
getTestManager().disableWorkChunkMessageHandler();
getTestManager().enableMaintenanceRunner(false);
String state = """
1|COMPLETED
2|COMPLETED
3|GATE_WAITING,3|READY
3|QUEUED,3|READY
""";
JobDefinition<TestJobParameters> jobDef = getTestManager().withJobDefinition(true);
String jobInstanceId = getTestManager().createAndStoreJobInstance(jobDef);
String chunkInStep1 = getTestManager().createFirstChunk(jobDef, jobInstanceId);
String chunkInStep2 = getTestManager().createChunkInStep(jobInstanceId, SECOND_STEP_ID, true);
// dequeue and completes the first chunk
getTestManager().runMaintenancePass();
// the worker has received the chunk, and marks it started.
WorkChunk chunk1 = getTestManager().getSvc().onWorkChunkDequeue(chunkInStep1).orElseThrow(IllegalArgumentException::new);
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk1.getStatus());
WorkChunk fetchedWorkChunk = getTestManager().freshFetchWorkChunk(chunkInStep1);
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, fetchedWorkChunk.getStatus());
JobMaintenanceStateInformation info = new JobMaintenanceStateInformation(jobInstanceId, jobDef, state);
getTestManager().createChunksInStates(info);
assertEquals(SECOND_STEP_ID, getTestManager().freshFetchJobInstance(jobInstanceId).getCurrentGatedStepId());
getTestManager().getSvc().onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkInStep1, 50, 0));
fetchedWorkChunk = getTestManager().freshFetchWorkChunk(chunkInStep1);
assertEquals(WorkChunkStatusEnum.COMPLETED, fetchedWorkChunk.getStatus());
// execute
getTestManager().runInTransaction(() -> getTestManager().getSvc().advanceJobStepAndUpdateChunkStatus(jobInstanceId, LAST_STEP_ID));
// dequeue the second chunk
getTestManager().runMaintenancePass();
WorkChunk chunk2 = getTestManager().getSvc().onWorkChunkDequeue(chunkInStep2).orElseThrow(IllegalArgumentException::new);
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk2.getStatus());
assertEquals(CHUNK_DATA, chunk2.getData());
WorkChunk fetchedWorkChunk2 = getTestManager().freshFetchWorkChunk(chunkInStep2);
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, fetchedWorkChunk2.getStatus());
getTestManager().verifyWorkChunkMessageHandlerCalled(sendLatch, 2);
// verify
assertEquals(LAST_STEP_ID, getTestManager().freshFetchJobInstance(jobInstanceId).getCurrentGatedStepId());
info.verifyFinalStates(getTestManager().getSvc());
}
@Test

View File

@ -290,13 +290,4 @@ public interface IJobPersistence extends IWorkChunkPersistence {
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
boolean advanceJobStepAndUpdateChunkStatus(String theJobInstanceId, String theNextStepId);
/**
* Update all chunks of the given step id for the given job from GATE_WAITING to READY
* @param theJobInstanceId the id of the job instance to be updated
* @param theStepId the id of the step which the chunks belong to
* @return the number of chunks updated
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
int updateAllChunksForStepFromGateWaitingToReady(String theJobInstanceId, String theStepId);
}