This commit is contained in:
tyner 2024-04-10 08:32:28 -04:00
parent 2518531417
commit d06337d103
4 changed files with 18 additions and 23 deletions

View File

@ -598,7 +598,8 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
theJobInstanceId,
theNextStepId);
// when we reach here, the current step id is equal to theNextStepId
int numChanged = myWorkChunkRepository.updateAllChunksForStepFromGateWaitingToReady(theJobInstanceId, theNextStepId);
int numChanged =
myWorkChunkRepository.updateAllChunksForStepFromGateWaitingToReady(theJobInstanceId, theNextStepId);
ourLog.debug(
"Updated {} chunks of gated instance {} for step {} from fake QUEUED to READY.",
numChanged,

View File

@ -114,15 +114,13 @@ public interface IBatch2WorkChunkRepository
// In order to keep them compatible, turn QUEUED chunks into READY, too.
// TODO: remove QUEUED from the in clause when we are certain that no one is still running the old code.
@Modifying
@Query(
"UPDATE Batch2WorkChunkEntity e SET e.myStatus = ca.uhn.fhir.batch2.model.WorkChunkStatusEnum.READY WHERE "
+ "e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId AND e.myStatus in ("
+ "ca.uhn.fhir.batch2.model.WorkChunkStatusEnum.GATE_WAITING,"
+ "ca.uhn.fhir.batch2.model.WorkChunkStatusEnum.QUEUED"
+ ")")
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = ca.uhn.fhir.batch2.model.WorkChunkStatusEnum.READY WHERE "
+ "e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId AND e.myStatus in ("
+ "ca.uhn.fhir.batch2.model.WorkChunkStatusEnum.GATE_WAITING,"
+ "ca.uhn.fhir.batch2.model.WorkChunkStatusEnum.QUEUED"
+ ")")
int updateAllChunksForStepFromGateWaitingToReady(
@Param("instanceId") String theInstanceId,
@Param("stepId") String theStepId);
@Param("instanceId") String theInstanceId, @Param("stepId") String theStepId);
@Modifying
@Query("DELETE FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId")

View File

@ -297,7 +297,5 @@ public interface IJobPersistence extends IWorkChunkPersistence {
* @return
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
int updateAllChunksForStepFromGateWaitingToReady(
String theJobInstanceId,
String theStepId);
int updateAllChunksForStepFromGateWaitingToReady(String theJobInstanceId, String theStepId);
}

View File

@ -24,7 +24,6 @@ import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
@ -45,7 +44,6 @@ import org.springframework.data.domain.Pageable;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class JobInstanceProcessor {
@ -240,9 +238,9 @@ public class JobInstanceProcessor {
return true;
}
// all workchunks for the current step are in COMPLETED -> proceed.
return workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED));
}
// all workchunks for the current step are in COMPLETED -> proceed.
return workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED));
}
protected PagingIterator<WorkChunkMetadata> getReadyChunks() {
return new PagingIterator<>(WORK_CHUNK_METADATA_BATCH_SIZE, (index, batchsize, consumer) -> {
@ -265,8 +263,7 @@ public class JobInstanceProcessor {
* we'd need a new GATE_WAITING state to move chunks to prevent jobs from
* completing prematurely.
*/
private void enqueueReadyChunks(
JobInstance theJobInstance, JobDefinition<?> theJobDefinition) {
private void enqueueReadyChunks(JobInstance theJobInstance, JobDefinition<?> theJobDefinition) {
Iterator<WorkChunkMetadata> iter = getReadyChunks();
AtomicInteger counter = new AtomicInteger();
@ -283,7 +280,10 @@ public class JobInstanceProcessor {
updateChunkAndSendToQueue(metadata);
}
ourLog.debug(
"Encountered {} READY work chunks for job {} of type {}", counter.get(), theJobInstance.getInstanceId(), theJobDefinition.getJobDefinitionId());
"Encountered {} READY work chunks for job {} of type {}",
counter.get(),
theJobInstance.getInstanceId(),
theJobDefinition.getJobDefinitionId());
}
/**
@ -326,9 +326,7 @@ public class JobInstanceProcessor {
myBatchJobSender.sendWorkChannelMessage(workNotification);
}
private void processChunksForNextGatedSteps(
JobInstance theInstance,
String nextStepId) {
private void processChunksForNextGatedSteps(JobInstance theInstance, String nextStepId) {
String instanceId = theInstance.getInstanceId();
List<String> readyChunksForNextStep =
myProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, WorkChunkStatusEnum.READY);