Initial run through the first part of batch integration

This commit is contained in:
Tadgh 2020-06-02 13:44:23 -07:00
parent f8d699e13b
commit bb863f26fa
6 changed files with 205 additions and 29 deletions

View File

@ -0,0 +1,106 @@
package ca.uhn.fhir.jpa.bulk.batch;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Optional;
public class BulkItemReader extends AbstractItemCountingItemStreamItemReader<ResourcePersistentId> {
private static final Logger ourLog = LoggerFactory.getLogger(BulkItemReader.class);
@Autowired
private IBulkExportJobDao myBulkExportJobDao;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private FhirContext myContext;
@Autowired
private SearchBuilderFactory mySearchBuilderFactory;
private BulkExportJobEntity myJobEntity;
private String myJobUUID;
private IResultIterator myResultIterator;
@Override
protected ResourcePersistentId doRead() throws Exception {
if (myJobEntity == null) {
loadData();
}
if (myResultIterator.hasNext()) {
return myResultIterator.next();
} else {
return null;
}
}
private void loadData() {
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
if (!jobOpt.isPresent()) {
ourLog.info("Job appears to be deleted");
return;
}
myJobEntity = jobOpt.get();
ourLog.info("Bulk export starting generation for batch export job: {}", myJobEntity);
for (BulkExportCollectionEntity nextCollection : myJobEntity.getCollections()) {
String nextType = nextCollection.getResourceType();
IFhirResourceDao dao = myDaoRegistry.getResourceDao(nextType);
ourLog.info("Bulk export assembling export of type {} for job {}", nextType, myJobUUID);
Class<? extends IBaseResource> nextTypeClass = myContext.getResourceDefinition(nextType).getImplementingClass();
ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, nextType, nextTypeClass);
SearchParameterMap map = new SearchParameterMap();
map.setLoadSynchronous(true);
if (myJobEntity.getSince() != null) {
map.setLastUpdated(new DateRangeParam(myJobEntity.getSince(), null));
}
myResultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
}
ourLog.info("Bulk export completed job in");
}
@Override
protected void doOpen() throws Exception {
}
@Override
protected void doClose() throws Exception {
}
public void setJobUUID(String theUUID) {
this.myJobUUID = theUUID;
}
}

View File

