WIP supporting multiple type filters

This commit is contained in:
Tadgh 2021-03-09 12:42:39 -05:00
parent 27952ab5e4
commit 1a1a5ee661
7 changed files with 152 additions and 86 deletions

View File

@ -24,6 +24,7 @@ import org.springframework.beans.factory.annotation.Value;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -92,15 +93,24 @@ public abstract class BaseBulkItemReader implements ItemReader<List<ResourcePers
.map(filter -> buildSearchParameterMapForTypeFilter(filter, theDef))
.collect(Collectors.toList());
return maps;
} else {
SearchParameterMap map = new SearchParameterMap();
enhanceSearchParameterMapWithCommonParameters(map);
return Collections.singletonList(map);
}
}
private void enhanceSearchParameterMapWithCommonParameters(SearchParameterMap map) {
map.setLoadSynchronous(true);
if (getJobEntity().getSince() != null) {
map.setLastUpdated(new DateRangeParam(getJobEntity().getSince(), null));
}
}
public SearchParameterMap buildSearchParameterMapForTypeFilter(String theFilter, RuntimeResourceDefinition theDef) {
SearchParameterMap searchParameterMap = myMatchUrlService.translateMatchUrl(theFilter, theDef);
if (getJobEntity().getSince() != null) {
searchParameterMap.setLastUpdated(new DateRangeParam(getJobEntity().getSince(), null));
}
searchParameterMap.setLoadSynchronous(true);
enhanceSearchParameterMapWithCommonParameters(searchParameterMap);
return searchParameterMap;
}
protected RuntimeResourceDefinition getResourceDefinition() {

View File

@ -30,8 +30,10 @@ import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* Basic Bulk Export implementation which simply reads all type filters and applies them, along with the _since param
@ -40,19 +42,21 @@ import java.util.List;
public class BulkItemReader extends BaseBulkItemReader {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@Override
Iterator<ResourcePersistentId> getResourcePidIterator() {
ourLog.info("Bulk export assembling export of type {} for job {}", myResourceType, myJobUUID);
Set<ResourcePersistentId> myReadPids = new HashSet<>();
SearchParameterMap map = createSearchParameterMapsForResourceType();
List<SearchParameterMap> map = createSearchParameterMapsForResourceType();
ISearchBuilder sb = getSearchBuilderForLocalResourceType();
IResultIterator myResultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
List<ResourcePersistentId> myReadPids = new ArrayList<>();
while (myResultIterator.hasNext()) {
myReadPids.add(myResultIterator.next());
for (SearchParameterMap spMap: map) {
ourLog.debug("About to evaluate query {}", spMap.toNormalizedQueryString(myContext));
IResultIterator myResultIterator = sb.createQuery(spMap, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
while (myResultIterator.hasNext()) {
myReadPids.add(myResultIterator.next());
}
}
return myReadPids.iterator();
}

View File

@ -27,6 +27,7 @@ import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao;
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.QueryChunker;
@ -79,7 +80,7 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
@Override
Iterator<ResourcePersistentId> getResourcePidIterator() {
List<ResourcePersistentId> myReadPids = new ArrayList<>();
Set<ResourcePersistentId> myReadPids = new HashSet<>();
//Short circuit out if we detect we are attempting to extract patients
if (myResourceType.equalsIgnoreCase("Patient")) {
@ -171,24 +172,26 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
return expandedIds;
}
private void queryResourceTypeWithReferencesToPatients(List<ResourcePersistentId> myReadPids, List<String> idChunk) {
private void queryResourceTypeWithReferencesToPatients(Set<ResourcePersistentId> myReadPids, List<String> idChunk) {
//Build SP map
//First, inject the _typeFilters and _since from the export job
SearchParameterMap expandedSpMap = createSearchParameterMapsForResourceType();
List<SearchParameterMap> expandedSpMaps = createSearchParameterMapsForResourceType();
for (SearchParameterMap expandedSpMap: expandedSpMaps) {
//Since we are in a bulk job, we have to ensure the user didn't jam in a patient search param, since we need to manually set that.
validateSearchParameters(expandedSpMap);
//Since we are in a bulk job, we have to ensure the user didn't jam in a patient search param, since we need to manually set that.
validateSearchParameters(expandedSpMap);
// Now, further filter the query with patient references defined by the chunk of IDs we have.
filterSearchByResourceIds(idChunk, expandedSpMap);
// Now, further filter the query with patient references defined by the chunk of IDs we have.
filterSearchByResourceIds(idChunk, expandedSpMap);
// Fetch and cache a search builder for this resource type
ISearchBuilder searchBuilder = getSearchBuilderForLocalResourceType();
// Fetch and cache a search builder for this resource type
ISearchBuilder searchBuilder = getSearchBuilderForLocalResourceType();
//Execute query and all found pids to our local iterator.
IResultIterator resultIterator = searchBuilder.createQuery(expandedSpMap, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
while (resultIterator.hasNext()) {
myReadPids.add(resultIterator.next());
//Execute query and all found pids to our local iterator.
IResultIterator resultIterator = searchBuilder.createQuery(expandedSpMap, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
while (resultIterator.hasNext()) {
myReadPids.add(resultIterator.next());
}
}
}

View File

@ -71,27 +71,28 @@ public class PatientBulkItemReader extends BaseBulkItemReader implements ItemRea
List<ResourcePersistentId> myReadPids = new ArrayList<>();
//use _typeFilter and _since and all those fancy bits and bobs to generate our basic SP map.
SearchParameterMap map = createSearchParameterMapsForResourceType();
List<SearchParameterMap> maps = createSearchParameterMapsForResourceType();
String patientSearchParam = getPatientSearchParamForCurrentResourceType().getName();
//Ensure users did not monkey with the patient compartment search parameter.
validateSearchParameters(map);
for (SearchParameterMap map: maps) {
//Ensure users did not monkey with the patient compartment search parameter.
validateSearchParameters(map);
//Skip adding the parameter querying for patient= if we are in fact querying the patient resource type.
if (!myResourceType.equalsIgnoreCase("Patient")) {
//Now that we have our basic built Bulk Export SP map, we inject the condition that the resources returned
//must have a patient= or subject= reference set.
map.add(patientSearchParam, new ReferenceParam().setMissing(false));
}
//Skip adding the parameter querying for patient= if we are in fact querying the patient resource type.
if (!myResourceType.equalsIgnoreCase("Patient")) {
//Now that we have our basic built Bulk Export SP map, we inject the condition that the resources returned
//must have a patient= or subject= reference set.
map.add(patientSearchParam, new ReferenceParam().setMissing(false));
ourLog.debug("About to execute query {}", map.toNormalizedQueryString(myContext));
ISearchBuilder sb = getSearchBuilderForLocalResourceType();
IResultIterator myResultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
while (myResultIterator.hasNext()) {
myReadPids.add(myResultIterator.next());
}
}
ISearchBuilder sb = getSearchBuilderForLocalResourceType();
IResultIterator myResultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
while (myResultIterator.hasNext()) {
myReadPids.add(myResultIterator.next());
}
return myReadPids.iterator();
}

View File

@ -389,7 +389,6 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
public void validateTypeFilters(Set<String> theTheFilters, Set<String> theResourceTypes) {
if (theTheFilters != null) {
Set<String> types = new HashSet<>();
for (String next : theTheFilters) {
if (!next.contains("?")) {
throw new InvalidRequestException("Invalid " + JpaConstants.PARAM_EXPORT_TYPE_FILTER + " value \"" + next + "\". Must be in the form [ResourceType]?[params]");
@ -398,9 +397,6 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
if (!theResourceTypes.contains(resourceType)) {
throw new InvalidRequestException("Invalid " + JpaConstants.PARAM_EXPORT_TYPE_FILTER + " value \"" + next + "\". Resource type does not appear in " + JpaConstants.PARAM_EXPORT_TYPE+ " list");
}
if (!types.add(resourceType)) {
throw new InvalidRequestException("Invalid " + JpaConstants.PARAM_EXPORT_TYPE_FILTER + " value \"" + next + "\". Multiple filters found for type " + resourceType);
}
}
}
}

View File

@ -337,7 +337,6 @@ public class BulkDataExportProviderTest {
@Test
public void testInitiateWithGetAndMultipleTypeFilters() throws IOException {
//TODO GGG FIX ME
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo()
.setJobId(A_JOB_ID);
when(myBulkDataExportSvc.submitJob(any())).thenReturn(jobInfo);
@ -385,9 +384,8 @@ public class BulkDataExportProviderTest {
Parameters input = new Parameters();
input.addParameter(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, new StringType(Constants.CT_FHIR_NDJSON));
input.addParameter(JpaConstants.PARAM_EXPORT_STYLE, new StringType("Patient, Practitioner"));
input.addParameter(JpaConstants.PARAM_EXPORT_SINCE, now);
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE_FILTER, new StringType("Patient?identifier=foo"));
input.addParameter(JpaConstants.PARAM_EXPORT_STYLE, new StringType("Patient"));
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE_FILTER, new StringType("Patient?gender=male,Patient?gender=female"));
ourLog.info(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(input));
@ -406,9 +404,8 @@ public class BulkDataExportProviderTest {
verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture());
BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue();
assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat());
assertThat(options.getResourceTypes(), containsInAnyOrder("Patient", "Practitioner"));
assertThat(options.getSince(), notNullValue());
assertThat(options.getFilters(), containsInAnyOrder("Patient?identifier=foo"));
assertThat(options.getResourceTypes(), containsInAnyOrder("Patient"));
assertThat(options.getFilters(), containsInAnyOrder("Patient?gender=male", "Patient?gender=female"));
}
@Test
@ -447,5 +444,4 @@ public class BulkDataExportProviderTest {
assertThat(options.getSince(), notNullValue());
assertThat(options.getFilters(), containsInAnyOrder("Immunization?vaccine-code=foo"));
}
}

View File

@ -39,6 +39,7 @@ import org.hl7.fhir.r4.model.InstantType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Reference;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -613,10 +614,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
// Iterate over the files
Binary nextBinary = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents);
String nextContents = getBinaryContents(jobInfo, 0);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
assertThat(nextContents, is(containsString("IMM0")));
@ -652,10 +650,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
// Iterate over the files
Binary nextBinary = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents);
String nextContents = getBinaryContents(jobInfo, 0);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
assertThat(nextContents, is(containsString("IMM0")));
@ -695,16 +690,9 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
// GroupBulkExportJobParametersBuilder paramBuilder = new GroupBulkExportJobParametersBuilder();
// paramBuilder.setGroupId(myPatientGroupId.getIdPart());
// paramBuilder.setJobUUID(jobDetails.getJobId());
// paramBuilder.setReadChunkSize(10L);
//
// JobExecution jobExecution = myBatchJobSubmitter.runJob(myGroupBulkJob, paramBuilder.toJobParameters());
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
// awaitJobCompletion(jobExecution);
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
@ -712,10 +700,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("CareTeam")));
// Iterate over the files
Binary nextBinary = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents);
String nextContents = getBinaryContents(jobInfo, 0);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("CareTeam")));
assertThat(nextContents, is(containsString("CT0")));
@ -731,12 +716,93 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
JobParametersBuilder paramBuilder = new JobParametersBuilder().addString("jobUUID", "I'm not real!");
try {
myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
fail("Should have had invalid parameter execption!");
fail("Should have had invalid parameter exception!");
} catch (JobParametersInvalidException e) {
// good
}
}
@Test
public void testSystemExportWithMultipleTypeFilters() {
createResources();
// Create a bulk job
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Immunization"));
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
options.setFilters(Sets.newHashSet("Immunization?vaccine-code=Flu", "Immunization?patient=Patient/PAT1"));
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options);
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(1));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
// Iterate over the files
String nextContents = getBinaryContents(jobInfo, 0);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
//These are the COVID-19 entries
assertThat(nextContents, is(containsString("IMM0")));
assertThat(nextContents, is(containsString("IMM2")));
assertThat(nextContents, is(containsString("IMM4")));
assertThat(nextContents, is(containsString("IMM6")));
assertThat(nextContents, is(containsString("IMM8")));
//This is the entry for the one referencing patient/1
assertThat(nextContents, is(containsString("IMM1")));
}
@Test
public void testGroupExportWithMultipleTypeFilters() {
createResources();
// Create a bulk job
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Observation"));
options.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
options.setGroupId(myPatientGroupId);
options.setExpandMdm(false);
options.setFilters(Sets.newHashSet("Observation?identifier=VAL0,VAL2", "Observation?identifer=VAL4"));
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options);
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(1));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Observation")));
String nextContents = getBinaryContents(jobInfo, 0);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
//These are the COVID-19 entries
assertThat(nextContents, is(containsString("IMM0")));
assertThat(nextContents, is(containsString("IMM2")));
assertThat(nextContents, is(containsString("IMM4")));
assertThat(nextContents, is(containsString("IMM6")));
assertThat(nextContents, is(containsString("IMM8")));
//This is the entry for the one referencing patient/1
assertThat(nextContents, is(containsString("IMM1")));
}
@NotNull
public String getBinaryContents(IBulkDataExportSvc.JobInfo theJobInfo, int theIndex) {
// Iterate over the files
Binary nextBinary = myBinaryDao.read(theJobInfo.getFiles().get(theIndex).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents);
return nextContents;
}
@Test
public void testMdmExpansionSuccessfullyExtractsPatients() throws JobParametersInvalidException {
@ -761,10 +827,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(jobInfo.getFiles().size(), equalTo(1));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Patient")));
Binary patientExportContent = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, patientExportContent.getContentType());
String nextContents = new String(patientExportContent.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", patientExportContent.getResourceType(), nextContents);
String nextContents = getBinaryContents(jobInfo, 0);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Patient")));
//Output contains The entire group, plus the Mdm expansion, plus the golden resource
@ -797,10 +860,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
// Check immunization Content
Binary immunizationExportContent = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, immunizationExportContent.getContentType());
String nextContents = new String(immunizationExportContent.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", immunizationExportContent.getResourceType(), nextContents);
String nextContents = getBinaryContents(jobInfo, 0);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
assertThat(nextContents, is(containsString("IMM0")));
assertThat(nextContents, is(containsString("IMM2")));
@ -865,12 +925,8 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
// Check immunization Content
Binary immunizationExportContent = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, immunizationExportContent.getContentType());
String nextContents = new String(immunizationExportContent.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", immunizationExportContent.getResourceType(), nextContents);
String nextContents = getBinaryContents(jobInfo, 0);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
assertThat(nextContents, is(containsString("IMM1")));
assertThat(nextContents, is(containsString("IMM3")));
assertThat(nextContents, is(containsString("IMM5")));