Add support for Patient export

This commit is contained in:
Tadgh 2021-03-04 20:16:50 -05:00
parent f5372c9f07
commit 9547048848
8 changed files with 44 additions and 41 deletions

View File

@ -33,5 +33,5 @@ import org.springframework.context.annotation.Import;
public class BatchJobsConfig {
public static final String BULK_EXPORT_JOB_NAME = "bulkExportJob";
public static final String GROUP_BULK_EXPORT_JOB_NAME = "groupBulkExportJob";
public static final String PATIENT_BULK_EXPORT_JOB_NAME = "patientBulkExport";
public static final String PATIENT_BULK_EXPORT_JOB_NAME = "patientBulkExportJob";
}

View File

@ -10,7 +10,7 @@ public class GroupBulkDataExportOptions extends BulkDataExportOptions {
private final boolean myMdm;
public GroupBulkDataExportOptions(String theOutputFormat, Set<String> theResourceTypes, Date theSince, Set<String> theFilters, IIdType theGroupId, boolean theMdm) {
super(theOutputFormat, theResourceTypes, theSince, theFilters);
super(theOutputFormat, theResourceTypes, theSince, theFilters, false);
myGroupId = theGroupId;
myMdm = theMdm;
}

View File

@ -95,7 +95,7 @@ public class BulkExportJobConfig {
@Bean
@Lazy
public Job patientBulkExportJob() {
return myJobBuilderFactory.get(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME)
return myJobBuilderFactory.get(BatchJobsConfig.PATIENT_BULK_EXPORT_JOB_NAME)
.validator(bulkJobParameterValidator())
.start(createBulkExportEntityStep())
.next(patientPartitionStep())
@ -126,6 +126,7 @@ public class BulkExportJobConfig {
return new BulkExportJobParameterValidator();
}
//Writers
@Bean
public Step groupBulkExportGenerateResourceFilesStep() {
return myStepBuilderFactory.get("groupBulkExportGenerateResourceFilesStep")
@ -137,13 +138,6 @@ public class BulkExportJobConfig {
.build();
}
@Bean
@StepScope
public GroupBulkItemReader groupBulkItemReader(){
return new GroupBulkItemReader();
}
@Bean
public Step bulkExportGenerateResourceFilesStep() {
return myStepBuilderFactory.get("bulkExportGenerateResourceFilesStep")
@ -154,6 +148,16 @@ public class BulkExportJobConfig {
.listener(bulkExportGenerateResourceFilesStepListener())
.build();
}
@Bean
public Step patientBulkExportGenerateResourceFilesStep() {
return myStepBuilderFactory.get("patientBulkExportGenerateResourceFilesStep")
.<List<ResourcePersistentId>, List<IBaseResource>> chunk(CHUNK_SIZE) //1000 resources per generated file, as the reader returns 10 resources at a time.
.reader(patientBulkItemReader())
.processor(myPidToIBaseResourceProcessor)
.writer(resourceToFileWriter())
.listener(bulkExportGenerateResourceFilesStepListener())
.build();
}
@Bean
@JobScope
@ -204,15 +208,11 @@ public class BulkExportJobConfig {
.build();
}
@Bean
public Step patientBulkExportGenerateResourceFilesStep() {
return myStepBuilderFactory.get("patientBulkExportGenerateResourceFilesStep")
.<List<ResourcePersistentId>, List<IBaseResource>> chunk(CHUNK_SIZE) //1000 resources per generated file, as the reader returns 10 resources at a time.
.reader(patientBulkItemReader())
.processor(myPidToIBaseResourceProcessor)
.writer(resourceToFileWriter())
.listener(bulkExportGenerateResourceFilesStepListener())
.build();
@StepScope
public GroupBulkItemReader groupBulkItemReader(){
return new GroupBulkItemReader();
}
@Bean
@ -235,7 +235,7 @@ public class BulkExportJobConfig {
@Bean
@StepScope
public ItemWriter<List<IBaseResource>> resourceToFileWriter() {
public ResourceToFileWriter resourceToFileWriter() {
return new ResourceToFileWriter();
}

View File

@ -66,7 +66,7 @@ public class CreateBulkExportEntityTasklet implements Tasklet {
outputFormat = Constants.CT_FHIR_NDJSON;
}
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(outputFormat, resourceTypeSet, since, filterSet));
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(outputFormat, resourceTypeSet, since, filterSet, true));
addUUIDToJobContext(theChunkContext, jobInfo.getJobId());
return RepeatStatus.FINISHED;

View File

@ -29,6 +29,7 @@ import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.ReferenceParam;
import ca.uhn.fhir.rest.param.StringParam;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemReader;
@ -71,7 +72,7 @@ public class PatientBulkItemReader extends BaseBulkItemReader implements ItemRea
if (!myResourceType.equalsIgnoreCase("Patient")) {
//Now that we have our basic built Bulk Export SP map, we inject the condition that the resources returned
//must have a patient= or subject= reference set.
map.add(patientSearchParam, new StringParam().setMissing(false));
map.add(patientSearchParam, new ReferenceParam().setMissing(true));
}
ISearchBuilder sb = getSearchBuilderForLocalResourceType();

View File

@ -75,7 +75,7 @@ public class BulkDataExportProvider {
@OperationParam(name = JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theOutputFormat,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theType,
@OperationParam(name = JpaConstants.PARAM_EXPORT_SINCE, min = 0, max = 1, typeName = "instant") IPrimitiveType<Date> theSince,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theTypeFilter,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = OperationParam.MAX_UNLIMITED, typeName = "string") IPrimitiveType<String> theTypeFilter,
ServletRequestDetails theRequestDetails
) {
@ -102,7 +102,7 @@ public class BulkDataExportProvider {
filters = ArrayUtil.commaSeparatedListToCleanSet(theTypeFilter.getValueAsString());
}
IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(outputFormat, resourceTypes, since, filters));
IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(outputFormat, resourceTypes, since, filters, true));
String serverBase = getServerBase(theRequestDetails);
String pollLocation = serverBase + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" + JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + outcome.getJobId();

View File

@ -114,6 +114,10 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
@Qualifier(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME)
private org.springframework.batch.core.Job myGroupBulkExportJob;
@Autowired
@Qualifier(BatchJobsConfig.PATIENT_BULK_EXPORT_JOB_NAME)
private org.springframework.batch.core.Job myPatientBulkExportJob;
private final int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR);
/**
@ -242,6 +246,7 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
private boolean isPatientBulkJob(BulkExportJobEntity theBulkExportJobEntity) {
//TODO GGG
return true;
}
private void enhanceBulkParametersWithGroupParameters(BulkExportJobEntity theBulkExportJobEntity, JobParametersBuilder theParameters) {

View File

@ -159,7 +159,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test
public void testSubmit_InvalidOutputFormat() {
try {
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Patient", "Observation"), null, null));
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Patient", "Observation"), null, null, true));
fail();
} catch (InvalidRequestException e) {
assertEquals("Invalid output format: application/fhir+json", e.getMessage());
@ -169,7 +169,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test
public void testSubmit_OnlyBinarySelected() {
try {
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Binary"), null, null));
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Binary"), null, null, true));
fail();
} catch (InvalidRequestException e) {
assertEquals("Invalid output format: application/fhir+json", e.getMessage());
@ -179,7 +179,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test
public void testSubmit_InvalidResourceTypes() {
try {
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient", "FOO"), null, null));
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient", "FOO"), null, null, true));
fail();
} catch (InvalidRequestException e) {
assertEquals("Unknown or unsupported resource type: FOO", e.getMessage());
@ -189,7 +189,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test
public void testSubmit_MultipleTypeFiltersForSameType() {
try {
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Patient?name=a", "Patient?active=true")));
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Patient?name=a", "Patient?active=true"), true));
fail();
} catch (InvalidRequestException e) {
assertEquals("Invalid _typeFilter value \"Patient?name=a\". Multiple filters found for type Patient", e.getMessage());
@ -199,7 +199,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test
public void testSubmit_TypeFilterForNonSelectedType() {
try {
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Observation?code=123")));
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Observation?code=123"), true));
fail();
} catch (InvalidRequestException e) {
assertEquals("Invalid _typeFilter value \"Observation?code=123\". Resource type does not appear in _type list", e.getMessage());
@ -209,7 +209,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test
public void testSubmit_TypeFilterInvalid() {
try {
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Hello")));
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Hello"), true));
fail();
} catch (InvalidRequestException e) {
assertEquals("Invalid _typeFilter value \"Hello\". Must be in the form [ResourceType]?[params]", e.getMessage());
@ -220,11 +220,11 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
public void testSubmit_ReusesExisting() {
// Submit
IBulkDataExportSvc.JobInfo jobDetails1 = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null));
IBulkDataExportSvc.JobInfo jobDetails1 = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null, true));
assertNotNull(jobDetails1.getJobId());
// Submit again
IBulkDataExportSvc.JobInfo jobDetails2 = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null));
IBulkDataExportSvc.JobInfo jobDetails2 = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null, true));
assertNotNull(jobDetails2.getJobId());
assertEquals(jobDetails1.getJobId(), jobDetails2.getJobId());
@ -245,7 +245,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient"), null, null));
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient"), null, null, true));
assertNotNull(jobDetails.getJobId());
// Check the status
@ -275,7 +275,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, Sets.newHashSet(TEST_FILTER)));
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, Sets.newHashSet(TEST_FILTER), true));
assertNotNull(jobDetails.getJobId());
// Check the status
@ -328,7 +328,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
myBinaryDao.create(b);
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, null, null, null));
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, null, null, null, true));
assertNotNull(jobDetails.getJobId());
// Check the status
@ -382,7 +382,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
// Create a bulk job
HashSet<String> types = Sets.newHashSet("Patient");
Set<String> typeFilters = Sets.newHashSet("Patient?_has:Observation:patient:identifier=SYS|VAL3");
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, types, null, typeFilters));
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, types, null, typeFilters, true));
assertNotNull(jobDetails.getJobId());
// Check the status
@ -434,7 +434,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
}
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), cutoff.getValue(), null));
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), cutoff.getValue(), null, true));
assertNotNull(jobDetails.getJobId());
// Check the status
@ -513,7 +513,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null));
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null, true));
//Add the UUID to the job
BulkExportJobParametersBuilder paramBuilder = new BulkExportJobParametersBuilder()
@ -584,7 +584,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(1));
assertThat(jobInfo.getFiles().size(), equalTo(2));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
// Iterate over the files
@ -615,9 +615,6 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
}
}
// CareTeam has two patient references: participant and patient. This test checks if we find the patient if participant is null but patient is not null
@Test
public void testGroupBatchJobCareTeam() throws Exception {