Break a couple Circular Bean Dependencies (#3437)

* Rely on context refresh event instead of postconstruct. Load bean lazily.

* Remove parent autowire

* Break termcodesystem circular dependency with new bean

* Break cycle between BaseTermReadSvcImpl and TermCodeSystemStorageSvcImpl

* move back into helper method

* Rip out scheduling and submission to spring batch away from BulkDataExportSvcImpl

* wip

* Fix bean naming

* Revert

* re-add autowiring

* Reformat

* docs and reformat

* Back out change

* feedback from review

* merge conflicts
This commit is contained in:
Tadgh 2022-03-03 06:37:40 -08:00 committed by GitHub
parent 016774d653
commit a2133f6940
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 583 additions and 420 deletions

View File

@ -110,6 +110,15 @@ public class SearchParameterUtil {
}
/**
* Return true if any search parameter in the resource can point at a patient, false otherwise
*/
public static boolean isResourceTypeInPatientCompartment(FhirContext theFhirContext, String theResourceType) {
RuntimeResourceDefinition runtimeResourceDefinition = theFhirContext.getResourceDefinition(theResourceType);
return getAllPatientCompartmentRuntimeSearchParams(runtimeResourceDefinition).size() > 0;
}
@Nullable
public static String getCode(FhirContext theContext, IBaseResource theResource) {
return getStringChild(theContext, theResource, "code");

View File

@ -0,0 +1,299 @@
package ca.uhn.fhir.jpa.bulk.export.svc;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.util.UrlUtil;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseBinary;
import org.hl7.fhir.instance.model.api.IIdType;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.PostConstruct;
import javax.transaction.Transactional;
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import static org.slf4j.LoggerFactory.getLogger;
public class BulkDataExportJobSchedulingHelperImpl implements IBulkDataExportJobSchedulingHelper {
private static final Logger ourLog = getLogger(BulkDataExportJobSchedulingHelperImpl.class);
private static final Long READ_CHUNK_SIZE = 10L;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private IBatchJobSubmitter myJobSubmitter;
@Autowired
private IBulkExportCollectionDao myBulkExportCollectionDao;
@Autowired
private IBulkExportCollectionFileDao myBulkExportCollectionFileDao;
@Autowired
private PlatformTransactionManager myTxManager;
private TransactionTemplate myTxTemplate;
@Autowired
private ISchedulerService mySchedulerService;
@Autowired
@Qualifier(BatchConstants.BULK_EXPORT_JOB_NAME)
private org.springframework.batch.core.Job myBulkExportJob;
@Autowired
@Qualifier(BatchConstants.GROUP_BULK_EXPORT_JOB_NAME)
private org.springframework.batch.core.Job myGroupBulkExportJob;
@Autowired
@Qualifier(BatchConstants.PATIENT_BULK_EXPORT_JOB_NAME)
private org.springframework.batch.core.Job myPatientBulkExportJob;
@Autowired
private IBulkExportJobDao myBulkExportJobDao;
@Autowired
private DaoConfig myDaoConfig;
@Autowired
private FhirContext myContext;
@PostConstruct
public void start() {
myTxTemplate = new TransactionTemplate(myTxManager);
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(Job.class.getName());
jobDetail.setJobClass(Job.class);
mySchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail);
jobDetail = new ScheduledJobDefinition();
jobDetail.setId(PurgeExpiredFilesJob.class.getName());
jobDetail.setJobClass(PurgeExpiredFilesJob.class);
mySchedulerService.scheduleClusteredJob(DateUtils.MILLIS_PER_HOUR, jobDetail);
}
/**
* This method is called by the scheduler to run a pass of the
* generator
*/
@Transactional(value = Transactional.TxType.NEVER)
@Override
public synchronized void startSubmittedJobs() {
if (!myDaoConfig.isEnableTaskBulkExportJobExecution()) {
return;
}
Optional<BulkExportJobEntity> jobToProcessOpt = myTxTemplate.execute(t -> {
Pageable page = PageRequest.of(0, 1);
Slice<BulkExportJobEntity> submittedJobs = myBulkExportJobDao.findByStatus(page, BulkExportJobStatusEnum.SUBMITTED);
if (submittedJobs.isEmpty()) {
return Optional.empty();
}
return Optional.of(submittedJobs.getContent().get(0));
});
if (!jobToProcessOpt.isPresent()) {
return;
}
BulkExportJobEntity bulkExportJobEntity = jobToProcessOpt.get();
String jobUuid = bulkExportJobEntity.getJobId();
try {
processJob(bulkExportJobEntity);
} catch (Exception e) {
ourLog.error("Failure while preparing bulk export extract", e);
myTxTemplate.execute(t -> {
Optional<BulkExportJobEntity> submittedJobs = myBulkExportJobDao.findByJobId(jobUuid);
if (submittedJobs.isPresent()) {
BulkExportJobEntity jobEntity = submittedJobs.get();
jobEntity.setStatus(BulkExportJobStatusEnum.ERROR);
jobEntity.setStatusMessage(e.getMessage());
myBulkExportJobDao.save(jobEntity);
}
return null;
});
}
}
@Override
@Transactional(Transactional.TxType.NEVER)
public synchronized void cancelAndPurgeAllJobs() {
myTxTemplate.execute(t -> {
ourLog.info("Deleting all files");
myBulkExportCollectionFileDao.deleteAllFiles();
ourLog.info("Deleting all collections");
myBulkExportCollectionDao.deleteAllFiles();
ourLog.info("Deleting all jobs");
myBulkExportJobDao.deleteAllFiles();
return null;
});
}
/**
* This method is called by the scheduler to run a pass of the
* generator
*/
@Transactional(value = Transactional.TxType.NEVER)
@Override
public void purgeExpiredFiles() {
if (!myDaoConfig.isEnableTaskBulkExportJobExecution()) {
return;
}
Optional<BulkExportJobEntity> jobToDelete = myTxTemplate.execute(t -> {
Pageable page = PageRequest.of(0, 1);
Slice<BulkExportJobEntity> submittedJobs = myBulkExportJobDao.findByExpiry(page, new Date());
if (submittedJobs.isEmpty()) {
return Optional.empty();
}
return Optional.of(submittedJobs.getContent().get(0));
});
if (jobToDelete.isPresent()) {
ourLog.info("Deleting bulk export job: {}", jobToDelete.get());
myTxTemplate.execute(t -> {
BulkExportJobEntity job = myBulkExportJobDao.getOne(jobToDelete.get().getId());
for (BulkExportCollectionEntity nextCollection : job.getCollections()) {
for (BulkExportCollectionFileEntity nextFile : nextCollection.getFiles()) {
ourLog.info("Purging bulk data file: {}", nextFile.getResourceId());
getBinaryDao().delete(toId(nextFile.getResourceId()), new SystemRequestDetails());
getBinaryDao().forceExpungeInExistingTransaction(toId(nextFile.getResourceId()), new ExpungeOptions().setExpungeDeletedResources(true).setExpungeOldVersions(true), new SystemRequestDetails());
myBulkExportCollectionFileDao.deleteByPid(nextFile.getId());
}
myBulkExportCollectionDao.deleteByPid(nextCollection.getId());
}
ourLog.debug("*** About to delete job with ID {}", job.getId());
myBulkExportJobDao.deleteByPid(job.getId());
return null;
});
ourLog.info("Finished deleting bulk export job: {}", jobToDelete.get());
}
}
@SuppressWarnings("unchecked")
private IFhirResourceDao<IBaseBinary> getBinaryDao() {
return myDaoRegistry.getResourceDao("Binary");
}
private IIdType toId(String theResourceId) {
IIdType retVal = myContext.getVersion().newIdType();
retVal.setValue(theResourceId);
return retVal;
}
private void processJob(BulkExportJobEntity theBulkExportJobEntity) {
String theJobUuid = theBulkExportJobEntity.getJobId();
JobParametersBuilder parameters = new JobParametersBuilder()
.addString(BatchConstants.JOB_UUID_PARAMETER, theJobUuid)
.addLong(BatchConstants.READ_CHUNK_PARAMETER, READ_CHUNK_SIZE);
ourLog.info("Submitting bulk export job {} to job scheduler", theJobUuid);
try {
if (isGroupBulkJob(theBulkExportJobEntity)) {
enhanceBulkParametersWithGroupParameters(theBulkExportJobEntity, parameters);
myJobSubmitter.runJob(myGroupBulkExportJob, parameters.toJobParameters());
} else if (isPatientBulkJob(theBulkExportJobEntity)) {
myJobSubmitter.runJob(myPatientBulkExportJob, parameters.toJobParameters());
} else {
myJobSubmitter.runJob(myBulkExportJob, parameters.toJobParameters());
}
} catch (JobParametersInvalidException theE) {
ourLog.error("Unable to start job with UUID: {}, the parameters are invalid. {}", theJobUuid, theE.getMessage());
}
}
private String getQueryParameterIfPresent(String theRequestString, String theParameter) {
Map<String, String[]> stringMap = UrlUtil.parseQueryString(theRequestString);
if (stringMap != null) {
String[] strings = stringMap.get(theParameter);
if (strings != null) {
return String.join(",", strings);
}
}
return null;
}
private boolean isPatientBulkJob(BulkExportJobEntity theBulkExportJobEntity) {
return theBulkExportJobEntity.getRequest().startsWith("/Patient/");
}
private boolean isGroupBulkJob(BulkExportJobEntity theBulkExportJobEntity) {
return theBulkExportJobEntity.getRequest().startsWith("/Group/");
}
private void enhanceBulkParametersWithGroupParameters(BulkExportJobEntity theBulkExportJobEntity, JobParametersBuilder theParameters) {
String theGroupId = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_GROUP_ID);
String expandMdm = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_MDM);
theParameters.addString(BatchConstants.GROUP_ID_PARAMETER, theGroupId);
theParameters.addString(BatchConstants.EXPAND_MDM_PARAMETER, expandMdm);
}
public static class Job implements HapiJob {
@Autowired
private IBulkDataExportJobSchedulingHelper myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.startSubmittedJobs();
}
}
public static class PurgeExpiredFilesJob implements HapiJob {
@Autowired
private IBulkDataExportJobSchedulingHelper myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.purgeExpiredFiles();
}
}
}

