Start with failling test and partial job config

This commit is contained in:
Tadgh 2021-03-04 16:15:51 -05:00
parent 8021396f4f
commit ff1c6e4d27
7 changed files with 150 additions and 17 deletions

View File

@ -33,4 +33,5 @@ import org.springframework.context.annotation.Import;
public class BatchJobsConfig { public class BatchJobsConfig {
public static final String BULK_EXPORT_JOB_NAME = "bulkExportJob"; public static final String BULK_EXPORT_JOB_NAME = "bulkExportJob";
public static final String GROUP_BULK_EXPORT_JOB_NAME = "groupBulkExportJob"; public static final String GROUP_BULK_EXPORT_JOB_NAME = "groupBulkExportJob";
public static final String PATIENT_BULK_EXPORT_JOB_NAME = "patientBulkExport";
} }

View File

@ -8,12 +8,14 @@ public class BulkDataExportOptions {
private final Set<String> myResourceTypes; private final Set<String> myResourceTypes;
private final Date mySince; private final Date mySince;
private final Set<String> myFilters; private final Set<String> myFilters;
private boolean mySystemLevel;
public BulkDataExportOptions(String theOutputFormat, Set<String> theResourceTypes, Date theSince, Set<String> theFilters) { public BulkDataExportOptions(String theOutputFormat, Set<String> theResourceTypes, Date theSince, Set<String> theFilters, boolean theSystemLevel) {
myOutputFormat = theOutputFormat; myOutputFormat = theOutputFormat;
myResourceTypes = theResourceTypes; myResourceTypes = theResourceTypes;
mySince = theSince; mySince = theSince;
myFilters = theFilters; myFilters = theFilters;
mySystemLevel = theSystemLevel;
} }
public String getOutputFormat() { public String getOutputFormat() {
@ -31,4 +33,8 @@ public class BulkDataExportOptions {
public Set<String> getFilters() { public Set<String> getFilters() {
return myFilters; return myFilters;
} }
public boolean isSystemLevel() {
return mySystemLevel;
}
} }

View File

@ -92,6 +92,17 @@ public class BulkExportJobConfig {
.build(); .build();
} }
@Bean
@Lazy
public Job patientBulkExportJob() {
return myJobBuilderFactory.get(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME)
.validator(bulkJobParameterValidator())
.start(createBulkExportEntityStep())
.next(patientPartitionStep())
.next(closeJobStep())
.build();
}
@Bean @Bean
public GroupIdPresentValidator groupBulkJobParameterValidator() { public GroupIdPresentValidator groupBulkJobParameterValidator() {
return new GroupIdPresentValidator(); return new GroupIdPresentValidator();
@ -122,7 +133,7 @@ public class BulkExportJobConfig {
.reader(groupBulkItemReader()) .reader(groupBulkItemReader())
.processor(myPidToIBaseResourceProcessor) .processor(myPidToIBaseResourceProcessor)
.writer(resourceToFileWriter()) .writer(resourceToFileWriter())
.listener(bulkExportGenrateResourceFilesStepListener()) .listener(bulkExportGenerateResourceFilesStepListener())
.build(); .build();
} }
@ -140,7 +151,7 @@ public class BulkExportJobConfig {
.reader(bulkItemReader()) .reader(bulkItemReader())
.processor(myPidToIBaseResourceProcessor) .processor(myPidToIBaseResourceProcessor)
.writer(resourceToFileWriter()) .writer(resourceToFileWriter())
.listener(bulkExportGenrateResourceFilesStepListener()) .listener(bulkExportGenerateResourceFilesStepListener())
.build(); .build();
} }
@ -165,10 +176,17 @@ public class BulkExportJobConfig {
@Bean @Bean
@JobScope @JobScope
public BulkExportGenerateResourceFilesStepListener bulkExportGenrateResourceFilesStepListener() { public BulkExportGenerateResourceFilesStepListener bulkExportGenerateResourceFilesStepListener() {
return new BulkExportGenerateResourceFilesStepListener(); return new BulkExportGenerateResourceFilesStepListener();
} }
@Bean
public Step partitionStep() {
return myStepBuilderFactory.get("partitionStep")
.partitioner("bulkExportGenerateResourceFilesStep", bulkExportResourceTypePartitioner())
.step(bulkExportGenerateResourceFilesStep())
.build();
}
@Bean @Bean
public Step groupPartitionStep() { public Step groupPartitionStep() {
@ -177,14 +195,32 @@ public class BulkExportJobConfig {
.step(groupBulkExportGenerateResourceFilesStep()) .step(groupBulkExportGenerateResourceFilesStep())
.build(); .build();
} }
@Bean @Bean
public Step partitionStep() { public Step patientPartitionStep() {
return myStepBuilderFactory.get("partitionStep") return myStepBuilderFactory.get("partitionStep")
.partitioner("bulkExportGenerateResourceFilesStep", bulkExportResourceTypePartitioner()) .partitioner("patientBulkExportGenerateResourceFilesStep", bulkExportResourceTypePartitioner())
.step(bulkExportGenerateResourceFilesStep()) .step(patientBulkExportGenerateResourceFilesStep())
.build(); .build();
} }
@Bean
public Step patientBulkExportGenerateResourceFilesStep() {
return myStepBuilderFactory.get("patientBulkExportGenerateResourceFilesStep")
.<List<ResourcePersistentId>, List<IBaseResource>> chunk(CHUNK_SIZE) //1000 resources per generated file, as the reader returns 10 resources at a time.
.reader(patientBulkItemReader())
.processor(myPidToIBaseResourceProcessor)
.writer(resourceToFileWriter())
.listener(bulkExportGenerateResourceFilesStepListener())
.build();
}
@Bean
@StepScope
public PatientBulkItemReader patientBulkItemReader() {
return new PatientBulkItemReader();
}
@Bean @Bean
@StepScope @StepScope
public BulkItemReader bulkItemReader(){ public BulkItemReader bulkItemReader(){

View File

@ -49,6 +49,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseBinary; import org.hl7.fhir.instance.model.api.IBaseBinary;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Group;
import org.hl7.fhir.r4.model.InstantType; import org.hl7.fhir.r4.model.InstantType;
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionContext;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -229,6 +230,8 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
if (isGroupBulkJob(theBulkExportJobEntity)) { if (isGroupBulkJob(theBulkExportJobEntity)) {
enhanceBulkParametersWithGroupParameters(theBulkExportJobEntity, parameters); enhanceBulkParametersWithGroupParameters(theBulkExportJobEntity, parameters);
myJobSubmitter.runJob(myGroupBulkExportJob, parameters.toJobParameters()); myJobSubmitter.runJob(myGroupBulkExportJob, parameters.toJobParameters());
} else if (isPatientBulkJob(theBulkExportJobEntity)) {
myJobSubmitter.runJob(myPatientBulkExportJob, parameters.toJobParameters());
} else { } else {
myJobSubmitter.runJob(myBulkExportJob, parameters.toJobParameters()); myJobSubmitter.runJob(myBulkExportJob, parameters.toJobParameters());
} }
@ -237,6 +240,10 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
} }
} }
private boolean isPatientBulkJob(BulkExportJobEntity theBulkExportJobEntity) {
//TODO GGG
}
private void enhanceBulkParametersWithGroupParameters(BulkExportJobEntity theBulkExportJobEntity, JobParametersBuilder theParameters) { private void enhanceBulkParametersWithGroupParameters(BulkExportJobEntity theBulkExportJobEntity, JobParametersBuilder theParameters) {
String theGroupId = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_GROUP_ID); String theGroupId = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_GROUP_ID);
String expandMdm = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_MDM); String expandMdm = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_MDM);
@ -282,7 +289,14 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
// TODO GGG KS can we encode BulkDataExportOptions as a JSON string as opposed to this request string. Feels like it would be a more extensible encoding... // TODO GGG KS can we encode BulkDataExportOptions as a JSON string as opposed to this request string. Feels like it would be a more extensible encoding...
//Probably yes, but this will all need to be rebuilt when we remove this bridge entity //Probably yes, but this will all need to be rebuilt when we remove this bridge entity
StringBuilder requestBuilder = new StringBuilder(); StringBuilder requestBuilder = new StringBuilder();
requestBuilder.append("/").append(JpaConstants.OPERATION_EXPORT); requestBuilder.append("/");
//Prefix the export url with Group/[id]/
if (theBulkDataExportOptions instanceof GroupBulkDataExportOptions) {
requestBuilder.append(((GroupBulkDataExportOptions)theBulkDataExportOptions).getGroupId()).append("/");
}
requestBuilder.append(JpaConstants.OPERATION_EXPORT);
requestBuilder.append("?").append(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT).append("=").append(escapeUrlParam(outputFormat)); requestBuilder.append("?").append(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT).append("=").append(escapeUrlParam(outputFormat));
Set<String> resourceTypes = theBulkDataExportOptions.getResourceTypes(); Set<String> resourceTypes = theBulkDataExportOptions.getResourceTypes();
if (resourceTypes != null) { if (resourceTypes != null) {
@ -300,6 +314,9 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_GROUP_ID).append("=").append(groupOptions.getGroupId().getValue()); requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_GROUP_ID).append("=").append(groupOptions.getGroupId().getValue());
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_MDM).append("=").append(groupOptions.isMdm()); requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_MDM).append("=").append(groupOptions.isMdm());
} }
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_SYSTEM_LEVEL).append("=").append(theBulkDataExportOptions.isSystemLevel());
String request = requestBuilder.toString(); String request = requestBuilder.toString();
Date cutoff = DateUtils.addMilliseconds(new Date(), -myReuseBulkExportForMillis); Date cutoff = DateUtils.addMilliseconds(new Date(), -myReuseBulkExportForMillis);

