From 954704884886e1153ff69f0b2e61110c50c7cd4b Mon Sep 17 00:00:00 2001 From: Tadgh Date: Thu, 4 Mar 2021 20:16:50 -0500 Subject: [PATCH] Add support for Patient export --- .../uhn/fhir/jpa/batch/BatchJobsConfig.java | 2 +- .../bulk/api/GroupBulkDataExportOptions.java | 2 +- .../jpa/bulk/job/BulkExportJobConfig.java | 34 +++++++++---------- .../job/CreateBulkExportEntityTasklet.java | 2 +- .../jpa/bulk/job/PatientBulkItemReader.java | 3 +- .../bulk/provider/BulkDataExportProvider.java | 4 +-- .../jpa/bulk/svc/BulkDataExportSvcImpl.java | 5 +++ .../jpa/bulk/BulkDataExportSvcImplR4Test.java | 33 ++++++++---------- 8 files changed, 44 insertions(+), 41 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/BatchJobsConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/BatchJobsConfig.java index 5cb70419561..178ee7358d5 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/BatchJobsConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/BatchJobsConfig.java @@ -33,5 +33,5 @@ import org.springframework.context.annotation.Import; public class BatchJobsConfig { public static final String BULK_EXPORT_JOB_NAME = "bulkExportJob"; public static final String GROUP_BULK_EXPORT_JOB_NAME = "groupBulkExportJob"; - public static final String PATIENT_BULK_EXPORT_JOB_NAME = "patientBulkExport"; + public static final String PATIENT_BULK_EXPORT_JOB_NAME = "patientBulkExportJob"; } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/api/GroupBulkDataExportOptions.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/api/GroupBulkDataExportOptions.java index fb80bf2843f..2718f4e461c 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/api/GroupBulkDataExportOptions.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/api/GroupBulkDataExportOptions.java @@ -10,7 +10,7 @@ public class GroupBulkDataExportOptions extends BulkDataExportOptions { private final boolean myMdm; public GroupBulkDataExportOptions(String theOutputFormat, Set theResourceTypes, Date theSince, Set theFilters, IIdType theGroupId, boolean theMdm) { - super(theOutputFormat, theResourceTypes, theSince, theFilters); + super(theOutputFormat, theResourceTypes, theSince, theFilters, false); myGroupId = theGroupId; myMdm = theMdm; } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java index 31cf3b93e7c..a2b8d804e3b 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java @@ -95,7 +95,7 @@ public class BulkExportJobConfig { @Bean @Lazy public Job patientBulkExportJob() { - return myJobBuilderFactory.get(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME) + return myJobBuilderFactory.get(BatchJobsConfig.PATIENT_BULK_EXPORT_JOB_NAME) .validator(bulkJobParameterValidator()) .start(createBulkExportEntityStep()) .next(patientPartitionStep()) @@ -126,6 +126,7 @@ public class BulkExportJobConfig { return new BulkExportJobParameterValidator(); } + //Writers @Bean public Step groupBulkExportGenerateResourceFilesStep() { return myStepBuilderFactory.get("groupBulkExportGenerateResourceFilesStep") @@ -137,13 +138,6 @@ public class BulkExportJobConfig { .build(); } - @Bean - @StepScope - public GroupBulkItemReader groupBulkItemReader(){ - return new GroupBulkItemReader(); - } - - @Bean public Step bulkExportGenerateResourceFilesStep() { return myStepBuilderFactory.get("bulkExportGenerateResourceFilesStep") @@ -154,6 +148,16 @@ public class BulkExportJobConfig { .listener(bulkExportGenerateResourceFilesStepListener()) .build(); } + @Bean + public Step patientBulkExportGenerateResourceFilesStep() { + return myStepBuilderFactory.get("patientBulkExportGenerateResourceFilesStep") + ., List> chunk(CHUNK_SIZE) //1000 resources per generated file, as the reader returns 10 resources at a time. + .reader(patientBulkItemReader()) + .processor(myPidToIBaseResourceProcessor) + .writer(resourceToFileWriter()) + .listener(bulkExportGenerateResourceFilesStepListener()) + .build(); + } @Bean @JobScope @@ -204,15 +208,11 @@ public class BulkExportJobConfig { .build(); } + @Bean - public Step patientBulkExportGenerateResourceFilesStep() { - return myStepBuilderFactory.get("patientBulkExportGenerateResourceFilesStep") - ., List> chunk(CHUNK_SIZE) //1000 resources per generated file, as the reader returns 10 resources at a time. - .reader(patientBulkItemReader()) - .processor(myPidToIBaseResourceProcessor) - .writer(resourceToFileWriter()) - .listener(bulkExportGenerateResourceFilesStepListener()) - .build(); + @StepScope + public GroupBulkItemReader groupBulkItemReader(){ + return new GroupBulkItemReader(); } @Bean @@ -235,7 +235,7 @@ public class BulkExportJobConfig { @Bean @StepScope - public ItemWriter> resourceToFileWriter() { + public ResourceToFileWriter resourceToFileWriter() { return new ResourceToFileWriter(); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/CreateBulkExportEntityTasklet.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/CreateBulkExportEntityTasklet.java index 91df88ef7cf..b6466e7532c 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/CreateBulkExportEntityTasklet.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/CreateBulkExportEntityTasklet.java @@ -66,7 +66,7 @@ public class CreateBulkExportEntityTasklet implements Tasklet { outputFormat = Constants.CT_FHIR_NDJSON; } - IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(outputFormat, resourceTypeSet, since, filterSet)); + IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(outputFormat, resourceTypeSet, since, filterSet, true)); addUUIDToJobContext(theChunkContext, jobInfo.getJobId()); return RepeatStatus.FINISHED; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/PatientBulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/PatientBulkItemReader.java index 46c72ac6200..c470b7f77c4 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/PatientBulkItemReader.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/PatientBulkItemReader.java @@ -29,6 +29,7 @@ import ca.uhn.fhir.jpa.dao.ISearchBuilder; import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; +import ca.uhn.fhir.rest.param.ReferenceParam; import ca.uhn.fhir.rest.param.StringParam; import org.slf4j.Logger; import org.springframework.batch.item.ItemReader; @@ -71,7 +72,7 @@ public class PatientBulkItemReader extends BaseBulkItemReader implements ItemRea if (!myResourceType.equalsIgnoreCase("Patient")) { //Now that we have our basic built Bulk Export SP map, we inject the condition that the resources returned //must have a patient= or subject= reference set. - map.add(patientSearchParam, new StringParam().setMissing(false)); + map.add(patientSearchParam, new ReferenceParam().setMissing(true)); } ISearchBuilder sb = getSearchBuilderForLocalResourceType(); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/provider/BulkDataExportProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/provider/BulkDataExportProvider.java index e251bcd31d1..df3fcf2aeb9 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/provider/BulkDataExportProvider.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/provider/BulkDataExportProvider.java @@ -75,7 +75,7 @@ public class BulkDataExportProvider { @OperationParam(name = JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, min = 0, max = 1, typeName = "string") IPrimitiveType theOutputFormat, @OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType theType, @OperationParam(name = JpaConstants.PARAM_EXPORT_SINCE, min = 0, max = 1, typeName = "instant") IPrimitiveType theSince, - @OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = 1, typeName = "string") IPrimitiveType theTypeFilter, + @OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = OperationParam.MAX_UNLIMITED, typeName = "string") IPrimitiveType theTypeFilter, ServletRequestDetails theRequestDetails ) { @@ -102,7 +102,7 @@ public class BulkDataExportProvider { filters = ArrayUtil.commaSeparatedListToCleanSet(theTypeFilter.getValueAsString()); } - IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(outputFormat, resourceTypes, since, filters)); + IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(outputFormat, resourceTypes, since, filters, true)); String serverBase = getServerBase(theRequestDetails); String pollLocation = serverBase + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" + JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + outcome.getJobId(); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java index a1145c97609..d2f989814bb 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java @@ -114,6 +114,10 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { @Qualifier(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME) private org.springframework.batch.core.Job myGroupBulkExportJob; + @Autowired + @Qualifier(BatchJobsConfig.PATIENT_BULK_EXPORT_JOB_NAME) + private org.springframework.batch.core.Job myPatientBulkExportJob; + private final int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR); /** @@ -242,6 +246,7 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { private boolean isPatientBulkJob(BulkExportJobEntity theBulkExportJobEntity) { //TODO GGG + return true; } private void enhanceBulkParametersWithGroupParameters(BulkExportJobEntity theBulkExportJobEntity, JobParametersBuilder theParameters) { diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java index 986cba83cfd..9dbc82b709c 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java @@ -159,7 +159,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { @Test public void testSubmit_InvalidOutputFormat() { try { - myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Patient", "Observation"), null, null)); + myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Patient", "Observation"), null, null, true)); fail(); } catch (InvalidRequestException e) { assertEquals("Invalid output format: application/fhir+json", e.getMessage()); @@ -169,7 +169,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { @Test public void testSubmit_OnlyBinarySelected() { try { - myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Binary"), null, null)); + myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Binary"), null, null, true)); fail(); } catch (InvalidRequestException e) { assertEquals("Invalid output format: application/fhir+json", e.getMessage()); @@ -179,7 +179,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { @Test public void testSubmit_InvalidResourceTypes() { try { - myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient", "FOO"), null, null)); + myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient", "FOO"), null, null, true)); fail(); } catch (InvalidRequestException e) { assertEquals("Unknown or unsupported resource type: FOO", e.getMessage()); @@ -189,7 +189,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { @Test public void testSubmit_MultipleTypeFiltersForSameType() { try { - myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Patient?name=a", "Patient?active=true"))); + myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Patient?name=a", "Patient?active=true"), true)); fail(); } catch (InvalidRequestException e) { assertEquals("Invalid _typeFilter value \"Patient?name=a\". Multiple filters found for type Patient", e.getMessage()); @@ -199,7 +199,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { @Test public void testSubmit_TypeFilterForNonSelectedType() { try { - myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Observation?code=123"))); + myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Observation?code=123"), true)); fail(); } catch (InvalidRequestException e) { assertEquals("Invalid _typeFilter value \"Observation?code=123\". Resource type does not appear in _type list", e.getMessage()); @@ -209,7 +209,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { @Test public void testSubmit_TypeFilterInvalid() { try { - myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Hello"))); + myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Hello"), true)); fail(); } catch (InvalidRequestException e) { assertEquals("Invalid _typeFilter value \"Hello\". Must be in the form [ResourceType]?[params]", e.getMessage()); @@ -220,11 +220,11 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { public void testSubmit_ReusesExisting() { // Submit - IBulkDataExportSvc.JobInfo jobDetails1 = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null)); + IBulkDataExportSvc.JobInfo jobDetails1 = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null, true)); assertNotNull(jobDetails1.getJobId()); // Submit again - IBulkDataExportSvc.JobInfo jobDetails2 = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null)); + IBulkDataExportSvc.JobInfo jobDetails2 = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null, true)); assertNotNull(jobDetails2.getJobId()); assertEquals(jobDetails1.getJobId(), jobDetails2.getJobId()); @@ -245,7 +245,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { createResources(); // Create a bulk job - IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient"), null, null)); + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient"), null, null, true)); assertNotNull(jobDetails.getJobId()); // Check the status @@ -275,7 +275,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { createResources(); // Create a bulk job - IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, Sets.newHashSet(TEST_FILTER))); + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, Sets.newHashSet(TEST_FILTER), true)); assertNotNull(jobDetails.getJobId()); // Check the status @@ -328,7 +328,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { myBinaryDao.create(b); // Create a bulk job - IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, null, null, null)); + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, null, null, null, true)); assertNotNull(jobDetails.getJobId()); // Check the status @@ -382,7 +382,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { // Create a bulk job HashSet types = Sets.newHashSet("Patient"); Set typeFilters = Sets.newHashSet("Patient?_has:Observation:patient:identifier=SYS|VAL3"); - IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, types, null, typeFilters)); + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, types, null, typeFilters, true)); assertNotNull(jobDetails.getJobId()); // Check the status @@ -434,7 +434,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { } // Create a bulk job - IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), cutoff.getValue(), null)); + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), cutoff.getValue(), null, true)); assertNotNull(jobDetails.getJobId()); // Check the status @@ -513,7 +513,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { createResources(); // Create a bulk job - IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null)); + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null, true)); //Add the UUID to the job BulkExportJobParametersBuilder paramBuilder = new BulkExportJobParametersBuilder() @@ -584,7 +584,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE)); - assertThat(jobInfo.getFiles().size(), equalTo(1)); + assertThat(jobInfo.getFiles().size(), equalTo(2)); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); // Iterate over the files @@ -615,9 +615,6 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { } - - } - // CareTeam has two patient references: participant and patient. This test checks if we find the patient if participant is null but patient is not null @Test public void testGroupBatchJobCareTeam() throws Exception {