View File

@ -20,32 +20,20 @@ package ca.uhn.fhir.jpa.bulk.export.svc;
* #L%
*/
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
@ -53,30 +41,19 @@ import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
import ca.uhn.fhir.util.UrlUtil;
import ca.uhn.fhir.util.SearchParameterUtil;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseBinary;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.InstantType;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.PostConstruct;
import javax.transaction.Transactional;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@ -90,7 +67,6 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
private static final Long READ_CHUNK_SIZE = 10L;
private static final Logger ourLog = LoggerFactory.getLogger(BulkDataExportSvcImpl.class);
private final int myReuseBulkExportForMillis = (int) (60 * DateUtils.MILLIS_PER_MINUTE);
@ -99,205 +75,14 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
@Autowired
private IBulkExportCollectionDao myBulkExportCollectionDao;
@Autowired
private IBulkExportCollectionFileDao myBulkExportCollectionFileDao;
@Autowired
private ISchedulerService mySchedulerService;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private FhirContext myContext;
@Autowired
private PlatformTransactionManager myTxManager;
private TransactionTemplate myTxTemplate;
@Autowired
private IBatchJobSubmitter myJobSubmitter;
@Autowired
@Qualifier(BatchConstants.BULK_EXPORT_JOB_NAME)
private org.springframework.batch.core.Job myBulkExportJob;
@Autowired
@Qualifier(BatchConstants.GROUP_BULK_EXPORT_JOB_NAME)
private org.springframework.batch.core.Job myGroupBulkExportJob;
@Autowired
@Qualifier(BatchConstants.PATIENT_BULK_EXPORT_JOB_NAME)
private org.springframework.batch.core.Job myPatientBulkExportJob;
private Set<String> myCompartmentResources;
private final int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR);
/**
* This method is called by the scheduler to run a pass of the
* generator
*/
@Transactional(value = Transactional.TxType.NEVER)
@Override
public synchronized void buildExportFiles() {
if (!myDaoConfig.isEnableTaskBulkExportJobExecution()) {
return;
}
Optional<BulkExportJobEntity> jobToProcessOpt = myTxTemplate.execute(t -> {
Pageable page = PageRequest.of(0, 1);
Slice<BulkExportJobEntity> submittedJobs = myBulkExportJobDao.findByStatus(page, BulkExportJobStatusEnum.SUBMITTED);
if (submittedJobs.isEmpty()) {
return Optional.empty();
}
return Optional.of(submittedJobs.getContent().get(0));
});
if (!jobToProcessOpt.isPresent()) {
return;
}
BulkExportJobEntity bulkExportJobEntity = jobToProcessOpt.get();
String jobUuid = bulkExportJobEntity.getJobId();
try {
processJob(bulkExportJobEntity);
} catch (Exception e) {
ourLog.error("Failure while preparing bulk export extract", e);
myTxTemplate.execute(t -> {
Optional<BulkExportJobEntity> submittedJobs = myBulkExportJobDao.findByJobId(jobUuid);
if (submittedJobs.isPresent()) {
BulkExportJobEntity jobEntity = submittedJobs.get();
jobEntity.setStatus(BulkExportJobStatusEnum.ERROR);
jobEntity.setStatusMessage(e.getMessage());
myBulkExportJobDao.save(jobEntity);
}
return null;
});
}
}
private String getQueryParameterIfPresent(String theRequestString, String theParameter) {
Map<String, String[]> stringMap = UrlUtil.parseQueryString(theRequestString);
if (stringMap != null) {
String[] strings = stringMap.get(theParameter);
if (strings != null) {
return String.join(",", strings);
}
}
return null;
}
@Autowired
private DaoConfig myDaoConfig;
/**
* This method is called by the scheduler to run a pass of the
* generator
*/
@Transactional(value = Transactional.TxType.NEVER)
@Override
public void purgeExpiredFiles() {
if (!myDaoConfig.isEnableTaskBulkExportJobExecution()) {
return;
}
Optional<BulkExportJobEntity> jobToDelete = myTxTemplate.execute(t -> {
Pageable page = PageRequest.of(0, 1);
Slice<BulkExportJobEntity> submittedJobs = myBulkExportJobDao.findByExpiry(page, new Date());
if (submittedJobs.isEmpty()) {
return Optional.empty();
}
return Optional.of(submittedJobs.getContent().get(0));
});
if (jobToDelete.isPresent()) {
ourLog.info("Deleting bulk export job: {}", jobToDelete.get());
myTxTemplate.execute(t -> {
BulkExportJobEntity job = myBulkExportJobDao.getOne(jobToDelete.get().getId());
for (BulkExportCollectionEntity nextCollection : job.getCollections()) {
for (BulkExportCollectionFileEntity nextFile : nextCollection.getFiles()) {
ourLog.info("Purging bulk data file: {}", nextFile.getResourceId());
getBinaryDao().delete(toId(nextFile.getResourceId()), new SystemRequestDetails());
getBinaryDao().forceExpungeInExistingTransaction(toId(nextFile.getResourceId()), new ExpungeOptions().setExpungeDeletedResources(true).setExpungeOldVersions(true), new SystemRequestDetails());
myBulkExportCollectionFileDao.deleteByPid(nextFile.getId());
}
myBulkExportCollectionDao.deleteByPid(nextCollection.getId());
}
ourLog.info("*** ABOUT TO DELETE");
myBulkExportJobDao.deleteByPid(job.getId());
return null;
});
ourLog.info("Finished deleting bulk export job: {}", jobToDelete.get());
}
}
private void processJob(BulkExportJobEntity theBulkExportJobEntity) {
String theJobUuid = theBulkExportJobEntity.getJobId();
JobParametersBuilder parameters = new JobParametersBuilder()
.addString(BatchConstants.JOB_UUID_PARAMETER, theJobUuid)
.addLong(BatchConstants.READ_CHUNK_PARAMETER, READ_CHUNK_SIZE);
ourLog.info("Submitting bulk export job {} to job scheduler", theJobUuid);
try {
if (isGroupBulkJob(theBulkExportJobEntity)) {
enhanceBulkParametersWithGroupParameters(theBulkExportJobEntity, parameters);
myJobSubmitter.runJob(myGroupBulkExportJob, parameters.toJobParameters());
} else if (isPatientBulkJob(theBulkExportJobEntity)) {
myJobSubmitter.runJob(myPatientBulkExportJob, parameters.toJobParameters());
} else {
myJobSubmitter.runJob(myBulkExportJob, parameters.toJobParameters());
}
} catch (JobParametersInvalidException theE) {
ourLog.error("Unable to start job with UUID: {}, the parameters are invalid. {}", theJobUuid, theE.getMessage());
}
}
private boolean isPatientBulkJob(BulkExportJobEntity theBulkExportJobEntity) {
return theBulkExportJobEntity.getRequest().startsWith("/Patient/");
}
private boolean isGroupBulkJob(BulkExportJobEntity theBulkExportJobEntity) {
return theBulkExportJobEntity.getRequest().startsWith("/Group/");
}
private void enhanceBulkParametersWithGroupParameters(BulkExportJobEntity theBulkExportJobEntity, JobParametersBuilder theParameters) {
String theGroupId = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_GROUP_ID);
String expandMdm = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_MDM);
theParameters.addString(BatchConstants.GROUP_ID_PARAMETER, theGroupId);
theParameters.addString(BatchConstants.EXPAND_MDM_PARAMETER, expandMdm);
}
@SuppressWarnings("unchecked")
private IFhirResourceDao<IBaseBinary> getBinaryDao() {
return myDaoRegistry.getResourceDao("Binary");
}
@PostConstruct
public void start() {
myTxTemplate = new TransactionTemplate(myTxManager);
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(Job.class.getName());
jobDetail.setJobClass(Job.class);
mySchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail);
jobDetail = new ScheduledJobDefinition();
jobDetail.setId(PurgeExpiredFilesJob.class.getName());
jobDetail.setJobClass(PurgeExpiredFilesJob.class);
mySchedulerService.scheduleClusteredJob(DateUtils.MILLIS_PER_HOUR, jobDetail);
}
@Transactional
@Override
@Deprecated
@ -391,32 +176,32 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
.filter(t -> !"Binary".equals(t))
.collect(Collectors.toSet());
BulkExportJobEntity job = new BulkExportJobEntity();
job.setJobId(UUID.randomUUID().toString());
job.setStatus(BulkExportJobStatusEnum.SUBMITTED);
job.setSince(since);
job.setCreated(new Date());
job.setRequest(request);
BulkExportJobEntity jobEntity = new BulkExportJobEntity();
jobEntity.setJobId(UUID.randomUUID().toString());
jobEntity.setStatus(BulkExportJobStatusEnum.SUBMITTED);
jobEntity.setSince(since);
jobEntity.setCreated(new Date());
jobEntity.setRequest(request);
// Validate types
validateTypes(resourceTypes);
validateTypeFilters(theBulkDataExportOptions.getFilters(), resourceTypes);
updateExpiry(job);
myBulkExportJobDao.save(job);
updateExpiry(jobEntity);
myBulkExportJobDao.save(jobEntity);
for (String nextType : resourceTypes) {
BulkExportCollectionEntity collection = new BulkExportCollectionEntity();
collection.setJob(job);
collection.setJob(jobEntity);
collection.setResourceType(nextType);
job.getCollections().add(collection);
jobEntity.getCollections().add(collection);
myBulkExportCollectionDao.save(collection);
}
ourLog.info("Bulk export job submitted: {}", job.toString());
ourLog.info("Bulk export job submitted: {}", jobEntity.toString());
return toSubmittedJobInfo(job);
return toSubmittedJobInfo(jobEntity);
}
public void validateTypes(Set<String> theResourceTypes) {
@ -484,21 +269,12 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
public Set<String> getPatientCompartmentResources() {
if (myCompartmentResources == null) {
myCompartmentResources = myContext.getResourceTypes().stream()
.filter(this::resourceTypeIsInPatientCompartment)
.filter(resType -> SearchParameterUtil.isResourceTypeInPatientCompartment(myContext, resType))
.collect(Collectors.toSet());
}
return myCompartmentResources;
}
/**
* Return true if any search parameter in the resource can point at a patient, false otherwise
*/
private boolean resourceTypeIsInPatientCompartment(String theResourceType) {
RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(theResourceType);
List<RuntimeSearchParam> searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient");
return searchParams != null && searchParams.size() >= 1;
}
public Set<String> getAllowedResourceTypesForBulkExportStyle(BulkDataExportOptions.ExportStyle theExportStyle) {
if (theExportStyle.equals(SYSTEM)) {
return myContext.getResourceTypes();
@ -509,51 +285,10 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
}
}
private IIdType toId(String theResourceId) {
IIdType retVal = myContext.getVersion().newIdType();
retVal.setValue(theResourceId);
return retVal;
}
private IIdType toQualifiedBinaryId(String theIdPart) {
IIdType retVal = myContext.getVersion().newIdType();
retVal.setParts(null, "Binary", theIdPart, null);
return retVal;
}
@Override
@Transactional(Transactional.TxType.NEVER)
public synchronized void cancelAndPurgeAllJobs() {
myTxTemplate.execute(t -> {
ourLog.info("Deleting all files");
myBulkExportCollectionFileDao.deleteAllFiles();
ourLog.info("Deleting all collections");
myBulkExportCollectionDao.deleteAllFiles();
ourLog.info("Deleting all jobs");
myBulkExportJobDao.deleteAllFiles();
return null;
});
}
public static class Job implements HapiJob {
@Autowired
private IBulkDataExportSvc myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.buildExportFiles();
}
}
public static class PurgeExpiredFilesJob implements HapiJob {
@Autowired
private IBulkDataExportSvc myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.purgeExpiredFiles();
}
}
}

