From 9e1120a47a05d650dfe42c6816681f148366fff5 Mon Sep 17 00:00:00 2001 From: StevenXLi Date: Tue, 27 Sep 2022 10:51:43 -0400 Subject: [PATCH] 4080 support onc requirements for group bulk export (#4081) * added failing tests * added include to bulk export processor, added support for different resource types in bulk export fetch id step * added changelog * fixed duplication issue, and added null check Co-authored-by: Steven Li --- ...nc-requirements-for-group-bulk-export.yaml | 6 + .../export/svc/JpaBulkExportProcessor.java | 13 ++ .../jpa/search/builder/SearchBuilder.java | 44 +++--- .../uhn/fhir/jpa/bulk/BulkDataExportTest.java | 129 ++++++++++++++++++ .../server/storage/ResourcePersistentId.java | 11 ++ .../jobs/export/ExpandResourcesStep.java | 3 +- .../jobs/export/FetchResourceIdsStep.java | 6 +- 7 files changed, 191 insertions(+), 21 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_2_0/4080-support-onc-requirements-for-group-bulk-export.yaml diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_2_0/4080-support-onc-requirements-for-group-bulk-export.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_2_0/4080-support-onc-requirements-for-group-bulk-export.yaml new file mode 100644 index 00000000000..1e2ba0d0f31 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_2_0/4080-support-onc-requirements-for-group-bulk-export.yaml @@ -0,0 +1,6 @@ +--- +type: add +issue: 4080 +jira: SMILE-5197, SMILE-5198 +title: "Previously, Group Bulk Export did not support the inclusion of resources referenced in the resources in the patient compartment. +This is now supported." diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/JpaBulkExportProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/JpaBulkExportProcessor.java index a7360256ce2..0797cea05db 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/JpaBulkExportProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/JpaBulkExportProcessor.java @@ -44,6 +44,7 @@ import ca.uhn.fhir.jpa.util.QueryChunker; import ca.uhn.fhir.mdm.api.MdmMatchResultEnum; import ca.uhn.fhir.mdm.dao.IMdmLinkDao; import ca.uhn.fhir.mdm.model.MdmPidTuple; +import ca.uhn.fhir.model.api.Include; import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions; import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; @@ -65,7 +66,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Nonnull; +import javax.persistence.EntityManager; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -108,6 +111,9 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor { @Autowired private MdmExpansionCacheSvc myMdmExpansionCacheSvc; + @Autowired + private EntityManager myEntityManager; + private IFhirPath myFhirPath; @Transactional @@ -438,6 +444,13 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor { while (resultIterator.hasNext()) { myReadPids.add(resultIterator.next()); } + + // add _include to results to support ONC + Set includes = Collections.singleton(new Include("*", true)); + SystemRequestDetails requestDetails = SystemRequestDetails.newSystemRequestAllPartitions(); + Set includeIds = searchBuilder.loadIncludes(myContext, myEntityManager, myReadPids, includes, false, expandedSpMap.getLastUpdated(), theParams.getJobId(), requestDetails, null); + // gets rid of the Patient duplicates + myReadPids.addAll(includeIds.stream().filter((id) -> !id.getResourceType().equals("Patient")).collect(Collectors.toSet())); } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/SearchBuilder.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/SearchBuilder.java index ad6436459bf..841db5923af 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/SearchBuilder.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/SearchBuilder.java @@ -92,7 +92,6 @@ import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; import ca.uhn.fhir.util.StopWatch; import ca.uhn.fhir.util.StringUtil; import ca.uhn.fhir.util.UrlUtil; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Streams; import com.healthmarketscience.sqlbuilder.Condition; import org.apache.commons.lang3.Validate; @@ -149,6 +148,9 @@ public class SearchBuilder implements ISearchBuilder { private static final ResourcePersistentId NO_MORE = new ResourcePersistentId(-1L); private static final String MY_TARGET_RESOURCE_PID = "myTargetResourcePid"; private static final String MY_SOURCE_RESOURCE_PID = "mySourceResourcePid"; + private static final String MY_TARGET_RESOURCE_TYPE = "myTargetResourceType"; + + private static final String MY_SOURCE_RESOURCE_TYPE = "mySourceResourceType"; private static final String MY_TARGET_RESOURCE_VERSION = "myTargetResourceVersion"; public static final String RESOURCE_ID_ALIAS = "resource_id"; public static final String RESOURCE_VERSION_ALIAS = "resource_version"; @@ -290,7 +292,7 @@ public class SearchBuilder implements ISearchBuilder { @SuppressWarnings("ConstantConditions") @Override public Long createCountQuery(SearchParameterMap theParams, String theSearchUuid, - RequestDetails theRequest, @Nonnull RequestPartitionId theRequestPartitionId) { + RequestDetails theRequest, @Nonnull RequestPartitionId theRequestPartitionId) { assert theRequestPartitionId != null; assert TransactionSynchronizationManager.isActualTransactionActive(); @@ -342,7 +344,7 @@ public class SearchBuilder implements ISearchBuilder { } private List createQuery(SearchParameterMap theParams, SortSpec sort, Integer theOffset, Integer theMaximumResults, boolean theCountOnlyFlag, RequestDetails theRequest, - SearchRuntimeDetails theSearchRuntimeDetails) { + SearchRuntimeDetails theSearchRuntimeDetails) { ArrayList queries = new ArrayList<>(); @@ -383,7 +385,7 @@ public class SearchBuilder implements ISearchBuilder { !fulltextExecutor.hasNext() || // Our hibernate search query doesn't respect partitions yet (!myPartitionSettings.isPartitioningEnabled() && - // were there AND terms left? Then we still need the db. + // were there AND terms left? Then we still need the db. theParams.isEmpty() && // not every param is a param. :-( theParams.getNearDistanceParam() == null && @@ -991,8 +993,9 @@ public class SearchBuilder implements ISearchBuilder { /** * Check if we can load the resources from Hibernate Search instead of the database. * We assume this is faster. - * + *

* Hibernate Search only stores the current version, and only if enabled. + * * @param thePids the pids to check for versioned references * @return can we fetch from Hibernate Search? */ @@ -1001,7 +1004,7 @@ public class SearchBuilder implements ISearchBuilder { return myDaoConfig.isStoreResourceInHSearchIndex() && myDaoConfig.isAdvancedHSearchIndexing() && // we don't support history - thePids.stream().noneMatch(p->p.getVersion()!=null) && + thePids.stream().noneMatch(p -> p.getVersion() != null) && // skip the complexity for metadata in dstu2 myContext.getVersion().getVersion().isEqualOrNewerThan(FhirVersionEnum.DSTU3); } @@ -1024,7 +1027,8 @@ public class SearchBuilder implements ISearchBuilder { /** * THIS SHOULD RETURN HASHSET and not just Set because we add to it later - * so it can't be Collections.emptySet() or some such thing + * so it can't be Collections.emptySet() or some such thing. + * The ResourcePersistentId returned will have resource type populated. */ @Override public Set loadIncludes(FhirContext theContext, EntityManager theEntityManager, Collection theMatches, Collection theIncludes, @@ -1037,6 +1041,7 @@ public class SearchBuilder implements ISearchBuilder { } String searchPidFieldName = theReverseMode ? MY_TARGET_RESOURCE_PID : MY_SOURCE_RESOURCE_PID; String findPidFieldName = theReverseMode ? MY_SOURCE_RESOURCE_PID : MY_TARGET_RESOURCE_PID; + String findResourceTypeFieldName = theReverseMode ? MY_SOURCE_RESOURCE_TYPE : MY_TARGET_RESOURCE_TYPE; String findVersionFieldName = null; if (!theReverseMode && myModelConfig.isRespectVersionsForSearchIncludes()) { findVersionFieldName = MY_TARGET_RESOURCE_VERSION; @@ -1077,6 +1082,7 @@ public class SearchBuilder implements ISearchBuilder { if (matchAll) { StringBuilder sqlBuilder = new StringBuilder(); sqlBuilder.append("SELECT r.").append(findPidFieldName); + sqlBuilder.append(", r.").append(findResourceTypeFieldName); if (findVersionFieldName != null) { sqlBuilder.append(", r." + findVersionFieldName); } @@ -1117,16 +1123,18 @@ public class SearchBuilder implements ISearchBuilder { continue; } - Long resourceLink; Long version = null; + Long resourceLink = (Long) ((Object[]) nextRow)[0]; + String resourceType = (String) ((Object[]) nextRow)[1]; if (findVersionFieldName != null) { - resourceLink = (Long) ((Object[]) nextRow)[0]; - version = (Long) ((Object[]) nextRow)[1]; - } else { - resourceLink = (Long) nextRow; + version = (Long) ((Object[]) nextRow)[2]; } - pidsToInclude.add(new ResourcePersistentId(resourceLink, version)); + if (resourceLink != null) { + ResourcePersistentId pid = new ResourcePersistentId(resourceLink, version); + pid.setResourceType(resourceType); + pidsToInclude.add(pid); + } } } } else { @@ -1179,16 +1187,16 @@ public class SearchBuilder implements ISearchBuilder { " WHERE r.src_path = :src_path AND " + " r.target_resource_id IS NOT NULL AND " + " r." + searchPidFieldSqlColumn + " IN (:target_pids) "); - if(targetResourceType != null) { + if (targetResourceType != null) { resourceIdBasedQuery.append(" AND r.target_resource_type = :target_resource_type "); - } else if(haveTargetTypesDefinedByParam) { + } else if (haveTargetTypesDefinedByParam) { resourceIdBasedQuery.append(" AND r.target_resource_type in (:target_resource_types) "); } // Case 2: String fieldsToLoadFromSpidxUriTable = "rUri.res_id"; // to match the fields loaded in union - if(fieldsToLoad.split(",").length > 1) { + if (fieldsToLoad.split(",").length > 1) { for (int i = 0; i < fieldsToLoad.split(",").length - 1; i++) { fieldsToLoadFromSpidxUriTable += ", NULL"; } @@ -1200,10 +1208,10 @@ public class SearchBuilder implements ISearchBuilder { " r.target_resource_url = rUri.sp_uri AND " + " rUri.sp_name = 'url' "); - if(targetResourceType != null) { + if (targetResourceType != null) { resourceUrlBasedQuery.append(" AND rUri.res_type = :target_resource_type "); - } else if(haveTargetTypesDefinedByParam) { + } else if (haveTargetTypesDefinedByParam) { resourceUrlBasedQuery.append(" AND rUri.res_type IN (:target_resource_types) "); } diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportTest.java index 894079226df..3e321236a21 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportTest.java @@ -12,12 +12,17 @@ import ca.uhn.fhir.util.JsonUtil; import com.google.common.collect.Sets; import org.hamcrest.Matchers; import org.hl7.fhir.r4.model.Binary; +import org.hl7.fhir.r4.model.Device; import org.hl7.fhir.r4.model.Encounter; import org.hl7.fhir.r4.model.Enumerations; import org.hl7.fhir.r4.model.Group; import org.hl7.fhir.r4.model.IdType; +import org.hl7.fhir.r4.model.Location; import org.hl7.fhir.r4.model.Observation; +import org.hl7.fhir.r4.model.Organization; import org.hl7.fhir.r4.model.Patient; +import org.hl7.fhir.r4.model.Practitioner; +import org.hl7.fhir.r4.model.Provenance; import org.hl7.fhir.r4.model.Reference; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -268,6 +273,130 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test { verifyBulkExportResults(options, List.of("\"P1\"", "\"" + obsId + "\"", "\"" + encId + "\"", "\"P2\"", "\"" + obsId2 + "\"", "\"" + encId2 + "\""), List.of("\"P3\"", "\"" + obsId3 + "\"", "\"" + encId3 + "\"")); } + @Test + public void testGroupBulkExportGroupIncludePractitionerOrganizationLocation_ShouldShowUp() { + // Create some resources + Practitioner practitioner = new Practitioner(); + practitioner.setActive(true); + String practId = myClient.create().resource(practitioner).execute().getId().getIdPart(); + + Organization organization = new Organization(); + organization.setActive(true); + String orgId = myClient.create().resource(organization).execute().getId().getIdPart(); + + organization = new Organization(); + organization.setActive(true); + String orgId2 = myClient.create().resource(organization).execute().getId().getIdPart(); + + Location location = new Location(); + location.setStatus(Location.LocationStatus.ACTIVE); + String locId = myClient.create().resource(location).execute().getId().getIdPart(); + + location = new Location(); + location.setStatus(Location.LocationStatus.ACTIVE); + String locId2 = myClient.create().resource(location).execute().getId().getIdPart(); + + Patient patient = new Patient(); + patient.setId("P1"); + patient.setActive(true); + myClient.update().resource(patient).execute(); + + patient = new Patient(); + patient.setId("P2"); + patient.setActive(true); + myClient.update().resource(patient).execute(); + + Encounter encounter = new Encounter(); + encounter.setStatus(Encounter.EncounterStatus.INPROGRESS); + encounter.setSubject(new Reference("Patient/P1")); + Encounter.EncounterParticipantComponent encounterParticipantComponent = new Encounter.EncounterParticipantComponent(); + encounterParticipantComponent.setIndividual(new Reference("Practitioner/" + practId)); + encounter.addParticipant(encounterParticipantComponent); + Encounter.EncounterLocationComponent encounterLocationComponent = new Encounter.EncounterLocationComponent(); + encounterLocationComponent.setLocation(new Reference("Location/" + locId)); + encounter.addLocation(encounterLocationComponent); + encounter.setServiceProvider(new Reference("Organization/" + orgId)); + String encId = myClient.create().resource(encounter).execute().getId().getIdPart(); + + encounter = new Encounter(); + encounter.setStatus(Encounter.EncounterStatus.INPROGRESS); + encounter.setSubject(new Reference("Patient/P2")); + encounterLocationComponent.setLocation(new Reference("Location/" + locId2)); + String encId2 = myClient.create().resource(encounter).execute().getId().getIdPart(); + + Group group = new Group(); + group.setId("Group/G1"); + group.setActive(true); + group.addMember().getEntity().setReference("Patient/P1"); + myClient.update().resource(group).execute(); + + // set the export options + BulkDataExportOptions options = new BulkDataExportOptions(); + options.setResourceTypes(Sets.newHashSet("Patient", "Encounter")); + options.setGroupId(new IdType("Group", "G1")); + options.setFilters(new HashSet<>()); + options.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP); + options.setOutputFormat(Constants.CT_FHIR_NDJSON); + verifyBulkExportResults(options, List.of("\"P1\"", "\"" + practId + "\"", "\"" + orgId + "\"", "\"" + encId + "\"", "\"" + locId + "\""), List.of("\"P2\"", "\"" + orgId2 + "\"", "\"" + encId2 + "\"", "\"" + locId2 + "\"")); + } + + @Test + public void testGroupBulkExportGroupIncludeDevice_ShouldShowUp() { + // Create some resources + Device device = new Device(); + device.setStatus(Device.FHIRDeviceStatus.ACTIVE); + String devId = myClient.create().resource(device).execute().getId().getIdPart(); + + device = new Device(); + device.setStatus(Device.FHIRDeviceStatus.ACTIVE); + String devId2 = myClient.create().resource(device).execute().getId().getIdPart(); + + device = new Device(); + device.setStatus(Device.FHIRDeviceStatus.ACTIVE); + String devId3 = myClient.create().resource(device).execute().getId().getIdPart(); + + Patient patient = new Patient(); + patient.setId("P1"); + patient.setActive(true); + myClient.update().resource(patient).execute(); + + patient = new Patient(); + patient.setId("P2"); + patient.setActive(true); + myClient.update().resource(patient).execute(); + + Observation observation = new Observation(); + observation.setStatus(Observation.ObservationStatus.AMENDED); + observation.setDevice(new Reference("Device/" + devId)); + observation.setSubject(new Reference("Patient/P1")); + String obsId = myClient.create().resource(observation).execute().getId().getIdPart(); + + Provenance provenance = new Provenance(); + provenance.addAgent().setWho(new Reference("Device/" + devId2)); + provenance.addTarget(new Reference("Patient/P1")); + String provId = myClient.create().resource(provenance).execute().getId().getIdPart(); + + provenance = new Provenance(); + provenance.addAgent().setWho(new Reference("Device/" + devId3)); + provenance.addTarget(new Reference("Patient/P2")); + String provId2 = myClient.create().resource(provenance).execute().getId().getIdPart(); + + Group group = new Group(); + group.setId("Group/G1"); + group.setActive(true); + group.addMember().getEntity().setReference("Patient/P1"); + myClient.update().resource(group).execute(); + + // set the export options + BulkDataExportOptions options = new BulkDataExportOptions(); + options.setResourceTypes(Sets.newHashSet("Patient", "Observation", "Provenance")); + options.setGroupId(new IdType("Group", "G1")); + options.setFilters(new HashSet<>()); + options.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP); + options.setOutputFormat(Constants.CT_FHIR_NDJSON); + verifyBulkExportResults(options, List.of("\"P1\"", "\"" + obsId + "\"", "\"" + provId + "\"", "\"" + devId + "\"", "\"" + devId2 + "\""), List.of("\"P2\"", "\"" + provId2 + "\"", "\"" + devId3 + "\"")); + } + private void verifyBulkExportResults(BulkDataExportOptions theOptions, List theContainedList, List theExcludedList) { Batch2JobStartResponse startResponse = myJobRunner.startNewJob(BulkExportUtils.createBulkExportJobParametersFromExportOptions(theOptions)); diff --git a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/api/server/storage/ResourcePersistentId.java b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/api/server/storage/ResourcePersistentId.java index 1677467f659..43bb7a10125 100644 --- a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/api/server/storage/ResourcePersistentId.java +++ b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/api/server/storage/ResourcePersistentId.java @@ -31,10 +31,13 @@ import java.util.Optional; /** * This class is an abstraction for however primary keys are stored in the underlying storage engine. This might be * a Long, a String, or something else. + * + * @param myId This is the only required field that needs to be populated, other fields can be populated for specific use cases. */ public class ResourcePersistentId { private Object myId; private Long myVersion; + private String myResourceType; private IIdType myAssociatedResourceId; public ResourcePersistentId(Object theId) { @@ -113,6 +116,14 @@ public class ResourcePersistentId { myVersion = theVersion; } + public String getResourceType() { + return myResourceType; + } + + public void setResourceType(String theResourceType) { + myResourceType = theResourceType; + } + public static List toLongList(Collection thePids) { List retVal = new ArrayList<>(thePids.size()); for (ResourcePersistentId next : thePids) { diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/ExpandResourcesStep.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/ExpandResourcesStep.java index 5d76e2e4da1..75f1a696ce0 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/ExpandResourcesStep.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/ExpandResourcesStep.java @@ -34,7 +34,6 @@ import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor; import ca.uhn.fhir.parser.IParser; -import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; import ca.uhn.fhir.rest.server.interceptor.ResponseTerminologyTranslationSvc; import org.hl7.fhir.instance.model.api.IBaseResource; @@ -102,11 +101,11 @@ public class ExpandResourcesStep implements IJobStepWorker fetchAllResources(BulkExportIdList theIds) { - IFhirResourceDao dao = myDaoRegistry.getResourceDao(theIds.getResourceType()); List resources = new ArrayList<>(); for (Id id : theIds.getIds()) { String value = id.getId(); + IFhirResourceDao dao = myDaoRegistry.getResourceDao(id.getResourceType()); // This should be a query, but we have PIDs, and we don't have a _pid search param. TODO GGG, figure out how to make this search by pid. resources.add(dao.readByPid(new ResourcePersistentId(value))); } diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStep.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStep.java index be4432b42c8..6238ca12e6b 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStep.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStep.java @@ -81,7 +81,11 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker= so that we know (with confidence) // that every batch is <= 1000 items