diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java index ba0c4ac7213..6de2a2feb5e 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java @@ -76,7 +76,6 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.function.Consumer; -import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -301,8 +300,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence { theCallback.accept(updated); if (updated == 1) { myEntityManager.flush(); - myEntityManager.unwrap(Session.class) - .doWork(Connection::commit); + myEntityManager.unwrap(Session.class).doWork(Connection::commit); } } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java index e3ec7bbcf4d..5ea7550cf54 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java @@ -43,7 +43,6 @@ import java.util.List; import java.util.Optional; import java.util.Set; import java.util.function.Consumer; -import java.util.function.Supplier; import java.util.stream.Stream; /** diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index 6f6f91d9f6b..b49e611ce08 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -37,13 +37,10 @@ import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.util.Logs; import ca.uhn.fhir.util.StopWatch; -import jakarta.persistence.EntityManager; import org.apache.commons.lang3.time.DateUtils; -import org.hibernate.Session; import org.slf4j.Logger; import org.springframework.transaction.annotation.Propagation; -import java.sql.Connection; import java.util.List; import java.util.Set; import java.util.stream.Stream; @@ -271,9 +268,11 @@ public class JobInstanceProcessor { myJobPersistence.enqueueWorkChunkForProcessing(chunkId, updated -> { if (updated == 1) { JobWorkCursor jobWorkCursor = - JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, theChunk.getTargetStepId()); + JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, theChunk.getTargetStepId()); - if (theJobDefinition.isGatedExecution() && jobWorkCursor.isFinalStep() && jobWorkCursor.isReductionStep()) { + if (theJobDefinition.isGatedExecution() + && jobWorkCursor.isFinalStep() + && jobWorkCursor.isReductionStep()) { // reduction steps are processed by // ReductionStepExecutorServiceImpl // which does not wait for steps off the queue but reads all the @@ -284,20 +283,20 @@ public class JobInstanceProcessor { // send to the queue // we use current step id because it has not been moved to the next step (yet) JobWorkNotification workNotification = new JobWorkNotification( - theJobDefinition.getJobDefinitionId(), - theJobDefinition.getJobDefinitionVersion(), - theInstance.getInstanceId(), - theChunk.getTargetStepId(), - chunkId); + theJobDefinition.getJobDefinitionId(), + theJobDefinition.getJobDefinitionVersion(), + theInstance.getInstanceId(), + theChunk.getTargetStepId(), + chunkId); myBatchJobSender.sendWorkChannelMessage(workNotification); } else { // means the work chunk is likely already gone... // we'll log and skip it. If it's still in the DB, the next pass // will pick it up. Otherwise, it's no longer important ourLog.error( - "Job Instance {} failed to transition work chunk with id {} from READY to QUEUED; skipping work chunk.", - theInstance.getInstanceId(), - theChunk.getId()); + "Job Instance {} failed to transition work chunk with id {} from READY to QUEUED; skipping work chunk.", + theInstance.getInstanceId(), + theChunk.getId()); } }); }