last-second-fetch

This commit is contained in:
Tadgh 2022-11-14 20:11:24 -08:00
parent 8c896fce53
commit 92b927020a
3 changed files with 17 additions and 9 deletions

View File

@ -297,7 +297,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
});
}
private void fetchChunksForStep(String theInstanceId, String theStepId, int thePageSize, int thePageIndex, Consumer<WorkChunk> theConsumer) {
public void fetchChunksForStep(String theInstanceId, String theStepId, int thePageSize, int thePageIndex, Consumer<WorkChunk> theConsumer) {
myTxTemplate.executeWithoutResult(tx -> {
List<Batch2WorkChunkEntity> chunks = myWorkChunkRepository.fetchChunksForStep(PageRequest.of(thePageIndex, thePageSize), theInstanceId, theStepId);
for (Batch2WorkChunkEntity chunk : chunks) {

View File

@ -35,6 +35,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
public interface IJobPersistence {
@ -177,6 +178,9 @@ public interface IJobPersistence {
*/
List<WorkChunk> fetchWorkChunksWithoutData(String theInstanceId, int thePageSize, int thePageIndex);
void fetchChunksForStep(String theInstanceId, String theStepId, int thePageSize, int thePageIndex, Consumer<WorkChunk> theConsumer);
/**
* Fetch all chunks for a given instance.
* @param theInstanceId - instance id

View File

@ -34,6 +34,7 @@ import ca.uhn.fhir.jpa.batch.log.Logs;
import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@ -175,11 +176,14 @@ public class JobInstanceProcessor {
if (totalChunksForNextStep != queuedChunksForNextStep.size()) {
ourLog.error("Total chunk size to submit for next step does not match QUEUED chunk size! [instanceId={}, stepId={}, totalChunks={}, queuedChunks={}]", instanceId, nextStepId, totalChunksForNextStep, queuedChunksForNextStep.size());
}
for (String nextChunkId : queuedChunksForNextStep) {
List<String> chunksToSubmit = new ArrayList<>();
myJobPersistence.fetchChunksForStep(instanceId, nextStepId, 10000, 0, chunk -> chunksToSubmit.add(chunk.getId()));
// for (String nextChunkId : queuedChunksForNextStep) {
for (String nextChunkId : chunksToSubmit) {
JobWorkNotification workNotification = new JobWorkNotification(myInstance, nextStepId, nextChunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
}
ourLog.debug("Submitted a batch of chunks for processing. [chunkCount={}, instanceId={}, stepId={}]", queuedChunksForNextStep.size(), instanceId, nextStepId);
ourLog.debug("Submitted a batch of chunks for processing. [chunkCount={}, instanceId={}, stepId={}]", chunksToSubmit.size(), instanceId, nextStepId);
myInstance.setCurrentGatedStepId(nextStepId);
myJobPersistence.updateInstance(myInstance);
}