Partitioning Based on resource type complete. Still no threading

This commit is contained in:
Tadgh 2020-06-03 14:58:42 -07:00
parent e8970bce46
commit 7d46b3cc10
11 changed files with 297 additions and 59 deletions

View File

@ -0,0 +1,42 @@
package ca.uhn.fhir.jpa.bulk.batch;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.transaction.Transactional;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.slf4j.LoggerFactory.getLogger;
@Service
public class BulkExportDaoSvc {
private static final Logger ourLog = getLogger(BulkExportDaoSvc.class);
@Autowired
IBulkExportJobDao myBulkExportJobDao;
@Transactional
public List<String> getBulkJobResourceTypes(String theJobUUID) {
BulkExportJobEntity bulkExportJobEntity = loadJob(theJobUUID);
return bulkExportJobEntity.getCollections()
.stream()
.map(BulkExportCollectionEntity::getResourceType)
.collect(Collectors.toList());
}
private BulkExportJobEntity loadJob(String theJobUUID) {
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(theJobUUID);
if (!jobOpt.isPresent()) {
ourLog.info("Job appears to be deleted");
return null;
}
return jobOpt.get();
}
}

View File

@ -8,7 +8,6 @@ import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
@ -17,9 +16,16 @@ import ca.uhn.fhir.rest.param.DateRangeParam;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
public class BulkItemReader extends AbstractItemCountingItemStreamItemReader<ResourcePersistentId> {
@ -42,54 +48,24 @@ public class BulkItemReader extends AbstractItemCountingItemStreamItemReader<Res
private String myJobUUID;
private IResultIterator myResultIterator;
@Value("#{stepExecutionContext['resourceType']}")
private String myResourceType;
Iterator<ResourcePersistentId> myPidIterator;
@Override
protected ResourcePersistentId doRead() throws Exception {
if (myJobEntity == null) {
loadData();
if (myPidIterator == null) {
loadResourcePids();
}
if (myResultIterator.hasNext()) {
return myResultIterator.next();
if (myPidIterator.hasNext()) {
return myPidIterator.next();
} else {
return null;
}
}
private void loadData() {
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
if (!jobOpt.isPresent()) {
ourLog.info("Job appears to be deleted");
return;
}
myJobEntity = jobOpt.get();
ourLog.info("Bulk export starting generation for batch export job: {}", myJobEntity);
for (BulkExportCollectionEntity nextCollection : myJobEntity.getCollections()) {
String nextType = nextCollection.getResourceType();
IFhirResourceDao dao = myDaoRegistry.getResourceDao(nextType);
ourLog.info("Bulk export assembling export of type {} for job {}", nextType, myJobUUID);
Class<? extends IBaseResource> nextTypeClass = myContext.getResourceDefinition(nextType).getImplementingClass();
ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, nextType, nextTypeClass);
SearchParameterMap map = new SearchParameterMap();
map.setLoadSynchronous(true);
if (myJobEntity.getSince() != null) {
map.setLastUpdated(new DateRangeParam(myJobEntity.getSince(), null));
}
myResultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
}
ourLog.info("Bulk export completed job in");
}
@Override
protected void doOpen() throws Exception {
@ -100,7 +76,39 @@ public class BulkItemReader extends AbstractItemCountingItemStreamItemReader<Res
}
private void loadResourcePids() {
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
if (!jobOpt.isPresent()) {
ourLog.info("Job appears to be deleted");
return;
}
myJobEntity = jobOpt.get();
ourLog.info("Bulk export starting generation for batch export job: {}", myJobEntity);
IFhirResourceDao dao = myDaoRegistry.getResourceDao(myResourceType);
ourLog.info("Bulk export assembling export of type {} for job {}", myResourceType, myJobUUID);
Class<? extends IBaseResource> nextTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, nextTypeClass);
SearchParameterMap map = new SearchParameterMap();
map.setLoadSynchronous(true);
if (myJobEntity.getSince() != null) {
map.setLastUpdated(new DateRangeParam(myJobEntity.getSince(), null));
}
IResultIterator myResultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
List<ResourcePersistentId> myReadPids = new ArrayList<>();
while (myResultIterator.hasNext()) {
myReadPids.add(myResultIterator.next());
}
myPidIterator = myReadPids.iterator();
}
public void setJobUUID(String theUUID) {
this.myJobUUID = theUUID;
}
}