View File

@ -17,8 +17,10 @@ import ca.uhn.fhir.jpa.batch.mdm.MdmClearJobSubmitterImpl;
import ca.uhn.fhir.jpa.batch.reader.BatchResourceSearcher;
import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider;
import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.export.provider.BulkDataExportProvider;
import ca.uhn.fhir.jpa.bulk.export.svc.BulkDataExportJobSchedulingHelperImpl;
import ca.uhn.fhir.jpa.bulk.export.svc.BulkDataExportSvcImpl;
import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import ca.uhn.fhir.jpa.bulk.imprt.svc.BulkDataImportSvcImpl;
@ -438,6 +440,12 @@ public class JpaConfig {
return new BulkDataExportSvcImpl();
}
@Bean
@Lazy
public IBulkDataExportJobSchedulingHelper bulkDataExportJobSchedulingHelper() {
return new BulkDataExportJobSchedulingHelperImpl();
}
@Bean
@Lazy
public BulkDataExportProvider bulkDataExportProvider() {

View File

@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.config;
import ca.uhn.fhir.jpa.dao.ObservationLastNIndexPersistSvc;
import ca.uhn.fhir.jpa.term.TermCodeSystemStorageSvcImpl;
import ca.uhn.fhir.jpa.term.TermConceptDaoSvc;
import ca.uhn.fhir.jpa.term.TermDeferredStorageSvcImpl;
import ca.uhn.fhir.jpa.term.TermReindexingSvcImpl;
import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc;
@ -38,6 +39,11 @@ public class SharedConfigDstu3Plus {
return new TermCodeSystemStorageSvcImpl();
}
@Bean
public TermConceptDaoSvc termConceptDaoSvc() {
return new TermConceptDaoSvc();
}
@Bean
public ITermDeferredStorageSvc termDeferredStorageSvc() {
return new TermDeferredStorageSvcImpl();

View File

@ -50,8 +50,6 @@ import static ca.uhn.fhir.jpa.dao.index.IdHelperService.RESOURCE_PID;
public class JpaIdHelperService extends IdHelperService implements IJpaIdHelperService, IIdHelperService {
@Autowired
protected IResourceTableDao myResourceTableDao;
@Autowired
private IIdHelperService myIdHelperService;
/**
* @deprecated This method doesn't take a partition ID as input, so it is unsafe. It
@ -61,7 +59,7 @@ public class JpaIdHelperService extends IdHelperService implements IJpaIdHelperS
@Deprecated
@Nonnull
public List<Long> getPidsOrThrowException(List<IIdType> theIds) {
List<ResourcePersistentId> resourcePersistentIds = myIdHelperService.resolveResourcePersistentIdsWithCache(RequestPartitionId.allPartitions(), theIds);
List<ResourcePersistentId> resourcePersistentIds = super.resolveResourcePersistentIdsWithCache(RequestPartitionId.allPartitions(), theIds);
return resourcePersistentIds.stream().map(ResourcePersistentId::getIdAsLong).collect(Collectors.toList());
}
@ -80,7 +78,7 @@ public class JpaIdHelperService extends IdHelperService implements IJpaIdHelperS
if (retVal == null) {
IIdType id = theResource.getIdElement();
try {
retVal = myIdHelperService.resolveResourcePersistentIds(RequestPartitionId.allPartitions(), id.getResourceType(), id.getIdPart()).getIdAsLong();
retVal = super.resolveResourcePersistentIds(RequestPartitionId.allPartitions(), id.getResourceType(), id.getIdPart()).getIdAsLong();
} catch (ResourceNotFoundException e) {
return null;
}
@ -100,7 +98,7 @@ public class JpaIdHelperService extends IdHelperService implements IJpaIdHelperS
assert TransactionSynchronizationManager.isSynchronizationActive();
List<IIdType> ids = Collections.singletonList(theId);
List<ResourcePersistentId> resourcePersistentIds = myIdHelperService.resolveResourcePersistentIdsWithCache(RequestPartitionId.allPartitions(), ids);
List<ResourcePersistentId> resourcePersistentIds = super.resolveResourcePersistentIdsWithCache(RequestPartitionId.allPartitions(), ids);
return resourcePersistentIds.get(0).getIdAsLong();
}
@ -139,7 +137,7 @@ public class JpaIdHelperService extends IdHelperService implements IJpaIdHelperS
public Set<String> translatePidsToFhirResourceIds(Set<Long> thePids) {
assert TransactionSynchronizationManager.isSynchronizationActive();
Map<Long, Optional<String>> pidToForcedIdMap = myIdHelperService.translatePidsToForcedIds(thePids);
Map<Long, Optional<String>> pidToForcedIdMap = super.translatePidsToForcedIds(thePids);
//If the result of the translation is an empty optional, it means there is no forced id, and we can use the PID as the resource ID.
Set<String> resolvedResourceIds = pidToForcedIdMap.entrySet().stream()

View File

@ -26,11 +26,13 @@ import ca.uhn.fhir.context.support.IValidationSupport;
import ca.uhn.fhir.context.support.ValidationSupportContext;
import ca.uhn.fhir.context.support.ValueSetExpansionOptions;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IDao;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoCodeSystem;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.config.HibernatePropertiesProvider;
import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc;
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao;
@ -63,8 +65,6 @@ import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.search.ElasticsearchNestedQueryBuilderUtil;
import ca.uhn.fhir.jpa.search.builder.SearchBuilder;
import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc;
import ca.uhn.fhir.jpa.term.api.ITermConceptMappingSvc;
import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc;
import ca.uhn.fhir.jpa.term.api.ITermReadSvc;
import ca.uhn.fhir.jpa.term.ex.ExpansionTooCostlyException;
@ -132,6 +132,7 @@ import org.hl7.fhir.r4.model.IntegerType;
import org.hl7.fhir.r4.model.StringType;
import org.hl7.fhir.r4.model.ValueSet;
import org.hl7.fhir.r4.model.codesystems.ConceptSubsumptionOutcome;
import org.jetbrains.annotations.NotNull;
import org.quartz.JobExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
@ -257,12 +258,11 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
private ISchedulerService mySchedulerService;
@Autowired(required = false)
private ITermDeferredStorageSvc myDeferredStorageSvc;
@Autowired(required = false)
private ITermCodeSystemStorageSvc myConceptStorageSvc;
@Autowired
private IIdHelperService myIdHelperService;
@Autowired
private ApplicationContext myApplicationContext;
@Autowired
private ITermConceptMappingSvc myTermConceptMappingSvc;
private volatile IValidationSupport myJpaValidationSupport;
private volatile IValidationSupport myValidationSupport;
@ -1580,18 +1580,24 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
}
private Optional<TermValueSet> fetchValueSetEntity(ValueSet theValueSet) {
ResourcePersistentId valueSetResourcePid = myConceptStorageSvc.getValueSetResourcePid(theValueSet.getIdElement());
ResourcePersistentId valueSetResourcePid = getValueSetResourcePersistentId(theValueSet);
Optional<TermValueSet> optionalTermValueSet = myTermValueSetDao.findByResourcePid(valueSetResourcePid.getIdAsLong());
return optionalTermValueSet;
}
private ResourcePersistentId getValueSetResourcePersistentId(ValueSet theValueSet) {
ResourcePersistentId valueSetResourcePid = myIdHelperService.resolveResourcePersistentIds(RequestPartitionId.allPartitions(), theValueSet.getIdElement().getResourceType(), theValueSet.getIdElement().getIdPart());
return valueSetResourcePid;
}
protected IValidationSupport.CodeValidationResult validateCodeIsInPreExpandedValueSet(
ConceptValidationOptions theValidationOptions,
ValueSet theValueSet, String theSystem, String theCode, String theDisplay, Coding theCoding, CodeableConcept theCodeableConcept) {
assert TransactionSynchronizationManager.isSynchronizationActive();
ValidateUtil.isNotNullOrThrowUnprocessableEntity(theValueSet.hasId(), "ValueSet.id is required");
ResourcePersistentId valueSetResourcePid = myConceptStorageSvc.getValueSetResourcePid(theValueSet.getIdElement());
ResourcePersistentId valueSetResourcePid = getValueSetResourcePersistentId(theValueSet);
List<TermValueSetConcept> concepts = new ArrayList<>();
if (isNotBlank(theCode)) {

View File

@ -26,7 +26,6 @@ import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao;
@ -89,7 +88,6 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static ca.uhn.fhir.jpa.batch.config.BatchConstants.TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.hl7.fhir.common.hapi.validation.support.ValidationConstants.LOINC_LOW;
@ -127,17 +125,8 @@ public class TermCodeSystemStorageSvcImpl implements ITermCodeSystemStorageSvc {
private IResourceTableDao myResourceTableDao;
@Autowired
private IBatchJobSubmitter myJobSubmitter;
private TermConceptDaoSvc myTermConceptDaoSvc;
@Autowired
@Qualifier(TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME)
private Job myTermCodeSystemVersionDeleteJob;
@Override
public ResourcePersistentId getValueSetResourcePid(IIdType theIdType) {
return myIdHelperService.resolveResourcePersistentIds(RequestPartitionId.allPartitions(), theIdType.getResourceType(), theIdType.getIdPart());
}
@Transactional
@Override
@ -277,41 +266,7 @@ public class TermCodeSystemStorageSvcImpl implements ITermCodeSystemStorageSvc {
*/
@Override
public int saveConcept(TermConcept theConcept) {
int retVal = 0;
/*
* If the concept has an ID, we're reindexing, so there's no need to
* save parent concepts first (it's way too slow to do that)
*/
if (theConcept.getId() == null) {
boolean needToSaveParents = false;
for (TermConceptParentChildLink next : theConcept.getParents()) {
if (next.getParent().getId() == null) {
needToSaveParents = true;
}
}
if (needToSaveParents) {
retVal += ensureParentsSaved(theConcept.getParents());
}
}
if (theConcept.getId() == null || theConcept.getIndexStatus() == null) {
retVal++;
theConcept.setIndexStatus(BaseHapiFhirDao.INDEX_STATUS_INDEXED);
theConcept.setUpdated(new Date());
myConceptDao.save(theConcept);
for (TermConceptProperty next : theConcept.getProperties()) {
myConceptPropertyDao.save(next);
}
for (TermConceptDesignation next : theConcept.getDesignations()) {
myConceptDesignationDao.save(next);
}
}
ourLog.trace("Saved {} and got PID {}", theConcept.getCode(), theConcept.getId());
return retVal;
return myTermConceptDaoSvc.saveConcept(theConcept);
}
@Override

View File

@ -0,0 +1,110 @@
package ca.uhn.fhir.jpa.term;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.dao.data.ITermConceptDao;
import ca.uhn.fhir.jpa.dao.data.ITermConceptDesignationDao;
import ca.uhn.fhir.jpa.dao.data.ITermConceptPropertyDao;
import ca.uhn.fhir.jpa.entity.TermConcept;
import ca.uhn.fhir.jpa.entity.TermConceptDesignation;
import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink;
import ca.uhn.fhir.jpa.entity.TermConceptProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Collection;
import java.util.Date;
public class TermConceptDaoSvc {
private static final Logger ourLog = LoggerFactory.getLogger(TermCodeSystemStorageSvcImpl.class);
@Autowired
protected ITermConceptPropertyDao myConceptPropertyDao;
@Autowired
protected ITermConceptDao myConceptDao;
@Autowired
protected ITermConceptDesignationDao myConceptDesignationDao;
public int saveConcept(TermConcept theConcept) {
int retVal = 0;
/*
* If the concept has an ID, we're reindexing, so there's no need to
* save parent concepts first (it's way too slow to do that)
*/
if (theConcept.getId() == null) {
boolean needToSaveParents = false;
for (TermConceptParentChildLink next : theConcept.getParents()) {
if (next.getParent().getId() == null) {
needToSaveParents = true;
break;
}
}
if (needToSaveParents) {
retVal += ensureParentsSaved(theConcept.getParents());
}
}
if (theConcept.getId() == null || theConcept.getIndexStatus() == null) {
retVal++;
theConcept.setIndexStatus(BaseHapiFhirDao.INDEX_STATUS_INDEXED);
theConcept.setUpdated(new Date());
myConceptDao.save(theConcept);
for (TermConceptProperty next : theConcept.getProperties()) {
myConceptPropertyDao.save(next);
}
for (TermConceptDesignation next : theConcept.getDesignations()) {
myConceptDesignationDao.save(next);
}
}
ourLog.trace("Saved {} and got PID {}", theConcept.getCode(), theConcept.getId());
return retVal;
}
private int ensureParentsSaved(Collection<TermConceptParentChildLink> theParents) {
ourLog.trace("Checking {} parents", theParents.size());
int retVal = 0;
for (TermConceptParentChildLink nextLink : theParents) {
if (nextLink.getRelationshipType() == TermConceptParentChildLink.RelationshipTypeEnum.ISA) {
TermConcept nextParent = nextLink.getParent();
retVal += ensureParentsSaved(nextParent.getParents());
if (nextParent.getId() == null) {
nextParent.setUpdated(new Date());
myConceptDao.saveAndFlush(nextParent);
retVal++;
ourLog.debug("Saved parent code {} and got id {}", nextParent.getCode(), nextParent.getId());
}
}
}
return retVal;
}
}

View File

@ -34,7 +34,6 @@ import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc;
import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc;
import ca.uhn.fhir.jpa.term.api.ITermVersionAdapterSvc;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
@ -103,8 +102,9 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
private ISchedulerService mySchedulerService;
@Autowired
private ITermVersionAdapterSvc myTerminologyVersionAdapterSvc;
@Autowired
private ITermCodeSystemStorageSvc myCodeSystemStorageSvc;
private TermConceptDaoSvc myTermConceptDaoSvc;
@Autowired
private IBatchJobSubmitter myJobSubmitter;
@ -189,7 +189,7 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
TermConcept next = myDeferredConcepts.remove(0);
if (myCodeSystemVersionDao.findById(next.getCodeSystemVersion().getPid()).isPresent()) {
try {
codeCount += myCodeSystemStorageSvc.saveConcept(next);
codeCount += myTermConceptDaoSvc.saveConcept(next);
} catch (Exception theE) {
ourLog.error("Exception thrown when attempting to save TermConcept {} in Code System {}",
next.getCode(), next.getCodeSystemVersion().getCodeSystemDisplayName(), theE);
@ -465,9 +465,10 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
myTransactionMgr = theTxManager;
}
@VisibleForTesting
void setCodeSystemStorageSvcForUnitTest(ITermCodeSystemStorageSvc theCodeSystemStorageSvc) {
myCodeSystemStorageSvc = theCodeSystemStorageSvc;
void setTermConceptDaoSvc(TermConceptDaoSvc theTermConceptDaoSvc) {
myTermConceptDaoSvc = theTermConceptDaoSvc;
}
@VisibleForTesting

View File

@ -63,11 +63,11 @@ public class TermReindexingSvcImpl implements ITermReindexingSvc {
@Autowired
private ITermConceptParentChildLinkDao myConceptParentChildLinkDao;
@Autowired
private ITermCodeSystemStorageSvc myConceptStorageSvc;
@Autowired
private ITermDeferredStorageSvc myDeferredStorageSvc;
@Autowired
private ISchedulerService mySchedulerService;
@Autowired
private TermConceptDaoSvc myTermConceptDaoSvc;
@Override
public void processReindexing() {
@ -138,7 +138,7 @@ public class TermReindexingSvcImpl implements ITermReindexingSvc {
nextConcept.setParentPids(parentsBuilder.toString());
}
myConceptStorageSvc.saveConcept(nextConcept);
myTermConceptDaoSvc.saveConcept(nextConcept);
count++;
}

View File

@ -92,5 +92,4 @@ public interface ITermCodeSystemStorageSvc {
int saveConcept(TermConcept theNextConcept);
ResourcePersistentId getValueSetResourcePid(IIdType theIdElement);
}

View File

@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.export.job.BulkExportJobParametersBuilder;
import ca.uhn.fhir.jpa.bulk.export.job.GroupBulkExportJobParametersBuilder;
@ -98,6 +99,8 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Autowired
private IBulkDataExportSvc myBulkDataExportSvc;
@Autowired
private IBulkDataExportJobSchedulingHelper myBulkDataExportJobSchedulingHelper;
@Autowired
private IBatchJobSubmitter myBatchJobSubmitter;
@Autowired
private BatchJobHelper myBatchJobHelper;
@ -165,7 +168,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
});
// Run a purge pass
myBulkDataExportSvc.purgeExpiredFiles();
myBulkDataExportJobSchedulingHelper.purgeExpiredFiles();
// Check that things were deleted
runInTransaction(() -> {
@ -313,7 +316,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertEquals(BulkExportJobStatusEnum.SUBMITTED, status.getStatus());
// Run a scheduled pass to build the export
myBulkDataExportSvc.buildExportFiles();
myBulkDataExportJobSchedulingHelper.startSubmittedJobs();
awaitAllBulkJobCompletions();
@ -359,8 +362,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Observation,Patient&_typeFilter=" + UrlUtil.escapeUrlParam(TEST_FILTER), status.getRequest());
// Run a scheduled pass to build the export
myBulkDataExportSvc.buildExportFiles();
myBulkDataExportJobSchedulingHelper.startSubmittedJobs();
awaitAllBulkJobCompletions();
// Fetch the job again
@ -414,7 +416,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson", status.getRequest());
// Run a scheduled pass to build the export
myBulkDataExportSvc.buildExportFiles();
myBulkDataExportJobSchedulingHelper.startSubmittedJobs();
awaitAllBulkJobCompletions();
@ -465,7 +467,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
myBulkDataExportJobSchedulingHelper.startSubmittedJobs();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
@ -497,7 +499,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Patient&_typeFilter=Patient%3F_has%3AObservation%3Apatient%3Aidentifier%3DSYS%7CVAL3", status.getRequest());
// Run a scheduled pass to build the export
myBulkDataExportSvc.buildExportFiles();
myBulkDataExportJobSchedulingHelper.startSubmittedJobs();
awaitAllBulkJobCompletions();
@ -551,7 +553,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=EpisodeOfCare,Patient&_typeFilter=Patient%3F_id%3DP999999990&_typeFilter=EpisodeOfCare%3Fpatient%3DP999999990", status.getRequest());
// Run a scheduled pass to build the export
myBulkDataExportSvc.buildExportFiles();
myBulkDataExportJobSchedulingHelper.startSubmittedJobs();
awaitAllBulkJobCompletions();
@ -611,7 +613,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Observation,Patient&_since=" + cutoff.setTimeZoneZulu(true).getValueAsString(), status.getRequest());
// Run a scheduled pass to build the export
myBulkDataExportSvc.buildExportFiles();
myBulkDataExportJobSchedulingHelper.startSubmittedJobs();
awaitAllBulkJobCompletions();
@ -705,7 +707,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
myBulkDataExportJobSchedulingHelper.startSubmittedJobs();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
@ -740,7 +742,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
myBulkDataExportJobSchedulingHelper.startSubmittedJobs();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
@ -847,7 +849,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
myBulkDataExportJobSchedulingHelper.startSubmittedJobs();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
@ -890,7 +892,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
options.setFilters(Sets.newHashSet("Immunization?vaccine-code=Flu", "Immunization?patient=Patient/PAT1"));
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options);
myBulkDataExportSvc.buildExportFiles();
myBulkDataExportJobSchedulingHelper.startSubmittedJobs();
awaitAllBulkJobCompletions();
@ -928,7 +930,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
options.setFilters(Sets.newHashSet("Observation?identifier=VAL0,VAL2", "Observation?identifier=VAL4"));
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options);
myBulkDataExportSvc.buildExportFiles();
myBulkDataExportJobSchedulingHelper.startSubmittedJobs();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
@ -970,7 +972,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
myBulkDataExportJobSchedulingHelper.startSubmittedJobs();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
@ -1000,7 +1002,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
myBulkDataExportJobSchedulingHelper.startSubmittedJobs();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
@ -1066,7 +1068,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
myBulkDataExportJobSchedulingHelper.startSubmittedJobs();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
@ -1104,7 +1106,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
//Patient-style
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
myBulkDataExportJobSchedulingHelper.startSubmittedJobs();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), is(equalTo(BulkExportJobStatusEnum.COMPLETE)));
@ -1113,7 +1115,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
bulkDataExportOptions.setGroupId(myPatientGroupId);
jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
myBulkDataExportJobSchedulingHelper.startSubmittedJobs();
awaitAllBulkJobCompletions();
jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), is(equalTo(BulkExportJobStatusEnum.COMPLETE)));
@ -1121,7 +1123,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
//System-style
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
myBulkDataExportJobSchedulingHelper.startSubmittedJobs();
awaitAllBulkJobCompletions();
jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), is(equalTo(BulkExportJobStatusEnum.COMPLETE)));
@ -1183,7 +1185,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
myBulkDataExportJobSchedulingHelper.startSubmittedJobs();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());

