From bb863f26fab72566042c0e446abc484d1e2ae546 Mon Sep 17 00:00:00 2001 From: Tadgh Date: Tue, 2 Jun 2020 13:44:23 -0700 Subject: [PATCH] Initial run through the first part of batch integration --- .../fhir/jpa/bulk/batch/BulkItemReader.java | 106 ++++++++++++++++++ .../ca/uhn/fhir/jpa/config/BaseConfig.java | 2 + .../jpa/bulk/BulkDataExportSvcImplR4Test.java | 23 +++- .../uhn/fhir/jpa/config/TestDstu3Config.java | 34 +----- .../ca/uhn/fhir/jpa/config/TestR4Config.java | 66 +++++++++++ .../java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java | 3 + 6 files changed, 205 insertions(+), 29 deletions(-) create mode 100644 hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkItemReader.java diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkItemReader.java new file mode 100644 index 00000000000..8a2be2b017a --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkItemReader.java @@ -0,0 +1,106 @@ +package ca.uhn.fhir.jpa.bulk.batch; + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.dao.IResultIterator; +import ca.uhn.fhir.jpa.dao.ISearchBuilder; +import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; +import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; +import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity; +import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; +import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; +import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; +import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; +import ca.uhn.fhir.rest.param.DateRangeParam; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.Optional; + +public class BulkItemReader extends AbstractItemCountingItemStreamItemReader { + private static final Logger ourLog = LoggerFactory.getLogger(BulkItemReader.class); + + + @Autowired + private IBulkExportJobDao myBulkExportJobDao; + + @Autowired + private DaoRegistry myDaoRegistry; + + @Autowired + private FhirContext myContext; + + @Autowired + private SearchBuilderFactory mySearchBuilderFactory; + + private BulkExportJobEntity myJobEntity; + + private String myJobUUID; + + private IResultIterator myResultIterator; + + + @Override + protected ResourcePersistentId doRead() throws Exception { + if (myJobEntity == null) { + loadData(); + } + if (myResultIterator.hasNext()) { + return myResultIterator.next(); + } else { + return null; + } + } + + + private void loadData() { + Optional jobOpt = myBulkExportJobDao.findByJobId(myJobUUID); + if (!jobOpt.isPresent()) { + ourLog.info("Job appears to be deleted"); + return; + } + myJobEntity = jobOpt.get(); + ourLog.info("Bulk export starting generation for batch export job: {}", myJobEntity); + + for (BulkExportCollectionEntity nextCollection : myJobEntity.getCollections()) { + String nextType = nextCollection.getResourceType(); + IFhirResourceDao dao = myDaoRegistry.getResourceDao(nextType); + + ourLog.info("Bulk export assembling export of type {} for job {}", nextType, myJobUUID); + + Class nextTypeClass = myContext.getResourceDefinition(nextType).getImplementingClass(); + ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, nextType, nextTypeClass); + + SearchParameterMap map = new SearchParameterMap(); + map.setLoadSynchronous(true); + if (myJobEntity.getSince() != null) { + map.setLastUpdated(new DateRangeParam(myJobEntity.getSince(), null)); + } + + myResultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions()); + } + + + ourLog.info("Bulk export completed job in"); + } + + + @Override + protected void doOpen() throws Exception { + + } + + @Override + protected void doClose() throws Exception { + + } + + public void setJobUUID(String theUUID) { + this.myJobUUID = theUUID; + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java index 02e95306451..bd702a52a07 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java @@ -53,6 +53,7 @@ import ca.uhn.fhir.rest.server.interceptor.partition.RequestTenantPartitionInter import org.hibernate.jpa.HibernatePersistenceProvider; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.utilities.graphql.IGraphQLStorageServices; +import org.springframework.batch.core.Job; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; @@ -305,6 +306,7 @@ public abstract class BaseConfig { return new BulkDataExportSvcImpl(); } + @Bean @Lazy public BulkDataExportProvider bulkDataExportProvider() { diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java index 34e835f5e83..da093705279 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java @@ -1,5 +1,6 @@ package ca.uhn.fhir.jpa.bulk; +import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; @@ -21,7 +22,11 @@ import org.hl7.fhir.r4.model.Patient; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersBuilder; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import java.util.Date; import java.util.UUID; @@ -41,6 +46,10 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { private IBulkExportCollectionFileDao myBulkExportCollectionFileDao; @Autowired private IBulkDataExportSvc myBulkDataExportSvc; + @Autowired + private IBatchJobSubmitter myBatchJobSubmitter; + @Autowired() + private Job myBulkJob; @Test @@ -234,7 +243,19 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { } - @Test + @Test + public void testBatchJob() { + createResources(); + + // Create a bulk job + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(null, Sets.newHashSet("Patient", "Observation"), null, null); + //Add the UUID to the job + JobParametersBuilder paramBuilder = new JobParametersBuilder().addString("jobUUID", jobDetails.getJobId()); + + myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters()); + + } + @Test public void testSubmit_WithSince() throws InterruptedException { // Create some resources to load diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestDstu3Config.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestDstu3Config.java index 15b6ee4bbba..9bfe6a68e2e 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestDstu3Config.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestDstu3Config.java @@ -2,11 +2,13 @@ package ca.uhn.fhir.jpa.config; import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; import ca.uhn.fhir.jpa.batch.svc.BatchJobSubmitterImpl; +import ca.uhn.fhir.jpa.bulk.batch.BulkItemReader; import ca.uhn.fhir.jpa.search.LuceneSearchMappingFactory; import ca.uhn.fhir.jpa.subscription.match.deliver.email.IEmailSender; import ca.uhn.fhir.jpa.subscription.match.deliver.email.JavaMailEmailSender; import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener; import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener; +import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor; import ca.uhn.fhir.validation.ResultSeverityEnum; import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder; @@ -16,8 +18,12 @@ import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.ItemStreamWriter; +import org.springframework.batch.item.ItemWriter; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.*; import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; @@ -38,34 +44,6 @@ public class TestDstu3Config extends BaseJavaConfigDstu3 { static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(TestDstu3Config.class); private Exception myLastStackTrace; - @Autowired - private StepBuilderFactory myStepBuilderFactory; - - @Autowired - private JobBuilderFactory myJobBuilderFactory; - - @Bean - public Job testJob() { - return myJobBuilderFactory.get("testJob") - .start(taskletStep()) - .build(); - } - @Bean - public Step taskletStep() { - return myStepBuilderFactory.get("testSte") - .tasklet((stepContribution, chunkContext) -> { - System.out.println("It works!"); - return RepeatStatus.FINISHED; - }) - .build(); - } - - @Bean - public Job expungeJob() { - return myJobBuilderFactory.get("expungeJob") - .start(taskletStep()) - - } @Bean public CircularQueueCaptureQueriesListener captureQueriesListener() { diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestR4Config.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestR4Config.java index 3955361d5fd..51a125824b6 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestR4Config.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestR4Config.java @@ -2,8 +2,10 @@ package ca.uhn.fhir.jpa.config; import ca.uhn.fhir.jpa.binstore.IBinaryStorageSvc; import ca.uhn.fhir.jpa.binstore.MemoryBinaryStorageSvcImpl; +import ca.uhn.fhir.jpa.bulk.batch.BulkItemReader; import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener; import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener; +import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor; import ca.uhn.fhir.validation.ResultSeverityEnum; import net.ttddyy.dsproxy.listener.SingleQueryCountHolder; @@ -11,7 +13,15 @@ import net.ttddyy.dsproxy.listener.logging.SLF4JLogLevel; import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder; import org.apache.commons.dbcp2.BasicDataSource; import org.hibernate.dialect.H2Dialect; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @@ -46,6 +56,12 @@ public class TestR4Config extends BaseJavaConfigR4 { } } + @Autowired + private StepBuilderFactory myStepBuilderFactory; + + @Autowired + private JobBuilderFactory myJobBuilderFactory; + @Autowired private Environment myEnvironment; @@ -55,6 +71,56 @@ public class TestR4Config extends BaseJavaConfigR4 { public CircularQueueCaptureQueriesListener captureQueriesListener() { return new CircularQueueCaptureQueriesListener(); } + @Bean + public Job bulkExportJob() { + return myJobBuilderFactory.get("bulkExportJob") + .start(readPidsStep()) + .build(); + + } + + @Bean + public Step readPidsStep() { + return myStepBuilderFactory.get("readPidsToBeExportedStep") + . chunk(100) + .reader(myBulkItemReader(null)) + .writer(mySimplePrinter()) + .build(); + } + + @Bean + public ItemWriter mySimplePrinter() { + return (theResourcePersistentIds) -> { + System.out.println("PRINTING CHUNK"); + theResourcePersistentIds.stream().forEach(pid -> { + System.out.println("zoop -> " + pid.toString()); + }); + }; + } + + @Bean + @StepScope + public BulkItemReader myBulkItemReader(@Value("#{jobParameters['jobUUID']}") String theJobUUID) { + BulkItemReader bulkItemReader = new BulkItemReader(); + bulkItemReader.setJobUUID(theJobUUID); + return bulkItemReader; + } + + @Bean + public Job testJob() { + return myJobBuilderFactory.get("testJob") + .start(taskletStep()) + .build(); + } + @Bean + public Step taskletStep() { + return myStepBuilderFactory.get("testSte") + .tasklet((stepContribution, chunkContext) -> { + System.out.println("It works!"); + return RepeatStatus.FINISHED; + }) + .build(); + } @Bean public DataSource dataSource() { diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java index cca267e277e..9bbbf835971 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java @@ -8,6 +8,7 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.api.model.ExpungeOptions; import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; +import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc; import ca.uhn.fhir.jpa.dao.index.IdHelperService; import ca.uhn.fhir.jpa.entity.TermConcept; @@ -122,6 +123,8 @@ public abstract class BaseJpaTest extends BaseTest { protected IPartitionLookupSvc myPartitionConfigSvc; @Autowired private IdHelperService myIdHelperService; + @Autowired + protected IBatchJobSubmitter myBatchJobSubmitter; @After public void afterPerformCleanup() {