This commit is contained in:
leif stawnyczy 2024-03-06 15:26:31 -05:00
parent d7633a80a4
commit a3a864c4f6
7 changed files with 66 additions and 77 deletions

View File

@ -293,8 +293,8 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
public int enqueueWorkChunkForProcessing(String theChunkId) {
return myWorkChunkRepository.updateChunkStatus(theChunkId,
WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.READY);
return myWorkChunkRepository.updateChunkStatus(
theChunkId, WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.READY);
}
@Override
@ -410,7 +410,8 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
}
@Override
public Set<WorkChunkStatusEnum> getDistinctWorkChunkStatesForJobAndStep(String theInstanceId, String theCurrentStepId) {
public Set<WorkChunkStatusEnum> getDistinctWorkChunkStatesForJobAndStep(
String theInstanceId, String theCurrentStepId) {
if (getRunningJob(theInstanceId) == null) {
return Collections.unmodifiableSet(new HashSet<>());
}
@ -484,9 +485,11 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
}
@Override
public Stream<WorkChunk> fetchAllWorkChunksForJobInStates(String theInstanceId, Set<WorkChunkStatusEnum> theWorkChunkStatuses) {
return myWorkChunkRepository.fetchChunksForJobInStates(theInstanceId, theWorkChunkStatuses)
.map(this::toChunk);
public Stream<WorkChunk> fetchAllWorkChunksForJobInStates(
String theInstanceId, Set<WorkChunkStatusEnum> theWorkChunkStatuses) {
return myWorkChunkRepository
.fetchChunksForJobInStates(theInstanceId, theWorkChunkStatuses)
.map(this::toChunk);
}
@Override

View File

@ -63,11 +63,9 @@ public interface IBatch2WorkChunkRepository
Stream<Batch2WorkChunkEntity> fetchChunksForStep(
@Param("instanceId") String theInstanceId, @Param("targetStepId") String theTargetStepId);
@Query(
"SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId AND e.myStatus IN :states"
)
@Query("SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId AND e.myStatus IN :states")
Stream<Batch2WorkChunkEntity> fetchChunksForJobInStates(
@Param("instanceId") String theInstanceId, @Param("states") Collection<WorkChunkStatusEnum> theStates);
@Param("instanceId") String theInstanceId, @Param("states") Collection<WorkChunkStatusEnum> theStates);
@Modifying
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, "
@ -109,14 +107,11 @@ public interface IBatch2WorkChunkRepository
@Param("startStatuses") Collection<WorkChunkStatusEnum> theStartStatuses);
@Modifying
@Query(
"UPDATE Batch2WorkChunkEntity e SET e.myStatus = :newStatus WHERE e.myId = :id AND e.myStatus = :oldStatus"
)
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :newStatus WHERE e.myId = :id AND e.myStatus = :oldStatus")
int updateChunkStatus(
@Param("id") String theChunkId,
@Param("newStatus") WorkChunkStatusEnum theNewStatus,
@Param("oldStatus") WorkChunkStatusEnum theOldStatus
);
@Param("id") String theChunkId,
@Param("newStatus") WorkChunkStatusEnum theNewStatus,
@Param("oldStatus") WorkChunkStatusEnum theOldStatus);
@Modifying
@Query("DELETE FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId")

View File