View File

@ -10,7 +10,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
import ca.uhn.fhir.jpa.config.JpaConfig;
import ca.uhn.fhir.jpa.dao.data.IForcedIdDao;
import ca.uhn.fhir.jpa.dao.data.IResourceHistoryTableDao;
@ -75,7 +75,6 @@ import org.hl7.fhir.dstu3.model.Resource;
import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@ -736,10 +735,10 @@ public abstract class BaseJpaTest extends BaseTest {
}
@SuppressWarnings("BusyWait")
protected static void purgeDatabase(DaoConfig theDaoConfig, IFhirSystemDao<?, ?> theSystemDao, IResourceReindexingSvc theResourceReindexingSvc, ISearchCoordinatorSvc theSearchCoordinatorSvc, ISearchParamRegistry theSearchParamRegistry, IBulkDataExportSvc theBulkDataExportSvc) {
protected static void purgeDatabase(DaoConfig theDaoConfig, IFhirSystemDao<?, ?> theSystemDao, IResourceReindexingSvc theResourceReindexingSvc, ISearchCoordinatorSvc theSearchCoordinatorSvc, ISearchParamRegistry theSearchParamRegistry, IBulkDataExportJobSchedulingHelper theBulkDataJobActivator) {
theSearchCoordinatorSvc.cancelAllActiveSearches();
theResourceReindexingSvc.cancelAndPurgeAllJobs();
theBulkDataExportSvc.cancelAndPurgeAllJobs();
theBulkDataJobActivator.cancelAndPurgeAllJobs();
boolean expungeEnabled = theDaoConfig.isExpungeEnabled();
theDaoConfig.setExpungeEnabled(true);

View File

@ -8,6 +8,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoSubscription;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestDstu2Config;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
@ -217,7 +218,7 @@ public abstract class BaseJpaDstu2Test extends BaseJpaTest {
@Autowired
protected SubscriptionLoader mySubscriptionLoader;
@Autowired
private IBulkDataExportSvc myBulkDataExportSvc;
private IBulkDataExportJobSchedulingHelper myBulkExportJobSchedulingHelper;
@Autowired
private ValidationSupportChain myJpaValidationSupportChain;
@ -232,7 +233,7 @@ public abstract class BaseJpaDstu2Test extends BaseJpaTest {
@BeforeEach
@Transactional()
public void beforePurgeDatabase() {
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc);
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkExportJobSchedulingHelper);
}
@BeforeEach

View File

@ -13,6 +13,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestDstu3Config;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
@ -344,9 +345,9 @@ public abstract class BaseJpaDstu3Test extends BaseJpaTest {
@Autowired
private IValidationSupport myJpaValidationSupportChainDstu3;
@Autowired
private IBulkDataExportSvc myBulkDataExportSvc;
@Autowired
protected ITermValueSetDao myTermValueSetDao;
@Autowired
private IBulkDataExportJobSchedulingHelper myBulkDataScheduleHelper;
@AfterEach()
public void afterCleanupDao() {
@ -388,7 +389,7 @@ public abstract class BaseJpaDstu3Test extends BaseJpaTest {
@BeforeEach
@Transactional()
public void beforePurgeDatabase() {
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc);
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataScheduleHelper);
}
@BeforeEach

View File

@ -17,6 +17,7 @@ import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider;
import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestR4Config;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
@ -502,7 +503,7 @@ public abstract class BaseJpaR4Test extends BaseJpaTest implements ITestDataBuil
private IValidationSupport myJpaValidationSupportChainR4;
private PerformanceTracingLoggingInterceptor myPerformanceTracingLoggingInterceptor;
@Autowired
private IBulkDataExportSvc myBulkDataExportSvc;
private IBulkDataExportJobSchedulingHelper myBulkDataScheduleHelper;
@AfterEach()
public void afterCleanupDao() {
@ -560,7 +561,7 @@ public abstract class BaseJpaR4Test extends BaseJpaTest implements ITestDataBuil
runInTransaction(() -> {
myMdmLinkDao.deleteAll();
});
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc);
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataScheduleHelper);
}
@BeforeEach

View File

@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestHibernateSearchAddInConfig;
import ca.uhn.fhir.jpa.config.TestR4Config;
@ -126,16 +127,15 @@ public class FhirResourceDaoR4SearchWithElasticSearchIT extends BaseJpaTest {
@Autowired
private IResourceReindexingSvc myResourceReindexingSvc;
@Autowired
private IBulkDataExportSvc myBulkDataExportSvc;
private IBulkDataExportJobSchedulingHelper myBulkDataScheduleHelper;
@Autowired
private ITermCodeSystemStorageSvc myTermCodeSystemStorageSvc;
@Autowired
private DaoRegistry myDaoRegistry;
private boolean myContainsSettings;
@BeforeEach
public void beforePurgeDatabase() {
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc);
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataScheduleHelper);
}
@Override

