This commit is contained in:
leif stawnyczy 2024-03-06 16:05:43 -05:00
parent 358009f025
commit 7d5794e1fb
3 changed files with 13 additions and 17 deletions

View File

@ -76,7 +76,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -301,8 +300,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
theCallback.accept(updated);
if (updated == 1) {
myEntityManager.flush();
myEntityManager.unwrap(Session.class)
.doWork(Connection::commit);
myEntityManager.unwrap(Session.class).doWork(Connection::commit);
}
}

View File

@ -43,7 +43,6 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
/**

View File

@ -37,13 +37,10 @@ import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch;
import jakarta.persistence.EntityManager;
import org.apache.commons.lang3.time.DateUtils;
import org.hibernate.Session;
import org.slf4j.Logger;
import org.springframework.transaction.annotation.Propagation;
import java.sql.Connection;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
@ -271,9 +268,11 @@ public class JobInstanceProcessor {
myJobPersistence.enqueueWorkChunkForProcessing(chunkId, updated -> {
if (updated == 1) {
JobWorkCursor<?, ?, ?> jobWorkCursor =
JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, theChunk.getTargetStepId());
JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, theChunk.getTargetStepId());
if (theJobDefinition.isGatedExecution() && jobWorkCursor.isFinalStep() && jobWorkCursor.isReductionStep()) {
if (theJobDefinition.isGatedExecution()
&& jobWorkCursor.isFinalStep()
&& jobWorkCursor.isReductionStep()) {
// reduction steps are processed by
// ReductionStepExecutorServiceImpl
// which does not wait for steps off the queue but reads all the
@ -284,20 +283,20 @@ public class JobInstanceProcessor {
// send to the queue
// we use current step id because it has not been moved to the next step (yet)
JobWorkNotification workNotification = new JobWorkNotification(
theJobDefinition.getJobDefinitionId(),
theJobDefinition.getJobDefinitionVersion(),
theInstance.getInstanceId(),
theChunk.getTargetStepId(),
chunkId);
theJobDefinition.getJobDefinitionId(),
theJobDefinition.getJobDefinitionVersion(),
theInstance.getInstanceId(),
theChunk.getTargetStepId(),
chunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
} else {
// means the work chunk is likely already gone...
// we'll log and skip it. If it's still in the DB, the next pass
// will pick it up. Otherwise, it's no longer important
ourLog.error(
"Job Instance {} failed to transition work chunk with id {} from READY to QUEUED; skipping work chunk.",
theInstance.getInstanceId(),
theChunk.getId());
"Job Instance {} failed to transition work chunk with id {} from READY to QUEUED; skipping work chunk.",
theInstance.getInstanceId(),
theChunk.getId());
}
});
}