@ -53,6 +53,7 @@ import ca.uhn.fhir.rest.server.interceptor.partition.RequestTenantPartitionInter
import org.hibernate.jpa.HibernatePersistenceProvider;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.utilities.graphql.IGraphQLStorageServices;
import org.springframework.batch.core.Job;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
@ -305,6 +306,7 @@ public abstract class BaseConfig {
return new BulkDataExportSvcImpl();
}
@Bean
@Lazy
public BulkDataExportProvider bulkDataExportProvider() {

View File

@ -1,5 +1,6 @@
package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
@ -21,7 +22,11 @@ import org.hl7.fhir.r4.model.Patient;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import java.util.Date;
import java.util.UUID;
@ -41,6 +46,10 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
private IBulkExportCollectionFileDao myBulkExportCollectionFileDao;
@Autowired
private IBulkDataExportSvc myBulkDataExportSvc;
@Autowired
private IBatchJobSubmitter myBatchJobSubmitter;
@Autowired()
private Job myBulkJob;
@Test
@ -234,7 +243,19 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
}
@Test
@Test
public void testBatchJob() {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(null, Sets.newHashSet("Patient", "Observation"), null, null);
//Add the UUID to the job
JobParametersBuilder paramBuilder = new JobParametersBuilder().addString("jobUUID", jobDetails.getJobId());
myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
}
@Test
public void testSubmit_WithSince() throws InterruptedException {
// Create some resources to load

View File

@ -2,11 +2,13 @@ package ca.uhn.fhir.jpa.config;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.batch.svc.BatchJobSubmitterImpl;
import ca.uhn.fhir.jpa.bulk.batch.BulkItemReader;
import ca.uhn.fhir.jpa.search.LuceneSearchMappingFactory;
import ca.uhn.fhir.jpa.subscription.match.deliver.email.IEmailSender;
import ca.uhn.fhir.jpa.subscription.match.deliver.email.JavaMailEmailSender;
import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener;
import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
import ca.uhn.fhir.validation.ResultSeverityEnum;
import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder;
@ -16,8 +18,12 @@ import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemStreamWriter;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.*;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
@ -38,34 +44,6 @@ public class TestDstu3Config extends BaseJavaConfigDstu3 {
static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(TestDstu3Config.class);
private Exception myLastStackTrace;
@Autowired
private StepBuilderFactory myStepBuilderFactory;
@Autowired
private JobBuilderFactory myJobBuilderFactory;
@Bean
public Job testJob() {
return myJobBuilderFactory.get("testJob")
.start(taskletStep())
.build();
}
@Bean
public Step taskletStep() {
return myStepBuilderFactory.get("testSte")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("It works!");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Job expungeJob() {
return myJobBuilderFactory.get("expungeJob")
.start(taskletStep())
}
@Bean
public CircularQueueCaptureQueriesListener captureQueriesListener() {

View File

@ -2,8 +2,10 @@ package ca.uhn.fhir.jpa.config;
import ca.uhn.fhir.jpa.binstore.IBinaryStorageSvc;
import ca.uhn.fhir.jpa.binstore.MemoryBinaryStorageSvcImpl;
import ca.uhn.fhir.jpa.bulk.batch.BulkItemReader;
import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener;
import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
import ca.uhn.fhir.validation.ResultSeverityEnum;
import net.ttddyy.dsproxy.listener.SingleQueryCountHolder;
@ -11,7 +13,15 @@ import net.ttddyy.dsproxy.listener.logging.SLF4JLogLevel;
import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder;
import org.apache.commons.dbcp2.BasicDataSource;
import org.hibernate.dialect.H2Dialect;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@ -46,6 +56,12 @@ public class TestR4Config extends BaseJavaConfigR4 {
}
}
@Autowired
private StepBuilderFactory myStepBuilderFactory;
@Autowired
private JobBuilderFactory myJobBuilderFactory;
@Autowired
private Environment myEnvironment;
@ -55,6 +71,56 @@ public class TestR4Config extends BaseJavaConfigR4 {
public CircularQueueCaptureQueriesListener captureQueriesListener() {
return new CircularQueueCaptureQueriesListener();
}
@Bean
public Job bulkExportJob() {
return myJobBuilderFactory.get("bulkExportJob")
.start(readPidsStep())
.build();
}
@Bean
public Step readPidsStep() {
return myStepBuilderFactory.get("readPidsToBeExportedStep")
.<ResourcePersistentId, ResourcePersistentId > chunk(100)
.reader(myBulkItemReader(null))
.writer(mySimplePrinter())
.build();
}
@Bean
public ItemWriter<ResourcePersistentId> mySimplePrinter() {
return (theResourcePersistentIds) -> {
System.out.println("PRINTING CHUNK");
theResourcePersistentIds.stream().forEach(pid -> {
System.out.println("zoop -> " + pid.toString());
});
};
}
@Bean
@StepScope
public BulkItemReader myBulkItemReader(@Value("#{jobParameters['jobUUID']}") String theJobUUID) {
BulkItemReader bulkItemReader = new BulkItemReader();
bulkItemReader.setJobUUID(theJobUUID);
return bulkItemReader;
}
@Bean
public Job testJob() {
return myJobBuilderFactory.get("testJob")
.start(taskletStep())
.build();
}
@Bean
public Step taskletStep() {
return myStepBuilderFactory.get("testSte")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("It works!");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public DataSource dataSource() {

View File

@ -8,6 +8,7 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
import ca.uhn.fhir.jpa.entity.TermConcept;
@ -122,6 +123,8 @@ public abstract class BaseJpaTest extends BaseTest {
protected IPartitionLookupSvc myPartitionConfigSvc;
@Autowired
private IdHelperService myIdHelperService;
@Autowired
protected IBatchJobSubmitter myBatchJobSubmitter;
@After
public void afterPerformCleanup() {