@ -27,7 +27,6 @@ import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.slf4j.Logger;
@ -150,7 +149,8 @@ public interface IJobPersistence extends IWorkChunkPersistence {
* @return a stream of work chunks
*/
@Transactional
Stream<WorkChunk> fetchAllWorkChunksForJobInStates(String theInstanceId, Set<WorkChunkStatusEnum> theWorkChunkStatuses);
Stream<WorkChunk> fetchAllWorkChunksForJobInStates(
String theInstanceId, Set<WorkChunkStatusEnum> theWorkChunkStatuses);
/**
* Callback to update a JobInstance within a locked transaction.

View File

@ -106,19 +106,17 @@ public abstract class BaseBatch2Config {
WorkChunkProcessor theExecutor,
IReductionStepExecutorService theReductionStepExecutorService,
IHapiTransactionService theTransactionService,
EntityManager theEntityManager
) {
EntityManager theEntityManager) {
return new JobMaintenanceServiceImpl(
theSchedulerService,
myPersistence,
theStorageSettings,
theJobDefinitionRegistry,
theBatchJobSender,
theExecutor,
theReductionStepExecutorService,
theTransactionService,
theEntityManager
);
theSchedulerService,
myPersistence,
theStorageSettings,
theJobDefinitionRegistry,
theBatchJobSender,
theExecutor,
theReductionStepExecutorService,
theTransactionService,
theEntityManager);
}
@Bean

View File

@ -28,7 +28,6 @@ import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.i18n.Msg;
@ -48,8 +47,6 @@ import org.slf4j.Logger;
import org.springframework.data.domain.Page;
import org.springframework.messaging.MessageHandler;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.util.Arrays;
import java.util.HashSet;

View File

@ -32,11 +32,9 @@ import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch;
import jakarta.persistence.EntityManager;
@ -74,8 +72,7 @@ public class JobInstanceProcessor {
IReductionStepExecutorService theReductionStepExecutorService,
JobDefinitionRegistry theJobDefinitionRegistry,
EntityManager theEntityManager,
IHapiTransactionService theTransactionService
) {
IHapiTransactionService theTransactionService) {
myJobPersistence = theJobPersistence;
myBatchJobSender = theBatchJobSender;
myInstanceId = theInstanceId;
@ -106,7 +103,7 @@ public class JobInstanceProcessor {
}
JobDefinition<? extends IModelJson> jobDefinition =
myJobDefinitionegistry.getJobDefinitionOrThrowException(theInstance);
myJobDefinitionegistry.getJobDefinitionOrThrowException(theInstance);
enqueueReadyChunks(theInstance, jobDefinition);
cleanupInstance(theInstance);
@ -194,8 +191,8 @@ public class JobInstanceProcessor {
return;
}
JobWorkCursor<?, ?, ?> jobWorkCursor =
JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, theInstance.getCurrentGatedStepId());
JobWorkCursor<?, ?, ?> jobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(
theJobDefinition, theInstance.getCurrentGatedStepId());
// final step
if (jobWorkCursor.isFinalStep() && !jobWorkCursor.isReductionStep()) {
@ -216,7 +213,7 @@ public class JobInstanceProcessor {
if (jobWorkCursor.nextStep.isReductionStep()) {
JobWorkCursor<?, ?, ?> nextJobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(
jobWorkCursor.getJobDefinition(), jobWorkCursor.nextStep.getStepId());
jobWorkCursor.getJobDefinition(), jobWorkCursor.nextStep.getStepId());
myReductionStepExecutorService.triggerReductionStep(instanceId, nextJobWorkCursor);
} else {
// otherwise, continue processing as expected
@ -227,7 +224,7 @@ public class JobInstanceProcessor {
"Not ready to advance gated execution of instance {} from step {} to {}.",
instanceId,
currentStepId,
jobWorkCursor.nextStep.getStepId());
jobWorkCursor.nextStep.getStepId());
}
}
@ -245,30 +242,27 @@ public class JobInstanceProcessor {
// we need a transaction to access the stream of workchunks
// because workchunks are created in READY state, there's an unknown
// number of them (and so we could be reading many from the db)
getTxBuilder()
.withPropagation(Propagation.REQUIRES_NEW)
.execute(() -> {
Stream<WorkChunk> readyChunks = myJobPersistence.fetchAllWorkChunksForJobInStates(theJobInstance.getInstanceId(),
Set.of(WorkChunkStatusEnum.READY));
getTxBuilder().withPropagation(Propagation.REQUIRES_NEW).execute(() -> {
Stream<WorkChunk> readyChunks = myJobPersistence.fetchAllWorkChunksForJobInStates(
theJobInstance.getInstanceId(), Set.of(WorkChunkStatusEnum.READY));
readyChunks.forEach(chunk -> {
/*
* For each chunk id
* * Move to QUEUE'd
* * Send to topic
* * flush changes
* * commit
*/
getTxBuilder().execute(() -> {
if (updateChunkAndSendToQueue(chunk, theJobInstance, theJobDefinition)) {
// flush this transaction
myEntityManager.flush();
myEntityManager.unwrap(Session.class)
.doWork(Connection::commit);
}
});
readyChunks.forEach(chunk -> {
/*
* For each chunk id
* * Move to QUEUE'd
* * Send to topic
* * flush changes
* * commit
*/
getTxBuilder().execute(() -> {
if (updateChunkAndSendToQueue(chunk, theJobInstance, theJobDefinition)) {
// flush this transaction
myEntityManager.flush();
myEntityManager.unwrap(Session.class).doWork(Connection::commit);
}
});
});
});
}
/**
@ -280,13 +274,14 @@ public class JobInstanceProcessor {
*
* Returns true after processing.
*/
private boolean updateChunkAndSendToQueue(WorkChunk theChunk, JobInstance theInstance, JobDefinition<?> theJobDefinition) {
private boolean updateChunkAndSendToQueue(
WorkChunk theChunk, JobInstance theInstance, JobDefinition<?> theJobDefinition) {
String chunkId = theChunk.getId();
int updated = myJobPersistence.enqueueWorkChunkForProcessing(chunkId);
if (updated == 1) {
JobWorkCursor<?, ?, ?> jobWorkCursor =
JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, theChunk.getTargetStepId());
JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, theChunk.getTargetStepId());
if (theJobDefinition.isGatedExecution() && jobWorkCursor.isFinalStep() && jobWorkCursor.isReductionStep()) {
// reduction steps are processed by
@ -299,16 +294,21 @@ public class JobInstanceProcessor {
// send to the queue
// we use current step id because it has not been moved to the next step (yet)
JobWorkNotification workNotification = new JobWorkNotification(
theJobDefinition.getJobDefinitionId(), theJobDefinition.getJobDefinitionVersion(),
theInstance.getInstanceId(), theChunk.getTargetStepId(), chunkId);
theJobDefinition.getJobDefinitionId(),
theJobDefinition.getJobDefinitionVersion(),
theInstance.getInstanceId(),
theChunk.getTargetStepId(),
chunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
return true;
} else {
// means the work chunk is likely already gone...
// we'll log and skip it. If it's still in the DB, the next pass
// will pick it up. Otherwise, it's no longer important
ourLog.error("Job Instance {} failed to transition work chunk with id {} from READY to QUEUED; skipping work chunk.",
theInstance.getInstanceId(), theChunk.getId());
ourLog.error(
"Job Instance {} failed to transition work chunk with id {} from READY to QUEUED; skipping work chunk.",
theInstance.getInstanceId(),
theChunk.getId());
// nothing changed, nothing to commit
return false;
@ -316,8 +316,7 @@ public class JobInstanceProcessor {
}
private IHapiTransactionService.IExecutionBuilder getTxBuilder() {
return myTransactionService.withSystemRequest()
.withRequestPartitionId(RequestPartitionId.allPartitions());
return myTransactionService.withSystemRequest().withRequestPartitionId(RequestPartitionId.allPartitions());
}
private void processChunksForNextSteps(JobInstance theInstance, String nextStepId) {

View File

@ -27,7 +27,6 @@ import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
@ -114,8 +113,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
@Nonnull WorkChunkProcessor theExecutor,
@Nonnull IReductionStepExecutorService theReductionStepExecutorService,
IHapiTransactionService theTransactionService,
EntityManager theEntityManager
) {
EntityManager theEntityManager) {
myStorageSettings = theStorageSettings;
myReductionStepExecutorService = theReductionStepExecutorService;
Validate.notNull(theSchedulerService);
@ -245,8 +243,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
myReductionStepExecutorService,
myJobDefinitionRegistry,
myEntityManager,
myTransactionService
);
myTransactionService);
ourLog.debug(
"Triggering maintenance process for instance {} in status {}",
instanceId,