diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkExportDaoSvc.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkExportDaoSvc.java new file mode 100644 index 00000000000..4133f107309 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkExportDaoSvc.java @@ -0,0 +1,42 @@ +package ca.uhn.fhir.jpa.bulk.batch; + +import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; +import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity; +import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; +import org.slf4j.Logger; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.transaction.Transactional; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.slf4j.LoggerFactory.getLogger; + +@Service +public class BulkExportDaoSvc { + private static final Logger ourLog = getLogger(BulkExportDaoSvc.class); + + @Autowired + IBulkExportJobDao myBulkExportJobDao; + + @Transactional + public List getBulkJobResourceTypes(String theJobUUID) { + BulkExportJobEntity bulkExportJobEntity = loadJob(theJobUUID); + return bulkExportJobEntity.getCollections() + .stream() + .map(BulkExportCollectionEntity::getResourceType) + .collect(Collectors.toList()); + } + + private BulkExportJobEntity loadJob(String theJobUUID) { + Optional jobOpt = myBulkExportJobDao.findByJobId(theJobUUID); + if (!jobOpt.isPresent()) { + ourLog.info("Job appears to be deleted"); + return null; + } + return jobOpt.get(); + } + +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkItemReader.java index 8a2be2b017a..3582f724366 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkItemReader.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkItemReader.java @@ -8,7 +8,6 @@ import ca.uhn.fhir.jpa.dao.IResultIterator; import ca.uhn.fhir.jpa.dao.ISearchBuilder; import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; -import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity; import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; @@ -17,9 +16,16 @@ import ca.uhn.fhir.rest.param.DateRangeParam; import org.hl7.fhir.instance.model.api.IBaseResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.StepExecutionListener; import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import java.util.Optional; public class BulkItemReader extends AbstractItemCountingItemStreamItemReader { @@ -42,54 +48,24 @@ public class BulkItemReader extends AbstractItemCountingItemStreamItemReader myPidIterator; - @Override protected ResourcePersistentId doRead() throws Exception { - if (myJobEntity == null) { - loadData(); + if (myPidIterator == null) { + loadResourcePids(); } - if (myResultIterator.hasNext()) { - return myResultIterator.next(); + if (myPidIterator.hasNext()) { + return myPidIterator.next(); } else { return null; } } - - private void loadData() { - Optional jobOpt = myBulkExportJobDao.findByJobId(myJobUUID); - if (!jobOpt.isPresent()) { - ourLog.info("Job appears to be deleted"); - return; - } - myJobEntity = jobOpt.get(); - ourLog.info("Bulk export starting generation for batch export job: {}", myJobEntity); - - for (BulkExportCollectionEntity nextCollection : myJobEntity.getCollections()) { - String nextType = nextCollection.getResourceType(); - IFhirResourceDao dao = myDaoRegistry.getResourceDao(nextType); - - ourLog.info("Bulk export assembling export of type {} for job {}", nextType, myJobUUID); - - Class nextTypeClass = myContext.getResourceDefinition(nextType).getImplementingClass(); - ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, nextType, nextTypeClass); - - SearchParameterMap map = new SearchParameterMap(); - map.setLoadSynchronous(true); - if (myJobEntity.getSince() != null) { - map.setLastUpdated(new DateRangeParam(myJobEntity.getSince(), null)); - } - - myResultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions()); - } - - - ourLog.info("Bulk export completed job in"); - } - - @Override protected void doOpen() throws Exception { @@ -100,7 +76,39 @@ public class BulkItemReader extends AbstractItemCountingItemStreamItemReader jobOpt = myBulkExportJobDao.findByJobId(myJobUUID); + if (!jobOpt.isPresent()) { + ourLog.info("Job appears to be deleted"); + return; + } + myJobEntity = jobOpt.get(); + ourLog.info("Bulk export starting generation for batch export job: {}", myJobEntity); + + IFhirResourceDao dao = myDaoRegistry.getResourceDao(myResourceType); + + ourLog.info("Bulk export assembling export of type {} for job {}", myResourceType, myJobUUID); + + Class nextTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass(); + ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, nextTypeClass); + + SearchParameterMap map = new SearchParameterMap(); + map.setLoadSynchronous(true); + if (myJobEntity.getSince() != null) { + map.setLastUpdated(new DateRangeParam(myJobEntity.getSince(), null)); + } + + IResultIterator myResultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions()); + List myReadPids = new ArrayList<>(); + while (myResultIterator.hasNext()) { + myReadPids.add(myResultIterator.next()); + } + myPidIterator = myReadPids.iterator(); + } + public void setJobUUID(String theUUID) { this.myJobUUID = theUUID; } + } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkItemResourceLoaderProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkItemResourceLoaderProcessor.java new file mode 100644 index 00000000000..8faf0c144b1 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/BulkItemResourceLoaderProcessor.java @@ -0,0 +1,42 @@ +package ca.uhn.fhir.jpa.bulk.batch; + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.dao.ISearchBuilder; +import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; +import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class BulkItemResourceLoaderProcessor implements ItemProcessor { + + @Autowired + private SearchBuilderFactory mySearchBuilderFactory; + + @Autowired + private DaoRegistry myDaoRegistry; + + @Value("#{stepExecutionContext['resourceType']}") + private String myResourceType; + + @Autowired + private FhirContext myContext; + + @Override + public IBaseResource process(ResourcePersistentId theResourcePersistentId) throws Exception { + + IFhirResourceDao dao = myDaoRegistry.getResourceDao(myResourceType); + Class resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass(); + ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, resourceTypeClass); + List outgoing = new ArrayList<>(); + sb.loadResourcesByPid(Collections.singletonList(theResourcePersistentId), Collections.emptyList(), outgoing, false, null); + return outgoing.get(0); + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/ResourceTypePartitioner.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/ResourceTypePartitioner.java new file mode 100644 index 00000000000..c0ed560539c --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/batch/ResourceTypePartitioner.java @@ -0,0 +1,53 @@ +package ca.uhn.fhir.jpa.bulk.batch; + +import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao; +import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; +import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity; +import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; +import org.slf4j.Logger; +import org.springframework.batch.core.partition.support.Partitioner; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionSynchronizationManager; + +import javax.annotation.Resource; +import javax.transaction.TransactionSynchronizationRegistry; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.slf4j.LoggerFactory.getLogger; + +public class ResourceTypePartitioner implements Partitioner { + private static final Logger ourLog = getLogger(ResourceTypePartitioner.class); + + private String myJobUUID; + + @Autowired + private BulkExportDaoSvc myBulkExportDaoSvc; + + public ResourceTypePartitioner(String theJobUUID) { + myJobUUID = theJobUUID; + } + + @Override + public Map partition(int gridSize) { + Map partitionContextMap = new HashMap<>(); + + List resourceTypes = myBulkExportDaoSvc.getBulkJobResourceTypes(myJobUUID); + + resourceTypes.stream() + .forEach(resourceType -> { + ExecutionContext context = new ExecutionContext(); + context.putString("resourceType", resourceType); + context.putString("jobUUID", myJobUUID); + partitionContextMap.put(resourceType, context); + }); + + return partitionContextMap; + } + + +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBulkExportCollectionDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBulkExportCollectionDao.java index 59dd52c05df..48d362dffb1 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBulkExportCollectionDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBulkExportCollectionDao.java @@ -36,4 +36,6 @@ public interface IBulkExportCollectionDao extends JpaRepository chunk(2) + public Step slaveResourceStep() { + return myStepBuilderFactory.get("slaveResourceStep") + . chunk(2) .reader(myBulkItemReader(null)) + .processor(pidToResourceProcessor(null)) .writer(mySimplePrinter()) + .listener(myBulkItemReader(null)) + .build(); + } + @Bean + public Step partitionStep() { + return myStepBuilderFactory.get("partitionStep") + .partitioner("slaveResourceStep", partitioner(null)).step(slaveResourceStep()) .build(); } + @Bean - public ItemWriter mySimplePrinter() { + public BulkExportDaoSvc bulkExportDaoSvc() { + return new BulkExportDaoSvc(); + } + + @Bean + @JobScope + public ResourceTypePartitioner partitioner(@Value("#{jobParameters['jobUUID']}") String theJobUUID) { + return new ResourceTypePartitioner(theJobUUID); + } + + + @Bean + @StepScope + public ItemProcessor pidToResourceProcessor(@Value("#{jobParameters['jobUUID']}") String theUUID) { + return new BulkItemResourceLoaderProcessor(); + } + + @Bean + public ItemWriter mySimplePrinter() { return (theResourcePersistentIds) -> { System.out.println("PRINTING CHUNK"); - theResourcePersistentIds.stream().forEach(pid -> { - System.out.println("zoop -> " + pid.toString()); + theResourcePersistentIds.stream().forEach(theIBaseResource-> { + System.out.println("zoop -> " + theIBaseResource); }); }; } diff --git a/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/BatchJobConfig.java b/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/BatchJobConfig.java index 01a72858dc3..1ff59b04263 100644 --- a/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/BatchJobConfig.java +++ b/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/BatchJobConfig.java @@ -7,6 +7,10 @@ import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; @@ -35,13 +39,35 @@ public class BatchJobConfig implements IPointcutLatch { @Bean public Step testStep() { - return myStepBuilderFactory.get("testStep").tasklet((theStepContribution, theChunkContext) -> { - System.out.println("woo!"); - myPointcutLatch.call(theChunkContext); - return RepeatStatus.FINISHED; - }).build(); + //return myStepBuilderFactory.get("testStep").tasklet(sampleTasklet()).build(); + return myStepBuilderFactory.get("testStep") + .chunk(100) + .reader(reader()) + .writer(simpleWriter()) + .build(); } + @Bean + @StepScope + public Tasklet sampleTasklet() { + return new SampleTasklet(); + } + + @Bean + @StepScope + public ItemReader reader() { + return new SampleItemReader(); + } + + @Bean + public ItemWriter simpleWriter() { + return new ItemWriter() { + @Override + public void write(List theList) throws Exception { + theList.forEach(System.out::println); + } + }; + } @Override public void clear() { myPointcutLatch.clear(); diff --git a/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/SampleItemReader.java b/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/SampleItemReader.java new file mode 100644 index 00000000000..2fe3a648d41 --- /dev/null +++ b/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/SampleItemReader.java @@ -0,0 +1,15 @@ +package ca.uhn.fhir.jpa.batch.config; + +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.NonTransientResourceException; +import org.springframework.batch.item.ParseException; +import org.springframework.batch.item.UnexpectedInputException; + +public class SampleItemReader implements ItemReader { + + + @Override + public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { + return "zoop"; + } +} diff --git a/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/SampleTasklet.java b/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/SampleTasklet.java new file mode 100644 index 00000000000..c577e3f6821 --- /dev/null +++ b/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/SampleTasklet.java @@ -0,0 +1,14 @@ +package ca.uhn.fhir.jpa.batch.config; + +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; + +public class SampleTasklet implements Tasklet { + @Override + public RepeatStatus execute(StepContribution theStepContribution, ChunkContext theChunkContext) throws Exception { + System.out.println("woo"); + return RepeatStatus.FINISHED; + } +} diff --git a/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/svc/BatchSvcTest.java b/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/svc/BatchSvcTest.java index b2994871fd7..2c20e7fc6dd 100644 --- a/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/svc/BatchSvcTest.java +++ b/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/svc/BatchSvcTest.java @@ -20,4 +20,5 @@ public class BatchSvcTest extends BaseBatchR4Test { myJobLauncher.run(myJob, new JobParameters()); myBatchJobConfig.awaitExpected(); } + }