View File

@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestHibernateSearchAddInConfig;
import ca.uhn.fhir.jpa.config.TestR4Config;
@ -144,14 +145,14 @@ public class FhirResourceDaoR4SearchWithLuceneDisabledTest extends BaseJpaTest {
@Autowired
private IResourceReindexingSvc myResourceReindexingSvc;
@Autowired
private IBulkDataExportSvc myBulkDataExportSvc;
private IBulkDataExportJobSchedulingHelper myBulkDataScheduleHelper;
@Autowired
private ITermReadSvc myTermSvc;
@BeforeEach
@Transactional()
public void beforePurgeDatabase() {
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc);
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataScheduleHelper);
}
@Override

View File

@ -6,6 +6,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoCodeSystem;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestHibernateSearchAddInConfig;
import ca.uhn.fhir.jpa.config.TestR4Config;
@ -85,7 +86,7 @@ public class FhirResourceDaoR4TerminologyElasticsearchIT extends BaseJpaTest {
@Autowired
private ISearchParamRegistry mySearchParamRegistry;
@Autowired
private IBulkDataExportSvc myBulkDataExportSvc;
private IBulkDataExportJobSchedulingHelper myBulkDataScheduleHelper;
@BeforeEach
@ -158,7 +159,7 @@ public class FhirResourceDaoR4TerminologyElasticsearchIT extends BaseJpaTest {
@AfterEach
public void afterPurgeDatabase() {
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc);
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataScheduleHelper);
}
@AfterAll