View File

@ -0,0 +1,42 @@
package ca.uhn.fhir.jpa.bulk.batch;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class BulkItemResourceLoaderProcessor implements ItemProcessor<ResourcePersistentId, IBaseResource> {
@Autowired
private SearchBuilderFactory mySearchBuilderFactory;
@Autowired
private DaoRegistry myDaoRegistry;
@Value("#{stepExecutionContext['resourceType']}")
private String myResourceType;
@Autowired
private FhirContext myContext;
@Override
public IBaseResource process(ResourcePersistentId theResourcePersistentId) throws Exception {
IFhirResourceDao dao = myDaoRegistry.getResourceDao(myResourceType);
Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, resourceTypeClass);
List<IBaseResource> outgoing = new ArrayList<>();
sb.loadResourcesByPid(Collections.singletonList(theResourcePersistentId), Collections.emptyList(), outgoing, false, null);
return outgoing.get(0);
}
}

View File

@ -0,0 +1,53 @@
package ca.uhn.fhir.jpa.bulk.batch;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import org.slf4j.Logger;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import javax.annotation.Resource;
import javax.transaction.TransactionSynchronizationRegistry;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.slf4j.LoggerFactory.getLogger;
public class ResourceTypePartitioner implements Partitioner {
private static final Logger ourLog = getLogger(ResourceTypePartitioner.class);
private String myJobUUID;
@Autowired
private BulkExportDaoSvc myBulkExportDaoSvc;
public ResourceTypePartitioner(String theJobUUID) {
myJobUUID = theJobUUID;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> partitionContextMap = new HashMap<>();
List<String> resourceTypes = myBulkExportDaoSvc.getBulkJobResourceTypes(myJobUUID);
resourceTypes.stream()
.forEach(resourceType -> {
ExecutionContext context = new ExecutionContext();
context.putString("resourceType", resourceType);
context.putString("jobUUID", myJobUUID);
partitionContextMap.put(resourceType, context);
});
return partitionContextMap;
}
}

View File

@ -36,4 +36,6 @@ public interface IBulkExportCollectionDao extends JpaRepository<BulkExportCollec
@Query("DELETE FROM BulkExportCollectionEntity t WHERE t.myId = :pid")
void deleteByPid(@Param("pid") Long theId);
// @Query("SELECT BulkExportCollectionEntity ")
// void findByJobId(Long theId);
}

View File

@ -25,6 +25,11 @@ import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
@ -244,14 +249,13 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test
public void testBatchJob() {
public void testBatchJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(null, Sets.newHashSet("Patient", "Observation"), null, null);
//Add the UUID to the job
JobParametersBuilder paramBuilder = new JobParametersBuilder().addString("jobUUID", jobDetails.getJobId());
myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
}
@ -303,7 +307,6 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
} else {
fail(next.getResourceType());
}
}
}

View File

