Scheduled batch 2 bulk export job and binary delete (#4492)

* First commit:  Scheduled batch 2 bulk export job delete and binary, incomplete mock-based unit test, and a mess of TODOs and code that needs to be deleted.

* Refine solution and add a concrete unit test but still work to do.

* Comment out code in cancelAndPurgeAllJobs() and see if it breaks the pipeline.

* Unit tests complete.  New Msg code for new IJobPersistence.fetchInstances() method.  Cleanup TODOs and add others.

* Finish final touches on implementation.

* Add changelog.

* Various cleanup.

* Code review feedback.

* Small tweak to changelog.

* Last code review tweak.

* Address more code review comments.

* Reverse changes to consider work chunks.  Add a constant for write-to-binary.
This commit is contained in:
Luke deGruchy 2023-02-06 09:04:04 -05:00 committed by GitHub
parent 0996124778
commit 2a963acde2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 655 additions and 201 deletions

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 4500
jira: SMILE-6001
title: "Schedule bulk export job and binary was not working with relational databases. This has now been fixed with a reimplementation for batch 2."

View File

@ -135,6 +135,12 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
return entity.getId(); return entity.getId();
} }
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public List<JobInstance> fetchInstances(String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable) {
return toInstanceList(myJobInstanceRepository.findInstancesByJobIdAndStatusAndExpiry(theJobDefinitionId, theStatuses, theCutoff, thePageable));
}
@Override @Override
@Transactional(propagation = Propagation.REQUIRES_NEW) @Transactional(propagation = Propagation.REQUIRES_NEW)
public List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus(String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex) { public List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus(String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex) {

View File

@ -20,38 +20,47 @@ package ca.uhn.fhir.jpa.bulk.export.svc;
* #L% * #L%
*/ */
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.jobs.export.BulkExportAppCtx;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportBinaryFileId;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions; import ca.uhn.fhir.jpa.api.model.BulkExportJobResults;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper; import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails; import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.util.Batch2JobDefinitionConstants;
import ca.uhn.fhir.util.JsonUtil;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseBinary; import org.hl7.fhir.instance.model.api.IBaseBinary;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Binary;
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate; import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date; import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import static org.slf4j.LoggerFactory.getLogger; import static org.slf4j.LoggerFactory.getLogger;
@ -59,26 +68,24 @@ import static org.slf4j.LoggerFactory.getLogger;
public class BulkDataExportJobSchedulingHelperImpl implements IBulkDataExportJobSchedulingHelper, IHasScheduledJobs { public class BulkDataExportJobSchedulingHelperImpl implements IBulkDataExportJobSchedulingHelper, IHasScheduledJobs {
private static final Logger ourLog = getLogger(BulkDataExportJobSchedulingHelperImpl.class); private static final Logger ourLog = getLogger(BulkDataExportJobSchedulingHelperImpl.class);
@Autowired private final DaoRegistry myDaoRegistry;
private DaoRegistry myDaoRegistry;
@Autowired private final PlatformTransactionManager myTxManager;
private IBulkExportCollectionDao myBulkExportCollectionDao;
@Autowired
private IBulkExportCollectionFileDao myBulkExportCollectionFileDao;
@Autowired
private PlatformTransactionManager myTxManager;
private TransactionTemplate myTxTemplate; private TransactionTemplate myTxTemplate;
@Autowired private final DaoConfig myDaoConfig;
private IBulkExportJobDao myBulkExportJobDao; private final BulkExportHelperService myBulkExportHelperSvc;
@Autowired private final IJobPersistence myJpaJobPersistence;
private DaoConfig myDaoConfig;
@Autowired public BulkDataExportJobSchedulingHelperImpl(DaoRegistry theDaoRegistry, PlatformTransactionManager theTxManager, DaoConfig theDaoConfig, BulkExportHelperService theBulkExportHelperSvc, IJobPersistence theJpaJobPersistence, TransactionTemplate theTxTemplate) {
private BulkExportHelperService myBulkExportHelperSvc; myDaoRegistry = theDaoRegistry;
myTxManager = theTxManager;
myDaoConfig = theDaoConfig;
myBulkExportHelperSvc = theBulkExportHelperSvc;
myJpaJobPersistence = theJpaJobPersistence;
myTxTemplate = theTxTemplate;
}
@PostConstruct @PostConstruct
public void start() { public void start() {
@ -97,15 +104,10 @@ public class BulkDataExportJobSchedulingHelperImpl implements IBulkDataExportJob
@Override @Override
@Transactional(propagation = Propagation.NEVER) @Transactional(propagation = Propagation.NEVER)
public synchronized void cancelAndPurgeAllJobs() { public synchronized void cancelAndPurgeAllJobs() {
myTxTemplate.execute(t -> { // This is called by unit test code that also calls ExpungeEverythingService,
ourLog.info("Deleting all files"); // which explicitly deletes both Batch2WorkChunkEntity and Batch2JobInstanceEntity, as well as ResourceTable, in
myBulkExportCollectionFileDao.deleteAllFiles(); // which Binary's are stored
ourLog.info("Deleting all collections"); // Long story short, this method no longer needs to do anything
myBulkExportCollectionDao.deleteAllFiles();
ourLog.info("Deleting all jobs");
myBulkExportJobDao.deleteAllFiles();
return null;
});
} }
/** /**
@ -116,51 +118,111 @@ public class BulkDataExportJobSchedulingHelperImpl implements IBulkDataExportJob
@Override @Override
public void purgeExpiredFiles() { public void purgeExpiredFiles() {
if (!myDaoConfig.isEnableTaskBulkExportJobExecution()) { if (!myDaoConfig.isEnableTaskBulkExportJobExecution()) {
ourLog.debug("bulk export disabled: doing nothing");
return; return;
} }
Optional<BulkExportJobEntity> jobToDelete = myTxTemplate.execute(t -> { final List<JobInstance> jobInstancesToDelete = myTxTemplate.execute(t ->
Pageable page = PageRequest.of(0, 1); myJpaJobPersistence.fetchInstances(Batch2JobDefinitionConstants.BULK_EXPORT,
Slice<BulkExportJobEntity> submittedJobs = myBulkExportJobDao.findNotRunningByExpiry(page, new Date()); StatusEnum.getEndedStatuses(),
if (submittedJobs.isEmpty()) { computeCutoffFromConfig(),
return Optional.empty(); PageRequest.of(0, 50))
} );
return Optional.of(submittedJobs.getContent().get(0));
});
if (jobToDelete.isPresent()) { if (jobInstancesToDelete == null || jobInstancesToDelete.isEmpty()) {
ourLog.info("Deleting bulk export job: {}", jobToDelete.get()); ourLog.debug("No batch 2 bulk export jobs found! Nothing to do!");
ourLog.info("Finished bulk export job deletion with nothing to do");
return;
}
for (JobInstance jobInstance : jobInstancesToDelete) {
ourLog.info("Deleting batch 2 bulk export job: {}", jobInstance);
myTxTemplate.execute(t -> { myTxTemplate.execute(t -> {
BulkExportJobEntity job = myBulkExportJobDao.getOne(jobToDelete.get().getId()); final Optional<JobInstance> optJobInstanceForInstanceId = myJpaJobPersistence.fetchInstance(jobInstance.getInstanceId());
for (BulkExportCollectionEntity nextCollection : job.getCollections()) {
for (BulkExportCollectionFileEntity nextFile : nextCollection.getFiles()) {
ourLog.info("Purging bulk data file: {}", nextFile.getResourceId());
IIdType id = myBulkExportHelperSvc.toId(nextFile.getResourceId());
getBinaryDao().delete(id, new SystemRequestDetails());
getBinaryDao().forceExpungeInExistingTransaction(id, new ExpungeOptions().setExpungeDeletedResources(true).setExpungeOldVersions(true), new SystemRequestDetails());
myBulkExportCollectionFileDao.deleteByPid(nextFile.getId());
if (optJobInstanceForInstanceId.isEmpty()) {
ourLog.error("Can't find job instance for ID: {} despite having retrieved it in the first step", jobInstance.getInstanceId());
return null;
} }
myBulkExportCollectionDao.deleteByPid(nextCollection.getId()); final JobInstance jobInstanceForInstanceId = optJobInstanceForInstanceId.get();
ourLog.info("Deleting bulk export job: {}", jobInstanceForInstanceId);
// We need to keep these for investigation but we also need a process to manually delete these jobs once we're done investigating
if (StatusEnum.FAILED == jobInstanceForInstanceId.getStatus()) {
ourLog.info("skipping because the status is FAILED for ID: {}" + jobInstanceForInstanceId.getInstanceId());
return null;
} }
ourLog.debug("*** About to delete job with ID {}", job.getId()); purgeBinariesIfNeeded(jobInstanceForInstanceId, jobInstanceForInstanceId.getReport());
myBulkExportJobDao.deleteByPid(job.getId());
final String batch2BulkExportJobInstanceId = jobInstanceForInstanceId.getInstanceId();
ourLog.debug("*** About to delete batch 2 bulk export job with ID {}", batch2BulkExportJobInstanceId);
myJpaJobPersistence.deleteInstanceAndChunks(batch2BulkExportJobInstanceId);
ourLog.info("Finished deleting bulk export job: {}", jobInstance.getInstanceId());
return null; return null;
}); });
ourLog.info("Finished deleting bulk export job: {}", jobToDelete.get()); ourLog.info("Finished deleting bulk export jobs");
} }
} }
private void purgeBinariesIfNeeded(JobInstance theJobInstanceForInstanceId, String theJobInstanceReportString) {
final Optional<BulkExportJobResults> optBulkExportJobResults = getBulkExportJobResults(theJobInstanceReportString);
if (optBulkExportJobResults.isPresent()) {
final BulkExportJobResults bulkExportJobResults = optBulkExportJobResults.get();
ourLog.debug("job: {} resource type to binary ID: {}", theJobInstanceForInstanceId.getInstanceId(), bulkExportJobResults.getResourceTypeToBinaryIds());
final Map<String, List<String>> resourceTypeToBinaryIds = bulkExportJobResults.getResourceTypeToBinaryIds();
for (String resourceType : resourceTypeToBinaryIds.keySet()) {
final List<String> binaryIds = resourceTypeToBinaryIds.get(resourceType);
for (String binaryId : binaryIds) {
ourLog.info("Purging batch 2 bulk export binary: {}", binaryId);
IIdType id = myBulkExportHelperSvc.toId(binaryId);
getBinaryDao().delete(id, new SystemRequestDetails());
}
}
} // else we can't know what the binary IDs are, so delete this job and move on
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private IFhirResourceDao<IBaseBinary> getBinaryDao() { private IFhirResourceDao<IBaseBinary> getBinaryDao() {
return myDaoRegistry.getResourceDao("Binary"); return myDaoRegistry.getResourceDao(Binary.class.getSimpleName());
} }
@Nonnull
private Optional<BulkExportJobResults> getBulkExportJobResults(String theJobInstanceReportString) {
if (StringUtils.isBlank(theJobInstanceReportString)) {
ourLog.error(String.format("Cannot parse job report string because it's null or blank: %s", theJobInstanceReportString));
return Optional.empty();
}
try {
return Optional.of(JsonUtil.deserialize(theJobInstanceReportString, BulkExportJobResults.class));
} catch (Exception theException) {
ourLog.error(String.format("Cannot parse job report string: %s", theJobInstanceReportString), theException);
return Optional.empty();
}
}
@Nonnull
private Date computeCutoffFromConfig() {
final int bulkExportFileRetentionPeriodHours = myDaoConfig.getBulkExportFileRetentionPeriodHours();
final LocalDateTime cutoffLocalDateTime = LocalDateTime.now()
.minusHours(bulkExportFileRetentionPeriodHours);
return Date.from(cutoffLocalDateTime
.atZone(ZoneId.systemDefault())
.toInstant());
}
public static class PurgeExpiredFilesJob implements HapiJob { public static class PurgeExpiredFilesJob implements HapiJob {
@Autowired @Autowired
private IBulkDataExportJobSchedulingHelper myTarget; private IBulkDataExportJobSchedulingHelper myTarget;

View File

@ -1,41 +0,0 @@
package ca.uhn.fhir.jpa.bulk.export.svc;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class BulkExportCollectionFileDaoSvc {
@Autowired
private IBulkExportCollectionFileDao myBulkExportCollectionFileDao;
@Transactional
public void save(BulkExportCollectionFileEntity theBulkExportCollectionEntity) {
myBulkExportCollectionFileDao.saveAndFlush(theBulkExportCollectionEntity);
}
}

View File

@ -1,5 +1,6 @@
package ca.uhn.fhir.jpa.config; package ca.uhn.fhir.jpa.config;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.jobs.expunge.DeleteExpungeJobSubmitterImpl; import ca.uhn.fhir.batch2.jobs.expunge.DeleteExpungeJobSubmitterImpl;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum; import ca.uhn.fhir.context.FhirVersionEnum;
@ -16,6 +17,7 @@ import ca.uhn.fhir.jpa.binary.provider.BinaryAccessProvider;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper; import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
import ca.uhn.fhir.jpa.bulk.export.provider.BulkDataExportProvider; import ca.uhn.fhir.jpa.bulk.export.provider.BulkDataExportProvider;
import ca.uhn.fhir.jpa.bulk.export.svc.BulkDataExportJobSchedulingHelperImpl; import ca.uhn.fhir.jpa.bulk.export.svc.BulkDataExportJobSchedulingHelperImpl;
import ca.uhn.fhir.jpa.bulk.export.svc.BulkExportHelperService;
import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import ca.uhn.fhir.jpa.bulk.imprt.svc.BulkDataImportSvcImpl; import ca.uhn.fhir.jpa.bulk.imprt.svc.BulkDataImportSvcImpl;
import ca.uhn.fhir.jpa.cache.IResourceVersionSvc; import ca.uhn.fhir.jpa.cache.IResourceVersionSvc;
@ -161,6 +163,7 @@ import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler; import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
import org.springframework.scheduling.concurrent.ScheduledExecutorFactoryBean; import org.springframework.scheduling.concurrent.ScheduledExecutorFactoryBean;
import org.springframework.transaction.PlatformTransactionManager;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
@ -451,8 +454,8 @@ public class JpaConfig {
} }
@Bean @Bean
public IBulkDataExportJobSchedulingHelper bulkDataExportJobSchedulingHelper() { public IBulkDataExportJobSchedulingHelper bulkDataExportJobSchedulingHelper(DaoRegistry theDaoRegistry, PlatformTransactionManager theTxManager, DaoConfig theDaoConfig, BulkExportHelperService theBulkExportHelperSvc, IJobPersistence theJpaJobPersistence) {
return new BulkDataExportJobSchedulingHelperImpl(); return new BulkDataExportJobSchedulingHelperImpl(theDaoRegistry, theTxManager, theDaoConfig, theBulkExportHelperSvc, theJpaJobPersistence, null);
} }
@Bean @Bean

View File

@ -28,6 +28,7 @@ import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query; import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param; import org.springframework.data.repository.query.Param;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -60,6 +61,14 @@ public interface IBatch2JobInstanceRepository extends JpaRepository<Batch2JobIns
Pageable thePageable Pageable thePageable
); );
@Query("SELECT b from Batch2JobInstanceEntity b WHERE b.myDefinitionId = :defId AND b.myStatus IN( :stats ) AND b.myEndTime < :cutoff")
List<Batch2JobInstanceEntity> findInstancesByJobIdAndStatusAndExpiry(
@Param("defId") String theDefinitionId,
@Param("stats") Set<StatusEnum> theStatus,
@Param("cutoff") Date theCutoff,
Pageable thePageable
);
@Query("SELECT e FROM Batch2JobInstanceEntity e WHERE e.myDefinitionId = :jobDefinitionId AND e.myStatus IN :statuses") @Query("SELECT e FROM Batch2JobInstanceEntity e WHERE e.myDefinitionId = :jobDefinitionId AND e.myStatus IN :statuses")
List<Batch2JobInstanceEntity> fetchInstancesByJobDefinitionIdAndStatus(@Param("jobDefinitionId") String theJobDefinitionId, @Param("statuses") Set<StatusEnum> theIncompleteStatuses, Pageable thePageRequest); List<Batch2JobInstanceEntity> fetchInstancesByJobDefinitionIdAndStatus(@Param("jobDefinitionId") String theJobDefinitionId, @Param("statuses") Set<StatusEnum> theIncompleteStatuses, Pageable thePageRequest);

View File

@ -1,39 +0,0 @@
package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
/*
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
@Deprecated
public interface IBulkExportCollectionDao extends JpaRepository<BulkExportCollectionEntity, Long>, IHapiFhirJpaRepository {
@Modifying
@Query("DELETE FROM BulkExportCollectionEntity t")
void deleteAllFiles();
@Modifying
@Query("DELETE FROM BulkExportCollectionEntity t WHERE t.myId = :pid")
void deleteByPid(@Param("pid") Long theId);
}

View File

@ -1,39 +0,0 @@
package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
/*
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
public interface IBulkExportCollectionFileDao extends JpaRepository<BulkExportCollectionFileEntity, Long>, IHapiFhirJpaRepository {
@Modifying
@Query("DELETE FROM BulkExportCollectionFileEntity t")
void deleteAllFiles();
@Modifying
@Query("DELETE FROM BulkExportCollectionFileEntity t WHERE t.myId = :pid")
void deleteByPid(@Param("pid") Long theId);
}

View File

@ -34,25 +34,6 @@ import java.util.Optional;
public interface IBulkExportJobDao extends JpaRepository<BulkExportJobEntity, Long>, IHapiFhirJpaRepository { public interface IBulkExportJobDao extends JpaRepository<BulkExportJobEntity, Long>, IHapiFhirJpaRepository {
@Query("SELECT j FROM BulkExportJobEntity j WHERE j.myJobId = :jobid")
Optional<BulkExportJobEntity> findByJobId(@Param("jobid") String theUuid);
@Query("SELECT j FROM BulkExportJobEntity j WHERE j.myStatus = :status")
Slice<BulkExportJobEntity> findByStatus(Pageable thePage, @Param("status") BulkExportJobStatusEnum theSubmitted);
@Query("SELECT j FROM BulkExportJobEntity j WHERE j.myExpiry < :cutoff")
Slice<BulkExportJobEntity> findByExpiry(Pageable thePage, @Param("cutoff") Date theCutoff);
@Query("SELECT j FROM BulkExportJobEntity j WHERE j.myExpiry IS NOT NULL and j.myExpiry < :cutoff AND j.myStatus <> 'BUILDING'")
Slice<BulkExportJobEntity> findNotRunningByExpiry(Pageable thePage, @Param("cutoff") Date theCutoff);
@Query("SELECT j FROM BulkExportJobEntity j WHERE j.myRequest = :request AND j.myCreated > :createdAfter AND j.myStatus <> :status ORDER BY j.myCreated DESC")
Slice<BulkExportJobEntity> findExistingJob(Pageable thePage, @Param("request") String theRequest, @Param("createdAfter") Date theCreatedAfter, @Param("status") BulkExportJobStatusEnum theNotStatus);
@Modifying
@Query("DELETE FROM BulkExportJobEntity t")
void deleteAllFiles();
@Modifying @Modifying
@Query("DELETE FROM BulkExportJobEntity t WHERE t.myId = :pid") @Query("DELETE FROM BulkExportJobEntity t WHERE t.myId = :pid")
void deleteByPid(@Param("pid") Long theId); void deleteByPid(@Param("pid") Long theId);

View File

@ -0,0 +1,382 @@
package ca.uhn.fhir.jpa.bulk.export.svc;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.BulkExportJobResults;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.util.Batch2JobDefinitionConstants;
import ca.uhn.fhir.util.JsonUtil;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseBinary;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Binary;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;
import org.mockito.stubbing.OngoingStubbing;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.util.Pair;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Nonnull;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class BulkDataExportJobSchedulingHelperImplTest {
@Mock
private DaoConfig myDaoConfig;
@Mock
private PlatformTransactionManager myTxManager;
@Mock
private TransactionTemplate myTxTemplate;
@Mock
private IJobPersistence myJpaJobPersistence;
@Mock
private BulkExportHelperService myBulkExportHelperSvc;
@Mock
private DaoRegistry myDaoRegistry;
@Mock
private IFhirResourceDao<IBaseBinary> myBinaryDao;
@Captor
private ArgumentCaptor<Date> myCutoffCaptor;
private BulkDataExportJobSchedulingHelperImpl myBulkDataExportJobSchedulingHelper;
private final FhirContext myFhirContext = FhirContext.forR4Cached();
@Test
public void testPurgeExpiredFilesDisabledDoesNothing() {
setupTestDisabled();
myBulkDataExportJobSchedulingHelper.purgeExpiredFiles();
verify(myJpaJobPersistence, never()).fetchInstance(anyString());
verify(myBulkExportHelperSvc, never()).toId(anyString());
verify(myJpaJobPersistence, never()).deleteInstanceAndChunks(anyString());
}
@Test
public void purgeExpiredFilesNothingToDeleteOneHourRetention() {
final int expectedRetentionHours = 1;
setupTestEnabled(expectedRetentionHours, List.of());
myBulkDataExportJobSchedulingHelper.purgeExpiredFiles();
verify(myJpaJobPersistence, never()).fetchInstance(anyString());
verify(myBulkExportHelperSvc, never()).toId(anyString());
verify(myBinaryDao, never()).delete(any(IIdType.class), any(SystemRequestDetails.class));
verify(myJpaJobPersistence, never()).deleteInstanceAndChunks(anyString());
final Date cutoffDate = myCutoffCaptor.getValue();
assertEquals(DateUtils.truncate(computeDateFromConfig(expectedRetentionHours), Calendar.SECOND), DateUtils.truncate(cutoffDate, Calendar.SECOND));
}
@Test
public void purgeExpiredFilesSingleJobSingleBinaryOneHourRetention_NULL_reportString() {
final int expectedRetentionHours = 1;
final int numBinariesPerJob = 1;
final List<JobInstance> jobInstances = getJobInstances(numBinariesPerJob, StatusEnum.COMPLETED);
jobInstances.get(0).setReport(null);
setupTestEnabledNoBinaries(expectedRetentionHours, jobInstances);
myBulkDataExportJobSchedulingHelper.purgeExpiredFiles();
for (JobInstance jobInstance : jobInstances) {
verify(myJpaJobPersistence).fetchInstance(jobInstance.getInstanceId());
verify(myBulkExportHelperSvc, never()).toId(anyString());
verify(myBinaryDao, never()).delete(any(IIdType.class), any(SystemRequestDetails.class));
verify(myJpaJobPersistence).deleteInstanceAndChunks(jobInstance.getInstanceId());
}
final Date cutoffDate = myCutoffCaptor.getValue();
assertEquals(DateUtils.truncate(computeDateFromConfig(expectedRetentionHours), Calendar.MINUTE), DateUtils.truncate(cutoffDate, Calendar.MINUTE));
}
@Test
public void purgeExpiredFilesSingleJobSingleBinaryOneHourRetention_BAD_reportString() {
final int expectedRetentionHours = 1;
final int numBinariesPerJob = 1;
final List<JobInstance> jobInstances = getJobInstances(numBinariesPerJob, StatusEnum.COMPLETED);
jobInstances.get(0).setReport("{garbage}");
setupTestEnabledNoBinaries(expectedRetentionHours, jobInstances);
myBulkDataExportJobSchedulingHelper.purgeExpiredFiles();
for (JobInstance jobInstance : jobInstances) {
verify(myJpaJobPersistence).fetchInstance(jobInstance.getInstanceId());
verify(myBulkExportHelperSvc, never()).toId(anyString());
verify(myBinaryDao, never()).delete(any(IIdType.class), any(SystemRequestDetails.class));
verify(myJpaJobPersistence).deleteInstanceAndChunks(jobInstance.getInstanceId());
}
final Date cutoffDate = myCutoffCaptor.getValue();
assertEquals(DateUtils.truncate(computeDateFromConfig(expectedRetentionHours), Calendar.MINUTE), DateUtils.truncate(cutoffDate, Calendar.MINUTE));
}
@Test
public void purgeExpiredFilesSingleJobSingleBinaryOneHourRetention() {
final int expectedRetentionHours = 1;
final int numBinariesPerJob = 1;
final List<JobInstance> jobInstances = getJobInstances(numBinariesPerJob, StatusEnum.COMPLETED);
setupTestEnabled(expectedRetentionHours, jobInstances);
myBulkDataExportJobSchedulingHelper.purgeExpiredFiles();
for (JobInstance jobInstance : jobInstances) {
verify(myJpaJobPersistence).fetchInstance(jobInstance.getInstanceId());
for (int index = 0; index < numBinariesPerJob; index++) {
verify(myBulkExportHelperSvc).toId(jobInstance.getInstanceId() + "-binary-" + index);
verify(myBinaryDao).delete(eq(toId(jobInstance.getInstanceId() + "-binary-" + index)), any(SystemRequestDetails.class));
}
verify(myJpaJobPersistence).deleteInstanceAndChunks(jobInstance.getInstanceId());
}
final Date cutoffDate = myCutoffCaptor.getValue();
assertEquals(DateUtils.truncate(computeDateFromConfig(expectedRetentionHours), Calendar.MINUTE), DateUtils.truncate(cutoffDate, Calendar.MINUTE));
}
@Test
public void purgeExpiredFilesSingleJobSingleBinaryOneHourRetentionStatusFailed() {
final int expectedRetentionHours = 1;
final int numBinariesPerJob = 1;
final List<JobInstance> jobInstances = getJobInstances(numBinariesPerJob, StatusEnum.COMPLETED);
setupTestEnabled(expectedRetentionHours, jobInstances);
myBulkDataExportJobSchedulingHelper.purgeExpiredFiles();
for (JobInstance jobInstance : jobInstances) {
verify(myJpaJobPersistence).fetchInstance(jobInstance.getInstanceId());
for (int index = 0; index < numBinariesPerJob; index++) {
verify(myBulkExportHelperSvc).toId(jobInstance.getInstanceId() + "-binary-" + index);
verify(myBinaryDao).delete(eq(toId(jobInstance.getInstanceId() + "-binary-" + index)), any(SystemRequestDetails.class));
}
verify(myJpaJobPersistence).deleteInstanceAndChunks(jobInstance.getInstanceId());
}
final Date cutoffDate = myCutoffCaptor.getValue();
assertEquals(DateUtils.truncate(computeDateFromConfig(expectedRetentionHours), Calendar.MINUTE), DateUtils.truncate(cutoffDate, Calendar.MINUTE));
}
@Test
public void purgeExpiredFilesSingleJobSingleBinaryTwoHourRetention() {
final int expectedRetentionHours = 2;
final int numBinariesPerJob = 1;
final List<JobInstance> jobInstances = getJobInstances(numBinariesPerJob, StatusEnum.COMPLETED);
setupTestEnabled(expectedRetentionHours, jobInstances);
myBulkDataExportJobSchedulingHelper.purgeExpiredFiles();
for (JobInstance jobInstance : jobInstances) {
verify(myJpaJobPersistence).fetchInstance(jobInstance.getInstanceId());
for (int index = 0; index < numBinariesPerJob; index++) {
verify(myBulkExportHelperSvc).toId(jobInstance.getInstanceId() + "-binary-" + index);
verify(myBinaryDao).delete(eq(toId(jobInstance.getInstanceId() + "-binary-" + index)), any(SystemRequestDetails.class));
}
verify(myJpaJobPersistence).deleteInstanceAndChunks(jobInstance.getInstanceId());
}
final Date cutoffDate = myCutoffCaptor.getValue();
assertEquals(DateUtils.truncate(computeDateFromConfig(expectedRetentionHours), Calendar.MINUTE), DateUtils.truncate(cutoffDate, Calendar.MINUTE));
}
@Test
public void purgeExpiredFilesMultipleJobsMultipleBinariesTwoHourRetention() {
final int expectedRetentionHours = 2;
final int numBinariesPerJob = 3;
final List<JobInstance> jobInstances = getJobInstances( numBinariesPerJob, StatusEnum.COMPLETED, StatusEnum.COMPLETED, StatusEnum.COMPLETED);
setupTestEnabled(expectedRetentionHours, jobInstances);
myBulkDataExportJobSchedulingHelper.purgeExpiredFiles();
for (JobInstance jobInstance : jobInstances) {
verify(myJpaJobPersistence).fetchInstance(jobInstance.getInstanceId());
for (int index = 0; index < numBinariesPerJob; index++) {
verify(myBulkExportHelperSvc).toId(jobInstance.getInstanceId() + "-binary-" + index);
verify(myBinaryDao).delete(eq(toId(jobInstance.getInstanceId() + "-binary-" + index)), any(SystemRequestDetails.class));
}
verify(myJpaJobPersistence).deleteInstanceAndChunks(jobInstance.getInstanceId());
}
final Date cutoffDate = myCutoffCaptor.getValue();
assertEquals(DateUtils.truncate(computeDateFromConfig(expectedRetentionHours), Calendar.MINUTE), DateUtils.truncate(cutoffDate, Calendar.MINUTE));
}
@Test
public void purgeExpiredFilesMultipleJobsMultipleBinariesTwoHourRetentionMixedStatuses() {
final int expectedRetentionHours = 2;
final int numBinariesPerJob = 3;
final List<JobInstance> jobInstances = getJobInstances( numBinariesPerJob, StatusEnum.COMPLETED, StatusEnum.FAILED, StatusEnum.COMPLETED);
setupTestEnabled(expectedRetentionHours, jobInstances);
myBulkDataExportJobSchedulingHelper.purgeExpiredFiles();
for (JobInstance jobInstance : jobInstances) {
verify(myJpaJobPersistence).fetchInstance(jobInstance.getInstanceId());
if (StatusEnum.FAILED != jobInstance.getStatus()) {
for (int index = 0; index < numBinariesPerJob; index++) {
verify(myBulkExportHelperSvc).toId(jobInstance.getInstanceId() + "-binary-" + index);
verify(myBinaryDao).delete(eq(toId(jobInstance.getInstanceId() + "-binary-" + index)), any(SystemRequestDetails.class));
}
verify(myJpaJobPersistence).deleteInstanceAndChunks(jobInstance.getInstanceId());
}
}
final Date cutoffDate = myCutoffCaptor.getValue();
assertEquals(DateUtils.truncate(computeDateFromConfig(expectedRetentionHours), Calendar.MINUTE), DateUtils.truncate(cutoffDate, Calendar.MINUTE));
}
@Nonnull
private static List<JobInstance> getJobInstances(int theNumBinaries, StatusEnum... theStatusEnums) {
return IntStream.range(0, theStatusEnums.length)
.mapToObj(index -> Pair.of(index, theStatusEnums[index]))
.map(pair -> {
final JobInstance jobInstance = new JobInstance();
final StatusEnum status = pair.getSecond();
final String instanceId = status.name() + pair.getFirst();
jobInstance.setInstanceId(instanceId);
jobInstance.setReport(serialize(getBulkExportJobResults(instanceId, theNumBinaries)));
jobInstance.setStatus(status);
return jobInstance;
}).toList();
}
private static String serialize(BulkExportJobResults theBulkExportJobResults) {
return JsonUtil.serialize(theBulkExportJobResults);
}
@Nonnull
private static BulkExportJobResults getBulkExportJobResults(String theInstanceId, int theNumBinaries) {
final BulkExportJobResults bulkExportJobResults = new BulkExportJobResults();
bulkExportJobResults.setResourceTypeToBinaryIds(Map.of("Patient",
IntStream.range(0, theNumBinaries)
.mapToObj(theInt -> theInstanceId + "-binary-" + theInt)
.toList()));
return bulkExportJobResults;
}
@Nonnull
private Date computeDateFromConfig(int theExpectedRetentionHours) {
return Date.from(LocalDateTime.now()
.minusHours(theExpectedRetentionHours)
.atZone(ZoneId.systemDefault())
.toInstant());
}
private void setupTestDisabled() {
setupTest(false, -1, List.of(), false);
}
private void setupTestEnabled(int theRetentionHours, List<JobInstance> theJobInstances) {
setupTest(true, theRetentionHours, theJobInstances, true);
}
private void setupTestEnabledNoBinaries(int theRetentionHours, List<JobInstance> theJobInstances) {
setupTest(true, theRetentionHours, theJobInstances, false);
}
private void setupTest(boolean theIsEnabled, int theRetentionHours, List<JobInstance> theJobInstances, boolean theIsEnableBinaryMocks) {
myBulkDataExportJobSchedulingHelper = new BulkDataExportJobSchedulingHelperImpl(myDaoRegistry, myTxManager, myDaoConfig, myBulkExportHelperSvc, myJpaJobPersistence, myTxTemplate);
when(myDaoConfig.isEnableTaskBulkExportJobExecution()).thenReturn(theIsEnabled);
if (!theIsEnabled) {
return;
}
final Answer<List<JobInstance>> fetchInstancesAnswer = theInvocationOnMock -> {
final TransactionCallback<List<JobInstance>> transactionCallback = theInvocationOnMock.getArgument(0);
return transactionCallback.doInTransaction(null);
};
final Answer<Void> purgeExpiredJobsAnswer = theInvocationOnMock -> {
final TransactionCallback<Optional<JobInstance>> transactionCallback = theInvocationOnMock.getArgument(0);
transactionCallback.doInTransaction(null);
return null;
};
when(myJpaJobPersistence.fetchInstances(eq(Batch2JobDefinitionConstants.BULK_EXPORT),
eq(StatusEnum.getEndedStatuses()),
myCutoffCaptor.capture(),
any(PageRequest.class)))
.thenReturn(theJobInstances);
when(myTxTemplate.execute(any()))
.thenAnswer(fetchInstancesAnswer).thenAnswer(purgeExpiredJobsAnswer);
when(myDaoConfig.getBulkExportFileRetentionPeriodHours())
.thenReturn(theRetentionHours);
if (theJobInstances.isEmpty()) {
return;
}
OngoingStubbing<Optional<JobInstance>> when = when(myJpaJobPersistence.fetchInstance(anyString()));
for (JobInstance jobInstance : theJobInstances) {
when = when.thenReturn(Optional.of(jobInstance));
}
if (!theIsEnableBinaryMocks) {
return;
}
when(myBulkExportHelperSvc.toId(anyString()))
.thenAnswer(theInvocationOnMock -> toId(theInvocationOnMock.getArgument(0)));
when(myDaoRegistry.getResourceDao(Binary.class.getSimpleName())).thenReturn(myBinaryDao);
}
private IIdType toId(String theResourceId) {
final IIdType retVal = myFhirContext.getVersion().newIdType();
retVal.setValue(theResourceId);
return retVal;
}
}

View File

@ -21,8 +21,11 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
@ -133,6 +136,86 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
}); });
} }
@Test
public void testFetchInstanceWithStatusAndCutoff_statues() {
myCaptureQueriesListener.clear();
final String completedId = storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 1);
final String failedId = storeJobInstanceAndUpdateWithEndTime(StatusEnum.FAILED, 1);
final String erroredId = storeJobInstanceAndUpdateWithEndTime(StatusEnum.ERRORED, 1);
final String cancelledId = storeJobInstanceAndUpdateWithEndTime(StatusEnum.CANCELLED, 1);
storeJobInstanceAndUpdateWithEndTime(StatusEnum.QUEUED, 1);
storeJobInstanceAndUpdateWithEndTime(StatusEnum.IN_PROGRESS, 1);
storeJobInstanceAndUpdateWithEndTime(StatusEnum.FINALIZE, 1);
final LocalDateTime cutoffLocalDateTime = LocalDateTime.now()
.minusMinutes(0);
final Date cutoffDate = Date.from(cutoffLocalDateTime
.atZone(ZoneId.systemDefault())
.toInstant());
final List<JobInstance> jobInstancesByCutoff =
mySvc.fetchInstances(JOB_DEFINITION_ID, StatusEnum.getEndedStatuses(), cutoffDate, PageRequest.of(0, 100));
assertEquals(Set.of(completedId, failedId, erroredId, cancelledId),
jobInstancesByCutoff.stream()
.map(JobInstance::getInstanceId)
.collect(Collectors.toUnmodifiableSet()));
}
@Test
public void testFetchInstanceWithStatusAndCutoff_cutoffs() {
myCaptureQueriesListener.clear();
storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 3);
storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 4);
final String sevenMinutesAgoId = storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 7);
final String eightMinutesAgoId = storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 8);
final LocalDateTime cutoffLocalDateTime = LocalDateTime.now()
.minusMinutes(6);
final Date cutoffDate = Date.from(cutoffLocalDateTime
.atZone(ZoneId.systemDefault())
.toInstant());
final List<JobInstance> jobInstancesByCutoff =
mySvc.fetchInstances(JOB_DEFINITION_ID, StatusEnum.getEndedStatuses(), cutoffDate, PageRequest.of(0, 100));
myCaptureQueriesListener.logSelectQueries();
myCaptureQueriesListener.getSelectQueries().forEach(query -> ourLog.info("query: {}", query.getSql(true, true)));
assertEquals(Set.of(sevenMinutesAgoId, eightMinutesAgoId),
jobInstancesByCutoff.stream()
.map(JobInstance::getInstanceId)
.collect(Collectors.toUnmodifiableSet()));
}
@Test
public void testFetchInstanceWithStatusAndCutoff_pages() {
final String job1 = storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 5);
final String job2 = storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 5);
storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 5);
storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 5);
storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 5);
storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 5);
final LocalDateTime cutoffLocalDateTime = LocalDateTime.now()
.minusMinutes(0);
final Date cutoffDate = Date.from(cutoffLocalDateTime
.atZone(ZoneId.systemDefault())
.toInstant());
final List<JobInstance> jobInstancesByCutoff =
mySvc.fetchInstances(JOB_DEFINITION_ID, StatusEnum.getEndedStatuses(), cutoffDate, PageRequest.of(0, 2));
assertEquals(Set.of(job1, job2),
jobInstancesByCutoff.stream()
.map(JobInstance::getInstanceId)
.collect(Collectors.toUnmodifiableSet()));
}
/** /**
* Returns a set of statuses, and whether they should be successfully picked up and started by a consumer. * Returns a set of statuses, and whether they should be successfully picked up and started by a consumer.
* @return * @return
@ -548,4 +631,29 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
return instance; return instance;
} }
@Nonnull
private String storeJobInstanceAndUpdateWithEndTime(StatusEnum theStatus, int minutes) {
final JobInstance jobInstance = new JobInstance();
jobInstance.setJobDefinitionId(JOB_DEFINITION_ID);
jobInstance.setStatus(theStatus);
jobInstance.setJobDefinitionVersion(JOB_DEF_VER);
jobInstance.setParameters(CHUNK_DATA);
jobInstance.setReport("TEST");
final String id = mySvc.storeNewInstance(jobInstance);
jobInstance.setInstanceId(id);
final LocalDateTime localDateTime = LocalDateTime.now()
.minusMinutes(minutes);
ourLog.info("localDateTime: {}", localDateTime);
jobInstance.setEndTime(Date.from(localDateTime
.atZone(ZoneId.systemDefault())
.toInstant()));
mySvc.updateInstance(jobInstance);
return id;
}
} }

View File

@ -36,6 +36,8 @@ import org.springframework.context.annotation.Scope;
@Configuration @Configuration
public class BulkExportAppCtx { public class BulkExportAppCtx {
public static final String WRITE_TO_BINARIES = "write-to-binaries";
@Bean @Bean
public JobDefinition bulkExportJobDefinition() { public JobDefinition bulkExportJobDefinition() {
JobDefinition.Builder<IModelJson, VoidModel> builder = JobDefinition.newBuilder(); JobDefinition.Builder<IModelJson, VoidModel> builder = JobDefinition.newBuilder();
@ -63,7 +65,7 @@ public class BulkExportAppCtx {
) )
// write binaries and save to db // write binaries and save to db
.addIntermediateStep( .addIntermediateStep(
"write-to-binaries", WRITE_TO_BINARIES,
"Writes the expanded resources to the binaries and saves", "Writes the expanded resources to the binaries and saves",
BulkExportBinaryFileId.class, BulkExportBinaryFileId.class,
writeBinaryStep() writeBinaryStep()

View File

@ -27,10 +27,13 @@ import ca.uhn.fhir.batch2.model.MarkWorkChunkAsErrorRequest;
import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.i18n.Msg;
import org.springframework.data.domain.Page; import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@ -73,6 +76,10 @@ public interface IJobPersistence {
*/ */
Optional<JobInstance> fetchInstance(String theInstanceId); Optional<JobInstance> fetchInstance(String theInstanceId);
default List<JobInstance> fetchInstances(String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable) {
throw new UnsupportedOperationException(Msg.code(2271) + "Unsupported operation in this implementation");
}
/** /**
* Fetches any existing jobs matching provided request parameters * Fetches any existing jobs matching provided request parameters
* @return * @return

View File

@ -29,7 +29,9 @@ import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import org.springframework.data.domain.Page; import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@ -66,6 +68,11 @@ public class SynchronizedJobPersistenceWrapper implements IJobPersistence {
return myWrap.fetchInstance(theInstanceId); return myWrap.fetchInstance(theInstanceId);
} }
@Override
public List<JobInstance> fetchInstances(String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable) {
return myWrap.fetchInstances(theJobDefinitionId, theStatuses, theCutoff, thePageable);
}
@Override @Override
public synchronized List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int theStart, int theBatchSize) { public synchronized List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int theStart, int theBatchSize) {
return myWrap.fetchInstances(theRequest, theStart, theBatchSize); return myWrap.fetchInstances(theRequest, theStart, theBatchSize);

View File

@ -35,5 +35,6 @@ public interface IBulkDataExportJobSchedulingHelper {
* Stops all invoked jobs, and then purges them. * Stops all invoked jobs, and then purges them.
*/ */
@Transactional(propagation = Propagation.NEVER) @Transactional(propagation = Propagation.NEVER)
@Deprecated
void cancelAndPurgeAllJobs(); void cancelAndPurgeAllJobs();
} }