batch2 new svc methods (#6310)

* started work on adding new methods

* started work on adding new methods

* review feedback. move dto from api to model

* checkstyle
This commit is contained in:
Ken Stevens 2024-10-08 08:57:00 -04:00 committed by GitHub
parent 32651e1c36
commit 6f94e228b0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 131 additions and 3 deletions

View File

@ -21,6 +21,8 @@ package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.JobOperationResultJson; import ca.uhn.fhir.batch2.api.JobOperationResultJson;
import ca.uhn.fhir.batch2.model.BatchInstanceStatusDTO;
import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO;
import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest; import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.StatusEnum;
@ -258,6 +260,22 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
.execute(() -> myJobInstanceRepository.findById(theInstanceId).map(this::toInstance)); .execute(() -> myJobInstanceRepository.findById(theInstanceId).map(this::toInstance));
} }
@Nonnull
@Override
public List<BatchWorkChunkStatusDTO> fetchWorkChunkStatusForInstance(String theInstanceId) {
return myTransactionService
.withSystemRequestOnDefaultPartition()
.execute(() -> myWorkChunkRepository.fetchWorkChunkStatusForInstance(theInstanceId));
}
@Nonnull
@Override
public BatchInstanceStatusDTO fetchBatchInstanceStatus(String theInstanceId) {
return myTransactionService
.withSystemRequestOnDefaultPartition()
.execute(() -> myJobInstanceRepository.fetchBatchInstanceStatus(theInstanceId));
}
@Override @Override
@Transactional(propagation = Propagation.REQUIRES_NEW) @Transactional(propagation = Propagation.REQUIRES_NEW)
public List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int thePage, int theBatchSize) { public List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int thePage, int theBatchSize) {

View File

@ -19,6 +19,7 @@
*/ */
package ca.uhn.fhir.jpa.dao.data; package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.batch2.model.BatchInstanceStatusDTO;
import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity; import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Pageable;
@ -91,4 +92,8 @@ public interface IBatch2JobInstanceRepository
@Query("SELECT e FROM Batch2JobInstanceEntity e WHERE e.myDefinitionId = :jobDefinitionId") @Query("SELECT e FROM Batch2JobInstanceEntity e WHERE e.myDefinitionId = :jobDefinitionId")
List<Batch2JobInstanceEntity> findInstancesByJobDefinitionId( List<Batch2JobInstanceEntity> findInstancesByJobDefinitionId(
@Param("jobDefinitionId") String theJobDefinitionId, Pageable thePageRequest); @Param("jobDefinitionId") String theJobDefinitionId, Pageable thePageRequest);
@Query(
"SELECT new ca.uhn.fhir.batch2.model.BatchInstanceStatusDTO(e.myId, e.myStatus, e.myStartTime, e.myEndTime) FROM Batch2JobInstanceEntity e WHERE e.myId = :id")
BatchInstanceStatusDTO fetchBatchInstanceStatus(@Param("id") String theInstanceId);
} }

View File

@ -19,6 +19,7 @@
*/ */
package ca.uhn.fhir.jpa.dao.data; package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity; import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Pageable;
@ -147,4 +148,8 @@ public interface IBatch2WorkChunkRepository
@Param("instanceId") String theInstanceId, @Param("instanceId") String theInstanceId,
@Param("stepId") String theStepId, @Param("stepId") String theStepId,
@Param("status") WorkChunkStatusEnum theStatus); @Param("status") WorkChunkStatusEnum theStatus);
@Query(
"SELECT new ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO(e.myTargetStepId, e.myStatus, min(e.myStartTime), max(e.myEndTime), avg(e.myEndTime - e.myStartTime), count(*)) FROM Batch2WorkChunkEntity e WHERE e.myInstanceId=:instanceId GROUP BY e.myTargetStepId, e.myStatus")
List<BatchWorkChunkStatusDTO> fetchWorkChunkStatusForInstance(@Param("instanceId") String theInstanceId);
} }

View File

