4080 support onc requirements for group bulk export (#4081)

* added failing tests

* added include to bulk export processor, added support for different resource types in bulk export fetch id step

* added changelog

* fixed duplication issue, and added null check

Co-authored-by: Steven Li <steven@smilecdr.com>
This commit is contained in:
StevenXLi 2022-09-27 10:51:43 -04:00 committed by GitHub
parent f06f9dc90a
commit 9e1120a47a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 191 additions and 21 deletions

View File

@ -0,0 +1,6 @@
---
type: add
issue: 4080
jira: SMILE-5197, SMILE-5198
title: "Previously, Group Bulk Export did not support the inclusion of resources referenced in the resources in the patient compartment.
This is now supported."

View File

@ -44,6 +44,7 @@ import ca.uhn.fhir.jpa.util.QueryChunker;
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
import ca.uhn.fhir.mdm.dao.IMdmLinkDao;
import ca.uhn.fhir.mdm.model.MdmPidTuple;
import ca.uhn.fhir.model.api.Include;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
@ -65,7 +66,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Nonnull;
import javax.persistence.EntityManager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -108,6 +111,9 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
@Autowired
private MdmExpansionCacheSvc myMdmExpansionCacheSvc;
@Autowired
private EntityManager myEntityManager;
private IFhirPath myFhirPath;
@Transactional
@ -438,6 +444,13 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
while (resultIterator.hasNext()) {
myReadPids.add(resultIterator.next());
}
// add _include to results to support ONC
Set<Include> includes = Collections.singleton(new Include("*", true));
SystemRequestDetails requestDetails = SystemRequestDetails.newSystemRequestAllPartitions();
Set<ResourcePersistentId> includeIds = searchBuilder.loadIncludes(myContext, myEntityManager, myReadPids, includes, false, expandedSpMap.getLastUpdated(), theParams.getJobId(), requestDetails, null);
// gets rid of the Patient duplicates
myReadPids.addAll(includeIds.stream().filter((id) -> !id.getResourceType().equals("Patient")).collect(Collectors.toSet()));
}
}

View File