View File

@ -16,6 +16,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider;
import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestR5Config;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
@ -406,7 +407,7 @@ public abstract class BaseJpaR5Test extends BaseJpaTest implements ITestDataBuil
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private IBulkDataExportSvc myBulkDataExportSvc;
private IBulkDataExportJobSchedulingHelper myBulkDataSchedulerHelper;
@Override
public IIdType doCreateResource(IBaseResource theResource) {
@ -480,7 +481,7 @@ public abstract class BaseJpaR5Test extends BaseJpaTest implements ITestDataBuil
@BeforeEach
@Transactional()
public void beforePurgeDatabase() {
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc);
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataSchedulerHelper);
}
@BeforeEach

View File

@ -5,6 +5,7 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestDataBuilderConfig;
import ca.uhn.fhir.jpa.config.TestHibernateSearchAddInConfig;
@ -76,7 +77,7 @@ public class TokenAutocompleteElasticsearchIT extends BaseJpaTest{
@Autowired
IResourceReindexingSvc myResourceReindexingSvc;
@Autowired
IBulkDataExportSvc myBulkDataExportSvc;
private IBulkDataExportJobSchedulingHelper myBulkDataScheduleHelper;
@Autowired
ITestDataBuilder myDataBuilder;
@ -86,7 +87,7 @@ public class TokenAutocompleteElasticsearchIT extends BaseJpaTest{
@BeforeEach
public void beforePurgeDatabase() {
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc);
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataScheduleHelper);
myDaoConfig.setAdvancedLuceneIndexing(true);
}