@ -4,7 +4,10 @@ import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.batch.svc.BatchJobSubmitterImpl;
import ca.uhn.fhir.jpa.binstore.IBinaryStorageSvc;
import ca.uhn.fhir.jpa.binstore.MemoryBinaryStorageSvcImpl;
import ca.uhn.fhir.jpa.bulk.batch.BulkExportDaoSvc;
import ca.uhn.fhir.jpa.bulk.batch.BulkItemReader;
import ca.uhn.fhir.jpa.bulk.batch.BulkItemResourceLoaderProcessor;
import ca.uhn.fhir.jpa.bulk.batch.ResourceTypePartitioner;
import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener;
import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
@ -15,11 +18,15 @@ import net.ttddyy.dsproxy.listener.logging.SLF4JLogLevel;
import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder;
import org.apache.commons.dbcp2.BasicDataSource;
import org.hibernate.dialect.H2Dialect;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
@ -42,6 +49,7 @@ import static org.junit.Assert.fail;
@Configuration
@Import(TestJPAConfig.class)
@EnableTransactionManagement()
@EnableBatchProcessing
public class TestR4Config extends BaseJavaConfigR4 {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(TestR4Config.class);
@ -64,9 +72,6 @@ public class TestR4Config extends BaseJavaConfigR4 {
@Autowired
private JobBuilderFactory myJobBuilderFactory;
@Autowired
private Environment myEnvironment;
private Exception myLastStackTrace;
@Bean
@ -78,29 +83,56 @@ public class TestR4Config extends BaseJavaConfigR4 {
public CircularQueueCaptureQueriesListener captureQueriesListener() {
return new CircularQueueCaptureQueriesListener();
}
@Bean
public Job bulkExportJob() {
return myJobBuilderFactory.get("bulkExportJob")
.start(readPidsStep())
.start(partitionStep())
.build();
}
@Bean
public Step readPidsStep() {
return myStepBuilderFactory.get("readPidsToBeExportedStep")
.<ResourcePersistentId, ResourcePersistentId > chunk(2)
public Step slaveResourceStep() {
return myStepBuilderFactory.get("slaveResourceStep")
.<ResourcePersistentId, IBaseResource> chunk(2)
.reader(myBulkItemReader(null))
.processor(pidToResourceProcessor(null))
.writer(mySimplePrinter())
.listener(myBulkItemReader(null))
.build();
}
@Bean
public Step partitionStep() {
return myStepBuilderFactory.get("partitionStep")
.partitioner("slaveResourceStep", partitioner(null)).step(slaveResourceStep())
.build();
}
@Bean
public ItemWriter<ResourcePersistentId> mySimplePrinter() {
public BulkExportDaoSvc bulkExportDaoSvc() {
return new BulkExportDaoSvc();
}
@Bean
@JobScope
public ResourceTypePartitioner partitioner(@Value("#{jobParameters['jobUUID']}") String theJobUUID) {
return new ResourceTypePartitioner(theJobUUID);
}
@Bean
@StepScope
public ItemProcessor<ResourcePersistentId, IBaseResource> pidToResourceProcessor(@Value("#{jobParameters['jobUUID']}") String theUUID) {
return new BulkItemResourceLoaderProcessor();
}
@Bean
public ItemWriter<IBaseResource> mySimplePrinter() {
return (theResourcePersistentIds) -> {
System.out.println("PRINTING CHUNK");
theResourcePersistentIds.stream().forEach(pid -> {
System.out.println("zoop -> " + pid.toString());
theResourcePersistentIds.stream().forEach(theIBaseResource-> {
System.out.println("zoop -> " + theIBaseResource);
});
};
}

View File

@ -7,6 +7,10 @@ import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
@ -35,13 +39,35 @@ public class BatchJobConfig implements IPointcutLatch {
@Bean
public Step testStep() {
return myStepBuilderFactory.get("testStep").tasklet((theStepContribution, theChunkContext) -> {
System.out.println("woo!");
myPointcutLatch.call(theChunkContext);
return RepeatStatus.FINISHED;
}).build();
//return myStepBuilderFactory.get("testStep").tasklet(sampleTasklet()).build();
return myStepBuilderFactory.get("testStep")
.<String, String>chunk(100)
.reader(reader())
.writer(simpleWriter())
.build();
}
@Bean
@StepScope
public Tasklet sampleTasklet() {
return new SampleTasklet();
}
@Bean
@StepScope
public ItemReader<String> reader() {
return new SampleItemReader();
}
@Bean
public ItemWriter<String> simpleWriter() {
return new ItemWriter<String>() {
@Override
public void write(List<? extends String> theList) throws Exception {
theList.forEach(System.out::println);
}
};
}
@Override
public void clear() {
myPointcutLatch.clear();

View File

@ -0,0 +1,15 @@
package ca.uhn.fhir.jpa.batch.config;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
public class SampleItemReader implements ItemReader<String> {
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
return "zoop";
}
}

View File

@ -0,0 +1,14 @@
package ca.uhn.fhir.jpa.batch.config;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
public class SampleTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution theStepContribution, ChunkContext theChunkContext) throws Exception {
System.out.println("woo");
return RepeatStatus.FINISHED;
}
}

View File

@ -20,4 +20,5 @@ public class BatchSvcTest extends BaseBatchR4Test {
myJobLauncher.run(myJob, new JobParameters());
myBatchJobConfig.awaitExpected();
}
}