From a2133f6940ca4d029940a26f824e7b3c1fa55348 Mon Sep 17 00:00:00 2001 From: Tadgh Date: Thu, 3 Mar 2022 06:37:40 -0800 Subject: [PATCH] Break a couple Circular Bean Dependencies (#3437) * Rely on context refresh event instead of postconstruct. Load bean lazily. * Remove parent autowire * Break termcodesystem circular dependency with new bean * Break cycle between BaseTermReadSvcImpl and TermCodeSystemStorageSvcImpl * move back into helper method * Rip out scheduling and submission to spring batch away from BulkDataExportSvcImpl * wip * Fix bean naming * Revert * re-add autowiring * Reformat * docs and reformat * Back out change * feedback from review * merge conflicts --- .../ca/uhn/fhir/util/SearchParameterUtil.java | 9 + ...BulkDataExportJobSchedulingHelperImpl.java | 299 ++++++++++++++++++ .../export/svc/BulkDataExportSvcImpl.java | 295 +---------------- .../ca/uhn/fhir/jpa/config/JpaConfig.java | 8 + .../jpa/config/SharedConfigDstu3Plus.java | 6 + .../jpa/dao/index/JpaIdHelperService.java | 10 +- .../fhir/jpa/term/BaseTermReadSvcImpl.java | 22 +- .../term/TermCodeSystemStorageSvcImpl.java | 49 +-- .../uhn/fhir/jpa/term/TermConceptDaoSvc.java | 110 +++++++ .../jpa/term/TermDeferredStorageSvcImpl.java | 11 +- .../fhir/jpa/term/TermReindexingSvcImpl.java | 6 +- .../term/api/ITermCodeSystemStorageSvc.java | 1 - .../jpa/bulk/BulkDataExportSvcImplR4Test.java | 44 +-- .../java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java | 7 +- .../fhir/jpa/dao/dstu2/BaseJpaDstu2Test.java | 5 +- .../fhir/jpa/dao/dstu3/BaseJpaDstu3Test.java | 7 +- .../ca/uhn/fhir/jpa/dao/r4/BaseJpaR4Test.java | 5 +- ...esourceDaoR4SearchWithElasticSearchIT.java | 6 +- ...urceDaoR4SearchWithLuceneDisabledTest.java | 5 +- ...sourceDaoR4TerminologyElasticsearchIT.java | 5 +- .../ca/uhn/fhir/jpa/dao/r5/BaseJpaR5Test.java | 5 +- .../TokenAutocompleteElasticsearchIT.java | 5 +- ...stractValueSetFreeTextExpansionR4Test.java | 7 +- .../term/TermDeferredStorageSvcImplTest.java | 26 +- .../ValueSetExpansionR4ElasticsearchIT.java | 5 +- .../registry/SearchParamRegistryImpl.java | 10 + .../IBulkDataExportJobSchedulingHelper.java | 25 ++ .../bulk/export/api/IBulkDataExportSvc.java | 7 - .../job/CreateBulkExportEntityTasklet.java | 3 +- 29 files changed, 583 insertions(+), 420 deletions(-) create mode 100644 hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/BulkDataExportJobSchedulingHelperImpl.java create mode 100644 hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermConceptDaoSvc.java create mode 100644 hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/api/IBulkDataExportJobSchedulingHelper.java diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java index 0f05da79ab8..024e948ae47 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java @@ -110,6 +110,15 @@ public class SearchParameterUtil { } + /** + * Return true if any search parameter in the resource can point at a patient, false otherwise + */ + public static boolean isResourceTypeInPatientCompartment(FhirContext theFhirContext, String theResourceType) { + RuntimeResourceDefinition runtimeResourceDefinition = theFhirContext.getResourceDefinition(theResourceType); + return getAllPatientCompartmentRuntimeSearchParams(runtimeResourceDefinition).size() > 0; + } + + @Nullable public static String getCode(FhirContext theContext, IBaseResource theResource) { return getStringChild(theContext, theResource, "code"); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/BulkDataExportJobSchedulingHelperImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/BulkDataExportJobSchedulingHelperImpl.java new file mode 100644 index 00000000000..d81854bcb65 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/BulkDataExportJobSchedulingHelperImpl.java @@ -0,0 +1,299 @@ +package ca.uhn.fhir.jpa.bulk.export.svc; + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.api.config.DaoConfig; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.api.model.ExpungeOptions; +import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; +import ca.uhn.fhir.jpa.batch.config.BatchConstants; +import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper; +import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum; +import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao; +import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao; +import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; +import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity; +import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity; +import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; +import ca.uhn.fhir.jpa.model.sched.HapiJob; +import ca.uhn.fhir.jpa.model.sched.ISchedulerService; +import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; +import ca.uhn.fhir.jpa.model.util.JpaConstants; +import ca.uhn.fhir.jpa.partition.SystemRequestDetails; +import ca.uhn.fhir.util.UrlUtil; +import org.apache.commons.lang3.time.DateUtils; +import org.hl7.fhir.instance.model.api.IBaseBinary; +import org.hl7.fhir.instance.model.api.IIdType; +import org.quartz.JobExecutionContext; +import org.slf4j.Logger; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.JobParametersInvalidException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Pageable; +import org.springframework.data.domain.Slice; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.support.TransactionTemplate; + +import javax.annotation.PostConstruct; +import javax.transaction.Transactional; +import java.util.Date; +import java.util.Map; +import java.util.Optional; + +import static org.slf4j.LoggerFactory.getLogger; + +public class BulkDataExportJobSchedulingHelperImpl implements IBulkDataExportJobSchedulingHelper { + private static final Logger ourLog = getLogger(BulkDataExportJobSchedulingHelperImpl.class); + + private static final Long READ_CHUNK_SIZE = 10L; + + @Autowired + private DaoRegistry myDaoRegistry; + + @Autowired + private IBatchJobSubmitter myJobSubmitter; + + @Autowired + private IBulkExportCollectionDao myBulkExportCollectionDao; + + @Autowired + private IBulkExportCollectionFileDao myBulkExportCollectionFileDao; + + @Autowired + private PlatformTransactionManager myTxManager; + private TransactionTemplate myTxTemplate; + + @Autowired + private ISchedulerService mySchedulerService; + + @Autowired + @Qualifier(BatchConstants.BULK_EXPORT_JOB_NAME) + private org.springframework.batch.core.Job myBulkExportJob; + + @Autowired + @Qualifier(BatchConstants.GROUP_BULK_EXPORT_JOB_NAME) + private org.springframework.batch.core.Job myGroupBulkExportJob; + + @Autowired + @Qualifier(BatchConstants.PATIENT_BULK_EXPORT_JOB_NAME) + private org.springframework.batch.core.Job myPatientBulkExportJob; + + @Autowired + private IBulkExportJobDao myBulkExportJobDao; + + @Autowired + private DaoConfig myDaoConfig; + + @Autowired + private FhirContext myContext; + + @PostConstruct + public void start() { + myTxTemplate = new TransactionTemplate(myTxManager); + + ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); + jobDetail.setId(Job.class.getName()); + jobDetail.setJobClass(Job.class); + mySchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail); + + jobDetail = new ScheduledJobDefinition(); + jobDetail.setId(PurgeExpiredFilesJob.class.getName()); + jobDetail.setJobClass(PurgeExpiredFilesJob.class); + mySchedulerService.scheduleClusteredJob(DateUtils.MILLIS_PER_HOUR, jobDetail); + } + + /** + * This method is called by the scheduler to run a pass of the + * generator + */ + @Transactional(value = Transactional.TxType.NEVER) + @Override + public synchronized void startSubmittedJobs() { + if (!myDaoConfig.isEnableTaskBulkExportJobExecution()) { + return; + } + + Optional jobToProcessOpt = myTxTemplate.execute(t -> { + Pageable page = PageRequest.of(0, 1); + Slice submittedJobs = myBulkExportJobDao.findByStatus(page, BulkExportJobStatusEnum.SUBMITTED); + if (submittedJobs.isEmpty()) { + return Optional.empty(); + } + return Optional.of(submittedJobs.getContent().get(0)); + }); + + if (!jobToProcessOpt.isPresent()) { + return; + } + + BulkExportJobEntity bulkExportJobEntity = jobToProcessOpt.get(); + + String jobUuid = bulkExportJobEntity.getJobId(); + try { + processJob(bulkExportJobEntity); + } catch (Exception e) { + ourLog.error("Failure while preparing bulk export extract", e); + myTxTemplate.execute(t -> { + Optional submittedJobs = myBulkExportJobDao.findByJobId(jobUuid); + if (submittedJobs.isPresent()) { + BulkExportJobEntity jobEntity = submittedJobs.get(); + jobEntity.setStatus(BulkExportJobStatusEnum.ERROR); + jobEntity.setStatusMessage(e.getMessage()); + myBulkExportJobDao.save(jobEntity); + } + return null; + }); + } + + } + + + + + + @Override + @Transactional(Transactional.TxType.NEVER) + public synchronized void cancelAndPurgeAllJobs() { + myTxTemplate.execute(t -> { + ourLog.info("Deleting all files"); + myBulkExportCollectionFileDao.deleteAllFiles(); + ourLog.info("Deleting all collections"); + myBulkExportCollectionDao.deleteAllFiles(); + ourLog.info("Deleting all jobs"); + myBulkExportJobDao.deleteAllFiles(); + return null; + }); + } + + /** + * This method is called by the scheduler to run a pass of the + * generator + */ + @Transactional(value = Transactional.TxType.NEVER) + @Override + public void purgeExpiredFiles() { + if (!myDaoConfig.isEnableTaskBulkExportJobExecution()) { + return; + } + + Optional jobToDelete = myTxTemplate.execute(t -> { + Pageable page = PageRequest.of(0, 1); + Slice submittedJobs = myBulkExportJobDao.findByExpiry(page, new Date()); + if (submittedJobs.isEmpty()) { + return Optional.empty(); + } + return Optional.of(submittedJobs.getContent().get(0)); + }); + + if (jobToDelete.isPresent()) { + + ourLog.info("Deleting bulk export job: {}", jobToDelete.get()); + + myTxTemplate.execute(t -> { + + BulkExportJobEntity job = myBulkExportJobDao.getOne(jobToDelete.get().getId()); + for (BulkExportCollectionEntity nextCollection : job.getCollections()) { + for (BulkExportCollectionFileEntity nextFile : nextCollection.getFiles()) { + + ourLog.info("Purging bulk data file: {}", nextFile.getResourceId()); + getBinaryDao().delete(toId(nextFile.getResourceId()), new SystemRequestDetails()); + getBinaryDao().forceExpungeInExistingTransaction(toId(nextFile.getResourceId()), new ExpungeOptions().setExpungeDeletedResources(true).setExpungeOldVersions(true), new SystemRequestDetails()); + myBulkExportCollectionFileDao.deleteByPid(nextFile.getId()); + + } + + myBulkExportCollectionDao.deleteByPid(nextCollection.getId()); + } + + ourLog.debug("*** About to delete job with ID {}", job.getId()); + myBulkExportJobDao.deleteByPid(job.getId()); + return null; + }); + + ourLog.info("Finished deleting bulk export job: {}", jobToDelete.get()); + } + } + + @SuppressWarnings("unchecked") + private IFhirResourceDao getBinaryDao() { + return myDaoRegistry.getResourceDao("Binary"); + } + + private IIdType toId(String theResourceId) { + IIdType retVal = myContext.getVersion().newIdType(); + retVal.setValue(theResourceId); + return retVal; + } + + private void processJob(BulkExportJobEntity theBulkExportJobEntity) { + String theJobUuid = theBulkExportJobEntity.getJobId(); + JobParametersBuilder parameters = new JobParametersBuilder() + .addString(BatchConstants.JOB_UUID_PARAMETER, theJobUuid) + .addLong(BatchConstants.READ_CHUNK_PARAMETER, READ_CHUNK_SIZE); + + ourLog.info("Submitting bulk export job {} to job scheduler", theJobUuid); + + try { + if (isGroupBulkJob(theBulkExportJobEntity)) { + enhanceBulkParametersWithGroupParameters(theBulkExportJobEntity, parameters); + myJobSubmitter.runJob(myGroupBulkExportJob, parameters.toJobParameters()); + } else if (isPatientBulkJob(theBulkExportJobEntity)) { + myJobSubmitter.runJob(myPatientBulkExportJob, parameters.toJobParameters()); + } else { + myJobSubmitter.runJob(myBulkExportJob, parameters.toJobParameters()); + } + } catch (JobParametersInvalidException theE) { + ourLog.error("Unable to start job with UUID: {}, the parameters are invalid. {}", theJobUuid, theE.getMessage()); + } + } + + private String getQueryParameterIfPresent(String theRequestString, String theParameter) { + Map stringMap = UrlUtil.parseQueryString(theRequestString); + if (stringMap != null) { + String[] strings = stringMap.get(theParameter); + if (strings != null) { + return String.join(",", strings); + } + } + return null; + + } + + private boolean isPatientBulkJob(BulkExportJobEntity theBulkExportJobEntity) { + return theBulkExportJobEntity.getRequest().startsWith("/Patient/"); + } + + private boolean isGroupBulkJob(BulkExportJobEntity theBulkExportJobEntity) { + return theBulkExportJobEntity.getRequest().startsWith("/Group/"); + } + + private void enhanceBulkParametersWithGroupParameters(BulkExportJobEntity theBulkExportJobEntity, JobParametersBuilder theParameters) { + String theGroupId = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_GROUP_ID); + String expandMdm = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_MDM); + theParameters.addString(BatchConstants.GROUP_ID_PARAMETER, theGroupId); + theParameters.addString(BatchConstants.EXPAND_MDM_PARAMETER, expandMdm); + } + + public static class Job implements HapiJob { + @Autowired + private IBulkDataExportJobSchedulingHelper myTarget; + + @Override + public void execute(JobExecutionContext theContext) { + myTarget.startSubmittedJobs(); + } + } + + public static class PurgeExpiredFilesJob implements HapiJob { + @Autowired + private IBulkDataExportJobSchedulingHelper myTarget; + + @Override + public void execute(JobExecutionContext theContext) { + myTarget.purgeExpiredFiles(); + } + } +} + diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/BulkDataExportSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/BulkDataExportSvcImpl.java index 34f9e8e2899..e81e9132d52 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/BulkDataExportSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/BulkDataExportSvcImpl.java @@ -20,32 +20,20 @@ package ca.uhn.fhir.jpa.bulk.export.svc; * #L% */ -import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.context.FhirContext; -import ca.uhn.fhir.context.RuntimeResourceDefinition; -import ca.uhn.fhir.context.RuntimeSearchParam; +import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.interceptor.api.Pointcut; -import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; -import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; -import ca.uhn.fhir.jpa.api.model.ExpungeOptions; -import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; -import ca.uhn.fhir.jpa.batch.config.BatchConstants; import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao; -import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity; import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity; import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; -import ca.uhn.fhir.jpa.model.sched.HapiJob; -import ca.uhn.fhir.jpa.model.sched.ISchedulerService; -import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.model.util.JpaConstants; -import ca.uhn.fhir.jpa.partition.SystemRequestDetails; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions; @@ -53,30 +41,19 @@ import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; -import ca.uhn.fhir.util.UrlUtil; +import ca.uhn.fhir.util.SearchParameterUtil; import org.apache.commons.lang3.time.DateUtils; -import org.hl7.fhir.instance.model.api.IBaseBinary; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.r4.model.InstantType; -import org.quartz.JobExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.batch.core.JobParametersBuilder; -import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Slice; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.transaction.support.TransactionTemplate; -import javax.annotation.PostConstruct; import javax.transaction.Transactional; import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -90,7 +67,6 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; public class BulkDataExportSvcImpl implements IBulkDataExportSvc { - private static final Long READ_CHUNK_SIZE = 10L; private static final Logger ourLog = LoggerFactory.getLogger(BulkDataExportSvcImpl.class); private final int myReuseBulkExportForMillis = (int) (60 * DateUtils.MILLIS_PER_MINUTE); @@ -99,205 +75,14 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { @Autowired private IBulkExportCollectionDao myBulkExportCollectionDao; @Autowired - private IBulkExportCollectionFileDao myBulkExportCollectionFileDao; - @Autowired - private ISchedulerService mySchedulerService; - @Autowired private DaoRegistry myDaoRegistry; @Autowired private FhirContext myContext; - @Autowired - private PlatformTransactionManager myTxManager; - private TransactionTemplate myTxTemplate; - - @Autowired - private IBatchJobSubmitter myJobSubmitter; - - @Autowired - @Qualifier(BatchConstants.BULK_EXPORT_JOB_NAME) - private org.springframework.batch.core.Job myBulkExportJob; - - @Autowired - @Qualifier(BatchConstants.GROUP_BULK_EXPORT_JOB_NAME) - private org.springframework.batch.core.Job myGroupBulkExportJob; - - @Autowired - @Qualifier(BatchConstants.PATIENT_BULK_EXPORT_JOB_NAME) - private org.springframework.batch.core.Job myPatientBulkExportJob; private Set myCompartmentResources; private final int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR); - /** - * This method is called by the scheduler to run a pass of the - * generator - */ - @Transactional(value = Transactional.TxType.NEVER) - @Override - public synchronized void buildExportFiles() { - if (!myDaoConfig.isEnableTaskBulkExportJobExecution()) { - return; - } - - Optional jobToProcessOpt = myTxTemplate.execute(t -> { - Pageable page = PageRequest.of(0, 1); - Slice submittedJobs = myBulkExportJobDao.findByStatus(page, BulkExportJobStatusEnum.SUBMITTED); - if (submittedJobs.isEmpty()) { - return Optional.empty(); - } - return Optional.of(submittedJobs.getContent().get(0)); - }); - - if (!jobToProcessOpt.isPresent()) { - return; - } - - BulkExportJobEntity bulkExportJobEntity = jobToProcessOpt.get(); - - String jobUuid = bulkExportJobEntity.getJobId(); - try { - processJob(bulkExportJobEntity); - } catch (Exception e) { - ourLog.error("Failure while preparing bulk export extract", e); - myTxTemplate.execute(t -> { - Optional submittedJobs = myBulkExportJobDao.findByJobId(jobUuid); - if (submittedJobs.isPresent()) { - BulkExportJobEntity jobEntity = submittedJobs.get(); - jobEntity.setStatus(BulkExportJobStatusEnum.ERROR); - jobEntity.setStatusMessage(e.getMessage()); - myBulkExportJobDao.save(jobEntity); - } - return null; - }); - } - - } - - private String getQueryParameterIfPresent(String theRequestString, String theParameter) { - Map stringMap = UrlUtil.parseQueryString(theRequestString); - if (stringMap != null) { - String[] strings = stringMap.get(theParameter); - if (strings != null) { - return String.join(",", strings); - } - } - return null; - - } - - @Autowired - private DaoConfig myDaoConfig; - - /** - * This method is called by the scheduler to run a pass of the - * generator - */ - @Transactional(value = Transactional.TxType.NEVER) - @Override - public void purgeExpiredFiles() { - if (!myDaoConfig.isEnableTaskBulkExportJobExecution()) { - return; - } - - Optional jobToDelete = myTxTemplate.execute(t -> { - Pageable page = PageRequest.of(0, 1); - Slice submittedJobs = myBulkExportJobDao.findByExpiry(page, new Date()); - if (submittedJobs.isEmpty()) { - return Optional.empty(); - } - return Optional.of(submittedJobs.getContent().get(0)); - }); - - if (jobToDelete.isPresent()) { - - ourLog.info("Deleting bulk export job: {}", jobToDelete.get()); - - myTxTemplate.execute(t -> { - - BulkExportJobEntity job = myBulkExportJobDao.getOne(jobToDelete.get().getId()); - for (BulkExportCollectionEntity nextCollection : job.getCollections()) { - for (BulkExportCollectionFileEntity nextFile : nextCollection.getFiles()) { - - ourLog.info("Purging bulk data file: {}", nextFile.getResourceId()); - getBinaryDao().delete(toId(nextFile.getResourceId()), new SystemRequestDetails()); - getBinaryDao().forceExpungeInExistingTransaction(toId(nextFile.getResourceId()), new ExpungeOptions().setExpungeDeletedResources(true).setExpungeOldVersions(true), new SystemRequestDetails()); - myBulkExportCollectionFileDao.deleteByPid(nextFile.getId()); - - } - - myBulkExportCollectionDao.deleteByPid(nextCollection.getId()); - } - - ourLog.info("*** ABOUT TO DELETE"); - myBulkExportJobDao.deleteByPid(job.getId()); - return null; - }); - - ourLog.info("Finished deleting bulk export job: {}", jobToDelete.get()); - - } - - } - - private void processJob(BulkExportJobEntity theBulkExportJobEntity) { - String theJobUuid = theBulkExportJobEntity.getJobId(); - JobParametersBuilder parameters = new JobParametersBuilder() - .addString(BatchConstants.JOB_UUID_PARAMETER, theJobUuid) - .addLong(BatchConstants.READ_CHUNK_PARAMETER, READ_CHUNK_SIZE); - - ourLog.info("Submitting bulk export job {} to job scheduler", theJobUuid); - - try { - if (isGroupBulkJob(theBulkExportJobEntity)) { - enhanceBulkParametersWithGroupParameters(theBulkExportJobEntity, parameters); - myJobSubmitter.runJob(myGroupBulkExportJob, parameters.toJobParameters()); - } else if (isPatientBulkJob(theBulkExportJobEntity)) { - myJobSubmitter.runJob(myPatientBulkExportJob, parameters.toJobParameters()); - } else { - myJobSubmitter.runJob(myBulkExportJob, parameters.toJobParameters()); - } - } catch (JobParametersInvalidException theE) { - ourLog.error("Unable to start job with UUID: {}, the parameters are invalid. {}", theJobUuid, theE.getMessage()); - } - } - - private boolean isPatientBulkJob(BulkExportJobEntity theBulkExportJobEntity) { - return theBulkExportJobEntity.getRequest().startsWith("/Patient/"); - } - - private boolean isGroupBulkJob(BulkExportJobEntity theBulkExportJobEntity) { - return theBulkExportJobEntity.getRequest().startsWith("/Group/"); - } - - private void enhanceBulkParametersWithGroupParameters(BulkExportJobEntity theBulkExportJobEntity, JobParametersBuilder theParameters) { - String theGroupId = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_GROUP_ID); - String expandMdm = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_MDM); - theParameters.addString(BatchConstants.GROUP_ID_PARAMETER, theGroupId); - theParameters.addString(BatchConstants.EXPAND_MDM_PARAMETER, expandMdm); - } - - - @SuppressWarnings("unchecked") - private IFhirResourceDao getBinaryDao() { - return myDaoRegistry.getResourceDao("Binary"); - } - - @PostConstruct - public void start() { - myTxTemplate = new TransactionTemplate(myTxManager); - - ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); - jobDetail.setId(Job.class.getName()); - jobDetail.setJobClass(Job.class); - mySchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail); - - jobDetail = new ScheduledJobDefinition(); - jobDetail.setId(PurgeExpiredFilesJob.class.getName()); - jobDetail.setJobClass(PurgeExpiredFilesJob.class); - mySchedulerService.scheduleClusteredJob(DateUtils.MILLIS_PER_HOUR, jobDetail); - } - @Transactional @Override @Deprecated @@ -391,32 +176,32 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { .filter(t -> !"Binary".equals(t)) .collect(Collectors.toSet()); - BulkExportJobEntity job = new BulkExportJobEntity(); - job.setJobId(UUID.randomUUID().toString()); - job.setStatus(BulkExportJobStatusEnum.SUBMITTED); - job.setSince(since); - job.setCreated(new Date()); - job.setRequest(request); + BulkExportJobEntity jobEntity = new BulkExportJobEntity(); + jobEntity.setJobId(UUID.randomUUID().toString()); + jobEntity.setStatus(BulkExportJobStatusEnum.SUBMITTED); + jobEntity.setSince(since); + jobEntity.setCreated(new Date()); + jobEntity.setRequest(request); // Validate types validateTypes(resourceTypes); validateTypeFilters(theBulkDataExportOptions.getFilters(), resourceTypes); - updateExpiry(job); - myBulkExportJobDao.save(job); + updateExpiry(jobEntity); + myBulkExportJobDao.save(jobEntity); for (String nextType : resourceTypes) { BulkExportCollectionEntity collection = new BulkExportCollectionEntity(); - collection.setJob(job); + collection.setJob(jobEntity); collection.setResourceType(nextType); - job.getCollections().add(collection); + jobEntity.getCollections().add(collection); myBulkExportCollectionDao.save(collection); } - ourLog.info("Bulk export job submitted: {}", job.toString()); + ourLog.info("Bulk export job submitted: {}", jobEntity.toString()); - return toSubmittedJobInfo(job); + return toSubmittedJobInfo(jobEntity); } public void validateTypes(Set theResourceTypes) { @@ -484,21 +269,12 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { public Set getPatientCompartmentResources() { if (myCompartmentResources == null) { myCompartmentResources = myContext.getResourceTypes().stream() - .filter(this::resourceTypeIsInPatientCompartment) + .filter(resType -> SearchParameterUtil.isResourceTypeInPatientCompartment(myContext, resType)) .collect(Collectors.toSet()); } return myCompartmentResources; } - /** - * Return true if any search parameter in the resource can point at a patient, false otherwise - */ - private boolean resourceTypeIsInPatientCompartment(String theResourceType) { - RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(theResourceType); - List searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient"); - return searchParams != null && searchParams.size() >= 1; - } - public Set getAllowedResourceTypesForBulkExportStyle(BulkDataExportOptions.ExportStyle theExportStyle) { if (theExportStyle.equals(SYSTEM)) { return myContext.getResourceTypes(); @@ -509,51 +285,10 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { } } - private IIdType toId(String theResourceId) { - IIdType retVal = myContext.getVersion().newIdType(); - retVal.setValue(theResourceId); - return retVal; - } - private IIdType toQualifiedBinaryId(String theIdPart) { IIdType retVal = myContext.getVersion().newIdType(); retVal.setParts(null, "Binary", theIdPart, null); return retVal; } - - @Override - @Transactional(Transactional.TxType.NEVER) - public synchronized void cancelAndPurgeAllJobs() { - myTxTemplate.execute(t -> { - ourLog.info("Deleting all files"); - myBulkExportCollectionFileDao.deleteAllFiles(); - ourLog.info("Deleting all collections"); - myBulkExportCollectionDao.deleteAllFiles(); - ourLog.info("Deleting all jobs"); - myBulkExportJobDao.deleteAllFiles(); - return null; - }); - } - - public static class Job implements HapiJob { - @Autowired - private IBulkDataExportSvc myTarget; - - @Override - public void execute(JobExecutionContext theContext) { - myTarget.buildExportFiles(); - } - } - - public static class PurgeExpiredFilesJob implements HapiJob { - @Autowired - private IBulkDataExportSvc myTarget; - - @Override - public void execute(JobExecutionContext theContext) { - myTarget.purgeExpiredFiles(); - } - } - } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/JpaConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/JpaConfig.java index cd0ba886c30..1ce6b4a17d2 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/JpaConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/JpaConfig.java @@ -17,8 +17,10 @@ import ca.uhn.fhir.jpa.batch.mdm.MdmClearJobSubmitterImpl; import ca.uhn.fhir.jpa.batch.reader.BatchResourceSearcher; import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider; import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor; +import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper; import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.bulk.export.provider.BulkDataExportProvider; +import ca.uhn.fhir.jpa.bulk.export.svc.BulkDataExportJobSchedulingHelperImpl; import ca.uhn.fhir.jpa.bulk.export.svc.BulkDataExportSvcImpl; import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; import ca.uhn.fhir.jpa.bulk.imprt.svc.BulkDataImportSvcImpl; @@ -438,6 +440,12 @@ public class JpaConfig { return new BulkDataExportSvcImpl(); } + @Bean + @Lazy + public IBulkDataExportJobSchedulingHelper bulkDataExportJobSchedulingHelper() { + return new BulkDataExportJobSchedulingHelperImpl(); + } + @Bean @Lazy public BulkDataExportProvider bulkDataExportProvider() { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/SharedConfigDstu3Plus.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/SharedConfigDstu3Plus.java index 656c818a0f2..6627d470556 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/SharedConfigDstu3Plus.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/SharedConfigDstu3Plus.java @@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.config; import ca.uhn.fhir.jpa.dao.ObservationLastNIndexPersistSvc; import ca.uhn.fhir.jpa.term.TermCodeSystemStorageSvcImpl; +import ca.uhn.fhir.jpa.term.TermConceptDaoSvc; import ca.uhn.fhir.jpa.term.TermDeferredStorageSvcImpl; import ca.uhn.fhir.jpa.term.TermReindexingSvcImpl; import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc; @@ -38,6 +39,11 @@ public class SharedConfigDstu3Plus { return new TermCodeSystemStorageSvcImpl(); } + @Bean + public TermConceptDaoSvc termConceptDaoSvc() { + return new TermConceptDaoSvc(); + } + @Bean public ITermDeferredStorageSvc termDeferredStorageSvc() { return new TermDeferredStorageSvcImpl(); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/index/JpaIdHelperService.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/index/JpaIdHelperService.java index 2d525ae3541..178c9a49b60 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/index/JpaIdHelperService.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/index/JpaIdHelperService.java @@ -50,8 +50,6 @@ import static ca.uhn.fhir.jpa.dao.index.IdHelperService.RESOURCE_PID; public class JpaIdHelperService extends IdHelperService implements IJpaIdHelperService, IIdHelperService { @Autowired protected IResourceTableDao myResourceTableDao; - @Autowired - private IIdHelperService myIdHelperService; /** * @deprecated This method doesn't take a partition ID as input, so it is unsafe. It @@ -61,7 +59,7 @@ public class JpaIdHelperService extends IdHelperService implements IJpaIdHelperS @Deprecated @Nonnull public List getPidsOrThrowException(List theIds) { - List resourcePersistentIds = myIdHelperService.resolveResourcePersistentIdsWithCache(RequestPartitionId.allPartitions(), theIds); + List resourcePersistentIds = super.resolveResourcePersistentIdsWithCache(RequestPartitionId.allPartitions(), theIds); return resourcePersistentIds.stream().map(ResourcePersistentId::getIdAsLong).collect(Collectors.toList()); } @@ -80,7 +78,7 @@ public class JpaIdHelperService extends IdHelperService implements IJpaIdHelperS if (retVal == null) { IIdType id = theResource.getIdElement(); try { - retVal = myIdHelperService.resolveResourcePersistentIds(RequestPartitionId.allPartitions(), id.getResourceType(), id.getIdPart()).getIdAsLong(); + retVal = super.resolveResourcePersistentIds(RequestPartitionId.allPartitions(), id.getResourceType(), id.getIdPart()).getIdAsLong(); } catch (ResourceNotFoundException e) { return null; } @@ -100,7 +98,7 @@ public class JpaIdHelperService extends IdHelperService implements IJpaIdHelperS assert TransactionSynchronizationManager.isSynchronizationActive(); List ids = Collections.singletonList(theId); - List resourcePersistentIds = myIdHelperService.resolveResourcePersistentIdsWithCache(RequestPartitionId.allPartitions(), ids); + List resourcePersistentIds = super.resolveResourcePersistentIdsWithCache(RequestPartitionId.allPartitions(), ids); return resourcePersistentIds.get(0).getIdAsLong(); } @@ -139,7 +137,7 @@ public class JpaIdHelperService extends IdHelperService implements IJpaIdHelperS public Set translatePidsToFhirResourceIds(Set thePids) { assert TransactionSynchronizationManager.isSynchronizationActive(); - Map> pidToForcedIdMap = myIdHelperService.translatePidsToForcedIds(thePids); + Map> pidToForcedIdMap = super.translatePidsToForcedIds(thePids); //If the result of the translation is an empty optional, it means there is no forced id, and we can use the PID as the resource ID. Set resolvedResourceIds = pidToForcedIdMap.entrySet().stream() diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseTermReadSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseTermReadSvcImpl.java index 2638a5f8ce9..79861f19484 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseTermReadSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseTermReadSvcImpl.java @@ -26,11 +26,13 @@ import ca.uhn.fhir.context.support.IValidationSupport; import ca.uhn.fhir.context.support.ValidationSupportContext; import ca.uhn.fhir.context.support.ValueSetExpansionOptions; import ca.uhn.fhir.i18n.Msg; +import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IDao; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoCodeSystem; +import ca.uhn.fhir.jpa.api.svc.IIdHelperService; import ca.uhn.fhir.jpa.config.HibernatePropertiesProvider; import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc; import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao; @@ -63,8 +65,6 @@ import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.jpa.search.ElasticsearchNestedQueryBuilderUtil; import ca.uhn.fhir.jpa.search.builder.SearchBuilder; -import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc; -import ca.uhn.fhir.jpa.term.api.ITermConceptMappingSvc; import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc; import ca.uhn.fhir.jpa.term.api.ITermReadSvc; import ca.uhn.fhir.jpa.term.ex.ExpansionTooCostlyException; @@ -132,6 +132,7 @@ import org.hl7.fhir.r4.model.IntegerType; import org.hl7.fhir.r4.model.StringType; import org.hl7.fhir.r4.model.ValueSet; import org.hl7.fhir.r4.model.codesystems.ConceptSubsumptionOutcome; +import org.jetbrains.annotations.NotNull; import org.quartz.JobExecutionContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; @@ -257,12 +258,11 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc { private ISchedulerService mySchedulerService; @Autowired(required = false) private ITermDeferredStorageSvc myDeferredStorageSvc; - @Autowired(required = false) - private ITermCodeSystemStorageSvc myConceptStorageSvc; + @Autowired + private IIdHelperService myIdHelperService; + @Autowired private ApplicationContext myApplicationContext; - @Autowired - private ITermConceptMappingSvc myTermConceptMappingSvc; private volatile IValidationSupport myJpaValidationSupport; private volatile IValidationSupport myValidationSupport; @@ -1580,18 +1580,24 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc { } private Optional fetchValueSetEntity(ValueSet theValueSet) { - ResourcePersistentId valueSetResourcePid = myConceptStorageSvc.getValueSetResourcePid(theValueSet.getIdElement()); + ResourcePersistentId valueSetResourcePid = getValueSetResourcePersistentId(theValueSet); Optional optionalTermValueSet = myTermValueSetDao.findByResourcePid(valueSetResourcePid.getIdAsLong()); return optionalTermValueSet; } + private ResourcePersistentId getValueSetResourcePersistentId(ValueSet theValueSet) { + ResourcePersistentId valueSetResourcePid = myIdHelperService.resolveResourcePersistentIds(RequestPartitionId.allPartitions(), theValueSet.getIdElement().getResourceType(), theValueSet.getIdElement().getIdPart()); + return valueSetResourcePid; + } + protected IValidationSupport.CodeValidationResult validateCodeIsInPreExpandedValueSet( ConceptValidationOptions theValidationOptions, ValueSet theValueSet, String theSystem, String theCode, String theDisplay, Coding theCoding, CodeableConcept theCodeableConcept) { assert TransactionSynchronizationManager.isSynchronizationActive(); ValidateUtil.isNotNullOrThrowUnprocessableEntity(theValueSet.hasId(), "ValueSet.id is required"); - ResourcePersistentId valueSetResourcePid = myConceptStorageSvc.getValueSetResourcePid(theValueSet.getIdElement()); + ResourcePersistentId valueSetResourcePid = getValueSetResourcePersistentId(theValueSet); + List concepts = new ArrayList<>(); if (isNotBlank(theCode)) { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermCodeSystemStorageSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermCodeSystemStorageSvcImpl.java index 769cf5d1ef3..ae24bc455cd 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermCodeSystemStorageSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermCodeSystemStorageSvcImpl.java @@ -26,7 +26,6 @@ import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.svc.IIdHelperService; -import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao; import ca.uhn.fhir.jpa.dao.data.IResourceTableDao; import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao; @@ -89,7 +88,6 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import static ca.uhn.fhir.jpa.batch.config.BatchConstants.TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.hl7.fhir.common.hapi.validation.support.ValidationConstants.LOINC_LOW; @@ -127,17 +125,8 @@ public class TermCodeSystemStorageSvcImpl implements ITermCodeSystemStorageSvc { private IResourceTableDao myResourceTableDao; @Autowired - private IBatchJobSubmitter myJobSubmitter; + private TermConceptDaoSvc myTermConceptDaoSvc; - @Autowired - @Qualifier(TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME) - private Job myTermCodeSystemVersionDeleteJob; - - - @Override - public ResourcePersistentId getValueSetResourcePid(IIdType theIdType) { - return myIdHelperService.resolveResourcePersistentIds(RequestPartitionId.allPartitions(), theIdType.getResourceType(), theIdType.getIdPart()); - } @Transactional @Override @@ -277,41 +266,7 @@ public class TermCodeSystemStorageSvcImpl implements ITermCodeSystemStorageSvc { */ @Override public int saveConcept(TermConcept theConcept) { - int retVal = 0; - - /* - * If the concept has an ID, we're reindexing, so there's no need to - * save parent concepts first (it's way too slow to do that) - */ - if (theConcept.getId() == null) { - boolean needToSaveParents = false; - for (TermConceptParentChildLink next : theConcept.getParents()) { - if (next.getParent().getId() == null) { - needToSaveParents = true; - } - } - if (needToSaveParents) { - retVal += ensureParentsSaved(theConcept.getParents()); - } - } - - if (theConcept.getId() == null || theConcept.getIndexStatus() == null) { - retVal++; - theConcept.setIndexStatus(BaseHapiFhirDao.INDEX_STATUS_INDEXED); - theConcept.setUpdated(new Date()); - myConceptDao.save(theConcept); - - for (TermConceptProperty next : theConcept.getProperties()) { - myConceptPropertyDao.save(next); - } - - for (TermConceptDesignation next : theConcept.getDesignations()) { - myConceptDesignationDao.save(next); - } - } - - ourLog.trace("Saved {} and got PID {}", theConcept.getCode(), theConcept.getId()); - return retVal; + return myTermConceptDaoSvc.saveConcept(theConcept); } @Override diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermConceptDaoSvc.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermConceptDaoSvc.java new file mode 100644 index 00000000000..13f4b8fc9b0 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermConceptDaoSvc.java @@ -0,0 +1,110 @@ +package ca.uhn.fhir.jpa.term; + +/*- + * #%L + * HAPI FHIR JPA Server + * %% + * Copyright (C) 2014 - 2022 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao; +import ca.uhn.fhir.jpa.dao.data.ITermConceptDao; +import ca.uhn.fhir.jpa.dao.data.ITermConceptDesignationDao; +import ca.uhn.fhir.jpa.dao.data.ITermConceptPropertyDao; +import ca.uhn.fhir.jpa.entity.TermConcept; +import ca.uhn.fhir.jpa.entity.TermConceptDesignation; +import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink; +import ca.uhn.fhir.jpa.entity.TermConceptProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.Collection; +import java.util.Date; + +public class TermConceptDaoSvc { + private static final Logger ourLog = LoggerFactory.getLogger(TermCodeSystemStorageSvcImpl.class); + + @Autowired + protected ITermConceptPropertyDao myConceptPropertyDao; + + @Autowired + protected ITermConceptDao myConceptDao; + + @Autowired + protected ITermConceptDesignationDao myConceptDesignationDao; + + public int saveConcept(TermConcept theConcept) { + int retVal = 0; + + /* + * If the concept has an ID, we're reindexing, so there's no need to + * save parent concepts first (it's way too slow to do that) + */ + if (theConcept.getId() == null) { + boolean needToSaveParents = false; + for (TermConceptParentChildLink next : theConcept.getParents()) { + if (next.getParent().getId() == null) { + needToSaveParents = true; + break; + } + } + if (needToSaveParents) { + retVal += ensureParentsSaved(theConcept.getParents()); + } + } + + if (theConcept.getId() == null || theConcept.getIndexStatus() == null) { + retVal++; + theConcept.setIndexStatus(BaseHapiFhirDao.INDEX_STATUS_INDEXED); + theConcept.setUpdated(new Date()); + myConceptDao.save(theConcept); + + for (TermConceptProperty next : theConcept.getProperties()) { + myConceptPropertyDao.save(next); + } + + for (TermConceptDesignation next : theConcept.getDesignations()) { + myConceptDesignationDao.save(next); + } + + } + + ourLog.trace("Saved {} and got PID {}", theConcept.getCode(), theConcept.getId()); + return retVal; + } + + private int ensureParentsSaved(Collection theParents) { + ourLog.trace("Checking {} parents", theParents.size()); + int retVal = 0; + + for (TermConceptParentChildLink nextLink : theParents) { + if (nextLink.getRelationshipType() == TermConceptParentChildLink.RelationshipTypeEnum.ISA) { + TermConcept nextParent = nextLink.getParent(); + retVal += ensureParentsSaved(nextParent.getParents()); + if (nextParent.getId() == null) { + nextParent.setUpdated(new Date()); + myConceptDao.saveAndFlush(nextParent); + retVal++; + ourLog.debug("Saved parent code {} and got id {}", nextParent.getCode(), nextParent.getId()); + } + } + } + + return retVal; + } +} + diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java index c37345bce3b..97f4a1cdbad 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java @@ -34,7 +34,6 @@ import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; -import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc; import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc; import ca.uhn.fhir.jpa.term.api.ITermVersionAdapterSvc; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; @@ -103,8 +102,9 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc { private ISchedulerService mySchedulerService; @Autowired private ITermVersionAdapterSvc myTerminologyVersionAdapterSvc; + @Autowired - private ITermCodeSystemStorageSvc myCodeSystemStorageSvc; + private TermConceptDaoSvc myTermConceptDaoSvc; @Autowired private IBatchJobSubmitter myJobSubmitter; @@ -189,7 +189,7 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc { TermConcept next = myDeferredConcepts.remove(0); if (myCodeSystemVersionDao.findById(next.getCodeSystemVersion().getPid()).isPresent()) { try { - codeCount += myCodeSystemStorageSvc.saveConcept(next); + codeCount += myTermConceptDaoSvc.saveConcept(next); } catch (Exception theE) { ourLog.error("Exception thrown when attempting to save TermConcept {} in Code System {}", next.getCode(), next.getCodeSystemVersion().getCodeSystemDisplayName(), theE); @@ -465,9 +465,10 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc { myTransactionMgr = theTxManager; } + @VisibleForTesting - void setCodeSystemStorageSvcForUnitTest(ITermCodeSystemStorageSvc theCodeSystemStorageSvc) { - myCodeSystemStorageSvc = theCodeSystemStorageSvc; + void setTermConceptDaoSvc(TermConceptDaoSvc theTermConceptDaoSvc) { + myTermConceptDaoSvc = theTermConceptDaoSvc; } @VisibleForTesting diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermReindexingSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermReindexingSvcImpl.java index 07246061972..8af8fa7cd3b 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermReindexingSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermReindexingSvcImpl.java @@ -63,11 +63,11 @@ public class TermReindexingSvcImpl implements ITermReindexingSvc { @Autowired private ITermConceptParentChildLinkDao myConceptParentChildLinkDao; @Autowired - private ITermCodeSystemStorageSvc myConceptStorageSvc; - @Autowired private ITermDeferredStorageSvc myDeferredStorageSvc; @Autowired private ISchedulerService mySchedulerService; + @Autowired + private TermConceptDaoSvc myTermConceptDaoSvc; @Override public void processReindexing() { @@ -138,7 +138,7 @@ public class TermReindexingSvcImpl implements ITermReindexingSvc { nextConcept.setParentPids(parentsBuilder.toString()); } - myConceptStorageSvc.saveConcept(nextConcept); + myTermConceptDaoSvc.saveConcept(nextConcept); count++; } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/api/ITermCodeSystemStorageSvc.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/api/ITermCodeSystemStorageSvc.java index fded6f4d70f..c702a8e8915 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/api/ITermCodeSystemStorageSvc.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/api/ITermCodeSystemStorageSvc.java @@ -92,5 +92,4 @@ public interface ITermCodeSystemStorageSvc { int saveConcept(TermConcept theNextConcept); - ResourcePersistentId getValueSetResourcePid(IIdType theIdElement); } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java index 6a5a3cf6711..2631168afda 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java @@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome; import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; import ca.uhn.fhir.jpa.batch.config.BatchConstants; +import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper; import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.bulk.export.job.BulkExportJobParametersBuilder; import ca.uhn.fhir.jpa.bulk.export.job.GroupBulkExportJobParametersBuilder; @@ -98,6 +99,8 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { @Autowired private IBulkDataExportSvc myBulkDataExportSvc; @Autowired + private IBulkDataExportJobSchedulingHelper myBulkDataExportJobSchedulingHelper; + @Autowired private IBatchJobSubmitter myBatchJobSubmitter; @Autowired private BatchJobHelper myBatchJobHelper; @@ -165,7 +168,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { }); // Run a purge pass - myBulkDataExportSvc.purgeExpiredFiles(); + myBulkDataExportJobSchedulingHelper.purgeExpiredFiles(); // Check that things were deleted runInTransaction(() -> { @@ -313,7 +316,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { assertEquals(BulkExportJobStatusEnum.SUBMITTED, status.getStatus()); // Run a scheduled pass to build the export - myBulkDataExportSvc.buildExportFiles(); + myBulkDataExportJobSchedulingHelper.startSubmittedJobs(); awaitAllBulkJobCompletions(); @@ -359,8 +362,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Observation,Patient&_typeFilter=" + UrlUtil.escapeUrlParam(TEST_FILTER), status.getRequest()); // Run a scheduled pass to build the export - myBulkDataExportSvc.buildExportFiles(); - + myBulkDataExportJobSchedulingHelper.startSubmittedJobs(); awaitAllBulkJobCompletions(); // Fetch the job again @@ -414,7 +416,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson", status.getRequest()); // Run a scheduled pass to build the export - myBulkDataExportSvc.buildExportFiles(); + myBulkDataExportJobSchedulingHelper.startSubmittedJobs(); awaitAllBulkJobCompletions(); @@ -465,7 +467,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); - myBulkDataExportSvc.buildExportFiles(); + myBulkDataExportJobSchedulingHelper.startSubmittedJobs(); awaitAllBulkJobCompletions(); IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); @@ -497,7 +499,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Patient&_typeFilter=Patient%3F_has%3AObservation%3Apatient%3Aidentifier%3DSYS%7CVAL3", status.getRequest()); // Run a scheduled pass to build the export - myBulkDataExportSvc.buildExportFiles(); + myBulkDataExportJobSchedulingHelper.startSubmittedJobs(); awaitAllBulkJobCompletions(); @@ -551,7 +553,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=EpisodeOfCare,Patient&_typeFilter=Patient%3F_id%3DP999999990&_typeFilter=EpisodeOfCare%3Fpatient%3DP999999990", status.getRequest()); // Run a scheduled pass to build the export - myBulkDataExportSvc.buildExportFiles(); + myBulkDataExportJobSchedulingHelper.startSubmittedJobs(); awaitAllBulkJobCompletions(); @@ -611,7 +613,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Observation,Patient&_since=" + cutoff.setTimeZoneZulu(true).getValueAsString(), status.getRequest()); // Run a scheduled pass to build the export - myBulkDataExportSvc.buildExportFiles(); + myBulkDataExportJobSchedulingHelper.startSubmittedJobs(); awaitAllBulkJobCompletions(); @@ -705,7 +707,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); - myBulkDataExportSvc.buildExportFiles(); + myBulkDataExportJobSchedulingHelper.startSubmittedJobs(); awaitAllBulkJobCompletions(); IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); @@ -740,7 +742,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP); IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); - myBulkDataExportSvc.buildExportFiles(); + myBulkDataExportJobSchedulingHelper.startSubmittedJobs(); awaitAllBulkJobCompletions(); IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); @@ -847,7 +849,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { // Create a bulk job IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); - myBulkDataExportSvc.buildExportFiles(); + myBulkDataExportJobSchedulingHelper.startSubmittedJobs(); awaitAllBulkJobCompletions(); IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); @@ -890,7 +892,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { options.setFilters(Sets.newHashSet("Immunization?vaccine-code=Flu", "Immunization?patient=Patient/PAT1")); IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options); - myBulkDataExportSvc.buildExportFiles(); + myBulkDataExportJobSchedulingHelper.startSubmittedJobs(); awaitAllBulkJobCompletions(); @@ -928,7 +930,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { options.setFilters(Sets.newHashSet("Observation?identifier=VAL0,VAL2", "Observation?identifier=VAL4")); IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options); - myBulkDataExportSvc.buildExportFiles(); + myBulkDataExportJobSchedulingHelper.startSubmittedJobs(); awaitAllBulkJobCompletions(); IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); @@ -970,7 +972,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP); IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); - myBulkDataExportSvc.buildExportFiles(); + myBulkDataExportJobSchedulingHelper.startSubmittedJobs(); awaitAllBulkJobCompletions(); IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); @@ -1000,7 +1002,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP); IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); - myBulkDataExportSvc.buildExportFiles(); + myBulkDataExportJobSchedulingHelper.startSubmittedJobs(); awaitAllBulkJobCompletions(); IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); @@ -1066,7 +1068,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP); IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); - myBulkDataExportSvc.buildExportFiles(); + myBulkDataExportJobSchedulingHelper.startSubmittedJobs(); awaitAllBulkJobCompletions(); IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); @@ -1104,7 +1106,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { //Patient-style IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); - myBulkDataExportSvc.buildExportFiles(); + myBulkDataExportJobSchedulingHelper.startSubmittedJobs(); awaitAllBulkJobCompletions(); IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); assertThat(jobInfo.getStatus(), is(equalTo(BulkExportJobStatusEnum.COMPLETE))); @@ -1113,7 +1115,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP); bulkDataExportOptions.setGroupId(myPatientGroupId); jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); - myBulkDataExportSvc.buildExportFiles(); + myBulkDataExportJobSchedulingHelper.startSubmittedJobs(); awaitAllBulkJobCompletions(); jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); assertThat(jobInfo.getStatus(), is(equalTo(BulkExportJobStatusEnum.COMPLETE))); @@ -1121,7 +1123,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { //System-style bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM); jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); - myBulkDataExportSvc.buildExportFiles(); + myBulkDataExportJobSchedulingHelper.startSubmittedJobs(); awaitAllBulkJobCompletions(); jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); assertThat(jobInfo.getStatus(), is(equalTo(BulkExportJobStatusEnum.COMPLETE))); @@ -1183,7 +1185,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP); IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); - myBulkDataExportSvc.buildExportFiles(); + myBulkDataExportJobSchedulingHelper.startSubmittedJobs(); awaitAllBulkJobCompletions(); IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java index 2f4370408c6..de055869dcd 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java @@ -10,7 +10,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.api.model.ExpungeOptions; import ca.uhn.fhir.jpa.api.svc.IIdHelperService; import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; -import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc; +import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper; import ca.uhn.fhir.jpa.config.JpaConfig; import ca.uhn.fhir.jpa.dao.data.IForcedIdDao; import ca.uhn.fhir.jpa.dao.data.IResourceHistoryTableDao; @@ -75,7 +75,6 @@ import org.hl7.fhir.dstu3.model.Resource; import org.hl7.fhir.instance.model.api.IBaseBundle; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -736,10 +735,10 @@ public abstract class BaseJpaTest extends BaseTest { } @SuppressWarnings("BusyWait") - protected static void purgeDatabase(DaoConfig theDaoConfig, IFhirSystemDao theSystemDao, IResourceReindexingSvc theResourceReindexingSvc, ISearchCoordinatorSvc theSearchCoordinatorSvc, ISearchParamRegistry theSearchParamRegistry, IBulkDataExportSvc theBulkDataExportSvc) { + protected static void purgeDatabase(DaoConfig theDaoConfig, IFhirSystemDao theSystemDao, IResourceReindexingSvc theResourceReindexingSvc, ISearchCoordinatorSvc theSearchCoordinatorSvc, ISearchParamRegistry theSearchParamRegistry, IBulkDataExportJobSchedulingHelper theBulkDataJobActivator) { theSearchCoordinatorSvc.cancelAllActiveSearches(); theResourceReindexingSvc.cancelAndPurgeAllJobs(); - theBulkDataExportSvc.cancelAndPurgeAllJobs(); + theBulkDataJobActivator.cancelAndPurgeAllJobs(); boolean expungeEnabled = theDaoConfig.isExpungeEnabled(); theDaoConfig.setExpungeEnabled(true); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu2/BaseJpaDstu2Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu2/BaseJpaDstu2Test.java index 44ead5d6a4c..3a12504ad10 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu2/BaseJpaDstu2Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu2/BaseJpaDstu2Test.java @@ -8,6 +8,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoSubscription; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet; import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; +import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper; import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.config.TestDstu2Config; import ca.uhn.fhir.jpa.dao.BaseJpaTest; @@ -217,7 +218,7 @@ public abstract class BaseJpaDstu2Test extends BaseJpaTest { @Autowired protected SubscriptionLoader mySubscriptionLoader; @Autowired - private IBulkDataExportSvc myBulkDataExportSvc; + private IBulkDataExportJobSchedulingHelper myBulkExportJobSchedulingHelper; @Autowired private ValidationSupportChain myJpaValidationSupportChain; @@ -232,7 +233,7 @@ public abstract class BaseJpaDstu2Test extends BaseJpaTest { @BeforeEach @Transactional() public void beforePurgeDatabase() { - purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc); + purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkExportJobSchedulingHelper); } @BeforeEach diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu3/BaseJpaDstu3Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu3/BaseJpaDstu3Test.java index c458aacded1..dc58a138897 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu3/BaseJpaDstu3Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu3/BaseJpaDstu3Test.java @@ -13,6 +13,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet; import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; +import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper; import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.config.TestDstu3Config; import ca.uhn.fhir.jpa.dao.BaseJpaTest; @@ -344,9 +345,9 @@ public abstract class BaseJpaDstu3Test extends BaseJpaTest { @Autowired private IValidationSupport myJpaValidationSupportChainDstu3; @Autowired - private IBulkDataExportSvc myBulkDataExportSvc; - @Autowired protected ITermValueSetDao myTermValueSetDao; + @Autowired + private IBulkDataExportJobSchedulingHelper myBulkDataScheduleHelper; @AfterEach() public void afterCleanupDao() { @@ -388,7 +389,7 @@ public abstract class BaseJpaDstu3Test extends BaseJpaTest { @BeforeEach @Transactional() public void beforePurgeDatabase() { - purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc); + purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataScheduleHelper); } @BeforeEach diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/BaseJpaR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/BaseJpaR4Test.java index 50c0e79d56f..6719b675832 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/BaseJpaR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/BaseJpaR4Test.java @@ -17,6 +17,7 @@ import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider; import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor; +import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper; import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.config.TestR4Config; import ca.uhn.fhir.jpa.dao.BaseJpaTest; @@ -502,7 +503,7 @@ public abstract class BaseJpaR4Test extends BaseJpaTest implements ITestDataBuil private IValidationSupport myJpaValidationSupportChainR4; private PerformanceTracingLoggingInterceptor myPerformanceTracingLoggingInterceptor; @Autowired - private IBulkDataExportSvc myBulkDataExportSvc; + private IBulkDataExportJobSchedulingHelper myBulkDataScheduleHelper; @AfterEach() public void afterCleanupDao() { @@ -560,7 +561,7 @@ public abstract class BaseJpaR4Test extends BaseJpaTest implements ITestDataBuil runInTransaction(() -> { myMdmLinkDao.deleteAll(); }); - purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc); + purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataScheduleHelper); } @BeforeEach diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchWithElasticSearchIT.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchWithElasticSearchIT.java index db7fbbc2385..c3ec50d0184 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchWithElasticSearchIT.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchWithElasticSearchIT.java @@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet; import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome; import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; +import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper; import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.config.TestHibernateSearchAddInConfig; import ca.uhn.fhir.jpa.config.TestR4Config; @@ -126,16 +127,15 @@ public class FhirResourceDaoR4SearchWithElasticSearchIT extends BaseJpaTest { @Autowired private IResourceReindexingSvc myResourceReindexingSvc; @Autowired - private IBulkDataExportSvc myBulkDataExportSvc; + private IBulkDataExportJobSchedulingHelper myBulkDataScheduleHelper; @Autowired private ITermCodeSystemStorageSvc myTermCodeSystemStorageSvc; @Autowired private DaoRegistry myDaoRegistry; - private boolean myContainsSettings; @BeforeEach public void beforePurgeDatabase() { - purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc); + purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataScheduleHelper); } @Override diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchWithLuceneDisabledTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchWithLuceneDisabledTest.java index 2aac22a2ddd..29a9f1dbc5d 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchWithLuceneDisabledTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchWithLuceneDisabledTest.java @@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet; import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; +import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper; import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.config.TestHibernateSearchAddInConfig; import ca.uhn.fhir.jpa.config.TestR4Config; @@ -144,14 +145,14 @@ public class FhirResourceDaoR4SearchWithLuceneDisabledTest extends BaseJpaTest { @Autowired private IResourceReindexingSvc myResourceReindexingSvc; @Autowired - private IBulkDataExportSvc myBulkDataExportSvc; + private IBulkDataExportJobSchedulingHelper myBulkDataScheduleHelper; @Autowired private ITermReadSvc myTermSvc; @BeforeEach @Transactional() public void beforePurgeDatabase() { - purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc); + purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataScheduleHelper); } @Override diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4TerminologyElasticsearchIT.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4TerminologyElasticsearchIT.java index 412cc94cd58..44649e6445e 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4TerminologyElasticsearchIT.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4TerminologyElasticsearchIT.java @@ -6,6 +6,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoCodeSystem; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet; import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; +import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper; import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.config.TestHibernateSearchAddInConfig; import ca.uhn.fhir.jpa.config.TestR4Config; @@ -85,7 +86,7 @@ public class FhirResourceDaoR4TerminologyElasticsearchIT extends BaseJpaTest { @Autowired private ISearchParamRegistry mySearchParamRegistry; @Autowired - private IBulkDataExportSvc myBulkDataExportSvc; + private IBulkDataExportJobSchedulingHelper myBulkDataScheduleHelper; @BeforeEach @@ -158,7 +159,7 @@ public class FhirResourceDaoR4TerminologyElasticsearchIT extends BaseJpaTest { @AfterEach public void afterPurgeDatabase() { - purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc); + purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataScheduleHelper); } @AfterAll diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r5/BaseJpaR5Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r5/BaseJpaR5Test.java index 2678243f65a..5eba74b66c9 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r5/BaseJpaR5Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r5/BaseJpaR5Test.java @@ -16,6 +16,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider; import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor; +import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper; import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.config.TestR5Config; import ca.uhn.fhir.jpa.dao.BaseJpaTest; @@ -406,7 +407,7 @@ public abstract class BaseJpaR5Test extends BaseJpaTest implements ITestDataBuil @Autowired private DaoRegistry myDaoRegistry; @Autowired - private IBulkDataExportSvc myBulkDataExportSvc; + private IBulkDataExportJobSchedulingHelper myBulkDataSchedulerHelper; @Override public IIdType doCreateResource(IBaseResource theResource) { @@ -480,7 +481,7 @@ public abstract class BaseJpaR5Test extends BaseJpaTest implements ITestDataBuil @BeforeEach @Transactional() public void beforePurgeDatabase() { - purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc); + purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataSchedulerHelper); } @BeforeEach diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/autocomplete/TokenAutocompleteElasticsearchIT.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/autocomplete/TokenAutocompleteElasticsearchIT.java index 376c59be7b8..da7be9c2d05 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/autocomplete/TokenAutocompleteElasticsearchIT.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/autocomplete/TokenAutocompleteElasticsearchIT.java @@ -5,6 +5,7 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; +import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper; import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.config.TestDataBuilderConfig; import ca.uhn.fhir.jpa.config.TestHibernateSearchAddInConfig; @@ -76,7 +77,7 @@ public class TokenAutocompleteElasticsearchIT extends BaseJpaTest{ @Autowired IResourceReindexingSvc myResourceReindexingSvc; @Autowired - IBulkDataExportSvc myBulkDataExportSvc; + private IBulkDataExportJobSchedulingHelper myBulkDataScheduleHelper; @Autowired ITestDataBuilder myDataBuilder; @@ -86,7 +87,7 @@ public class TokenAutocompleteElasticsearchIT extends BaseJpaTest{ @BeforeEach public void beforePurgeDatabase() { - purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc); + purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataScheduleHelper); myDaoConfig.setAdvancedLuceneIndexing(true); } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/term/AbstractValueSetFreeTextExpansionR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/term/AbstractValueSetFreeTextExpansionR4Test.java index c77ea6ad336..c994ffed8cb 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/term/AbstractValueSetFreeTextExpansionR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/term/AbstractValueSetFreeTextExpansionR4Test.java @@ -7,6 +7,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoCodeSystem; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet; import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; +import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper; import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.dao.BaseJpaTest; import ca.uhn.fhir.jpa.dao.data.IResourceTableDao; @@ -143,7 +144,7 @@ public abstract class AbstractValueSetFreeTextExpansionR4Test extends BaseJpaTes private ISearchParamRegistry mySearchParamRegistry; @Autowired - private IBulkDataExportSvc myBulkDataExportSvc; + private IBulkDataExportJobSchedulingHelper myBulkDataScheduleHelper; @Autowired protected ITermCodeSystemVersionDao myTermCodeSystemVersionDao; @@ -166,7 +167,7 @@ public abstract class AbstractValueSetFreeTextExpansionR4Test extends BaseJpaTes @BeforeEach @Transactional() public void beforePurgeDatabase() { - purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc); + purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataScheduleHelper); } @AfterEach @@ -174,7 +175,7 @@ public abstract class AbstractValueSetFreeTextExpansionR4Test extends BaseJpaTes myDaoConfig.setDeferIndexingForCodesystemsOfSize(new DaoConfig().getDeferIndexingForCodesystemsOfSize()); TermReindexingSvcImpl.setForceSaveDeferredAlwaysForUnitTest(false); myDaoConfig.setMaximumExpansionSize(DaoConfig.DEFAULT_MAX_EXPANSION_SIZE); - purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc); + purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataScheduleHelper); } @AfterEach() diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImplTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImplTest.java index 9298b802926..d4bc8c99ea2 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImplTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImplTest.java @@ -32,7 +32,7 @@ public class TermDeferredStorageSvcImplTest { @Mock private PlatformTransactionManager myTxManager; @Mock - private ITermCodeSystemStorageSvc myTermConceptStorageSvc; + private TermConceptDaoSvc myTermConceptDaoSvc; @Mock private ITermConceptDao myConceptDao; @Mock @@ -71,15 +71,15 @@ public class TermDeferredStorageSvcImplTest { TermDeferredStorageSvcImpl svc = new TermDeferredStorageSvcImpl(); svc.setTransactionManagerForUnitTest(myTxManager); - svc.setCodeSystemStorageSvcForUnitTest(myTermConceptStorageSvc); + svc.setTermConceptDaoSvc(myTermConceptDaoSvc); when(myTermCodeSystemVersionDao.findById(anyLong())).thenReturn(Optional.of(myTermCodeSystemVersion)); svc.setCodeSystemVersionDaoForUnitTest(myTermCodeSystemVersionDao); svc.setProcessDeferred(true); svc.addConceptToStorageQueue(concept); svc.saveDeferred(); - verify(myTermConceptStorageSvc, times(1)).saveConcept(same(concept)); - verifyNoMoreInteractions(myTermConceptStorageSvc); + verify(myTermConceptDaoSvc, times(1)).saveConcept(same(concept)); + verifyNoMoreInteractions(myTermConceptDaoSvc); } @@ -94,7 +94,7 @@ public class TermDeferredStorageSvcImplTest { TermDeferredStorageSvcImpl svc = new TermDeferredStorageSvcImpl(); svc.setTransactionManagerForUnitTest(myTxManager); - svc.setCodeSystemStorageSvcForUnitTest(myTermConceptStorageSvc); + svc.setTermConceptDaoSvc(myTermConceptDaoSvc); when(myTermCodeSystemVersionDao.findById(anyLong())).thenReturn(Optional.empty()); svc.setCodeSystemVersionDaoForUnitTest(myTermCodeSystemVersionDao); @@ -102,8 +102,8 @@ public class TermDeferredStorageSvcImplTest { svc.addConceptToStorageQueue(concept); svc.saveDeferred(); - verify(myTermConceptStorageSvc, times(0)).saveConcept(same(concept)); - verifyNoMoreInteractions(myTermConceptStorageSvc); + verify(myTermConceptDaoSvc, times(0)).saveConcept(same(concept)); + verifyNoMoreInteractions(myTermConceptDaoSvc); } @@ -119,18 +119,18 @@ public class TermDeferredStorageSvcImplTest { TermDeferredStorageSvcImpl svc = new TermDeferredStorageSvcImpl(); svc.setTransactionManagerForUnitTest(myTxManager); - svc.setCodeSystemStorageSvcForUnitTest(myTermConceptStorageSvc); + svc.setTermConceptDaoSvc(myTermConceptDaoSvc); // Simulate the case where an exception is thrown despite a valid code system version. when(myTermCodeSystemVersionDao.findById(anyLong())).thenReturn(Optional.of(myTermCodeSystemVersion)); - when(myTermConceptStorageSvc.saveConcept(concept)).thenThrow(new RuntimeException("Foreign Constraint Violation")); + when(myTermConceptDaoSvc.saveConcept(concept)).thenThrow(new RuntimeException("Foreign Constraint Violation")); svc.setCodeSystemVersionDaoForUnitTest(myTermCodeSystemVersionDao); svc.setProcessDeferred(true); svc.addConceptToStorageQueue(concept); svc.saveDeferred(); - verify(myTermConceptStorageSvc, times(1)).saveConcept(same(concept)); - verifyNoMoreInteractions(myTermConceptStorageSvc); + verify(myTermConceptDaoSvc, times(1)).saveConcept(same(concept)); + verifyNoMoreInteractions(myTermConceptDaoSvc); } @@ -142,13 +142,13 @@ public class TermDeferredStorageSvcImplTest { TermDeferredStorageSvcImpl svc = new TermDeferredStorageSvcImpl(); svc.setTransactionManagerForUnitTest(myTxManager); - svc.setCodeSystemStorageSvcForUnitTest(myTermConceptStorageSvc); + svc.setTermConceptDaoSvc(myTermConceptDaoSvc); svc.setConceptDaoForUnitTest(myConceptDao); svc.setProcessDeferred(true); svc.addConceptLinkToStorageQueue(conceptLink); svc.saveDeferred(); - verifyNoMoreInteractions(myTermConceptStorageSvc); + verifyNoMoreInteractions(myTermConceptDaoSvc); } } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/term/ValueSetExpansionR4ElasticsearchIT.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/term/ValueSetExpansionR4ElasticsearchIT.java index cc5f1ff0b6d..186340cdd98 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/term/ValueSetExpansionR4ElasticsearchIT.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/term/ValueSetExpansionR4ElasticsearchIT.java @@ -7,6 +7,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoCodeSystem; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet; import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; +import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper; import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.config.TestHibernateSearchAddInConfig; import ca.uhn.fhir.jpa.config.TestR4Config; @@ -90,7 +91,7 @@ public class ValueSetExpansionR4ElasticsearchIT extends BaseJpaTest { @Autowired private ISearchParamRegistry mySearchParamRegistry; @Autowired - private IBulkDataExportSvc myBulkDataExportSvc; + private IBulkDataExportJobSchedulingHelper myBulkDataExportJobSchedulingHelper; @Mock private IValueSetConceptAccumulator myValueSetCodeAccumulator; @@ -108,7 +109,7 @@ public class ValueSetExpansionR4ElasticsearchIT extends BaseJpaTest { @AfterEach public void afterPurgeDatabase() { - purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc); + purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportJobSchedulingHelper); } void createCodeSystem() { diff --git a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/registry/SearchParamRegistryImpl.java b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/registry/SearchParamRegistryImpl.java index 9d319ccc912..bb069a54492 100644 --- a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/registry/SearchParamRegistryImpl.java +++ b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/registry/SearchParamRegistryImpl.java @@ -46,6 +46,8 @@ import org.hl7.fhir.instance.model.api.IIdType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -265,6 +267,14 @@ public class SearchParamRegistryImpl implements ISearchParamRegistry, IResourceC myResourceChangeListenerRegistry = theResourceChangeListenerRegistry; } + + /** + * + * There is a circular reference between this class and the ResourceChangeListenerRegistry: + * SearchParamRegistryImpl -> ResourceChangeListenerRegistry -> InMemoryResourceMatcher -> SearchParamRegistryImpl. Sicne we only need this once on boot-up, we delay + * until ContextRefreshedEvent. + * + */ @PostConstruct public void registerListener() { myResourceChangeListenerCache = myResourceChangeListenerRegistry.registerResourceResourceChangeListener("SearchParameter", SearchParameterMap.newSynchronous(), this, REFRESH_INTERVAL); diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/api/IBulkDataExportJobSchedulingHelper.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/api/IBulkDataExportJobSchedulingHelper.java new file mode 100644 index 00000000000..e10b7e536c5 --- /dev/null +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/api/IBulkDataExportJobSchedulingHelper.java @@ -0,0 +1,25 @@ +package ca.uhn.fhir.jpa.bulk.export.api; + +import javax.transaction.Transactional; + +public interface IBulkDataExportJobSchedulingHelper { + + /** + * invoked via scheduled task, purges any tasks which are past their cutoff point. + */ + @Transactional(value = Transactional.TxType.NEVER) + void purgeExpiredFiles(); + + /** + * Stops all invoked jobs, and then purges them. + */ + @Transactional(value = Transactional.TxType.NEVER) + void cancelAndPurgeAllJobs(); + + /** + * Given all Bulk Export jobs that have been created since the last scheduled run, this method will + * start them all. This is invoked primarily via a scheduler. + */ + @Transactional(value = Transactional.TxType.NEVER) + void startSubmittedJobs(); +} diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/api/IBulkDataExportSvc.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/api/IBulkDataExportSvc.java index 1385b4cd7df..39c0e665399 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/api/IBulkDataExportSvc.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/api/IBulkDataExportSvc.java @@ -32,11 +32,6 @@ import java.util.List; import java.util.Set; public interface IBulkDataExportSvc { - void buildExportFiles(); - - @Transactional(value = Transactional.TxType.NEVER) - void purgeExpiredFiles(); - /** * Deprecated - Use {@link #submitJob(BulkDataExportOptions, Boolean, RequestDetails)} instead */ @@ -52,8 +47,6 @@ public interface IBulkDataExportSvc { */ Set getPatientCompartmentResources(); - void cancelAndPurgeAllJobs(); - class JobInfo { private String myJobId; private BulkExportJobStatusEnum myStatus; diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/CreateBulkExportEntityTasklet.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/CreateBulkExportEntityTasklet.java index 040aceefcb5..b050f244f84 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/CreateBulkExportEntityTasklet.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/CreateBulkExportEntityTasklet.java @@ -40,8 +40,7 @@ import java.util.stream.Collectors; public class CreateBulkExportEntityTasklet implements Tasklet { - @Autowired - private IBulkDataExportSvc myBulkDataExportSvc; + @Autowired private IBulkDataExportSvc myBulkDataExportSvc; public static void addUUIDToJobContext(ChunkContext theChunkContext, String theJobUUID) { theChunkContext