View File

@ -7,6 +7,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoCodeSystem;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
@ -143,7 +144,7 @@ public abstract class AbstractValueSetFreeTextExpansionR4Test extends BaseJpaTes
private ISearchParamRegistry mySearchParamRegistry;
@Autowired
private IBulkDataExportSvc myBulkDataExportSvc;
private IBulkDataExportJobSchedulingHelper myBulkDataScheduleHelper;
@Autowired
protected ITermCodeSystemVersionDao myTermCodeSystemVersionDao;
@ -166,7 +167,7 @@ public abstract class AbstractValueSetFreeTextExpansionR4Test extends BaseJpaTes
@BeforeEach
@Transactional()
public void beforePurgeDatabase() {
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc);
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataScheduleHelper);
}
@AfterEach
@ -174,7 +175,7 @@ public abstract class AbstractValueSetFreeTextExpansionR4Test extends BaseJpaTes
myDaoConfig.setDeferIndexingForCodesystemsOfSize(new DaoConfig().getDeferIndexingForCodesystemsOfSize());
TermReindexingSvcImpl.setForceSaveDeferredAlwaysForUnitTest(false);
myDaoConfig.setMaximumExpansionSize(DaoConfig.DEFAULT_MAX_EXPANSION_SIZE);
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc);
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataScheduleHelper);
}
@AfterEach()

View File