View File

@ -97,6 +97,10 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Qualifier(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME) @Qualifier(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME)
private Job myGroupBulkJob; private Job myGroupBulkJob;
@Autowired
@Qualifier(BatchJobsConfig.PATIENT_BULK_EXPORT_JOB_NAME)
private Job myPatientBulkJob;
private IIdType myPatientGroupId; private IIdType myPatientGroupId;
@Test @Test
@ -562,6 +566,58 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(nextContents, is(containsString("IMM8"))); assertThat(nextContents, is(containsString("IMM8")));
} }
@Test
public void testPatientLevelExportWorks() throws JobParametersInvalidException {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Immunization", "Observation"), null, null, false));
GroupBulkExportJobParametersBuilder paramBuilder = new GroupBulkExportJobParametersBuilder();
paramBuilder.setGroupId(myPatientGroupId.getIdPart());
paramBuilder.setJobUUID(jobDetails.getJobId());
paramBuilder.setReadChunkSize(10L);
JobExecution jobExecution = myBatchJobSubmitter.runJob(myPatientBulkJob, paramBuilder.toJobParameters());
awaitJobCompletion(jobExecution);
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(1));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
// Iterate over the files
Binary nextBinary = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
assertThat(nextContents, is(containsString("IMM0")));
assertThat(nextContents, is(containsString("IMM1")));
assertThat(nextContents, is(containsString("IMM2")));
assertThat(nextContents, is(containsString("IMM3")));
assertThat(nextContents, is(containsString("IMM4")));
assertThat(nextContents, is(containsString("IMM5")));
assertThat(nextContents, is(containsString("IMM6")));
assertThat(nextContents, is(containsString("IMM7")));
assertThat(nextContents, is(containsString("IMM8")));
assertThat(nextContents, is(containsString("IMM9")));
assertThat(nextContents, is(containsString("IMM999")));
assertThat(nextContents, is(not(containsString("IMM2000"))));
assertThat(nextContents, is(not(containsString("IMM2001"))));
assertThat(nextContents, is(not(containsString("IMM2002"))));
assertThat(nextContents, is(not(containsString("IMM2003"))));
assertThat(nextContents, is(not(containsString("IMM2004"))));
assertThat(nextContents, is(not(containsString("IMM2005"))));
}
}
// CareTeam has two patient references: participant and patient. This test checks if we find the patient if participant is null but patient is not null // CareTeam has two patient references: participant and patient. This test checks if we find the patient if participant is null but patient is not null
@Test @Test
public void testGroupBatchJobCareTeam() throws Exception { public void testGroupBatchJobCareTeam() throws Exception {
@ -789,15 +845,24 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
Long goldenPid2 = myIdHelperService.getPidOrNull(g2Outcome.getResource()); Long goldenPid2 = myIdHelperService.getPidOrNull(g2Outcome.getResource());
//Create some nongroup patients MDM linked to a different golden resource. They shouldnt be included in the query. //Create some nongroup patients MDM linked to a different golden resource. They shouldnt be included in the query.
for (int i = 1000; i < 1005; i++) { for (int i = 0; i < 5; i++) {
DaoMethodOutcome patientOutcome = createPatientWithIndex(i); int index = 1000 + i;
DaoMethodOutcome patientOutcome = createPatientWithIndex(index);
IIdType patId = patientOutcome.getId().toUnqualifiedVersionless(); IIdType patId = patientOutcome.getId().toUnqualifiedVersionless();
Long sourcePid = myIdHelperService.getPidOrNull(patientOutcome.getResource()); Long sourcePid = myIdHelperService.getPidOrNull(patientOutcome.getResource());
linkToGoldenResource(goldenPid2, sourcePid); linkToGoldenResource(goldenPid2, sourcePid);
createObservationWithIndex(i, patId); createObservationWithIndex(index, patId);
createImmunizationWithIndex(i, patId); createImmunizationWithIndex(index, patId);
createCareTeamWithIndex(i, patId); createCareTeamWithIndex(index, patId);
} }
//Create some Observations and immunizations which have _no subjects!_ These will be exlucded from the Patient level export.
for (int i = 0; i < 10; i++) {
int index = 2000 + i;
createObservationWithIndex(index, null);
createImmunizationWithIndex(index, null);
}
} }
private DaoMethodOutcome createPatientWithIndex(int i) { private DaoMethodOutcome createPatientWithIndex(int i) {
@ -820,7 +885,9 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
private void createImmunizationWithIndex(int i, IIdType patId) { private void createImmunizationWithIndex(int i, IIdType patId) {
Immunization immunization = new Immunization(); Immunization immunization = new Immunization();
immunization.setId("IMM" + i); immunization.setId("IMM" + i);
immunization.setPatient(new Reference(patId)); if (patId != null ) {
immunization.setPatient(new Reference(patId));
}
if (i % 2 == 0) { if (i % 2 == 0) {
CodeableConcept cc = new CodeableConcept(); CodeableConcept cc = new CodeableConcept();
cc.addCoding().setSystem("vaccines").setCode("Flu"); cc.addCoding().setSystem("vaccines").setCode("Flu");
@ -838,7 +905,9 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
obs.setId("OBS" + i); obs.setId("OBS" + i);
obs.addIdentifier().setSystem("SYS").setValue("VAL" + i); obs.addIdentifier().setSystem("SYS").setValue("VAL" + i);
obs.setStatus(Observation.ObservationStatus.FINAL); obs.setStatus(Observation.ObservationStatus.FINAL);
obs.getSubject().setReference(patId.getValue()); if (patId != null) {
obs.getSubject().setReference(patId.getValue());
}
myObservationDao.update(obs); myObservationDao.update(obs);
} }

View File

@ -194,6 +194,11 @@ public class JpaConstants {
*/ */
public static final String PARAM_EXPORT_MDM = "_mdm"; public static final String PARAM_EXPORT_MDM = "_mdm";
/**
* Whether the Export is for System level or Patient Level.
*/
public static final String PARAM_EXPORT_SYSTEM_LEVEL = "_system";
/** /**
* Parameter for delete to indicate the deleted resources should also be expunged * Parameter for delete to indicate the deleted resources should also be expunged
*/ */

View File

@ -67,8 +67,7 @@ public class TestJpaR4Config extends BaseJavaConfigR4 {
BasicDataSource retVal = new BasicDataSource(); BasicDataSource retVal = new BasicDataSource();
retVal.setDriver(new org.h2.Driver()); retVal.setDriver(new org.h2.Driver());
// retVal.setUrl("jdbc:h2:mem:testdb_r4"); retVal.setUrl("jdbc:h2:mem:testdb_r4");
retVal.setUrl("jdbc:h2:file:./testdb_r4;create=true");
retVal.setMaxWaitMillis(10000); retVal.setMaxWaitMillis(10000);
retVal.setUsername(""); retVal.setUsername("");
retVal.setPassword(""); retVal.setPassword("");