diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java index 5ea89d2adb4..c0826ad4dc0 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java @@ -21,6 +21,8 @@ package ca.uhn.fhir.jpa.batch2; import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.api.JobOperationResultJson; +import ca.uhn.fhir.batch2.model.BatchInstanceStatusDTO; +import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO; import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.StatusEnum; @@ -258,6 +260,22 @@ public class JpaJobPersistenceImpl implements IJobPersistence { .execute(() -> myJobInstanceRepository.findById(theInstanceId).map(this::toInstance)); } + @Nonnull + @Override + public List fetchWorkChunkStatusForInstance(String theInstanceId) { + return myTransactionService + .withSystemRequestOnDefaultPartition() + .execute(() -> myWorkChunkRepository.fetchWorkChunkStatusForInstance(theInstanceId)); + } + + @Nonnull + @Override + public BatchInstanceStatusDTO fetchBatchInstanceStatus(String theInstanceId) { + return myTransactionService + .withSystemRequestOnDefaultPartition() + .execute(() -> myJobInstanceRepository.fetchBatchInstanceStatus(theInstanceId)); + } + @Override @Transactional(propagation = Propagation.REQUIRES_NEW) public List fetchInstances(FetchJobInstancesRequest theRequest, int thePage, int theBatchSize) { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2JobInstanceRepository.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2JobInstanceRepository.java index 023fd93af64..b027702cf6e 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2JobInstanceRepository.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2JobInstanceRepository.java @@ -19,6 +19,7 @@ */ package ca.uhn.fhir.jpa.dao.data; +import ca.uhn.fhir.batch2.model.BatchInstanceStatusDTO; import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity; import org.springframework.data.domain.Pageable; @@ -91,4 +92,8 @@ public interface IBatch2JobInstanceRepository @Query("SELECT e FROM Batch2JobInstanceEntity e WHERE e.myDefinitionId = :jobDefinitionId") List findInstancesByJobDefinitionId( @Param("jobDefinitionId") String theJobDefinitionId, Pageable thePageRequest); + + @Query( + "SELECT new ca.uhn.fhir.batch2.model.BatchInstanceStatusDTO(e.myId, e.myStatus, e.myStartTime, e.myEndTime) FROM Batch2JobInstanceEntity e WHERE e.myId = :id") + BatchInstanceStatusDTO fetchBatchInstanceStatus(@Param("id") String theInstanceId); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java index 52319b8efe1..e9611614e45 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java @@ -19,6 +19,7 @@ */ package ca.uhn.fhir.jpa.dao.data; +import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity; import org.springframework.data.domain.Pageable; @@ -147,4 +148,8 @@ public interface IBatch2WorkChunkRepository @Param("instanceId") String theInstanceId, @Param("stepId") String theStepId, @Param("status") WorkChunkStatusEnum theStatus); + + @Query( + "SELECT new ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO(e.myTargetStepId, e.myStatus, min(e.myStartTime), max(e.myEndTime), avg(e.myEndTime - e.myStartTime), count(*)) FROM Batch2WorkChunkEntity e WHERE e.myInstanceId=:instanceId GROUP BY e.myTargetStepId, e.myStatus") + List fetchWorkChunkStatusForInstance(@Param("instanceId") String theInstanceId); } diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java index 3c1721d9a78..d7f85325a7f 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java @@ -1,5 +1,7 @@ package ca.uhn.fhir.jpa.batch2; +import ca.uhn.fhir.batch2.model.BatchInstanceStatusDTO; +import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO; import ca.uhn.fhir.batch2.api.IJobMaintenanceService; import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.api.JobOperationResultJson; @@ -875,6 +877,38 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { } + @Test + public void testFetchInstanceAndWorkChunkStatus() { + // Setup + + List chunkIds = new ArrayList<>(); + JobInstance instance = createInstance(); + String instanceId = mySvc.storeNewInstance(instance); + for (int i = 0; i < 5; i++) { + chunkIds.add(storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, i, JsonUtil.serialize(new NdJsonFileJson().setNdJsonText("{}")), false)); + } + + runInTransaction(() -> { + myWorkChunkRepository.updateChunkStatus(chunkIds.get(0), WorkChunkStatusEnum.READY, WorkChunkStatusEnum.COMPLETED); + myWorkChunkRepository.updateChunkStatus(chunkIds.get(1), WorkChunkStatusEnum.READY, WorkChunkStatusEnum.COMPLETED); + }); + + // Execute + BatchInstanceStatusDTO istatus = mySvc.fetchBatchInstanceStatus(instanceId); + assertEquals(instanceId, istatus.id); + assertEquals(StatusEnum.QUEUED, istatus.status); + + List result = mySvc.fetchWorkChunkStatusForInstance(instanceId); + assertThat(result).hasSize(2); + BatchWorkChunkStatusDTO result0 = result.get(0); + assertEquals(WorkChunkStatusEnum.COMPLETED, result0.status); + assertEquals(2, result0.totalChunks); + + BatchWorkChunkStatusDTO result1 = result.get(1); + assertEquals(WorkChunkStatusEnum.READY, result1.status); + assertEquals(3, result1.totalChunks); + } + private WorkChunk freshFetchWorkChunk(String chunkId) { return runInTransaction(() -> myWorkChunkRepository.findById(chunkId) diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobCoordinator.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobCoordinator.java index 03db05f36e3..85713548976 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobCoordinator.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobCoordinator.java @@ -19,9 +19,7 @@ */ package ca.uhn.fhir.batch2.api; -import ca.uhn.fhir.batch2.model.JobInstance; -import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; -import ca.uhn.fhir.batch2.model.StatusEnum; +import ca.uhn.fhir.batch2.model.*; import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; import ca.uhn.fhir.rest.api.server.RequestDetails; @@ -105,4 +103,8 @@ public interface IJobCoordinator { * Fetches all jobs by job definition id */ List getJobInstancesByJobDefinitionId(String theJobDefinitionId, int theCount, int theStart); + + List getWorkChunkStatus(String theInstanceId); + + BatchInstanceStatusDTO getBatchInstanceStatus(String theInstanceId); } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java index 196b94ccec3..739a5530858 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java @@ -19,6 +19,8 @@ */ package ca.uhn.fhir.batch2.api; +import ca.uhn.fhir.batch2.model.BatchInstanceStatusDTO; +import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO; import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest; import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobInstance; @@ -77,6 +79,12 @@ public interface IJobPersistence extends IWorkChunkPersistence { List fetchInstances( String theJobDefinitionId, Set theStatuses, Date theCutoff, Pageable thePageable); + @Nonnull + List fetchWorkChunkStatusForInstance(String theInstanceId); + + @Nonnull + BatchInstanceStatusDTO fetchBatchInstanceStatus(String theInstanceId); + /** * Fetches any existing jobs matching provided request parameters * diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java index 751b096f8a1..f0f7a4500e4 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java @@ -24,6 +24,8 @@ import ca.uhn.fhir.batch2.api.IJobMaintenanceService; import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.api.JobOperationResultJson; import ca.uhn.fhir.batch2.channel.BatchJobSender; +import ca.uhn.fhir.batch2.model.BatchInstanceStatusDTO; +import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO; import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest; import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobInstance; @@ -195,6 +197,16 @@ public class JobCoordinatorImpl implements IJobCoordinator { theJobDefinitionId, new HashSet<>(Arrays.asList(StatusEnum.values())), theCount, theStart); } + @Override + public List getWorkChunkStatus(String theInstanceId) { + return myJobPersistence.fetchWorkChunkStatusForInstance(theInstanceId); + } + + @Override + public BatchInstanceStatusDTO getBatchInstanceStatus(String theInstanceId) { + return myJobPersistence.fetchBatchInstanceStatus(theInstanceId); + } + @Override public Page fetchAllJobInstances(JobInstanceFetchRequest theFetchRequest) { return myJobQuerySvc.fetchAllInstances(theFetchRequest); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/BatchInstanceStatusDTO.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/BatchInstanceStatusDTO.java new file mode 100644 index 00000000000..0168a186b8f --- /dev/null +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/BatchInstanceStatusDTO.java @@ -0,0 +1,17 @@ +package ca.uhn.fhir.batch2.model; + +import java.util.Date; + +public class BatchInstanceStatusDTO { + public final String id; + public final StatusEnum status; + public final Date start; + public final Date stop; + + public BatchInstanceStatusDTO(String theId, StatusEnum theStatus, Date theStart, Date theStop) { + id = theId; + status = theStatus; + start = theStart; + stop = theStop; + } +} diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/BatchWorkChunkStatusDTO.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/BatchWorkChunkStatusDTO.java new file mode 100644 index 00000000000..fb7cadb1b0a --- /dev/null +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/BatchWorkChunkStatusDTO.java @@ -0,0 +1,27 @@ +package ca.uhn.fhir.batch2.model; + +import java.util.Date; + +public class BatchWorkChunkStatusDTO { + public final String stepId; + public final WorkChunkStatusEnum status; + public final Date start; + public final Date stop; + public final Double avg; + public final Long totalChunks; + + public BatchWorkChunkStatusDTO( + String theStepId, + WorkChunkStatusEnum theStatus, + Date theStart, + Date theStop, + Double theAvg, + Long theTotalChunks) { + stepId = theStepId; + status = theStatus; + start = theStart; + stop = theStop; + avg = theAvg; + totalChunks = theTotalChunks; + } +}