@ -32,7 +32,7 @@ public class TermDeferredStorageSvcImplTest {
@Mock
private PlatformTransactionManager myTxManager;
@Mock
private ITermCodeSystemStorageSvc myTermConceptStorageSvc;
private TermConceptDaoSvc myTermConceptDaoSvc;
@Mock
private ITermConceptDao myConceptDao;
@Mock
@ -71,15 +71,15 @@ public class TermDeferredStorageSvcImplTest {
TermDeferredStorageSvcImpl svc = new TermDeferredStorageSvcImpl();
svc.setTransactionManagerForUnitTest(myTxManager);
svc.setCodeSystemStorageSvcForUnitTest(myTermConceptStorageSvc);
svc.setTermConceptDaoSvc(myTermConceptDaoSvc);
when(myTermCodeSystemVersionDao.findById(anyLong())).thenReturn(Optional.of(myTermCodeSystemVersion));
svc.setCodeSystemVersionDaoForUnitTest(myTermCodeSystemVersionDao);
svc.setProcessDeferred(true);
svc.addConceptToStorageQueue(concept);
svc.saveDeferred();
verify(myTermConceptStorageSvc, times(1)).saveConcept(same(concept));
verifyNoMoreInteractions(myTermConceptStorageSvc);
verify(myTermConceptDaoSvc, times(1)).saveConcept(same(concept));
verifyNoMoreInteractions(myTermConceptDaoSvc);
}
@ -94,7 +94,7 @@ public class TermDeferredStorageSvcImplTest {
TermDeferredStorageSvcImpl svc = new TermDeferredStorageSvcImpl();
svc.setTransactionManagerForUnitTest(myTxManager);
svc.setCodeSystemStorageSvcForUnitTest(myTermConceptStorageSvc);
svc.setTermConceptDaoSvc(myTermConceptDaoSvc);
when(myTermCodeSystemVersionDao.findById(anyLong())).thenReturn(Optional.empty());
svc.setCodeSystemVersionDaoForUnitTest(myTermCodeSystemVersionDao);
@ -102,8 +102,8 @@ public class TermDeferredStorageSvcImplTest {
svc.addConceptToStorageQueue(concept);
svc.saveDeferred();
verify(myTermConceptStorageSvc, times(0)).saveConcept(same(concept));
verifyNoMoreInteractions(myTermConceptStorageSvc);
verify(myTermConceptDaoSvc, times(0)).saveConcept(same(concept));
verifyNoMoreInteractions(myTermConceptDaoSvc);
}
@ -119,18 +119,18 @@ public class TermDeferredStorageSvcImplTest {
TermDeferredStorageSvcImpl svc = new TermDeferredStorageSvcImpl();
svc.setTransactionManagerForUnitTest(myTxManager);
svc.setCodeSystemStorageSvcForUnitTest(myTermConceptStorageSvc);
svc.setTermConceptDaoSvc(myTermConceptDaoSvc);
// Simulate the case where an exception is thrown despite a valid code system version.
when(myTermCodeSystemVersionDao.findById(anyLong())).thenReturn(Optional.of(myTermCodeSystemVersion));
when(myTermConceptStorageSvc.saveConcept(concept)).thenThrow(new RuntimeException("Foreign Constraint Violation"));
when(myTermConceptDaoSvc.saveConcept(concept)).thenThrow(new RuntimeException("Foreign Constraint Violation"));
svc.setCodeSystemVersionDaoForUnitTest(myTermCodeSystemVersionDao);
svc.setProcessDeferred(true);
svc.addConceptToStorageQueue(concept);
svc.saveDeferred();
verify(myTermConceptStorageSvc, times(1)).saveConcept(same(concept));
verifyNoMoreInteractions(myTermConceptStorageSvc);
verify(myTermConceptDaoSvc, times(1)).saveConcept(same(concept));
verifyNoMoreInteractions(myTermConceptDaoSvc);
}
@ -142,13 +142,13 @@ public class TermDeferredStorageSvcImplTest {
TermDeferredStorageSvcImpl svc = new TermDeferredStorageSvcImpl();
svc.setTransactionManagerForUnitTest(myTxManager);
svc.setCodeSystemStorageSvcForUnitTest(myTermConceptStorageSvc);
svc.setTermConceptDaoSvc(myTermConceptDaoSvc);
svc.setConceptDaoForUnitTest(myConceptDao);
svc.setProcessDeferred(true);
svc.addConceptLinkToStorageQueue(conceptLink);
svc.saveDeferred();
verifyNoMoreInteractions(myTermConceptStorageSvc);
verifyNoMoreInteractions(myTermConceptDaoSvc);
}
}

View File

@ -7,6 +7,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoCodeSystem;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestHibernateSearchAddInConfig;
import ca.uhn.fhir.jpa.config.TestR4Config;
@ -90,7 +91,7 @@ public class ValueSetExpansionR4ElasticsearchIT extends BaseJpaTest {
@Autowired
private ISearchParamRegistry mySearchParamRegistry;
@Autowired
private IBulkDataExportSvc myBulkDataExportSvc;
private IBulkDataExportJobSchedulingHelper myBulkDataExportJobSchedulingHelper;
@Mock
private IValueSetConceptAccumulator myValueSetCodeAccumulator;
@ -108,7 +109,7 @@ public class ValueSetExpansionR4ElasticsearchIT extends BaseJpaTest {
@AfterEach
public void afterPurgeDatabase() {
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc);
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportJobSchedulingHelper);
}
void createCodeSystem() {

View File

@ -46,6 +46,8 @@ import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@ -265,6 +267,14 @@ public class SearchParamRegistryImpl implements ISearchParamRegistry, IResourceC
myResourceChangeListenerRegistry = theResourceChangeListenerRegistry;
}
/**
*
* There is a circular reference between this class and the ResourceChangeListenerRegistry:
* SearchParamRegistryImpl -> ResourceChangeListenerRegistry -> InMemoryResourceMatcher -> SearchParamRegistryImpl. Sicne we only need this once on boot-up, we delay
* until ContextRefreshedEvent.
*
*/
@PostConstruct
public void registerListener() {
myResourceChangeListenerCache = myResourceChangeListenerRegistry.registerResourceResourceChangeListener("SearchParameter", SearchParameterMap.newSynchronous(), this, REFRESH_INTERVAL);

View File

@ -0,0 +1,25 @@
package ca.uhn.fhir.jpa.bulk.export.api;
import javax.transaction.Transactional;
public interface IBulkDataExportJobSchedulingHelper {
/**
* invoked via scheduled task, purges any tasks which are past their cutoff point.
*/
@Transactional(value = Transactional.TxType.NEVER)
void purgeExpiredFiles();
/**
* Stops all invoked jobs, and then purges them.
*/
@Transactional(value = Transactional.TxType.NEVER)
void cancelAndPurgeAllJobs();
/**
* Given all Bulk Export jobs that have been created since the last scheduled run, this method will
* start them all. This is invoked primarily via a scheduler.
*/
@Transactional(value = Transactional.TxType.NEVER)
void startSubmittedJobs();
}

View File

@ -32,11 +32,6 @@ import java.util.List;
import java.util.Set;
public interface IBulkDataExportSvc {
void buildExportFiles();
@Transactional(value = Transactional.TxType.NEVER)
void purgeExpiredFiles();
/**
* Deprecated - Use {@link #submitJob(BulkDataExportOptions, Boolean, RequestDetails)} instead
*/
@ -52,8 +47,6 @@ public interface IBulkDataExportSvc {
*/
Set<String> getPatientCompartmentResources();
void cancelAndPurgeAllJobs();
class JobInfo {
private String myJobId;
private BulkExportJobStatusEnum myStatus;

View File

@ -40,8 +40,7 @@ import java.util.stream.Collectors;
public class CreateBulkExportEntityTasklet implements Tasklet {
@Autowired
private IBulkDataExportSvc myBulkDataExportSvc;
@Autowired private IBulkDataExportSvc myBulkDataExportSvc;
public static void addUUIDToJobContext(ChunkContext theChunkContext, String theJobUUID) {
theChunkContext