@ -92,7 +92,6 @@ import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.StringUtil;
import ca.uhn.fhir.util.UrlUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Streams;
import com.healthmarketscience.sqlbuilder.Condition;
import org.apache.commons.lang3.Validate;
@ -149,6 +148,9 @@ public class SearchBuilder implements ISearchBuilder {
private static final ResourcePersistentId NO_MORE = new ResourcePersistentId(-1L);
private static final String MY_TARGET_RESOURCE_PID = "myTargetResourcePid";
private static final String MY_SOURCE_RESOURCE_PID = "mySourceResourcePid";
private static final String MY_TARGET_RESOURCE_TYPE = "myTargetResourceType";
private static final String MY_SOURCE_RESOURCE_TYPE = "mySourceResourceType";
private static final String MY_TARGET_RESOURCE_VERSION = "myTargetResourceVersion";
public static final String RESOURCE_ID_ALIAS = "resource_id";
public static final String RESOURCE_VERSION_ALIAS = "resource_version";
@ -991,8 +993,9 @@ public class SearchBuilder implements ISearchBuilder {
/**
* Check if we can load the resources from Hibernate Search instead of the database.
* We assume this is faster.
*
* <p>
* Hibernate Search only stores the current version, and only if enabled.
*
* @param thePids the pids to check for versioned references
* @return can we fetch from Hibernate Search?
*/
@ -1001,7 +1004,7 @@ public class SearchBuilder implements ISearchBuilder {
return myDaoConfig.isStoreResourceInHSearchIndex() &&
myDaoConfig.isAdvancedHSearchIndexing() &&
// we don't support history
thePids.stream().noneMatch(p->p.getVersion()!=null) &&
thePids.stream().noneMatch(p -> p.getVersion() != null) &&
// skip the complexity for metadata in dstu2
myContext.getVersion().getVersion().isEqualOrNewerThan(FhirVersionEnum.DSTU3);
}
@ -1024,7 +1027,8 @@ public class SearchBuilder implements ISearchBuilder {
/**
* THIS SHOULD RETURN HASHSET and not just Set because we add to it later
* so it can't be Collections.emptySet() or some such thing
* so it can't be Collections.emptySet() or some such thing.
* The ResourcePersistentId returned will have resource type populated.
*/
@Override
public Set<ResourcePersistentId> loadIncludes(FhirContext theContext, EntityManager theEntityManager, Collection<ResourcePersistentId> theMatches, Collection<Include> theIncludes,
@ -1037,6 +1041,7 @@ public class SearchBuilder implements ISearchBuilder {
}
String searchPidFieldName = theReverseMode ? MY_TARGET_RESOURCE_PID : MY_SOURCE_RESOURCE_PID;
String findPidFieldName = theReverseMode ? MY_SOURCE_RESOURCE_PID : MY_TARGET_RESOURCE_PID;
String findResourceTypeFieldName = theReverseMode ? MY_SOURCE_RESOURCE_TYPE : MY_TARGET_RESOURCE_TYPE;
String findVersionFieldName = null;
if (!theReverseMode && myModelConfig.isRespectVersionsForSearchIncludes()) {
findVersionFieldName = MY_TARGET_RESOURCE_VERSION;
@ -1077,6 +1082,7 @@ public class SearchBuilder implements ISearchBuilder {
if (matchAll) {
StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("SELECT r.").append(findPidFieldName);
sqlBuilder.append(", r.").append(findResourceTypeFieldName);
if (findVersionFieldName != null) {
sqlBuilder.append(", r." + findVersionFieldName);
}
@ -1117,16 +1123,18 @@ public class SearchBuilder implements ISearchBuilder {
continue;
}
Long resourceLink;
Long version = null;
Long resourceLink = (Long) ((Object[]) nextRow)[0];
String resourceType = (String) ((Object[]) nextRow)[1];
if (findVersionFieldName != null) {
resourceLink = (Long) ((Object[]) nextRow)[0];
version = (Long) ((Object[]) nextRow)[1];
} else {
resourceLink = (Long) nextRow;
version = (Long) ((Object[]) nextRow)[2];
}
pidsToInclude.add(new ResourcePersistentId(resourceLink, version));
if (resourceLink != null) {
ResourcePersistentId pid = new ResourcePersistentId(resourceLink, version);
pid.setResourceType(resourceType);
pidsToInclude.add(pid);
}
}
}
} else {
@ -1179,16 +1187,16 @@ public class SearchBuilder implements ISearchBuilder {
" WHERE r.src_path = :src_path AND " +
" r.target_resource_id IS NOT NULL AND " +
" r." + searchPidFieldSqlColumn + " IN (:target_pids) ");
if(targetResourceType != null) {
if (targetResourceType != null) {
resourceIdBasedQuery.append(" AND r.target_resource_type = :target_resource_type ");
} else if(haveTargetTypesDefinedByParam) {
} else if (haveTargetTypesDefinedByParam) {
resourceIdBasedQuery.append(" AND r.target_resource_type in (:target_resource_types) ");
}
// Case 2:
String fieldsToLoadFromSpidxUriTable = "rUri.res_id";
// to match the fields loaded in union
if(fieldsToLoad.split(",").length > 1) {
if (fieldsToLoad.split(",").length > 1) {
for (int i = 0; i < fieldsToLoad.split(",").length - 1; i++) {
fieldsToLoadFromSpidxUriTable += ", NULL";
}
@ -1200,10 +1208,10 @@ public class SearchBuilder implements ISearchBuilder {
" r.target_resource_url = rUri.sp_uri AND " +
" rUri.sp_name = 'url' ");
if(targetResourceType != null) {
if (targetResourceType != null) {
resourceUrlBasedQuery.append(" AND rUri.res_type = :target_resource_type ");
} else if(haveTargetTypesDefinedByParam) {
} else if (haveTargetTypesDefinedByParam) {
resourceUrlBasedQuery.append(" AND rUri.res_type IN (:target_resource_types) ");
}

View File

@ -12,12 +12,17 @@ import ca.uhn.fhir.util.JsonUtil;
import com.google.common.collect.Sets;
import org.hamcrest.Matchers;
import org.hl7.fhir.r4.model.Binary;
import org.hl7.fhir.r4.model.Device;
import org.hl7.fhir.r4.model.Encounter;
import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.Group;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Location;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Organization;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Practitioner;
import org.hl7.fhir.r4.model.Provenance;
import org.hl7.fhir.r4.model.Reference;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@ -268,6 +273,130 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
verifyBulkExportResults(options, List.of("\"P1\"", "\"" + obsId + "\"", "\"" + encId + "\"", "\"P2\"", "\"" + obsId2 + "\"", "\"" + encId2 + "\""), List.of("\"P3\"", "\"" + obsId3 + "\"", "\"" + encId3 + "\""));
}
@Test
public void testGroupBulkExportGroupIncludePractitionerOrganizationLocation_ShouldShowUp() {
// Create some resources
Practitioner practitioner = new Practitioner();
practitioner.setActive(true);
String practId = myClient.create().resource(practitioner).execute().getId().getIdPart();
Organization organization = new Organization();
organization.setActive(true);
String orgId = myClient.create().resource(organization).execute().getId().getIdPart();
organization = new Organization();
organization.setActive(true);
String orgId2 = myClient.create().resource(organization).execute().getId().getIdPart();
Location location = new Location();
location.setStatus(Location.LocationStatus.ACTIVE);
String locId = myClient.create().resource(location).execute().getId().getIdPart();
location = new Location();
location.setStatus(Location.LocationStatus.ACTIVE);
String locId2 = myClient.create().resource(location).execute().getId().getIdPart();
Patient patient = new Patient();
patient.setId("P1");
patient.setActive(true);
myClient.update().resource(patient).execute();
patient = new Patient();
patient.setId("P2");
patient.setActive(true);
myClient.update().resource(patient).execute();
Encounter encounter = new Encounter();
encounter.setStatus(Encounter.EncounterStatus.INPROGRESS);
encounter.setSubject(new Reference("Patient/P1"));
Encounter.EncounterParticipantComponent encounterParticipantComponent = new Encounter.EncounterParticipantComponent();
encounterParticipantComponent.setIndividual(new Reference("Practitioner/" + practId));
encounter.addParticipant(encounterParticipantComponent);
Encounter.EncounterLocationComponent encounterLocationComponent = new Encounter.EncounterLocationComponent();
encounterLocationComponent.setLocation(new Reference("Location/" + locId));
encounter.addLocation(encounterLocationComponent);
encounter.setServiceProvider(new Reference("Organization/" + orgId));
String encId = myClient.create().resource(encounter).execute().getId().getIdPart();
encounter = new Encounter();
encounter.setStatus(Encounter.EncounterStatus.INPROGRESS);
encounter.setSubject(new Reference("Patient/P2"));
encounterLocationComponent.setLocation(new Reference("Location/" + locId2));
String encId2 = myClient.create().resource(encounter).execute().getId().getIdPart();
Group group = new Group();
group.setId("Group/G1");
group.setActive(true);
group.addMember().getEntity().setReference("Patient/P1");
myClient.update().resource(group).execute();
// set the export options
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Patient", "Encounter"));
options.setGroupId(new IdType("Group", "G1"));
options.setFilters(new HashSet<>());
options.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
options.setOutputFormat(Constants.CT_FHIR_NDJSON);
verifyBulkExportResults(options, List.of("\"P1\"", "\"" + practId + "\"", "\"" + orgId + "\"", "\"" + encId + "\"", "\"" + locId + "\""), List.of("\"P2\"", "\"" + orgId2 + "\"", "\"" + encId2 + "\"", "\"" + locId2 + "\""));
}
@Test
public void testGroupBulkExportGroupIncludeDevice_ShouldShowUp() {
// Create some resources
Device device = new Device();
device.setStatus(Device.FHIRDeviceStatus.ACTIVE);
String devId = myClient.create().resource(device).execute().getId().getIdPart();
device = new Device();
device.setStatus(Device.FHIRDeviceStatus.ACTIVE);
String devId2 = myClient.create().resource(device).execute().getId().getIdPart();
device = new Device();
device.setStatus(Device.FHIRDeviceStatus.ACTIVE);
String devId3 = myClient.create().resource(device).execute().getId().getIdPart();
Patient patient = new Patient();
patient.setId("P1");
patient.setActive(true);
myClient.update().resource(patient).execute();
patient = new Patient();
patient.setId("P2");
patient.setActive(true);
myClient.update().resource(patient).execute();
Observation observation = new Observation();
observation.setStatus(Observation.ObservationStatus.AMENDED);
observation.setDevice(new Reference("Device/" + devId));
observation.setSubject(new Reference("Patient/P1"));
String obsId = myClient.create().resource(observation).execute().getId().getIdPart();
Provenance provenance = new Provenance();
provenance.addAgent().setWho(new Reference("Device/" + devId2));
provenance.addTarget(new Reference("Patient/P1"));
String provId = myClient.create().resource(provenance).execute().getId().getIdPart();
provenance = new Provenance();
provenance.addAgent().setWho(new Reference("Device/" + devId3));
provenance.addTarget(new Reference("Patient/P2"));
String provId2 = myClient.create().resource(provenance).execute().getId().getIdPart();
Group group = new Group();
group.setId("Group/G1");
group.setActive(true);
group.addMember().getEntity().setReference("Patient/P1");
myClient.update().resource(group).execute();
// set the export options
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Patient", "Observation", "Provenance"));
options.setGroupId(new IdType("Group", "G1"));
options.setFilters(new HashSet<>());
options.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
options.setOutputFormat(Constants.CT_FHIR_NDJSON);
verifyBulkExportResults(options, List.of("\"P1\"", "\"" + obsId + "\"", "\"" + provId + "\"", "\"" + devId + "\"", "\"" + devId2 + "\""), List.of("\"P2\"", "\"" + provId2 + "\"", "\"" + devId3 + "\""));
}
private void verifyBulkExportResults(BulkDataExportOptions theOptions, List<String> theContainedList, List<String> theExcludedList) {
Batch2JobStartResponse startResponse = myJobRunner.startNewJob(BulkExportUtils.createBulkExportJobParametersFromExportOptions(theOptions));

View File

@ -31,10 +31,13 @@ import java.util.Optional;
/**
* This class is an abstraction for however primary keys are stored in the underlying storage engine. This might be
* a Long, a String, or something else.
*
* @param myId This is the only required field that needs to be populated, other fields can be populated for specific use cases.
*/
public class ResourcePersistentId {
private Object myId;
private Long myVersion;
private String myResourceType;
private IIdType myAssociatedResourceId;
public ResourcePersistentId(Object theId) {
@ -113,6 +116,14 @@ public class ResourcePersistentId {
myVersion = theVersion;
}
public String getResourceType() {
return myResourceType;
}
public void setResourceType(String theResourceType) {
myResourceType = theResourceType;
}
public static List<Long> toLongList(Collection<ResourcePersistentId> thePids) {
List<Long> retVal = new ArrayList<>(thePids.size());
for (ResourcePersistentId next : thePids) {

View File

@ -34,7 +34,6 @@ import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.interceptor.ResponseTerminologyTranslationSvc;
import org.hl7.fhir.instance.model.api.IBaseResource;
@ -102,11 +101,11 @@ public class ExpandResourcesStep implements IJobStepWorker<BulkExportJobParamete
}
private List<IBaseResource> fetchAllResources(BulkExportIdList theIds) {
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(theIds.getResourceType());
List<IBaseResource> resources = new ArrayList<>();
for (Id id : theIds.getIds()) {
String value = id.getId();
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(id.getResourceType());
// This should be a query, but we have PIDs, and we don't have a _pid search param. TODO GGG, figure out how to make this search by pid.
resources.add(dao.readByPid(new ResourcePersistentId(value)));
}

View File

@ -81,7 +81,11 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker<BulkExportJobPa
while (pidIterator.hasNext()) {
ResourcePersistentId pid = pidIterator.next();
if (pid.getResourceType() != null) {
idsToSubmit.add(Id.getIdFromPID(pid, pid.getResourceType()));
} else {
idsToSubmit.add(Id.getIdFromPID(pid, resourceType));
}
// >= so that we know (with confidence)
// that every batch is <= 1000 items