Support forward references in Group Bulk Export (#3558)

* Preliminary test work to generate forward-reference data

* Complete implementation

* Permit Practitioner/Organization as forward references in bulk export group params

* Wip

* Fix test

* Add error number

* Address review comments

* Update assert
This commit is contained in:
Tadgh 2022-04-21 22:33:10 -07:00 committed by GitHub
parent f9824d6b15
commit 53c8b067d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 197 additions and 36 deletions

View File

@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.batch.config;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public final class BatchConstants {
@ -90,6 +91,8 @@ public final class BatchConstants {
*/
public static final String JOB_EXECUTION_RESOURCE_TYPE = "resourceType";
public static final List<String> PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES = List.of("Practitioner", "Organization");
/**
* This Set contains the step names across all job types that are appropriate for
* someone to look at the write count for that given step in order to determine the

View File

@ -0,0 +1,4 @@
type: add
issue: 3557
jira: SMILE-4062
title: "Group Bulk Export (e.g. Group/123/$export) now additionally supports Organization and Practitioner as valid _type parameters. This works internally by querying using a `_has` parameter"

View File

@ -34,14 +34,18 @@ import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseExtension;
import org.hl7.fhir.instance.model.api.IBaseReference;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.lang.NonNull;
import java.util.List;
import java.util.Optional;
import static ca.uhn.fhir.jpa.batch.config.BatchConstants.PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES;
/**
* Reusable Item Processor which attaches an extension to any outgoing resource. This extension will contain a resource
* reference to the golden resource patient of the given resources' patient. (e.g. Observation.subject, Immunization.patient, etc)
@ -49,6 +53,7 @@ import java.util.Optional;
public class GoldenResourceAnnotatingProcessor implements ItemProcessor<List<IBaseResource>, List<IBaseResource>> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@Value("#{stepExecutionContext['resourceType']}")
private String myResourceType;
@ -79,21 +84,34 @@ public class GoldenResourceAnnotatingProcessor implements ItemProcessor<List<IBa
}
@Override
public List<IBaseResource> process(List<IBaseResource> theIBaseResources) throws Exception {
//If MDM expansion is enabled, add this magic new extension, otherwise, return the resource as is.
if (myMdmEnabled) {
if (myRuntimeSearchParam == null) {
populateRuntimeSearchParam();
}
if (myPatientFhirPath == null) {
populatePatientFhirPath();
}
theIBaseResources.forEach(this::annotateClinicalResourceWithRelatedGoldenResourcePatient);
public List<IBaseResource> process(@NonNull List<IBaseResource> theIBaseResources) throws Exception {
if (shouldAnnotateResource()) {
lazyLoadSearchParamsAndFhirPath();
theIBaseResources.forEach(this::annotateBackwardsReferences);
}
return theIBaseResources;
}
private void annotateClinicalResourceWithRelatedGoldenResourcePatient(IBaseResource iBaseResource) {
private void lazyLoadSearchParamsAndFhirPath() {
if (myRuntimeSearchParam == null) {
populateRuntimeSearchParam();
}
if (myPatientFhirPath == null) {
populatePatientFhirPath();
}
}
/**
* If the resource is added via a forward-reference from a patient, e.g. Patient.managingOrganization, we have no way to fetch the patient at this point in time.
* This is a shortcoming of including the forward reference types in a Group/Patient bulk export.
*
* @return true if the resource should be annotated with the golden resource patient reference
*/
private boolean shouldAnnotateResource() {
return myMdmEnabled && !PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES.contains(myResourceType);
}
private void annotateBackwardsReferences(IBaseResource iBaseResource) {
Optional<String> patientReference = getPatientReference(iBaseResource);
if (patientReference.isPresent()) {
addGoldenResourceExtension(iBaseResource, patientReference.get());
@ -104,13 +122,15 @@ public class GoldenResourceAnnotatingProcessor implements ItemProcessor<List<IBa
}
private Optional<String> getPatientReference(IBaseResource iBaseResource) {
//In the case of patient, we will just use the raw ID.
if (myResourceType.equalsIgnoreCase("Patient")) {
return Optional.of(iBaseResource.getIdElement().getIdPart());
//Otherwise, we will perform evaluation of the fhirPath.
} else {
Optional<IBaseReference> optionalReference = getFhirParser().evaluateFirst(iBaseResource, myPatientFhirPath, IBaseReference.class);
return optionalReference.map(theIBaseReference -> theIBaseReference.getReferenceElement().getIdPart());
if (optionalReference.isPresent()) {
return optionalReference.map(theIBaseReference -> theIBaseReference.getReferenceElement().getIdPart());
} else {
return Optional.empty();
}
}
}

View File

@ -38,6 +38,8 @@ import ca.uhn.fhir.jpa.util.QueryChunker;
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.HasOrListParam;
import ca.uhn.fhir.rest.param.HasParam;
import ca.uhn.fhir.rest.param.ReferenceOrListParam;
import ca.uhn.fhir.rest.param.ReferenceParam;
import org.hl7.fhir.instance.model.api.IBaseResource;
@ -58,6 +60,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static ca.uhn.fhir.jpa.batch.config.BatchConstants.PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES;
/**
* Bulk Item reader for the Group Bulk Export job.
* Instead of performing a normal query on the resource type using type filters, we instead
@ -87,13 +91,14 @@ public class GroupBulkItemReader extends BaseJpaBulkItemReader implements ItemRe
@Override
protected Iterator<ResourcePersistentId> getResourcePidIterator() {
Set<ResourcePersistentId> myReadPids = new HashSet<>();
//Short circuit out if we detect we are attempting to extract patients
if (myResourceType.equalsIgnoreCase("Patient")) {
return getExpandedPatientIterator();
}
//First lets expand the group so we get a list of all patient IDs of the group, and MDM-matched patient IDs of the group.
Set<String> expandedMemberResourceIds = expandAllPatientPidsFromGroup();
if (ourLog.isDebugEnabled()) {
@ -102,15 +107,16 @@ public class GroupBulkItemReader extends BaseJpaBulkItemReader implements ItemRe
//Next, let's search for the target resources, with their correct patient references, chunked.
//The results will be jammed into myReadPids
Set<ResourcePersistentId> myExpandedMemberPids = new HashSet<>();
QueryChunker<String> queryChunker = new QueryChunker<>();
queryChunker.chunk(new ArrayList<>(expandedMemberResourceIds), QUERY_CHUNK_SIZE, (idChunk) -> {
queryResourceTypeWithReferencesToPatients(myReadPids, idChunk);
queryResourceTypeWithReferencesToPatients(myExpandedMemberPids, idChunk);
});
if (ourLog.isDebugEnabled()) {
ourLog.debug("Resource PIDs to be Bulk Exported: [{}]", myReadPids.stream().map(ResourcePersistentId::toString).collect(Collectors.joining(",")));
ourLog.debug("Resource PIDs to be Bulk Exported: {}", myExpandedMemberPids);
}
return myReadPids.iterator();
return myExpandedMemberPids.iterator();
}
/**
@ -243,6 +249,7 @@ public class GroupBulkItemReader extends BaseJpaBulkItemReader implements ItemRe
}
private void queryResourceTypeWithReferencesToPatients(Set<ResourcePersistentId> myReadPids, List<String> idChunk) {
//Build SP map
//First, inject the _typeFilters and _since from the export job
List<SearchParameterMap> expandedSpMaps = createSearchParameterMapsForResourceType();
@ -251,12 +258,16 @@ public class GroupBulkItemReader extends BaseJpaBulkItemReader implements ItemRe
//Since we are in a bulk job, we have to ensure the user didn't jam in a patient search param, since we need to manually set that.
validateSearchParameters(expandedSpMap);
// Now, further filter the query with patient references defined by the chunk of IDs we have.
filterSearchByResourceIds(idChunk, expandedSpMap);
// Fetch and cache a search builder for this resource type
ISearchBuilder searchBuilder = getSearchBuilderForLocalResourceType();
// Now, further filter the query with patient references defined by the chunk of IDs we have.
if (PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES.contains(myResourceType)) {
filterSearchByHasParam(idChunk, expandedSpMap);
} else {
filterSearchByResourceIds(idChunk, expandedSpMap);
}
//Execute query and all found pids to our local iterator.
IResultIterator resultIterator = searchBuilder.createQuery(expandedSpMap, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
while (resultIterator.hasNext()) {
@ -265,17 +276,41 @@ public class GroupBulkItemReader extends BaseJpaBulkItemReader implements ItemRe
}
}
/**
*
* @param idChunk
* @param expandedSpMap
*/
private void filterSearchByHasParam(List<String> idChunk, SearchParameterMap expandedSpMap) {
HasOrListParam hasOrListParam = new HasOrListParam();
idChunk.stream().forEach(id -> hasOrListParam.addOr(buildHasParam(id)));
expandedSpMap.add("_has", hasOrListParam);
}
private HasParam buildHasParam(String theId) {
if ("Practitioner".equalsIgnoreCase(myResourceType)) {
return new HasParam("Patient", "general-practitioner", "_id", theId);
} else if ("Organization".equalsIgnoreCase(myResourceType)) {
return new HasParam("Patient", "organization", "_id", theId);
} else {
throw new IllegalArgumentException(Msg.code(2077) + " We can't handle forward references onto type " + myResourceType);
}
}
private void filterSearchByResourceIds(List<String> idChunk, SearchParameterMap expandedSpMap) {
ReferenceOrListParam orList = new ReferenceOrListParam();
idChunk.forEach(id -> orList.add(new ReferenceParam(id)));
expandedSpMap.add(getPatientSearchParamForCurrentResourceType().getName(), orList);
}
private RuntimeSearchParam validateSearchParameters(SearchParameterMap expandedSpMap) {
RuntimeSearchParam runtimeSearchParam = getPatientSearchParamForCurrentResourceType();
if (expandedSpMap.get(runtimeSearchParam.getName()) != null) {
throw new IllegalArgumentException(Msg.code(792) + String.format("Group Bulk Export manually modifies the Search Parameter called [%s], so you may not include this search parameter in your _typeFilter!", runtimeSearchParam.getName()));
private void validateSearchParameters(SearchParameterMap expandedSpMap) {
if (PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES.contains(myResourceType)) {
return;
} else {
RuntimeSearchParam runtimeSearchParam = getPatientSearchParamForCurrentResourceType();
if (expandedSpMap.get(runtimeSearchParam.getName()) != null) {
throw new IllegalArgumentException(Msg.code(792) + String.format("Group Bulk Export manually modifies the Search Parameter called [%s], so you may not include this search parameter in your _typeFilter!", runtimeSearchParam.getName()));
}
}
return runtimeSearchParam;
}
}

View File

@ -49,7 +49,9 @@ import org.hl7.fhir.r4.model.Group;
import org.hl7.fhir.r4.model.Immunization;
import org.hl7.fhir.r4.model.InstantType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Organization;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Practitioner;
import org.hl7.fhir.r4.model.Reference;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
@ -63,6 +65,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
@ -423,7 +426,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
// Fetch the job again
status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkExportJobStatusEnum.COMPLETE, status.getStatus());
assertEquals(5, status.getFiles().size());
assertEquals(7, status.getFiles().size());
// Iterate over the files
for (IBulkDataExportSvc.FileEntry next : status.getFiles()) {
@ -443,6 +446,12 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
} else if ("CareTeam".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"id\":\"CT0\""));
assertEquals(16, nextContents.split("\n").length);
} else if ("Practitioner".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"id\":\"PRACT0\""));
assertEquals(11, nextContents.split("\n").length);
} else if ("Organization".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"id\":\"ORG0\""));
assertEquals(11, nextContents.split("\n").length);
} else if ("Group".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"id\":\"G0\""));
assertEquals(1, nextContents.split("\n").length);
@ -702,7 +711,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
bulkDataExportOptions.setSince(null);
bulkDataExportOptions.setFilters(null);
bulkDataExportOptions.setGroupId(myPatientGroupId);
bulkDataExportOptions.setExpandMdm(true);
bulkDataExportOptions.setExpandMdm(false);
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
@ -727,6 +736,58 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(nextContents, is(containsString("IMM8")));
}
@Test
public void testGroupBatchJobFindsForwardReferencesIfNeeded() {
createResources();
// Create a bulk job
BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setOutputFormat(null);
bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Practitioner","Organization", "Observation"));
bulkDataExportOptions.setSince(null);
bulkDataExportOptions.setFilters(null);
bulkDataExportOptions.setGroupId(myPatientGroupId);
bulkDataExportOptions.setExpandMdm(false);
//FIXME GGG Make sure this works with MDM Enabled as well.
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportJobSchedulingHelper.startSubmittedJobs();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkExportJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(3));
// Iterate over the files
String nextContents = getBinaryContents(jobInfo, 0);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Practitioner")));
assertThat(nextContents, is(containsString("PRACT0")));
assertThat(nextContents, is(containsString("PRACT2")));
assertThat(nextContents, is(containsString("PRACT4")));
assertThat(nextContents, is(containsString("PRACT6")));
assertThat(nextContents, is(containsString("PRACT8")));
nextContents = getBinaryContents(jobInfo, 1);
assertThat(jobInfo.getFiles().get(1).getResourceType(), is(equalTo("Organization")));
assertThat(nextContents, is(containsString("ORG0")));
assertThat(nextContents, is(containsString("ORG2")));
assertThat(nextContents, is(containsString("ORG4")));
assertThat(nextContents, is(containsString("ORG6")));
assertThat(nextContents, is(containsString("ORG8")));
//Ensure _backwards_ references still work
nextContents = getBinaryContents(jobInfo, 2);
assertThat(jobInfo.getFiles().get(2).getResourceType(), is(equalTo("Observation")));
assertThat(nextContents, is(containsString("OBS0")));
assertThat(nextContents, is(containsString("OBS2")));
assertThat(nextContents, is(containsString("OBS4")));
assertThat(nextContents, is(containsString("OBS6")));
assertThat(nextContents, is(containsString("OBS8")));
}
@Test
public void testGroupBatchJobMdmExpansionIdentifiesGoldenResources() {
createResources();
@ -867,6 +928,14 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(nextContents, is(containsString("CT4")));
assertThat(nextContents, is(containsString("CT6")));
assertThat(nextContents, is(containsString("CT8")));
//These should be brought in via MDM.
assertThat(nextContents, is(containsString("CT1")));
assertThat(nextContents, is(containsString("CT3")));
assertThat(nextContents, is(containsString("CT5")));
assertThat(nextContents, is(containsString("CT7")));
assertThat(nextContents, is(containsString("CT9")));
}
@ -1209,13 +1278,23 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
}
private void createResources() {
SystemRequestDetails srd = SystemRequestDetails.newSystemRequestAllPartitions();
Group group = new Group();
group.setId("G0");
//Manually create a Practitioner
IIdType goldenPractId = createPractitionerWithIndex(999);
//Manually create an Organization
IIdType goldenOrgId = createOrganizationWithIndex(999);
//Manually create a golden record
Patient goldenPatient = new Patient();
goldenPatient.setId("PAT999");
SystemRequestDetails srd = SystemRequestDetails.newSystemRequestAllPartitions();
goldenPatient.setGeneralPractitioner(Collections.singletonList(new Reference(goldenPractId.toVersionless())));
goldenPatient.setManagingOrganization(new Reference(goldenOrgId.toVersionless()));
DaoMethodOutcome g1Outcome = myPatientDao.update(goldenPatient, srd);
Long goldenPid = runInTransaction(() -> myIdHelperService.getPidOrNull(g1Outcome.getResource()));
@ -1225,7 +1304,9 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createCareTeamWithIndex(999, g1Outcome.getId());
for (int i = 0; i < 10; i++) {
DaoMethodOutcome patientOutcome = createPatientWithIndex(i);
IIdType orgId = createOrganizationWithIndex(i);
IIdType practId = createPractitionerWithIndex(i);
DaoMethodOutcome patientOutcome = createPatientWithIndexAndGPAndManagingOrganization(i, practId, orgId);
IIdType patId = patientOutcome.getId().toUnqualifiedVersionless();
Long sourcePid = runInTransaction(() -> myIdHelperService.getPidOrNull(patientOutcome.getResource()));
@ -1254,7 +1335,9 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
//Create some nongroup patients MDM linked to a different golden resource. They shouldnt be included in the query.
for (int i = 0; i < 5; i++) {
int index = 1000 + i;
DaoMethodOutcome patientOutcome = createPatientWithIndex(index);
IIdType orgId = createOrganizationWithIndex(i);
IIdType practId = createPractitionerWithIndex(i);
DaoMethodOutcome patientOutcome = createPatientWithIndexAndGPAndManagingOrganization(index, practId, orgId);
IIdType patId = patientOutcome.getId().toUnqualifiedVersionless();
Long sourcePid = runInTransaction(() -> myIdHelperService.getPidOrNull(patientOutcome.getResource()));
linkToGoldenResource(goldenPid2, sourcePid);
@ -1271,12 +1354,26 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
}
}
private DaoMethodOutcome createPatientWithIndex(int i) {
private IIdType createPractitionerWithIndex(int theIndex) {
Practitioner pract = new Practitioner();
pract.setId("PRACT" + theIndex);
return myPractitionerDao.update(pract, new SystemRequestDetails().setRequestPartitionId(RequestPartitionId.defaultPartition())).getId();
}
private IIdType createOrganizationWithIndex(int theIndex) {
Organization org = new Organization();
org.setId("ORG" + theIndex);
return myOrganizationDao.update(org, new SystemRequestDetails().setRequestPartitionId(RequestPartitionId.defaultPartition())).getId();
}
private DaoMethodOutcome createPatientWithIndexAndGPAndManagingOrganization(int theIndex, IIdType thePractId, IIdType theOrgId) {
Patient patient = new Patient();
patient.setId("PAT" + i);
patient.setGender(i % 2 == 0 ? Enumerations.AdministrativeGender.MALE : Enumerations.AdministrativeGender.FEMALE);
patient.addName().setFamily("FAM" + i);
patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i);
patient.setId("PAT" + theIndex);
patient.setGender(theIndex % 2 == 0 ? Enumerations.AdministrativeGender.MALE : Enumerations.AdministrativeGender.FEMALE);
patient.addName().setFamily("FAM" + theIndex);
patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + theIndex);
patient.setManagingOrganization(new Reference(theOrgId.toVersionless()));
patient.setGeneralPractitioner(Collections.singletonList(new Reference(thePractId.toVersionless())));
return myPatientDao.update(patient, new SystemRequestDetails().setRequestPartitionId(RequestPartitionId.defaultPartition()));
}

View File

@ -56,6 +56,7 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static ca.uhn.fhir.jpa.batch.config.BatchConstants.PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES;
import static org.slf4j.LoggerFactory.getLogger;
@ -135,6 +136,7 @@ public class BulkDataExportProvider {
private void validateResourceTypesAllContainPatientSearchParams(Set<String> theResourceTypes) {
if (theResourceTypes != null) {
List<String> badResourceTypes = theResourceTypes.stream()
.filter(resourceType -> !PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES.contains(resourceType))
.filter(resourceType -> !myBulkDataExportSvc.getPatientCompartmentResources().contains(resourceType))
.collect(Collectors.toList());