@ -1,5 +1,7 @@
package ca.uhn.fhir.jpa.batch2; package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.model.BatchInstanceStatusDTO;
import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService; import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.JobOperationResultJson; import ca.uhn.fhir.batch2.api.JobOperationResultJson;
@ -875,6 +877,38 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
} }
@Test
public void testFetchInstanceAndWorkChunkStatus() {
// Setup
List<String> chunkIds = new ArrayList<>();
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
for (int i = 0; i < 5; i++) {
chunkIds.add(storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, i, JsonUtil.serialize(new NdJsonFileJson().setNdJsonText("{}")), false));
}
runInTransaction(() -> {
myWorkChunkRepository.updateChunkStatus(chunkIds.get(0), WorkChunkStatusEnum.READY, WorkChunkStatusEnum.COMPLETED);
myWorkChunkRepository.updateChunkStatus(chunkIds.get(1), WorkChunkStatusEnum.READY, WorkChunkStatusEnum.COMPLETED);
});
// Execute
BatchInstanceStatusDTO istatus = mySvc.fetchBatchInstanceStatus(instanceId);
assertEquals(instanceId, istatus.id);
assertEquals(StatusEnum.QUEUED, istatus.status);
List<BatchWorkChunkStatusDTO> result = mySvc.fetchWorkChunkStatusForInstance(instanceId);
assertThat(result).hasSize(2);
BatchWorkChunkStatusDTO result0 = result.get(0);
assertEquals(WorkChunkStatusEnum.COMPLETED, result0.status);
assertEquals(2, result0.totalChunks);
BatchWorkChunkStatusDTO result1 = result.get(1);
assertEquals(WorkChunkStatusEnum.READY, result1.status);
assertEquals(3, result1.totalChunks);
}
private WorkChunk freshFetchWorkChunk(String chunkId) { private WorkChunk freshFetchWorkChunk(String chunkId) {
return runInTransaction(() -> return runInTransaction(() ->
myWorkChunkRepository.findById(chunkId) myWorkChunkRepository.findById(chunkId)

View File

@ -19,9 +19,7 @@
*/ */
package ca.uhn.fhir.batch2.api; package ca.uhn.fhir.batch2.api;
import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.*;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.RequestDetails;
@ -105,4 +103,8 @@ public interface IJobCoordinator {
* Fetches all jobs by job definition id * Fetches all jobs by job definition id
*/ */
List<JobInstance> getJobInstancesByJobDefinitionId(String theJobDefinitionId, int theCount, int theStart); List<JobInstance> getJobInstancesByJobDefinitionId(String theJobDefinitionId, int theCount, int theStart);
List<BatchWorkChunkStatusDTO> getWorkChunkStatus(String theInstanceId);
BatchInstanceStatusDTO getBatchInstanceStatus(String theInstanceId);
} }

View File

@ -19,6 +19,8 @@
*/ */
package ca.uhn.fhir.batch2.api; package ca.uhn.fhir.batch2.api;
import ca.uhn.fhir.batch2.model.BatchInstanceStatusDTO;
import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO;
import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest; import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstance;
@ -77,6 +79,12 @@ public interface IJobPersistence extends IWorkChunkPersistence {
List<JobInstance> fetchInstances( List<JobInstance> fetchInstances(
String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable); String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable);
@Nonnull
List<BatchWorkChunkStatusDTO> fetchWorkChunkStatusForInstance(String theInstanceId);
@Nonnull
BatchInstanceStatusDTO fetchBatchInstanceStatus(String theInstanceId);
/** /**
* Fetches any existing jobs matching provided request parameters * Fetches any existing jobs matching provided request parameters
* *

View File

@ -24,6 +24,8 @@ import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.JobOperationResultJson; import ca.uhn.fhir.batch2.api.JobOperationResultJson;
import ca.uhn.fhir.batch2.channel.BatchJobSender; import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.BatchInstanceStatusDTO;
import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO;
import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest; import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstance;
@ -195,6 +197,16 @@ public class JobCoordinatorImpl implements IJobCoordinator {
theJobDefinitionId, new HashSet<>(Arrays.asList(StatusEnum.values())), theCount, theStart); theJobDefinitionId, new HashSet<>(Arrays.asList(StatusEnum.values())), theCount, theStart);
} }
@Override
public List<BatchWorkChunkStatusDTO> getWorkChunkStatus(String theInstanceId) {
return myJobPersistence.fetchWorkChunkStatusForInstance(theInstanceId);
}
@Override
public BatchInstanceStatusDTO getBatchInstanceStatus(String theInstanceId) {
return myJobPersistence.fetchBatchInstanceStatus(theInstanceId);
}
@Override @Override
public Page<JobInstance> fetchAllJobInstances(JobInstanceFetchRequest theFetchRequest) { public Page<JobInstance> fetchAllJobInstances(JobInstanceFetchRequest theFetchRequest) {
return myJobQuerySvc.fetchAllInstances(theFetchRequest); return myJobQuerySvc.fetchAllInstances(theFetchRequest);

View File

@ -0,0 +1,17 @@
package ca.uhn.fhir.batch2.model;
import java.util.Date;
public class BatchInstanceStatusDTO {
public final String id;
public final StatusEnum status;
public final Date start;
public final Date stop;
public BatchInstanceStatusDTO(String theId, StatusEnum theStatus, Date theStart, Date theStop) {
id = theId;
status = theStatus;
start = theStart;
stop = theStop;
}
}

View File

@ -0,0 +1,27 @@
package ca.uhn.fhir.batch2.model;
import java.util.Date;
public class BatchWorkChunkStatusDTO {
public final String stepId;
public final WorkChunkStatusEnum status;
public final Date start;
public final Date stop;
public final Double avg;
public final Long totalChunks;
public BatchWorkChunkStatusDTO(
String theStepId,
WorkChunkStatusEnum theStatus,
Date theStart,
Date theStop,
Double theAvg,
Long theTotalChunks) {
stepId = theStepId;
status = theStatus;
start = theStart;
stop = theStop;
avg = theAvg;
totalChunks = theTotalChunks;
}
}