diff --git a/hapi-fhir-base/src/main/resources/ca/uhn/fhir/i18n/hapi-messages.properties b/hapi-fhir-base/src/main/resources/ca/uhn/fhir/i18n/hapi-messages.properties index 3257ffb37e2..aecaba0f495 100644 --- a/hapi-fhir-base/src/main/resources/ca/uhn/fhir/i18n/hapi-messages.properties +++ b/hapi-fhir-base/src/main/resources/ca/uhn/fhir/i18n/hapi-messages.properties @@ -63,8 +63,8 @@ ca.uhn.fhir.validation.ValidationResult.noIssuesDetected=No issues detected duri # JPA Messages -ca.uhn.fhir.jpa.bulk.BulkDataExportSvcImpl.onlyBinarySelected=Binary resources may not be exported with bulk export -ca.uhn.fhir.jpa.bulk.BulkDataExportSvcImpl.unknownResourceType=Unknown or unsupported resource type: {0} +ca.uhn.fhir.jpa.bulk.svc.BulkDataExportSvcImpl.onlyBinarySelected=Binary resources may not be exported with bulk export +ca.uhn.fhir.jpa.bulk.svc.BulkDataExportSvcImpl.unknownResourceType=Unknown or unsupported resource type: {0} ca.uhn.fhir.jpa.config.HapiFhirHibernateJpaDialect.resourceVersionConstraintFailure=The operation has failed with a version constraint failure. This generally means that two clients/threads were trying to update the same resource at the same time, and this request was chosen as the failing request. ca.uhn.fhir.jpa.config.HapiFhirHibernateJpaDialect.resourceIndexedCompositeStringUniqueConstraintFailure=The operation has failed with a unique index constraint failure. This probably means that the operation was trying to create/update a resource that would have resulted in a duplicate value for a unique index. ca.uhn.fhir.jpa.config.HapiFhirHibernateJpaDialect.forcedIdConstraintFailure=The operation has failed with a client-assigned ID constraint failure. This typically means that multiple client threads are trying to create a new resource with the same client-assigned ID at the same time, and this thread was chosen to be rejected. diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/BatchJobsConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/BatchJobsConfig.java new file mode 100644 index 00000000000..fab1bfe43cd --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/BatchJobsConfig.java @@ -0,0 +1,14 @@ +package ca.uhn.fhir.jpa.batch; + +import ca.uhn.fhir.jpa.bulk.job.BulkExportJobConfig; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +@Configuration +//When you define a new batch job, add it here. +@Import({ + CommonBatchJobConfig.class, + BulkExportJobConfig.class,}) +public class BatchJobsConfig { + //Empty config, as this is just an aggregator for all the various batch jobs defined around the system. +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/CommonBatchJobConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/CommonBatchJobConfig.java new file mode 100644 index 00000000000..a344a4290f4 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/CommonBatchJobConfig.java @@ -0,0 +1,17 @@ +package ca.uhn.fhir.jpa.batch; + +import ca.uhn.fhir.jpa.batch.processors.PidToIBaseResourceProcessor; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class CommonBatchJobConfig { + + @Bean + @StepScope + public PidToIBaseResourceProcessor pidToResourceProcessor() { + return new PidToIBaseResourceProcessor(); + } + +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkItemResourceLoaderProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/PidToIBaseResourceProcessor.java similarity index 82% rename from hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkItemResourceLoaderProcessor.java rename to hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/PidToIBaseResourceProcessor.java index 0a46aa62e4e..a6257196213 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkItemResourceLoaderProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/PidToIBaseResourceProcessor.java @@ -1,4 +1,4 @@ -package ca.uhn.fhir.jpa.bulk.batch; +package ca.uhn.fhir.jpa.batch.processors; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; @@ -7,7 +7,6 @@ import ca.uhn.fhir.jpa.dao.ISearchBuilder; import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; import org.hl7.fhir.instance.model.api.IBaseResource; -import org.slf4j.Logger; import org.springframework.batch.item.ItemProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -16,11 +15,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import static org.slf4j.LoggerFactory.getLogger; - -public class BulkItemResourceLoaderProcessor implements ItemProcessor { - private static final Logger ourLog = getLogger(BulkItemResourceLoaderProcessor.class); - +/** + * Reusable Item Processor which converts a ResourcePersistentId to its IBaseResource + */ +public class PidToIBaseResourceProcessor implements ItemProcessor { @Autowired private SearchBuilderFactory mySearchBuilderFactory; @@ -34,8 +32,6 @@ public class BulkItemResourceLoaderProcessor implements ItemProcessor theResourceTypes, Date theSince, Set theFilters); - JobInfo getJobStatusOrThrowResourceNotFound(String theJobId); + JobInfo getJobInfoOrThrowResourceNotFound(String theJobId); void cancelAndPurgeAllJobs(); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkExportJobCompletionListener.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkExportJobCompletionListener.java deleted file mode 100644 index 8658a79ab3d..00000000000 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkExportJobCompletionListener.java +++ /dev/null @@ -1,39 +0,0 @@ -package ca.uhn.fhir.jpa.bulk.batch; - -import ca.uhn.fhir.jpa.bulk.BulkJobStatusEnum; -import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; -import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; -import org.springframework.batch.core.BatchStatus; -import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.JobExecutionListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; - -import java.util.Optional; - -public class BulkExportJobCompletionListener implements JobExecutionListener { - @Value("#{jobParameters['jobUUID']}") - private String myJobUUID; - - @Autowired - private IBulkExportJobDao myBulkExportJobDao; - - @Override - public void beforeJob(JobExecution theJobExecution) { - - } - - @Override - public void afterJob(JobExecution theJobExecution) { - if (theJobExecution.getStatus() == BatchStatus.COMPLETED) { - Optional byJobId = myBulkExportJobDao.findByJobId(myJobUUID); - if (byJobId.isPresent()) { - BulkExportJobEntity bulkExportJobEntity = byJobId.get(); - bulkExportJobEntity.setStatus(BulkJobStatusEnum.COMPLETE); - myBulkExportJobDao.save(bulkExportJobEntity); - } - - } - - } -} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java new file mode 100644 index 00000000000..af5bd822e3c --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java @@ -0,0 +1,82 @@ +package ca.uhn.fhir.jpa.bulk.job; + +import ca.uhn.fhir.jpa.batch.processors.PidToIBaseResourceProcessor; +import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.JobScope; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class BulkExportJobConfig { + + @Autowired + private StepBuilderFactory myStepBuilderFactory; + + @Autowired + private JobBuilderFactory myJobBuilderFactory; + + @Autowired + private PidToIBaseResourceProcessor myPidToIBaseResourceProcessor; + + @Bean + public Job bulkExportJob() { + return myJobBuilderFactory.get("bulkExportJob") + .start(partitionStep()) + .listener(bulkExportJobCompletionListener()) + .build(); + } + + @Bean + public Step bulkExportGenerateResourceFilesStep() { + return myStepBuilderFactory.get("bulkExportGenerateResourceFilesStep") + . chunk(1000) //1000 resources per generated file + .reader(bulkItemReader(null)) + .processor(myPidToIBaseResourceProcessor) + .writer(resourceToFileWriter()) + .build(); + } + + @Bean + @JobScope + public BulkExportJobStatusChangeListener bulkExportJobCompletionListener() { + return new BulkExportJobStatusChangeListener(); + } + + @Bean + public Step partitionStep() { + return myStepBuilderFactory.get("partitionStep") + .partitioner("bulkExportGenerateResourceFilesStep", partitioner(null)) + .step(bulkExportGenerateResourceFilesStep()) + .build(); + } + + @Bean + @StepScope + public BulkItemReader bulkItemReader(@Value("#{jobParameters['jobUUID']}") String theJobUUID) { + BulkItemReader bulkItemReader = new BulkItemReader(); + bulkItemReader.setJobUUID(theJobUUID); + return bulkItemReader; + } + + @Bean + @JobScope + public ResourceTypePartitioner partitioner(@Value("#{jobParameters['jobUUID']}") String theJobUUID) { + return new ResourceTypePartitioner(theJobUUID); + } + + @Bean + @StepScope + public ItemWriter resourceToFileWriter() { + return new ResourceToFileWriter(); + } + +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobStatusChangeListener.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobStatusChangeListener.java new file mode 100644 index 00000000000..ebb92f011ac --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobStatusChangeListener.java @@ -0,0 +1,39 @@ +package ca.uhn.fhir.jpa.bulk.job; + +import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum; +import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc; +import org.springframework.batch.core.BatchStatus; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobExecutionListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; + +/** + * Will run before and after a job to set the status to whatever is appropriate. + */ +public class BulkExportJobStatusChangeListener implements JobExecutionListener { + + @Value("#{jobParameters['jobUUID']}") + private String myJobUUID; + + @Autowired + private BulkExportDaoSvc myBulkExportDaoSvc; + + @Override + public void beforeJob(JobExecution theJobExecution) { + if (theJobExecution.getStatus() == BatchStatus.STARTING) { + myBulkExportDaoSvc.setJobToStatus(myJobUUID, BulkJobStatusEnum.BUILDING); + } + + } + + @Override + public void afterJob(JobExecution theJobExecution) { + if (theJobExecution.getStatus() == BatchStatus.COMPLETED) { + myBulkExportDaoSvc.setJobToStatus(myJobUUID, BulkJobStatusEnum.COMPLETE); + } else { + //If the job didn't complete successfully, just set it back to submitted so that it gets picked up again by the scheduler. + myBulkExportDaoSvc.setJobToStatus(myJobUUID, BulkJobStatusEnum.SUBMITTED); + } + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkItemReader.java similarity index 86% rename from hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkItemReader.java rename to hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkItemReader.java index ffd2d87a8a9..6b7131dbcdb 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkItemReader.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkItemReader.java @@ -1,4 +1,4 @@ -package ca.uhn.fhir.jpa.bulk.batch; +package ca.uhn.fhir.jpa.bulk.job; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.interceptor.model.RequestPartitionId; @@ -16,7 +16,10 @@ import ca.uhn.fhir.rest.param.DateRangeParam; import org.hl7.fhir.instance.model.api.IBaseResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.NonTransientResourceException; +import org.springframework.batch.item.ParseException; +import org.springframework.batch.item.UnexpectedInputException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -25,7 +28,7 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; -public class BulkItemReader extends AbstractItemCountingItemStreamItemReader { +public class BulkItemReader implements ItemReader { private static final Logger ourLog = LoggerFactory.getLogger(BulkItemReader.class); @Autowired @@ -47,32 +50,8 @@ public class BulkItemReader extends AbstractItemCountingItemStreamItemReader myPidIterator; - - protected ResourcePersistentId doRead() throws Exception { - if (myPidIterator == null) { - loadResourcePids(); - } - if (myPidIterator.hasNext()) { - return myPidIterator.next(); - } else { - return null; - } - } - - @Override - protected void doOpen() throws Exception { - - } - - @Override - protected void doClose() throws Exception { - - } - - private void loadResourcePids() { Optional jobOpt = myBulkExportJobDao.findByJobId(myJobUUID); if (!jobOpt.isPresent()) { @@ -107,4 +86,15 @@ public class BulkItemReader extends AbstractItemCountingItemStreamItemReader, CompletionPolicy { private static final Logger ourLog = getLogger(ResourceToFileWriter.class); - - @Autowired private FhirContext myContext; - - @Autowired - private IBulkExportCollectionDao myBulkExportCollectionDao; - @Autowired private DaoRegistry myDaoRegistry; - private BulkExportCollectionEntity myBulkExportCollectionEntity; + @Autowired + private BulkExportDaoSvc myBulkExportDaoSvc; private ByteArrayOutputStream myOutputStream; private OutputStreamWriter myWriter; @@ -54,8 +48,6 @@ public class ResourceToFileWriter implements ItemWriter, Completi private IFhirResourceDao myBinaryDao; - @Autowired - private BulkExportDaoSvc myBulkExportDaoSvc; public ResourceToFileWriter() { myOutputStream = new ByteArrayOutputStream(); @@ -92,19 +84,6 @@ public class ResourceToFileWriter implements ItemWriter, Completi return myBinaryDao.create(binary).getResource().getIdElement(); } - private BulkExportCollectionEntity getOrLoadBulkExportCollectionEntity() { - if (myBulkExportCollectionEntity == null) { - Optional oBulkExportCollectionEntity = myBulkExportCollectionDao.findById(myBulkExportCollectionEntityId); - if (!oBulkExportCollectionEntity.isPresent()) { - throw new IllegalArgumentException("This BulkExportCollectionEntity doesn't exist!"); - } else { - myBulkExportCollectionEntity = oBulkExportCollectionEntity.get(); - } - } - return myBulkExportCollectionEntity; - - } - @Override public void write(List resources) throws Exception { @@ -114,7 +93,7 @@ public class ResourceToFileWriter implements ItemWriter, Completi } Optional createdId = flushToFiles(); - createdId.ifPresent(theIIdType -> ourLog.warn("Created resources for bulk export file containing {}:{} resources of type ", theIIdType.toUnqualifiedVersionless().getValue(), myBulkExportCollectionEntity.getResourceType())); + createdId.ifPresent(theIIdType -> ourLog.warn("Created resources for bulk export file containing {} resources of type ", theIIdType.toUnqualifiedVersionless().getValue())); } @SuppressWarnings("unchecked") diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/ResourceTypePartitioner.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/ResourceTypePartitioner.java similarity index 95% rename from hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/ResourceTypePartitioner.java rename to hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/ResourceTypePartitioner.java index f8cea8f6d56..2b3c916e9a0 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/ResourceTypePartitioner.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/ResourceTypePartitioner.java @@ -1,5 +1,6 @@ -package ca.uhn.fhir.jpa.bulk.batch; +package ca.uhn.fhir.jpa.bulk.job; +import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc; import org.slf4j.Logger; import org.springframework.batch.core.partition.support.Partitioner; import org.springframework.batch.item.ExecutionContext; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkExportResponseJson.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/model/BulkExportResponseJson.java similarity index 98% rename from hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkExportResponseJson.java rename to hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/model/BulkExportResponseJson.java index 23e254a6a91..27e9d71e8cb 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkExportResponseJson.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/model/BulkExportResponseJson.java @@ -1,4 +1,4 @@ -package ca.uhn.fhir.jpa.bulk; +package ca.uhn.fhir.jpa.bulk.model; /*- * #%L diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkJobStatusEnum.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/model/BulkJobStatusEnum.java similarity index 95% rename from hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkJobStatusEnum.java rename to hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/model/BulkJobStatusEnum.java index 902f969a67a..9eb8a7f8c83 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkJobStatusEnum.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/model/BulkJobStatusEnum.java @@ -1,4 +1,4 @@ -package ca.uhn.fhir.jpa.bulk; +package ca.uhn.fhir.jpa.bulk.model; /*- * #%L diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkDataExportProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/provider/BulkDataExportProvider.java similarity index 97% rename from hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkDataExportProvider.java rename to hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/provider/BulkDataExportProvider.java index abccc5547b0..0cccd295086 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkDataExportProvider.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/provider/BulkDataExportProvider.java @@ -1,4 +1,4 @@ -package ca.uhn.fhir.jpa.bulk; +package ca.uhn.fhir.jpa.bulk.provider; /*- * #%L @@ -21,6 +21,8 @@ package ca.uhn.fhir.jpa.bulk; */ import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; +import ca.uhn.fhir.jpa.bulk.model.BulkExportResponseJson; import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.rest.annotation.Operation; import ca.uhn.fhir.rest.annotation.OperationParam; @@ -123,7 +125,7 @@ public class BulkDataExportProvider { HttpServletResponse response = theRequestDetails.getServletResponse(); theRequestDetails.getServer().addHeadersToResponse(response); - IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(theJobId.getValueAsString()); + IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(theJobId.getValueAsString()); switch (status.getStatus()) { case SUBMITTED: diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java similarity index 68% rename from hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImpl.java rename to hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java index 7423d111c3a..256e8bf7b5c 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java @@ -1,4 +1,4 @@ -package ca.uhn.fhir.jpa.bulk; +package ca.uhn.fhir.jpa.bulk.svc; /*- * #%L @@ -21,13 +21,12 @@ package ca.uhn.fhir.jpa.bulk; */ import ca.uhn.fhir.context.FhirContext; -import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.model.ExpungeOptions; -import ca.uhn.fhir.jpa.dao.IResultIterator; -import ca.uhn.fhir.jpa.dao.ISearchBuilder; -import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; +import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; +import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum; +import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; @@ -37,28 +36,21 @@ import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; -import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; import ca.uhn.fhir.jpa.model.util.JpaConstants; -import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; -import ca.uhn.fhir.parser.IParser; import ca.uhn.fhir.rest.api.Constants; -import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; -import ca.uhn.fhir.rest.param.DateRangeParam; -import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; -import ca.uhn.fhir.util.BinaryUtil; -import ca.uhn.fhir.util.StopWatch; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.time.DateUtils; import org.hl7.fhir.instance.model.api.IBaseBinary; -import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.r4.model.InstantType; import org.quartz.JobExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersBuilder; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Slice; @@ -67,18 +59,10 @@ import org.springframework.transaction.support.TransactionTemplate; import javax.annotation.PostConstruct; import javax.transaction.Transactional; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.util.ArrayList; -import java.util.Collections; import java.util.Date; -import java.util.List; import java.util.Optional; import java.util.Set; import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static ca.uhn.fhir.util.UrlUtil.escapeUrlParam; @@ -103,11 +87,15 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { private FhirContext myContext; @Autowired private PlatformTransactionManager myTxManager; - @Autowired - private SearchBuilderFactory mySearchBuilderFactory; private TransactionTemplate myTxTemplate; - private long myFileMaxChars = 500 * FileUtils.ONE_KB; + @Autowired + private IBatchJobSubmitter myJobSubmitter; + + @Autowired + @Qualifier("bulkExportJob") + private org.springframework.batch.core.Job myBulkExportJob; + private int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR); /** @@ -134,10 +122,7 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { String jobUuid = jobToProcessOpt.get().getJobId(); try { - myTxTemplate.execute(t -> { - processJob(jobUuid); - return null; - }); + processJob(jobUuid); } catch (Exception e) { ourLog.error("Failure while preparing bulk export extract", e); myTxTemplate.execute(t -> { @@ -203,112 +188,13 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { } private void processJob(String theJobUuid) { + JobParameters parameters = new JobParametersBuilder() + .addString("jobUUID", theJobUuid) + .toJobParameters(); - Optional jobOpt = myBulkExportJobDao.findByJobId(theJobUuid); - if (!jobOpt.isPresent()) { - ourLog.info("Job appears to be deleted"); - return; - } - - StopWatch jobStopwatch = new StopWatch(); - AtomicInteger jobResourceCounter = new AtomicInteger(); - - BulkExportJobEntity job = jobOpt.get(); - ourLog.info("Bulk export starting generation for batch export job: {}", job); - - for (BulkExportCollectionEntity nextCollection : job.getCollections()) { - - String nextType = nextCollection.getResourceType(); - IFhirResourceDao dao = myDaoRegistry.getResourceDao(nextType); - - ourLog.info("Bulk export assembling export of type {} for job {}", nextType, theJobUuid); - - Class nextTypeClass = myContext.getResourceDefinition(nextType).getImplementingClass(); - ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, nextType, nextTypeClass); - - SearchParameterMap map = new SearchParameterMap(); - map.setLoadSynchronous(true); - if (job.getSince() != null) { - map.setLastUpdated(new DateRangeParam(job.getSince(), null)); - } - - IResultIterator resultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, theJobUuid), null, RequestPartitionId.allPartitions()); - storeResultsToFiles(nextCollection, sb, resultIterator, jobResourceCounter, jobStopwatch); - } - - job.setStatus(BulkJobStatusEnum.COMPLETE); - updateExpiry(job); - myBulkExportJobDao.save(job); - - ourLog.info("Bulk export completed job in {}: {}", jobStopwatch, job); - + myJobSubmitter.runJob(myBulkExportJob, parameters); } - private void storeResultsToFiles(BulkExportCollectionEntity theExportCollection, ISearchBuilder theSearchBuilder, IResultIterator theResultIterator, AtomicInteger theJobResourceCounter, StopWatch theJobStopwatch) { - - try (IResultIterator query = theResultIterator) { - if (!query.hasNext()) { - return; - } - - AtomicInteger fileCounter = new AtomicInteger(0); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - OutputStreamWriter writer = new OutputStreamWriter(outputStream, Constants.CHARSET_UTF8); - IParser parser = myContext.newJsonParser().setPrettyPrint(false); - - List pidsSpool = new ArrayList<>(); - List resourcesSpool = new ArrayList<>(); - while (query.hasNext()) { - pidsSpool.add(query.next()); - fileCounter.incrementAndGet(); - theJobResourceCounter.incrementAndGet(); - - if (pidsSpool.size() >= 10 || !query.hasNext()) { - - theSearchBuilder.loadResourcesByPid(pidsSpool, Collections.emptyList(), resourcesSpool, false, null); - - for (IBaseResource nextFileResource : resourcesSpool) { - parser.encodeResourceToWriter(nextFileResource, writer); - writer.append("\n"); - } - - pidsSpool.clear(); - resourcesSpool.clear(); - - if (outputStream.size() >= myFileMaxChars || !query.hasNext()) { - Optional createdId = flushToFiles(theExportCollection, fileCounter, outputStream); - createdId.ifPresent(theIIdType -> ourLog.info("Created resource {} for bulk export file containing {} resources of type {} - Total {} resources ({}/sec)", theIIdType.toUnqualifiedVersionless().getValue(), fileCounter.get(), theExportCollection.getResourceType(), theJobResourceCounter.get(), theJobStopwatch.formatThroughput(theJobResourceCounter.get(), TimeUnit.SECONDS))); - fileCounter.set(0); - } - - } - } - - } catch (IOException e) { - throw new InternalErrorException(e); - } - } - - private Optional flushToFiles(BulkExportCollectionEntity theCollection, AtomicInteger theCounter, ByteArrayOutputStream theOutputStream) { - if (theOutputStream.size() > 0) { - IBaseBinary binary = BinaryUtil.newBinary(myContext); - binary.setContentType(Constants.CT_FHIR_NDJSON); - binary.setContent(theOutputStream.toByteArray()); - - IIdType createdId = getBinaryDao().create(binary).getResource().getIdElement(); - - BulkExportCollectionFileEntity file = new BulkExportCollectionFileEntity(); - theCollection.getFiles().add(file); - file.setCollection(theCollection); - file.setResource(createdId.getIdPart()); - myBulkExportCollectionFileDao.saveAndFlush(file); - theOutputStream.reset(); - - return Optional.of(createdId); - } - - return Optional.empty(); - } @SuppressWarnings("unchecked") private IFhirResourceDao getBinaryDao() { @@ -429,7 +315,7 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { @Transactional @Override - public JobInfo getJobStatusOrThrowResourceNotFound(String theJobId) { + public JobInfo getJobInfoOrThrowResourceNotFound(String theJobId) { BulkExportJobEntity job = myBulkExportJobDao .findByJobId(theJobId) .orElseThrow(() -> new ResourceNotFoundException(theJobId)); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkExportCollectionFileDaoSvc.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkExportCollectionFileDaoSvc.java similarity index 75% rename from hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkExportCollectionFileDaoSvc.java rename to hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkExportCollectionFileDaoSvc.java index f523f6d6f78..337ad45d204 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkExportCollectionFileDaoSvc.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkExportCollectionFileDaoSvc.java @@ -1,18 +1,14 @@ -package ca.uhn.fhir.jpa.bulk.batch; +package ca.uhn.fhir.jpa.bulk.svc; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao; import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity; -import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.transaction.Transactional; -import static org.slf4j.LoggerFactory.getLogger; - @Service public class BulkExportCollectionFileDaoSvc { - private static final Logger ourLog = getLogger(BulkExportCollectionFileDaoSvc.class); @Autowired private IBulkExportCollectionFileDao myBulkExportCollectionFileDao; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkExportDaoSvc.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkExportDaoSvc.java similarity index 79% rename from hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkExportDaoSvc.java rename to hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkExportDaoSvc.java index 23f33f8b37e..d7467b97042 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkExportDaoSvc.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkExportDaoSvc.java @@ -1,5 +1,6 @@ -package ca.uhn.fhir.jpa.bulk.batch; +package ca.uhn.fhir.jpa.bulk.svc; +import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; @@ -63,4 +64,18 @@ public class BulkExportDaoSvc { return jobOpt.get(); } + @Transactional + public void setJobToStatus(String theJobUUID, BulkJobStatusEnum theStatus) { + Optional oJob = myBulkExportJobDao.findByJobId(theJobUUID); + if (!oJob.isPresent()) { + ourLog.error("Job doesn't exist!"); + } else { + ourLog.info("Setting job with UUID {} to {}", theJobUUID, theStatus); + BulkExportJobEntity bulkExportJobEntity = oJob.get(); + bulkExportJobEntity.setStatus(theStatus); + myBulkExportJobDao.save(bulkExportJobEntity); + } + + } + } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java index bd702a52a07..5a8ec091485 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java @@ -6,11 +6,13 @@ import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.interceptor.executor.InterceptorService; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IDao; +import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; +import ca.uhn.fhir.jpa.batch.svc.BatchJobSubmitterImpl; import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider; import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor; -import ca.uhn.fhir.jpa.bulk.BulkDataExportProvider; -import ca.uhn.fhir.jpa.bulk.BulkDataExportSvcImpl; -import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc; +import ca.uhn.fhir.jpa.bulk.provider.BulkDataExportProvider; +import ca.uhn.fhir.jpa.bulk.svc.BulkDataExportSvcImpl; +import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.dao.HistoryBuilder; import ca.uhn.fhir.jpa.dao.HistoryBuilderFactory; import ca.uhn.fhir.jpa.dao.ISearchBuilder; @@ -46,6 +48,7 @@ import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc; import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc; import ca.uhn.fhir.jpa.search.reindex.ResourceReindexingSvcImpl; import ca.uhn.fhir.jpa.searchparam.config.SearchParamConfig; +import ca.uhn.fhir.jpa.batch.BatchJobsConfig; import ca.uhn.fhir.jpa.searchparam.extractor.IResourceLinkResolver; import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.server.interceptor.consent.IConsentContextServices; @@ -53,7 +56,6 @@ import ca.uhn.fhir.rest.server.interceptor.partition.RequestTenantPartitionInter import org.hibernate.jpa.HibernatePersistenceProvider; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.utilities.graphql.IGraphQLStorageServices; -import org.springframework.batch.core.Job; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; @@ -109,7 +111,7 @@ import java.util.Date; @ComponentScan.Filter(type = FilterType.REGEX, pattern = "ca.uhn.fhir.jpa.empi.*") }) @Import({ - SearchParamConfig.class + SearchParamConfig.class, BatchJobsConfig.class }) public abstract class BaseConfig { @@ -138,6 +140,11 @@ public abstract class BaseConfig { return new DatabaseBackedPagingProvider(); } + @Bean + public IBatchJobSubmitter batchJobSubmitter() { + return new BatchJobSubmitterImpl(); + } + /** * This method should be overridden to provide an actual completed * bean, but it provides a partially completed entity manager @@ -306,7 +313,6 @@ public abstract class BaseConfig { return new BulkDataExportSvcImpl(); } - @Bean @Lazy public BulkDataExportProvider bulkDataExportProvider() { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBulkExportJobDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBulkExportJobDao.java index 2b91972f66a..1c213ae289b 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBulkExportJobDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBulkExportJobDao.java @@ -1,6 +1,6 @@ package ca.uhn.fhir.jpa.dao.data; -import ca.uhn.fhir.jpa.bulk.BulkJobStatusEnum; +import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum; import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Slice; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/BulkExportJobEntity.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/BulkExportJobEntity.java index f7a022dd31c..4aa73968d40 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/BulkExportJobEntity.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/BulkExportJobEntity.java @@ -20,7 +20,7 @@ package ca.uhn.fhir.jpa.entity; * #L% */ -import ca.uhn.fhir.jpa.bulk.BulkJobStatusEnum; +import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; import org.hl7.fhir.r5.model.InstantType; diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportProviderTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportProviderTest.java index 0d269f0e5e3..7e9c8100e0e 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportProviderTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportProviderTest.java @@ -1,6 +1,10 @@ package ca.uhn.fhir.jpa.bulk; import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; +import ca.uhn.fhir.jpa.bulk.model.BulkExportResponseJson; +import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum; +import ca.uhn.fhir.jpa.bulk.provider.BulkDataExportProvider; import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.client.apache.ResourceEntity; @@ -179,7 +183,7 @@ public class BulkDataExportProviderTest { .setJobId(A_JOB_ID) .setStatus(BulkJobStatusEnum.BUILDING) .setStatusTime(InstantType.now().getValue()); - when(myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(eq(A_JOB_ID))).thenReturn(jobInfo); + when(myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(eq(A_JOB_ID))).thenReturn(jobInfo); String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" + JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID; @@ -204,7 +208,7 @@ public class BulkDataExportProviderTest { .setStatus(BulkJobStatusEnum.ERROR) .setStatusTime(InstantType.now().getValue()) .setStatusMessage("Some Error Message"); - when(myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(eq(A_JOB_ID))).thenReturn(jobInfo); + when(myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(eq(A_JOB_ID))).thenReturn(jobInfo); String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" + JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID; @@ -233,7 +237,7 @@ public class BulkDataExportProviderTest { jobInfo.addFile().setResourceType("Patient").setResourceId(new IdType("Binary/111")); jobInfo.addFile().setResourceType("Patient").setResourceId(new IdType("Binary/222")); jobInfo.addFile().setResourceType("Patient").setResourceId(new IdType("Binary/333")); - when(myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(eq(A_JOB_ID))).thenReturn(jobInfo); + when(myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(eq(A_JOB_ID))).thenReturn(jobInfo); String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" + JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID; @@ -263,7 +267,7 @@ public class BulkDataExportProviderTest { @Test public void testPollForStatus_Gone() throws IOException { - when(myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(eq(A_JOB_ID))).thenThrow(new ResourceNotFoundException("Unknown job: AAA")); + when(myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(eq(A_JOB_ID))).thenThrow(new ResourceNotFoundException("Unknown job: AAA")); String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" + JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID; diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java index 0ab087ceaea..2a701e69b9d 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java @@ -1,6 +1,8 @@ package ca.uhn.fhir.jpa.bulk; import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; +import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; +import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; @@ -17,17 +19,15 @@ import org.hamcrest.Matchers; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.r4.model.Binary; import org.hl7.fhir.r4.model.InstantType; +import org.hl7.fhir.r4.model.Observation; import org.hl7.fhir.r4.model.Patient; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParametersBuilder; -import org.springframework.batch.core.JobParametersInvalidException; -import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; -import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; -import org.springframework.batch.core.repository.JobRestartException; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import java.util.Date; import java.util.UUID; @@ -53,7 +53,9 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { private IBulkDataExportSvc myBulkDataExportSvc; @Autowired private IBatchJobSubmitter myBatchJobSubmitter; + @Autowired + @Qualifier("bulkExportJob") private Job myBulkJob; @@ -151,7 +153,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { assertNotNull(jobDetails.getJobId()); // Check the status - IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId()); + IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); assertEquals(BulkJobStatusEnum.SUBMITTED, status.getStatus()); assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Observation,Patient&_typeFilter="+TEST_FILTER, status.getRequest()); @@ -159,7 +161,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { myBulkDataExportSvc.buildExportFiles(); // Fetch the job again - status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId()); + status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); assertEquals(BulkJobStatusEnum.COMPLETE, status.getStatus()); assertEquals(2, status.getFiles().size()); @@ -201,7 +203,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { assertNotNull(jobDetails.getJobId()); // Check the status - IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId()); + IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); assertEquals(BulkJobStatusEnum.SUBMITTED, status.getStatus()); assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson", status.getRequest()); @@ -209,7 +211,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { myBulkDataExportSvc.buildExportFiles(); // Fetch the job again - status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId()); + status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); assertEquals(BulkJobStatusEnum.COMPLETE, status.getStatus()); assertEquals(2, status.getFiles().size()); @@ -249,7 +251,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { @Test - public void testBatchJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException { + public void testBatchJob() throws InterruptedException { createResources(); // Create a bulk job @@ -258,10 +260,19 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { JobParametersBuilder paramBuilder = new JobParametersBuilder().addString("jobUUID", jobDetails.getJobId()); myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters()); - IBulkDataExportSvc.JobInfo jobStatusOrThrowResourceNotFound = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId()); - assertThat(jobStatusOrThrowResourceNotFound.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE)); - + IBulkDataExportSvc.JobInfo jobInfo; + while(true) { + jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); + if (jobInfo.getStatus() != BulkJobStatusEnum.COMPLETE) { + Thread.sleep(1000L); + ourLog.warn("waiting.."); + } else { + break; + } + } + assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE)); } + @Test public void testSubmit_WithSince() throws InterruptedException { @@ -284,7 +295,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { assertNotNull(jobDetails.getJobId()); // Check the status - IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId()); + IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); assertEquals(BulkJobStatusEnum.SUBMITTED, status.getStatus()); assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Observation,Patient&_since=" + cutoff.setTimeZoneZulu(true).getValueAsString(), status.getRequest()); @@ -292,7 +303,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { myBulkDataExportSvc.buildExportFiles(); // Fetch the job again - status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId()); + status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); assertEquals(BulkJobStatusEnum.COMPLETE, status.getStatus()); assertEquals(1, status.getFiles().size()); @@ -320,11 +331,11 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i); IIdType patId = myPatientDao.update(patient).getId().toUnqualifiedVersionless(); - //Observation obs = new Observation(); - //obs.setId("OBS" + i); - //obs.setStatus(Observation.ObservationStatus.FINAL); - //obs.getSubject().setReference(patId.getValue()); - // myObservationDao.update(obs); + Observation obs = new Observation(); + obs.setId("OBS" + i); + obs.setStatus(Observation.ObservationStatus.FINAL); + obs.getSubject().setReference(patId.getValue()); + myObservationDao.update(obs); } } } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestDstu3Config.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestDstu3Config.java index 78399337d0b..819872868b4 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestDstu3Config.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestDstu3Config.java @@ -1,29 +1,15 @@ package ca.uhn.fhir.jpa.config; -import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; -import ca.uhn.fhir.jpa.batch.svc.BatchJobSubmitterImpl; -import ca.uhn.fhir.jpa.bulk.batch.BulkItemReader; import ca.uhn.fhir.jpa.search.LuceneSearchMappingFactory; import ca.uhn.fhir.jpa.subscription.match.deliver.email.IEmailSender; import ca.uhn.fhir.jpa.subscription.match.deliver.email.JavaMailEmailSender; import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener; import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener; -import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor; import ca.uhn.fhir.validation.ResultSeverityEnum; import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder; import org.apache.commons.dbcp2.BasicDataSource; import org.hibernate.dialect.H2Dialect; -import org.springframework.batch.core.Job; -import org.springframework.batch.core.Step; -import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; -import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; -import org.springframework.batch.core.configuration.annotation.StepScope; -import org.springframework.batch.item.ItemStreamWriter; -import org.springframework.batch.item.ItemWriter; -import org.springframework.batch.repeat.RepeatStatus; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.*; import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestR4Config.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestR4Config.java index edc0240dd62..e8441768d67 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestR4Config.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestR4Config.java @@ -1,18 +1,13 @@ package ca.uhn.fhir.jpa.config; +import ca.uhn.fhir.jpa.batch.BatchJobsConfig; import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; import ca.uhn.fhir.jpa.batch.svc.BatchJobSubmitterImpl; import ca.uhn.fhir.jpa.binstore.IBinaryStorageSvc; import ca.uhn.fhir.jpa.binstore.MemoryBinaryStorageSvcImpl; -import ca.uhn.fhir.jpa.bulk.batch.BulkExportDaoSvc; -import ca.uhn.fhir.jpa.bulk.batch.BulkExportJobCompletionListener; -import ca.uhn.fhir.jpa.bulk.batch.BulkItemReader; -import ca.uhn.fhir.jpa.bulk.batch.BulkItemResourceLoaderProcessor; -import ca.uhn.fhir.jpa.bulk.batch.ResourceToFileWriter; -import ca.uhn.fhir.jpa.bulk.batch.ResourceTypePartitioner; +import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc; import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener; import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener; -import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor; import ca.uhn.fhir.validation.ResultSeverityEnum; import net.ttddyy.dsproxy.listener.SingleQueryCountHolder; @@ -20,18 +15,6 @@ import net.ttddyy.dsproxy.listener.logging.SLF4JLogLevel; import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder; import org.apache.commons.dbcp2.BasicDataSource; import org.hibernate.dialect.H2Dialect; -import org.hl7.fhir.instance.model.api.IBaseResource; -import org.springframework.batch.core.Job; -import org.springframework.batch.core.Step; -import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; -import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; -import org.springframework.batch.core.configuration.annotation.JobScope; -import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; -import org.springframework.batch.core.configuration.annotation.StepScope; -import org.springframework.batch.item.ItemProcessor; -import org.springframework.batch.item.ItemWriter; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @@ -42,20 +25,17 @@ import org.springframework.transaction.annotation.EnableTransactionManagement; import javax.sql.DataSource; import java.sql.Connection; import java.util.Properties; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.junit.Assert.fail; @Configuration -@Import(TestJPAConfig.class) +@Import({TestJPAConfig.class, BatchJobsConfig.class}) @EnableTransactionManagement() -@EnableBatchProcessing public class TestR4Config extends BaseJavaConfigR4 { - /** - * NANI - */ - public static final String WILL_LATE_BIND = null; + CountDownLatch jobLatch = new CountDownLatch(1); private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(TestR4Config.class); public static Integer ourMaxThreads; @@ -67,15 +47,10 @@ public class TestR4Config extends BaseJavaConfigR4 { * starvation */ if (ourMaxThreads == null) { - ourMaxThreads = (int) (Math.random() * 6.0) + 1; + ourMaxThreads = (int) (Math.random() * 6.0) + 3; } } - @Autowired - private StepBuilderFactory myStepBuilderFactory; - - @Autowired - private JobBuilderFactory myJobBuilderFactory; private Exception myLastStackTrace; @@ -90,73 +65,11 @@ public class TestR4Config extends BaseJavaConfigR4 { return new CircularQueueCaptureQueriesListener(); } - @Bean - public Job bulkExportJob() { - return myJobBuilderFactory.get("bulkExportJob") - .start(partitionStep()) - .listener(bulkExportJobCompletionListener()) - .build(); - } - - @Bean - public Step workerResourceStep() { - return myStepBuilderFactory.get("workerResourceStep") - . chunk(2) - .reader(myBulkItemReader(WILL_LATE_BIND)) - .processor(pidToResourceProcessor()) - .writer(resourceToFileWriter()) - .build(); - } - - @Bean - @JobScope - public BulkExportJobCompletionListener bulkExportJobCompletionListener() { - return new BulkExportJobCompletionListener(); - } - - @Bean - @StepScope - public ItemWriter resourceToFileWriter() { - return new ResourceToFileWriter(); - } - - @Bean - public Step partitionStep() { - return myStepBuilderFactory.get("partitionStep") - .partitioner("workerResourceStep", partitioner(null)) - .step(workerResourceStep()) - .build(); - } - - @Bean public BulkExportDaoSvc bulkExportDaoSvc() { return new BulkExportDaoSvc(); } - @Bean - @JobScope - public ResourceTypePartitioner partitioner(@Value("#{jobParameters['jobUUID']}") String theJobUUID) { - return new ResourceTypePartitioner(theJobUUID); - } - - - @Bean - @StepScope - public ItemProcessor pidToResourceProcessor() { - return new BulkItemResourceLoaderProcessor(); - } - - @Bean - @StepScope - public BulkItemReader myBulkItemReader(@Value("#{jobParameters['jobUUID']}") String theJobUUID) { - BulkItemReader bulkItemReader = new BulkItemReader(); - bulkItemReader.setJobUUID(theJobUUID); - bulkItemReader.setName("bulkItemReader"); - return bulkItemReader; - } - - @Bean public DataSource dataSource() { BasicDataSource retVal = new BasicDataSource() { diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java index c9ff08dec08..c507dd255f5 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java @@ -8,8 +8,7 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.api.model.ExpungeOptions; import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; -import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; -import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc; +import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.dao.index.IdHelperService; import ca.uhn.fhir.jpa.entity.TermConcept; import ca.uhn.fhir.jpa.model.util.JpaConstants; diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu2/BaseJpaDstu2Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu2/BaseJpaDstu2Test.java index d9a7983f4e1..d5912201826 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu2/BaseJpaDstu2Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu2/BaseJpaDstu2Test.java @@ -8,7 +8,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoSubscription; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet; import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; -import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc; +import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.config.TestDstu2Config; import ca.uhn.fhir.jpa.dao.BaseJpaTest; import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc; diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu3/BaseJpaDstu3Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu3/BaseJpaDstu3Test.java index bd50098e617..046bbdcd2c5 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu3/BaseJpaDstu3Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu3/BaseJpaDstu3Test.java @@ -13,7 +13,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet; import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; -import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc; +import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.config.TestDstu3Config; import ca.uhn.fhir.jpa.dao.BaseJpaTest; import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc; @@ -107,7 +107,6 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.runner.RunWith; -import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationContext; diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/BaseJpaR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/BaseJpaR4Test.java index 7e6956a6fab..e64d97ccb4c 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/BaseJpaR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/BaseJpaR4Test.java @@ -17,7 +17,7 @@ import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider; import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor; -import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc; +import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.config.TestR4Config; import ca.uhn.fhir.jpa.dao.BaseJpaTest; import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc; diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchWithElasticSearchTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchWithElasticSearchTest.java index 1ce3057a2bb..413b6aa4393 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchWithElasticSearchTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchWithElasticSearchTest.java @@ -6,7 +6,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet; import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; -import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc; +import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.config.TestR4ConfigWithElasticSearch; import ca.uhn.fhir.jpa.dao.BaseJpaTest; import ca.uhn.fhir.jpa.dao.data.IResourceTableDao; diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchWithLuceneDisabledTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchWithLuceneDisabledTest.java index 02eb8159ebc..7e3d68545a2 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchWithLuceneDisabledTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchWithLuceneDisabledTest.java @@ -7,7 +7,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet; import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; -import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc; +import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.config.TestR4WithLuceneDisabledConfig; import ca.uhn.fhir.jpa.dao.BaseJpaTest; import ca.uhn.fhir.jpa.dao.dstu2.FhirResourceDaoDstu2SearchNoFtTest; diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r5/BaseJpaR5Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r5/BaseJpaR5Test.java index fdd69f0a730..767aa723c89 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r5/BaseJpaR5Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r5/BaseJpaR5Test.java @@ -16,7 +16,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider; import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor; -import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc; +import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.config.TestR5Config; import ca.uhn.fhir.jpa.dao.BaseJpaTest; import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc; diff --git a/hapi-fhir-jpaserver-batch/src/main/java/ca/uhn/fhir/jpa/batch/config/JpaBatchConfigurer.java b/hapi-fhir-jpaserver-batch/src/main/java/ca/uhn/fhir/jpa/batch/config/JpaBatchConfigurer.java deleted file mode 100644 index 690b69ee0a8..00000000000 --- a/hapi-fhir-jpaserver-batch/src/main/java/ca/uhn/fhir/jpa/batch/config/JpaBatchConfigurer.java +++ /dev/null @@ -1,23 +0,0 @@ -package ca.uhn.fhir.jpa.batch.config; - -import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.orm.jpa.JpaTransactionManager; -import org.springframework.stereotype.Component; -import org.springframework.transaction.PlatformTransactionManager; -/*// -@Component -public class JpaBatchConfigurer extends DefaultBatchConfigurer { - - @Autowired - @Qualifier - private JpaTransactionManager myPlatformTransactionManager; - - @Override - public PlatformTransactionManager getTransactionManager() { - return myPlatformTransactionManager; - } - -} -*/ diff --git a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/TestRestfulServer.java b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/TestRestfulServer.java index 552891cfc47..0b763f1afd5 100644 --- a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/TestRestfulServer.java +++ b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/TestRestfulServer.java @@ -5,7 +5,7 @@ import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; -import ca.uhn.fhir.jpa.bulk.BulkDataExportProvider; +import ca.uhn.fhir.jpa.bulk.provider.BulkDataExportProvider; import ca.uhn.fhir.jpa.provider.DiffProvider; import ca.uhn.fhir.jpa.subscription.match.config.WebsocketDispatcherConfig; import ca.uhn.fhir.jpa.interceptor.CascadingDeleteInterceptor; @@ -21,7 +21,6 @@ import ca.uhn.fhir.jpa.provider.r5.JpaConformanceProviderR5; import ca.uhn.fhir.jpa.provider.r5.JpaSystemProviderR5; import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider; import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry; -import ca.uhn.fhir.jpa.subscription.match.config.WebsocketDispatcherConfig; import ca.uhn.fhir.narrative.DefaultThymeleafNarrativeGenerator; import ca.uhn.fhir.rest.api.EncodingEnum; import ca.uhn.fhir.rest.server.ETagSupportEnum;