From a3a864c4f6aebf55503d0f72e2d582729ec67e02 Mon Sep 17 00:00:00 2001 From: leif stawnyczy Date: Wed, 6 Mar 2024 15:26:31 -0500 Subject: [PATCH] spotless --- .../jpa/batch2/JpaJobPersistenceImpl.java | 15 ++-- .../dao/data/IBatch2WorkChunkRepository.java | 17 ++--- .../uhn/fhir/batch2/api/IJobPersistence.java | 4 +- .../fhir/batch2/config/BaseBatch2Config.java | 22 +++--- .../coordinator/JobCoordinatorImpl.java | 3 - .../maintenance/JobInstanceProcessor.java | 75 +++++++++---------- .../JobMaintenanceServiceImpl.java | 7 +- 7 files changed, 66 insertions(+), 77 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java index c2337e1a969..18f3c62523d 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java @@ -293,8 +293,8 @@ public class JpaJobPersistenceImpl implements IJobPersistence { @Override public int enqueueWorkChunkForProcessing(String theChunkId) { - return myWorkChunkRepository.updateChunkStatus(theChunkId, - WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.READY); + return myWorkChunkRepository.updateChunkStatus( + theChunkId, WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.READY); } @Override @@ -410,7 +410,8 @@ public class JpaJobPersistenceImpl implements IJobPersistence { } @Override - public Set getDistinctWorkChunkStatesForJobAndStep(String theInstanceId, String theCurrentStepId) { + public Set getDistinctWorkChunkStatesForJobAndStep( + String theInstanceId, String theCurrentStepId) { if (getRunningJob(theInstanceId) == null) { return Collections.unmodifiableSet(new HashSet<>()); } @@ -484,9 +485,11 @@ public class JpaJobPersistenceImpl implements IJobPersistence { } @Override - public Stream fetchAllWorkChunksForJobInStates(String theInstanceId, Set theWorkChunkStatuses) { - return myWorkChunkRepository.fetchChunksForJobInStates(theInstanceId, theWorkChunkStatuses) - .map(this::toChunk); + public Stream fetchAllWorkChunksForJobInStates( + String theInstanceId, Set theWorkChunkStatuses) { + return myWorkChunkRepository + .fetchChunksForJobInStates(theInstanceId, theWorkChunkStatuses) + .map(this::toChunk); } @Override diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java index 3573595ab6d..f67cf7eae7e 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java @@ -63,11 +63,9 @@ public interface IBatch2WorkChunkRepository Stream fetchChunksForStep( @Param("instanceId") String theInstanceId, @Param("targetStepId") String theTargetStepId); - @Query( - "SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId AND e.myStatus IN :states" - ) + @Query("SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId AND e.myStatus IN :states") Stream fetchChunksForJobInStates( - @Param("instanceId") String theInstanceId, @Param("states") Collection theStates); + @Param("instanceId") String theInstanceId, @Param("states") Collection theStates); @Modifying @Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, " @@ -109,14 +107,11 @@ public interface IBatch2WorkChunkRepository @Param("startStatuses") Collection theStartStatuses); @Modifying - @Query( - "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :newStatus WHERE e.myId = :id AND e.myStatus = :oldStatus" - ) + @Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :newStatus WHERE e.myId = :id AND e.myStatus = :oldStatus") int updateChunkStatus( - @Param("id") String theChunkId, - @Param("newStatus") WorkChunkStatusEnum theNewStatus, - @Param("oldStatus") WorkChunkStatusEnum theOldStatus - ); + @Param("id") String theChunkId, + @Param("newStatus") WorkChunkStatusEnum theNewStatus, + @Param("oldStatus") WorkChunkStatusEnum theOldStatus); @Modifying @Query("DELETE FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId") diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java index 4c376803459..3f3762c9849 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java @@ -27,7 +27,6 @@ import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; -import com.google.common.annotations.VisibleForTesting; import jakarta.annotation.Nonnull; import org.apache.commons.lang3.builder.ToStringBuilder; import org.slf4j.Logger; @@ -150,7 +149,8 @@ public interface IJobPersistence extends IWorkChunkPersistence { * @return a stream of work chunks */ @Transactional - Stream fetchAllWorkChunksForJobInStates(String theInstanceId, Set theWorkChunkStatuses); + Stream fetchAllWorkChunksForJobInStates( + String theInstanceId, Set theWorkChunkStatuses); /** * Callback to update a JobInstance within a locked transaction. diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java index 1773cd90e73..d54397ab9ca 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java @@ -106,19 +106,17 @@ public abstract class BaseBatch2Config { WorkChunkProcessor theExecutor, IReductionStepExecutorService theReductionStepExecutorService, IHapiTransactionService theTransactionService, - EntityManager theEntityManager - ) { + EntityManager theEntityManager) { return new JobMaintenanceServiceImpl( - theSchedulerService, - myPersistence, - theStorageSettings, - theJobDefinitionRegistry, - theBatchJobSender, - theExecutor, - theReductionStepExecutorService, - theTransactionService, - theEntityManager - ); + theSchedulerService, + myPersistence, + theStorageSettings, + theJobDefinitionRegistry, + theBatchJobSender, + theExecutor, + theReductionStepExecutorService, + theTransactionService, + theEntityManager); } @Bean diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java index 00187c6f6e6..751b096f8a1 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java @@ -28,7 +28,6 @@ import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest; import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; -import ca.uhn.fhir.batch2.model.JobWorkNotification; import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; import ca.uhn.fhir.i18n.Msg; @@ -48,8 +47,6 @@ import org.slf4j.Logger; import org.springframework.data.domain.Page; import org.springframework.messaging.MessageHandler; import org.springframework.transaction.annotation.Propagation; -import org.springframework.transaction.support.TransactionSynchronization; -import org.springframework.transaction.support.TransactionSynchronizationManager; import java.util.Arrays; import java.util.HashSet; diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index c37fa0e1a81..84a710eb5ef 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -32,11 +32,9 @@ import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator; import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater; -import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.model.api.IModelJson; -import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.util.Logs; import ca.uhn.fhir.util.StopWatch; import jakarta.persistence.EntityManager; @@ -74,8 +72,7 @@ public class JobInstanceProcessor { IReductionStepExecutorService theReductionStepExecutorService, JobDefinitionRegistry theJobDefinitionRegistry, EntityManager theEntityManager, - IHapiTransactionService theTransactionService - ) { + IHapiTransactionService theTransactionService) { myJobPersistence = theJobPersistence; myBatchJobSender = theBatchJobSender; myInstanceId = theInstanceId; @@ -106,7 +103,7 @@ public class JobInstanceProcessor { } JobDefinition jobDefinition = - myJobDefinitionegistry.getJobDefinitionOrThrowException(theInstance); + myJobDefinitionegistry.getJobDefinitionOrThrowException(theInstance); enqueueReadyChunks(theInstance, jobDefinition); cleanupInstance(theInstance); @@ -194,8 +191,8 @@ public class JobInstanceProcessor { return; } - JobWorkCursor jobWorkCursor = - JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, theInstance.getCurrentGatedStepId()); + JobWorkCursor jobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId( + theJobDefinition, theInstance.getCurrentGatedStepId()); // final step if (jobWorkCursor.isFinalStep() && !jobWorkCursor.isReductionStep()) { @@ -216,7 +213,7 @@ public class JobInstanceProcessor { if (jobWorkCursor.nextStep.isReductionStep()) { JobWorkCursor nextJobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId( - jobWorkCursor.getJobDefinition(), jobWorkCursor.nextStep.getStepId()); + jobWorkCursor.getJobDefinition(), jobWorkCursor.nextStep.getStepId()); myReductionStepExecutorService.triggerReductionStep(instanceId, nextJobWorkCursor); } else { // otherwise, continue processing as expected @@ -227,7 +224,7 @@ public class JobInstanceProcessor { "Not ready to advance gated execution of instance {} from step {} to {}.", instanceId, currentStepId, - jobWorkCursor.nextStep.getStepId()); + jobWorkCursor.nextStep.getStepId()); } } @@ -245,30 +242,27 @@ public class JobInstanceProcessor { // we need a transaction to access the stream of workchunks // because workchunks are created in READY state, there's an unknown // number of them (and so we could be reading many from the db) - getTxBuilder() - .withPropagation(Propagation.REQUIRES_NEW) - .execute(() -> { - Stream readyChunks = myJobPersistence.fetchAllWorkChunksForJobInStates(theJobInstance.getInstanceId(), - Set.of(WorkChunkStatusEnum.READY)); + getTxBuilder().withPropagation(Propagation.REQUIRES_NEW).execute(() -> { + Stream readyChunks = myJobPersistence.fetchAllWorkChunksForJobInStates( + theJobInstance.getInstanceId(), Set.of(WorkChunkStatusEnum.READY)); - readyChunks.forEach(chunk -> { - /* - * For each chunk id - * * Move to QUEUE'd - * * Send to topic - * * flush changes - * * commit - */ - getTxBuilder().execute(() -> { - if (updateChunkAndSendToQueue(chunk, theJobInstance, theJobDefinition)) { - // flush this transaction - myEntityManager.flush(); - myEntityManager.unwrap(Session.class) - .doWork(Connection::commit); - } - }); + readyChunks.forEach(chunk -> { + /* + * For each chunk id + * * Move to QUEUE'd + * * Send to topic + * * flush changes + * * commit + */ + getTxBuilder().execute(() -> { + if (updateChunkAndSendToQueue(chunk, theJobInstance, theJobDefinition)) { + // flush this transaction + myEntityManager.flush(); + myEntityManager.unwrap(Session.class).doWork(Connection::commit); + } }); }); + }); } /** @@ -280,13 +274,14 @@ public class JobInstanceProcessor { * * Returns true after processing. */ - private boolean updateChunkAndSendToQueue(WorkChunk theChunk, JobInstance theInstance, JobDefinition theJobDefinition) { + private boolean updateChunkAndSendToQueue( + WorkChunk theChunk, JobInstance theInstance, JobDefinition theJobDefinition) { String chunkId = theChunk.getId(); int updated = myJobPersistence.enqueueWorkChunkForProcessing(chunkId); if (updated == 1) { JobWorkCursor jobWorkCursor = - JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, theChunk.getTargetStepId()); + JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, theChunk.getTargetStepId()); if (theJobDefinition.isGatedExecution() && jobWorkCursor.isFinalStep() && jobWorkCursor.isReductionStep()) { // reduction steps are processed by @@ -299,16 +294,21 @@ public class JobInstanceProcessor { // send to the queue // we use current step id because it has not been moved to the next step (yet) JobWorkNotification workNotification = new JobWorkNotification( - theJobDefinition.getJobDefinitionId(), theJobDefinition.getJobDefinitionVersion(), - theInstance.getInstanceId(), theChunk.getTargetStepId(), chunkId); + theJobDefinition.getJobDefinitionId(), + theJobDefinition.getJobDefinitionVersion(), + theInstance.getInstanceId(), + theChunk.getTargetStepId(), + chunkId); myBatchJobSender.sendWorkChannelMessage(workNotification); return true; } else { // means the work chunk is likely already gone... // we'll log and skip it. If it's still in the DB, the next pass // will pick it up. Otherwise, it's no longer important - ourLog.error("Job Instance {} failed to transition work chunk with id {} from READY to QUEUED; skipping work chunk.", - theInstance.getInstanceId(), theChunk.getId()); + ourLog.error( + "Job Instance {} failed to transition work chunk with id {} from READY to QUEUED; skipping work chunk.", + theInstance.getInstanceId(), + theChunk.getId()); // nothing changed, nothing to commit return false; @@ -316,8 +316,7 @@ public class JobInstanceProcessor { } private IHapiTransactionService.IExecutionBuilder getTxBuilder() { - return myTransactionService.withSystemRequest() - .withRequestPartitionId(RequestPartitionId.allPartitions()); + return myTransactionService.withSystemRequest().withRequestPartitionId(RequestPartitionId.allPartitions()); } private void processChunksForNextSteps(JobInstance theInstance, String nextStepId) { diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImpl.java index dd971227821..1cadce83405 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImpl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImpl.java @@ -27,7 +27,6 @@ import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry; import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.i18n.Msg; -import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.jpa.model.sched.HapiJob; @@ -114,8 +113,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc @Nonnull WorkChunkProcessor theExecutor, @Nonnull IReductionStepExecutorService theReductionStepExecutorService, IHapiTransactionService theTransactionService, - EntityManager theEntityManager - ) { + EntityManager theEntityManager) { myStorageSettings = theStorageSettings; myReductionStepExecutorService = theReductionStepExecutorService; Validate.notNull(theSchedulerService); @@ -245,8 +243,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc myReductionStepExecutorService, myJobDefinitionRegistry, myEntityManager, - myTransactionService - ); + myTransactionService); ourLog.debug( "Triggering maintenance process for instance {} in status {}", instanceId,