add implementation

This commit is contained in:
Tadgh 2022-11-14 18:06:44 -08:00
parent a2ea7ffe48
commit 138be3dd4e
5 changed files with 52 additions and 5 deletions

View File

@ -35,7 +35,6 @@ import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.util.JobInstanceUtil;
import ca.uhn.fhir.model.api.PagingIterator;
import ca.uhn.fhir.narrative.BaseThymeleafNarrativeGenerator;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
@ -270,6 +269,15 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
myWorkChunkRepository.incrementWorkChunkErrorCount(theChunkId, theIncrementBy);
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId) {
List<StatusEnum> statusesForStep = myWorkChunkRepository.getDistinctStatusesForStep(theInstanceId, theCurrentStepId);
ourLog.debug("Checking whether gated job can advanced to next step. [instanceId={}, currentStepId={}, statusesForStep={}]", theInstanceId, theCurrentStepId, statusesForStep);
boolean canAdvance = statusesForStep.contains(StatusEnum.COMPLETED) && !statusesForStep.contains(StatusEnum.QUEUED) && !statusesForStep.contains(StatusEnum.IN_PROGRESS);
return canAdvance;
}
/**
* Note: Not @Transactional because {@link #fetchChunks(String, boolean, int, int, Consumer)} starts a transaction
*/

View File

@ -36,6 +36,9 @@ public interface IBatch2WorkChunkRepository extends JpaRepository<Batch2WorkChun
@Query("SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC")
List<Batch2WorkChunkEntity> fetchChunks(Pageable thePageRequest, @Param("instanceId") String theInstanceId);
@Query("SELECT DISTINCT e.myStatus from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId")
List<StatusEnum> getDistinctStatusesForStep(@Param("instanceId") String theInstanceId, @Param("stepId") String theStepId);
@Query("SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId AND e.myTargetStepId = :targetStepId ORDER BY e.mySequence ASC")
List<Batch2WorkChunkEntity> fetchChunksForStep(Pageable thePageRequest, @Param("instanceId") String theInstanceId, @Param("targetStepId") String theTargetStepId);

View File

@ -305,6 +305,25 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertEquals(1, chunks.size());
assertEquals(5, chunks.get(0).getErrorCount());
}
@Test
public void testGatedAdvancementByStatus() {
// Setup
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
// Execute
mySvc.canAdvanceInstanceToNextStep(instanceId, STEP_CHUNK_ID);
// Verify
List<WorkChunk> chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 100, 0);
assertEquals(1, chunks.size());
assertEquals(5, chunks.get(0).getErrorCount());
}
@Test
public void testMarkChunkAsCompleted_Error() {

View File

@ -28,6 +28,8 @@ import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import org.springframework.data.domain.Page;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.util.Iterator;
import java.util.List;
@ -163,6 +165,9 @@ public interface IJobPersistence {
*/
void incrementWorkChunkErrorCount(String theChunkId, int theIncrementBy);
@Transactional(propagation = Propagation.REQUIRES_NEW)
boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId);
/**
* Fetches all chunks for a given instance, without loading the data
*

View File

@ -144,7 +144,15 @@ public class JobInstanceProcessor {
int incompleteChunks = myProgressAccumulator.countChunksWithStatus(instanceId, currentStepId, StatusEnum.getIncompleteStatuses());
ourLog.debug("Considering whether to advance gated execution. [totalChunks={},incompleteChunks={},instanceId={},stepId={}", totalChunks, incompleteChunks, instanceId, currentStepId);
if (incompleteChunks == 0) {
boolean shouldAdvance = myJobPersistence.canAdvanceInstanceToNextStep(instanceId, currentStepId);
if (incompleteChunks == 0 && !shouldAdvance) {
ourLog.debug("Hello! If you see this, it means the old method decided to advance, and the new one didnt!");
} else if (incompleteChunks == 0 && shouldAdvance) {
ourLog.debug("If you see this message, it means both our advancement algorithms agreed!");
} else if (incompleteChunks != 0 && shouldAdvance) {
ourLog.debug("If you see this message, it means our advancement algorithms disagreed, but the newer one thinks we should advance!");
}
if (shouldAdvance) {
String nextStepId = jobWorkCursor.nextStep.getStepId();
ourLog.info("All processing is complete for gated execution of instance {} step {}. Proceeding to step {}", instanceId, currentStepId, nextStepId);
@ -162,12 +170,16 @@ public class JobInstanceProcessor {
}
private void processChunksForNextSteps(String instanceId, String nextStepId) {
List<String> chunksForNextStep = myProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, StatusEnum.QUEUED);
for (String nextChunkId : chunksForNextStep) {
List<String> queuedChunksForNextStep = myProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, StatusEnum.QUEUED);
int totalChunksForNextStep = myProgressAccumulator.getTotalChunkCountForInstanceAndStep(instanceId, nextStepId);
if (totalChunksForNextStep != queuedChunksForNextStep.size()) {
ourLog.error("Total chunk size to submit for next step does not match QUEUED chunk size! [instanceId={}, stepId={}, totalChunks={}, queuedChunks={}]", instanceId, nextStepId, totalChunksForNextStep, queuedChunksForNextStep.size());
}
for (String nextChunkId : queuedChunksForNextStep) {
JobWorkNotification workNotification = new JobWorkNotification(myInstance, nextStepId, nextChunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
}
ourLog.debug("Submitted a batch of chunks for processing. [chunkCount={}, instanceId={}, stepId={}]", chunksForNextStep.size(), instanceId, nextStepId);
ourLog.debug("Submitted a batch of chunks for processing. [chunkCount={}, instanceId={}, stepId={}]", queuedChunksForNextStep.size(), instanceId, nextStepId);
myInstance.setCurrentGatedStepId(nextStepId);
myJobPersistence.updateInstance(myInstance);
}