From bee33f763a5016b9588449b8d386bcc98f898962 Mon Sep 17 00:00:00 2001 From: Tadgh Date: Fri, 26 Mar 2021 15:54:27 -0400 Subject: [PATCH] Refactors, tidying. Improving docs --- .../ca/uhn/fhir/util/SearchParameterUtil.java | 31 +++++--- .../GoldenResourceAnnotatingProcessor.java | 48 +++++++----- .../fhir/jpa/bulk/job/BaseBulkItemReader.java | 15 ++-- .../jpa/bulk/job/GroupBulkItemReader.java | 76 +++++++++++++------ .../jpa/dao/mdm/MdmExpansionCacheSvc.java | 38 ++++++++-- .../jpa/bulk/BulkDataExportSvcImplR4Test.java | 1 - ...rceProviderR4ValueSetNoVerCSNoVerTest.java | 1 - 7 files changed, 144 insertions(+), 66 deletions(-) diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java index 047525705e5..c225af57251 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java @@ -32,6 +32,7 @@ import org.hl7.fhir.instance.model.api.IPrimitiveType; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; +import java.util.Optional; public class SearchParameterUtil { @@ -59,27 +60,29 @@ public class SearchParameterUtil { * 3.1 If that returns >1 result, throw an error * 3.2 If that returns 1 result, return it */ - public static RuntimeSearchParam getPatientSearchParamForResourceType(FhirContext theFhirContext, String theResourceType) { + public static Optional getOnlyPatientSearchParamForResourceType(FhirContext theFhirContext, String theResourceType) { RuntimeSearchParam myPatientSearchParam = null; RuntimeResourceDefinition runtimeResourceDefinition = theFhirContext.getResourceDefinition(theResourceType); myPatientSearchParam = runtimeResourceDefinition.getSearchParam("patient"); if (myPatientSearchParam == null) { myPatientSearchParam = runtimeResourceDefinition.getSearchParam("subject"); if (myPatientSearchParam == null) { - myPatientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition); - if (myPatientSearchParam == null) { - String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", theResourceType); - throw new IllegalArgumentException(errorMessage); - } + myPatientSearchParam = getOnlyPatientCompartmentRuntimeSearchParam(runtimeResourceDefinition); } } - return myPatientSearchParam; + return Optional.ofNullable(myPatientSearchParam); } + /** * Search the resource definition for a compartment named 'patient' and return its related Search Parameter. */ - public static RuntimeSearchParam getRuntimeSearchParamByCompartment(RuntimeResourceDefinition runtimeResourceDefinition) { + public static RuntimeSearchParam getOnlyPatientCompartmentRuntimeSearchParam(FhirContext theFhirContext, String theResourceType) { + RuntimeResourceDefinition resourceDefinition = theFhirContext.getResourceDefinition(theResourceType); + return getOnlyPatientCompartmentRuntimeSearchParam(resourceDefinition); + } + + public static RuntimeSearchParam getOnlyPatientCompartmentRuntimeSearchParam(RuntimeResourceDefinition runtimeResourceDefinition) { RuntimeSearchParam patientSearchParam; List searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient"); if (searchParams == null || searchParams.size() == 0) { @@ -88,11 +91,21 @@ public class SearchParameterUtil { } else if (searchParams.size() == 1) { patientSearchParam = searchParams.get(0); } else { - String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as we are unable to disambiguate which patient search parameter we should be searching by.", runtimeResourceDefinition.getId()); + String errorMessage = String.format("Resource type %s has more than one Search Param which references a patient compartment. We are unable to disambiguate which patient search parameter we should be searching by.", runtimeResourceDefinition.getId()); throw new IllegalArgumentException(errorMessage); } return patientSearchParam; + } + public static List getAllPatientCompartmentRuntimeSearchParams(FhirContext theFhirContext, String theResourceType) { + RuntimeResourceDefinition runtimeResourceDefinition = theFhirContext.getResourceDefinition(theResourceType); + return getAllPatientCompartmentRuntimeSearchParams(runtimeResourceDefinition); + + } + + private static List getAllPatientCompartmentRuntimeSearchParams(RuntimeResourceDefinition theRuntimeResourceDefinition) { + List patient = theRuntimeResourceDefinition.getSearchParamsForCompartmentName("Patient"); + return patient; } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java index 0fb7db8c32c..0dc489a7f36 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java @@ -41,7 +41,8 @@ import java.util.List; import java.util.Optional; /** - * Reusable Item Processor which converts ResourcePersistentIds to their IBaseResources + * Reusable Item Processor which attaches an extension to any outgoing resource. This extension will contain a resource + * reference to the golden resource patient of the given resources' patient. (e.g. Observation.subject, Immunization.patient, etc) */ public class GoldenResourceAnnotatingProcessor implements ItemProcessor, List> { private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); @@ -59,25 +60,40 @@ public class GoldenResourceAnnotatingProcessor implements ItemProcessor oPatientSearchParam= SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, myResourceType); + if (!oPatientSearchParam.isPresent()) { + String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", myResourceType); + throw new IllegalArgumentException(errorMessage); + } else { + myRuntimeSearchParam = oPatientSearchParam.get(); + } + } + @Override public List process(List theIBaseResources) throws Exception { + //If MDM expansion is enabled, add this magic new extension, otherwise, return the resource as is. if (myMdmEnabled) { if (myRuntimeSearchParam == null) { - myRuntimeSearchParam = SearchParameterUtil.getPatientSearchParamForResourceType(myContext, myResourceType); + populateRuntimeSearchParam(); } - String path = runtimeSearchParamFhirPath(); - theIBaseResources.forEach(iBaseResource -> annotateClinicalResourceWithRelatedGoldenResourcePatient(path, iBaseResource)); + if (myPatientFhirPath == null) { + populatePatientFhirPath(); + } + theIBaseResources.forEach(this::annotateClinicalResourceWithRelatedGoldenResourcePatient); } - return theIBaseResources; } - private void annotateClinicalResourceWithRelatedGoldenResourcePatient(String path, IBaseResource iBaseResource) { - Optional patientReference = getPatientReference(path, iBaseResource); + private void annotateClinicalResourceWithRelatedGoldenResourcePatient(IBaseResource iBaseResource) { + Optional patientReference = getPatientReference(iBaseResource); if (patientReference.isPresent()) { addGoldenResourceExtension(iBaseResource, patientReference.get()); } else { @@ -85,26 +101,24 @@ public class GoldenResourceAnnotatingProcessor implements ItemProcessor getPatientReference(String path, IBaseResource iBaseResource) { + private Optional getPatientReference(IBaseResource iBaseResource) { //In the case of patient, we will just use the raw ID. if (myResourceType.equalsIgnoreCase("Patient")) { return Optional.of(iBaseResource.getIdElement().getIdPart()); //Otherwise, we will perform evaluation of the fhirPath. } else { - Optional optionalReference = getFhirParser().evaluateFirst(iBaseResource, path, IBaseReference.class); + Optional optionalReference = getFhirParser().evaluateFirst(iBaseResource, myPatientFhirPath, IBaseReference.class); return optionalReference.map(theIBaseReference -> theIBaseReference.getReferenceElement().getIdPart()); } } private void addGoldenResourceExtension(IBaseResource iBaseResource, String sourceResourceId) { String goldenResourceId = myMdmExpansionCacheSvc.getGoldenResourceId(sourceResourceId); + IBaseExtension extension = ExtensionUtil.getOrCreateExtension(iBaseResource, ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL); if (!StringUtils.isBlank(goldenResourceId)) { - IBaseExtension extension = ExtensionUtil.getOrCreateExtension(iBaseResource, ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL); ExtensionUtil.setExtension(myContext, extension, "reference", prefixPatient(goldenResourceId)); } else { - IBaseExtension extension = ExtensionUtil.getOrCreateExtension(iBaseResource, "failed-to-associated-golden-id"); - ExtensionUtil.setExtension(myContext, extension, "string", myMdmExpansionCacheSvc.buildLog("not working!", true)); - + ExtensionUtil.setExtension(myContext, extension, "string", "This patient has no matched golden resource."); } } @@ -119,12 +133,12 @@ public class GoldenResourceAnnotatingProcessor implements ItemProcessor1 result, throw an error - * 3.2 If that returns 1 result, return it - */ protected RuntimeSearchParam getPatientSearchParamForCurrentResourceType() { if (myPatientSearchParam == null) { - myPatientSearchParam = SearchParameterUtil.getPatientSearchParamForResourceType(myContext, myResourceType); + Optional onlyPatientSearchParamForResourceType = SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, myResourceType); + if (onlyPatientSearchParamForResourceType.isPresent()) { + myPatientSearchParam = onlyPatientSearchParamForResourceType.get(); + } else { + + } } return myPatientSearchParam; } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java index a00e54a18a6..f90978a9d26 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java @@ -112,17 +112,17 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade * possibly expanded by MDM, and don't have to go and fetch other resource DAOs. */ private Iterator getExpandedPatientIterator() { - Set patientPidsToExport = new HashSet<>(); List members = getMembers(); List ids = members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList()); List pidsOrThrowException = myIdHelperService.getPidsOrThrowException(ids); - patientPidsToExport.addAll(pidsOrThrowException); + Set patientPidsToExport = new HashSet<>(pidsOrThrowException); if (myMdmEnabled) { IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId)); Long pidOrNull = myIdHelperService.getPidOrNull(group); - List> lists = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); - lists.forEach(patientPidsToExport::addAll); + List> goldenPidSourcePidTuple = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); + goldenPidSourcePidTuple.forEach(patientPidsToExport::addAll); + populateMdmResourceCache(goldenPidSourcePidTuple); } List resourcePersistentIds = patientPidsToExport .stream() @@ -131,6 +131,53 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade return resourcePersistentIds.iterator(); } + /** + * @param thePidTuples + */ + private void populateMdmResourceCache(List> thePidTuples) { + if (myMdmExpansionCacheSvc.hasBeenPopulated()) { + return; + } + //First, convert this zipped set of tuples to a map of + //{ + // patient/gold-1 -> [patient/1, patient/2] + // patient/gold-2 -> [patient/3, patient/4] + //} + Map> goldenResourceToSourcePidMap = new HashMap<>(); + for (List goldenPidTargetPidTuple : thePidTuples) { + Long goldenPid = goldenPidTargetPidTuple.get(0); + Long sourcePid = goldenPidTargetPidTuple.get(1); + + if(!goldenResourceToSourcePidMap.containsKey(goldenPid)) { + goldenResourceToSourcePidMap.put(goldenPid, new HashSet<>()); + } + goldenResourceToSourcePidMap.get(goldenPid).add(sourcePid); + } + + //Next, lets convert it to an inverted index for fast lookup + // { + // patient/1 -> patient/gold-1 + // patient/2 -> patient/gold-1 + // patient/3 -> patient/gold-2 + // patient/4 -> patient/gold-2 + // } + Map sourceResourceIdToGoldenResourceIdMap = new HashMap<>(); + goldenResourceToSourcePidMap.forEach((key, value) -> { + String goldenResourceId = myIdHelperService.translatePidIdToForcedId(new ResourcePersistentId(key)).orElse(key.toString()); + Map> pidsToForcedIds = myIdHelperService.translatePidsToForcedIds(value); + + Set sourceResourceIds = pidsToForcedIds.entrySet().stream() + .map(ent -> ent.getValue().isPresent() ? ent.getValue().get() : ent.getKey().toString()) + .collect(Collectors.toSet()); + + sourceResourceIds + .forEach(sourceResourceId -> sourceResourceIdToGoldenResourceIdMap.put(sourceResourceId, goldenResourceId)); + }); + + //Now that we have built our cached expansion, store it. + myMdmExpansionCacheSvc.setCacheContents(sourceResourceIdToGoldenResourceIdMap); + } + /** * Given the local myGroupId, read this group, and find all members' patient references. * @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"] @@ -173,8 +220,7 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade } goldenResourceToSourcePidMap.get(goldenPid).add(sourcePid); } - populateMdmResourceCacheIfNeeded(goldenResourceToSourcePidMap); - + populateMdmResourceCache(goldenPidTargetPidTuples); //If the result of the translation is an empty optional, it means there is no forced id, and we can use the PID as the resource ID. Set resolvedResourceIds = pidToForcedIdMap.entrySet().stream() @@ -192,25 +238,7 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade } private void populateMdmResourceCacheIfNeeded(Map> goldenResourceToSourcePidMap) { - if (myMdmExpansionCacheSvc.hasBeenPopulated()) { - return; - } - Map sourceResourceIdToGoldenResourceIdMap = new HashMap<>(); - goldenResourceToSourcePidMap.entrySet() - .forEach(entry -> { - Long key = entry.getKey(); - String goldenResourceId = myIdHelperService.translatePidIdToForcedId(new ResourcePersistentId(key)).orElse(key.toString()); - - Map> pidsToForcedIds = myIdHelperService.translatePidsToForcedIds(entry.getValue()); - Set sourceResourceIds = pidsToForcedIds.entrySet().stream() - .map(ent -> ent.getValue().isPresent() ? ent.getValue().get() : ent.getKey().toString()) - .collect(Collectors.toSet()); - - sourceResourceIds - .forEach(sourceResourceId -> sourceResourceIdToGoldenResourceIdMap.put(sourceResourceId, goldenResourceId)); - }); - myMdmExpansionCacheSvc.setCacheContents(sourceResourceIdToGoldenResourceIdMap); } private void queryResourceTypeWithReferencesToPatients(Set myReadPids, List idChunk) { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java index 6e80ba7e58b..19bc25dd810 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java @@ -1,5 +1,6 @@ package ca.uhn.fhir.jpa.dao.mdm; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import java.util.Map; @@ -7,20 +8,47 @@ import java.util.concurrent.ConcurrentHashMap; import static org.slf4j.LoggerFactory.getLogger; +/** + * The purpose of this class is to share context between steps of a given GroupBulkExport job. + * + * This cache allows you to port state between reader/processor/writer. In this case, we are maintaining + * a cache of Source Resource ID -> Golden Resource ID, so that we can annotate outgoing resources with their golden owner + * if applicable. + * + */ public class MdmExpansionCacheSvc { private static final Logger ourLog = getLogger(MdmExpansionCacheSvc.class); - private ConcurrentHashMap mySourceToGoldenIdCache = new ConcurrentHashMap<>(); + private final ConcurrentHashMap mySourceToGoldenIdCache = new ConcurrentHashMap<>(); public String getGoldenResourceId(String theSourceId) { - ourLog.info(buildLog("About to lookup cached resource ID " + theSourceId, true)); - return mySourceToGoldenIdCache.get(theSourceId); + ourLog.debug(buildLogMessage("About to lookup cached resource ID " + theSourceId)); + String goldenResourceId = mySourceToGoldenIdCache.get(theSourceId); + + //A golden resources' golden resource ID is itself. + if (StringUtils.isBlank(goldenResourceId)) { + if (mySourceToGoldenIdCache.containsValue(theSourceId)) { + goldenResourceId = theSourceId; + } + } + return goldenResourceId; } - public String buildLog(String message, boolean theShowContent) { + public String buildLogMessage(String theMessage) { + return buildLogMessage(theMessage, false); + } + + /** + * Builds a log message, potentially enriched with the cache content. + * + * @param message The log message + * @param theAddCacheContentContent If true, will annotate the log message with the current cache contents. + * @return a built log message, which may include the cache content. + */ + public String buildLogMessage(String message, boolean theAddCacheContentContent) { StringBuilder builder = new StringBuilder(); builder.append(message); - if (ourLog.isDebugEnabled() || theShowContent) { + if (ourLog.isDebugEnabled() || theAddCacheContentContent) { builder.append("\n") .append("Current cache content is:") .append("\n"); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java index e44f0703c6f..a6d1ec08287 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java @@ -1052,7 +1052,6 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { //Non-cached should all have unique IDs List jobIds = Stream.of(jobInfo5, jobInfo6, jobInfo7, jobInfo8, jobInfo9).map(IBulkDataExportSvc.JobInfo::getJobId).collect(Collectors.toList()); - ourLog.info("ZOOP {}", String.join(", ", jobIds)); Set uniqueJobIds = new HashSet<>(jobIds); assertEquals(uniqueJobIds.size(), jobIds.size()); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4ValueSetNoVerCSNoVerTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4ValueSetNoVerCSNoVerTest.java index 9a1794f65be..500eba52486 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4ValueSetNoVerCSNoVerTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4ValueSetNoVerCSNoVerTest.java @@ -541,7 +541,6 @@ public class ResourceProviderR4ValueSetNoVerCSNoVerTest extends BaseResourceProv ValueSet expanded = (ValueSet) respParam.getParameter().get(0).getResource(); String resp = myFhirCtx.newXmlParser().setPrettyPrint(true).encodeResourceToString(expanded); - ourLog.info("zoop"); ourLog.info(resp); assertThat(resp, is(containsStringIgnoringCase("")));