diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/HapiExtensions.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/HapiExtensions.java index 3d5d052ac78..f2cf7903407 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/HapiExtensions.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/HapiExtensions.java @@ -112,6 +112,11 @@ public class HapiExtensions { public static final String EXT_RESOURCE_PLACEHOLDER = "http://hapifhir.io/fhir/StructureDefinition/resource-placeholder"; /** + * URL for extension in a Group Bulk Export which identifies the golden patient of a given exported resource. + */ + public static final String ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL = "https://hapifhir.org/associated-patient-golden-resource/"; + + /** * Non instantiable */ private HapiExtensions() { diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java index 75d1c7c4b2a..c225af57251 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java @@ -23,6 +23,7 @@ package ca.uhn.fhir.util; import ca.uhn.fhir.context.BaseRuntimeChildDefinition; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.RuntimeResourceDefinition; +import ca.uhn.fhir.context.RuntimeSearchParam; import org.apache.commons.lang3.Validate; import org.hl7.fhir.instance.model.api.IBase; import org.hl7.fhir.instance.model.api.IBaseResource; @@ -31,6 +32,7 @@ import org.hl7.fhir.instance.model.api.IPrimitiveType; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; +import java.util.Optional; public class SearchParameterUtil { @@ -50,6 +52,62 @@ public class SearchParameterUtil { return retVal; } + /** + * Given the resource type, fetch its patient-based search parameter name + * 1. Attempt to find one called 'patient' + * 2. If that fails, find one called 'subject' + * 3. If that fails, find find by Patient Compartment. + * 3.1 If that returns >1 result, throw an error + * 3.2 If that returns 1 result, return it + */ + public static Optional getOnlyPatientSearchParamForResourceType(FhirContext theFhirContext, String theResourceType) { + RuntimeSearchParam myPatientSearchParam = null; + RuntimeResourceDefinition runtimeResourceDefinition = theFhirContext.getResourceDefinition(theResourceType); + myPatientSearchParam = runtimeResourceDefinition.getSearchParam("patient"); + if (myPatientSearchParam == null) { + myPatientSearchParam = runtimeResourceDefinition.getSearchParam("subject"); + if (myPatientSearchParam == null) { + myPatientSearchParam = getOnlyPatientCompartmentRuntimeSearchParam(runtimeResourceDefinition); + } + } + return Optional.ofNullable(myPatientSearchParam); + } + + + /** + * Search the resource definition for a compartment named 'patient' and return its related Search Parameter. + */ + public static RuntimeSearchParam getOnlyPatientCompartmentRuntimeSearchParam(FhirContext theFhirContext, String theResourceType) { + RuntimeResourceDefinition resourceDefinition = theFhirContext.getResourceDefinition(theResourceType); + return getOnlyPatientCompartmentRuntimeSearchParam(resourceDefinition); + } + + public static RuntimeSearchParam getOnlyPatientCompartmentRuntimeSearchParam(RuntimeResourceDefinition runtimeResourceDefinition) { + RuntimeSearchParam patientSearchParam; + List searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient"); + if (searchParams == null || searchParams.size() == 0) { + String errorMessage = String.format("Resource type [%s] is not eligible for this type of export, as it contains no Patient compartment, and no `patient` or `subject` search parameter", runtimeResourceDefinition.getId()); + throw new IllegalArgumentException(errorMessage); + } else if (searchParams.size() == 1) { + patientSearchParam = searchParams.get(0); + } else { + String errorMessage = String.format("Resource type %s has more than one Search Param which references a patient compartment. We are unable to disambiguate which patient search parameter we should be searching by.", runtimeResourceDefinition.getId()); + throw new IllegalArgumentException(errorMessage); + } + return patientSearchParam; + } + + public static List getAllPatientCompartmentRuntimeSearchParams(FhirContext theFhirContext, String theResourceType) { + RuntimeResourceDefinition runtimeResourceDefinition = theFhirContext.getResourceDefinition(theResourceType); + return getAllPatientCompartmentRuntimeSearchParams(runtimeResourceDefinition); + + } + + private static List getAllPatientCompartmentRuntimeSearchParams(RuntimeResourceDefinition theRuntimeResourceDefinition) { + List patient = theRuntimeResourceDefinition.getSearchParamsForCompartmentName("Patient"); + return patient; + } + @Nullable public static String getCode(FhirContext theContext, IBaseResource theResource) { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/CommonBatchJobConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/CommonBatchJobConfig.java index 20da121d847..340c7be3393 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/CommonBatchJobConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/CommonBatchJobConfig.java @@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.batch; * #L% */ +import ca.uhn.fhir.jpa.batch.processors.GoldenResourceAnnotatingProcessor; import ca.uhn.fhir.jpa.batch.processors.PidToIBaseResourceProcessor; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.context.annotation.Bean; @@ -34,4 +35,10 @@ public class CommonBatchJobConfig { return new PidToIBaseResourceProcessor(); } + @Bean + @StepScope + public GoldenResourceAnnotatingProcessor goldenResourceAnnotatingProcessor() { + return new GoldenResourceAnnotatingProcessor(); + } + } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java new file mode 100644 index 00000000000..4566aa83af3 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java @@ -0,0 +1,147 @@ +package ca.uhn.fhir.jpa.batch.processors; + +/*- + * #%L + * HAPI FHIR JPA Server + * %% + * Copyright (C) 2014 - 2021 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.context.RuntimeSearchParam; +import ca.uhn.fhir.fhirpath.IFhirPath; +import ca.uhn.fhir.jpa.batch.log.Logs; +import ca.uhn.fhir.jpa.bulk.job.BulkExportJobConfig; +import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc; +import ca.uhn.fhir.util.ExtensionUtil; +import ca.uhn.fhir.util.HapiExtensions; +import ca.uhn.fhir.util.SearchParameterUtil; +import org.apache.commons.lang3.StringUtils; +import org.hl7.fhir.instance.model.api.IBaseExtension; +import org.hl7.fhir.instance.model.api.IBaseReference; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.slf4j.Logger; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; + +import java.util.List; +import java.util.Optional; + +/** + * Reusable Item Processor which attaches an extension to any outgoing resource. This extension will contain a resource + * reference to the golden resource patient of the given resources' patient. (e.g. Observation.subject, Immunization.patient, etc) + */ +public class GoldenResourceAnnotatingProcessor implements ItemProcessor, List> { + private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); + + @Value("#{stepExecutionContext['resourceType']}") + private String myResourceType; + + @Autowired + private FhirContext myContext; + + @Autowired + private MdmExpansionCacheSvc myMdmExpansionCacheSvc; + + @Value("#{jobParameters['" + BulkExportJobConfig.EXPAND_MDM_PARAMETER+ "'] ?: false}") + private boolean myMdmEnabled; + + + private RuntimeSearchParam myRuntimeSearchParam; + + private String myPatientFhirPath; + + private IFhirPath myFhirPath; + + private void populateRuntimeSearchParam() { + Optional oPatientSearchParam= SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, myResourceType); + if (!oPatientSearchParam.isPresent()) { + String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", myResourceType); + throw new IllegalArgumentException(errorMessage); + } else { + myRuntimeSearchParam = oPatientSearchParam.get(); + } + } + + @Override + public List process(List theIBaseResources) throws Exception { + //If MDM expansion is enabled, add this magic new extension, otherwise, return the resource as is. + if (myMdmEnabled) { + if (myRuntimeSearchParam == null) { + populateRuntimeSearchParam(); + } + if (myPatientFhirPath == null) { + populatePatientFhirPath(); + } + theIBaseResources.forEach(this::annotateClinicalResourceWithRelatedGoldenResourcePatient); + } + return theIBaseResources; + } + + private void annotateClinicalResourceWithRelatedGoldenResourcePatient(IBaseResource iBaseResource) { + Optional patientReference = getPatientReference(iBaseResource); + if (patientReference.isPresent()) { + addGoldenResourceExtension(iBaseResource, patientReference.get()); + } else { + ourLog.error("Failed to find the patient reference information for resource {}. This is a bug, " + + "as all resources which can be exported via Group Bulk Export must reference a patient.", iBaseResource); + } + } + + private Optional getPatientReference(IBaseResource iBaseResource) { + //In the case of patient, we will just use the raw ID. + if (myResourceType.equalsIgnoreCase("Patient")) { + return Optional.of(iBaseResource.getIdElement().getIdPart()); + //Otherwise, we will perform evaluation of the fhirPath. + } else { + Optional optionalReference = getFhirParser().evaluateFirst(iBaseResource, myPatientFhirPath, IBaseReference.class); + return optionalReference.map(theIBaseReference -> theIBaseReference.getReferenceElement().getIdPart()); + } + } + + private void addGoldenResourceExtension(IBaseResource iBaseResource, String sourceResourceId) { + String goldenResourceId = myMdmExpansionCacheSvc.getGoldenResourceId(sourceResourceId); + IBaseExtension extension = ExtensionUtil.getOrCreateExtension(iBaseResource, HapiExtensions.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL); + if (!StringUtils.isBlank(goldenResourceId)) { + ExtensionUtil.setExtension(myContext, extension, "reference", prefixPatient(goldenResourceId)); + } + } + + private String prefixPatient(String theResourceId) { + return "Patient/" + theResourceId; + } + + private IFhirPath getFhirParser() { + if (myFhirPath == null) { + myFhirPath = myContext.newFhirPath(); + } + return myFhirPath; + } + + private String populatePatientFhirPath() { + if (myPatientFhirPath == null) { + myPatientFhirPath = myRuntimeSearchParam.getPath(); + // GGG: Yes this is a stupid hack, but by default this runtime search param will return stuff like + // Observation.subject.where(resolve() is Patient) which unfortunately our FHIRpath evaluator doesn't play nicely with + // our FHIRPath evaluator. + if (myPatientFhirPath.contains(".where")) { + myPatientFhirPath = myPatientFhirPath.substring(0, myPatientFhirPath.indexOf(".where")); + } + } + return myPatientFhirPath; + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/PidToIBaseResourceProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/PidToIBaseResourceProcessor.java index b392eb2b264..0554b7da31d 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/PidToIBaseResourceProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/PidToIBaseResourceProcessor.java @@ -50,6 +50,7 @@ public class PidToIBaseResourceProcessor implements ItemProcessor1 result, throw an error - * 3.2 If that returns 1 result, return it - */ protected RuntimeSearchParam getPatientSearchParamForCurrentResourceType() { if (myPatientSearchParam == null) { - RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(myResourceType); - myPatientSearchParam = runtimeResourceDefinition.getSearchParam("patient"); - if (myPatientSearchParam == null) { - myPatientSearchParam = runtimeResourceDefinition.getSearchParam("subject"); - if (myPatientSearchParam == null) { - myPatientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition); - if (myPatientSearchParam == null) { - String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", myResourceType); - throw new IllegalArgumentException(errorMessage); - } - } + Optional onlyPatientSearchParamForResourceType = SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, myResourceType); + if (onlyPatientSearchParamForResourceType.isPresent()) { + myPatientSearchParam = onlyPatientSearchParamForResourceType.get(); + } else { + } } return myPatientSearchParam; } - - /** - * Search the resource definition for a compartment named 'patient' and return its related Search Parameter. - */ - protected RuntimeSearchParam getRuntimeSearchParamByCompartment(RuntimeResourceDefinition runtimeResourceDefinition) { - RuntimeSearchParam patientSearchParam; - List searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient"); - if (searchParams == null || searchParams.size() == 0) { - String errorMessage = String.format("Resource type [%s] is not eligible for this type of export, as it contains no Patient compartment, and no `patient` or `subject` search parameter", myResourceType); - throw new IllegalArgumentException(errorMessage); - } else if (searchParams.size() == 1) { - patientSearchParam = searchParams.get(0); - } else { - String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as we are unable to disambiguate which patient search parameter we should be searching by.", myResourceType); - throw new IllegalArgumentException(errorMessage); - } - return patientSearchParam; - } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java index a2b8d804e3b..22a11abcbdc 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java @@ -21,8 +21,10 @@ package ca.uhn.fhir.jpa.bulk.job; */ import ca.uhn.fhir.jpa.batch.BatchJobsConfig; +import ca.uhn.fhir.jpa.batch.processors.GoldenResourceAnnotatingProcessor; import ca.uhn.fhir.jpa.batch.processors.PidToIBaseResourceProcessor; import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc; +import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc; import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; import org.hl7.fhir.instance.model.api.IBaseResource; import org.springframework.batch.core.Job; @@ -32,13 +34,16 @@ import org.springframework.batch.core.configuration.annotation.JobBuilderFactory import org.springframework.batch.core.configuration.annotation.JobScope; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.support.CompositeItemProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Lazy; +import java.util.ArrayList; import java.util.List; /** @@ -64,11 +69,23 @@ public class BulkExportJobConfig { @Autowired private PidToIBaseResourceProcessor myPidToIBaseResourceProcessor; + @Autowired + private GoldenResourceAnnotatingProcessor myGoldenResourceAnnotatingProcessor; + @Bean public BulkExportDaoSvc bulkExportDaoSvc() { return new BulkExportDaoSvc(); } + + @Bean + @Lazy + @JobScope + public MdmExpansionCacheSvc mdmExpansionCacheSvc() { + return new MdmExpansionCacheSvc(); + } + + @Bean @Lazy public Job bulkExportJob() { @@ -80,6 +97,18 @@ public class BulkExportJobConfig { .build(); } + @Bean + @Lazy + @StepScope + public CompositeItemProcessor, List> inflateResourceThenAnnotateWithGoldenResourceProcessor() { + CompositeItemProcessor processor = new CompositeItemProcessor<>(); + ArrayList delegates = new ArrayList<>(); + delegates.add(myPidToIBaseResourceProcessor); + delegates.add(myGoldenResourceAnnotatingProcessor); + processor.setDelegates(delegates); + return processor; + } + @Bean @Lazy public Job groupBulkExportJob() { @@ -132,7 +161,7 @@ public class BulkExportJobConfig { return myStepBuilderFactory.get("groupBulkExportGenerateResourceFilesStep") ., List> chunk(CHUNK_SIZE) //1000 resources per generated file, as the reader returns 10 resources at a time. .reader(groupBulkItemReader()) - .processor(myPidToIBaseResourceProcessor) + .processor(inflateResourceThenAnnotateWithGoldenResourceProcessor()) .writer(resourceToFileWriter()) .listener(bulkExportGenerateResourceFilesStepListener()) .build(); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java index dd03381fa83..30a7567776e 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java @@ -27,7 +27,7 @@ import ca.uhn.fhir.jpa.dao.IResultIterator; import ca.uhn.fhir.jpa.dao.ISearchBuilder; import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao; import ca.uhn.fhir.jpa.dao.index.IdHelperService; -import ca.uhn.fhir.jpa.entity.Search; +import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc; import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.util.QueryChunker; @@ -36,6 +36,7 @@ import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; import ca.uhn.fhir.rest.param.ReferenceOrListParam; import ca.uhn.fhir.rest.param.ReferenceParam; +import com.google.common.collect.Multimaps; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IPrimitiveType; @@ -45,6 +46,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -75,6 +77,8 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade private IdHelperService myIdHelperService; @Autowired private IMdmLinkDao myMdmLinkDao; + @Autowired + private MdmExpansionCacheSvc myMdmExpansionCacheSvc; @Override Iterator getResourcePidIterator() { @@ -109,17 +113,20 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade * possibly expanded by MDM, and don't have to go and fetch other resource DAOs. */ private Iterator getExpandedPatientIterator() { - Set patientPidsToExport = new HashSet<>(); List members = getMembers(); List ids = members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList()); List pidsOrThrowException = myIdHelperService.getPidsOrThrowException(ids); - patientPidsToExport.addAll(pidsOrThrowException); + Set patientPidsToExport = new HashSet<>(pidsOrThrowException); if (myMdmEnabled) { IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId)); Long pidOrNull = myIdHelperService.getPidOrNull(group); - List> lists = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); - lists.forEach(patientPidsToExport::addAll); + List goldenPidSourcePidTuple = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); + goldenPidSourcePidTuple.forEach(tuple -> { + patientPidsToExport.add(tuple.getGoldenPid()); + patientPidsToExport.add(tuple.getSourcePid()); + }); + populateMdmResourceCache(goldenPidSourcePidTuple); } List resourcePersistentIds = patientPidsToExport .stream() @@ -128,6 +135,45 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade return resourcePersistentIds.iterator(); } + /** + * @param thePidTuples + */ + private void populateMdmResourceCache(List thePidTuples) { + if (myMdmExpansionCacheSvc.hasBeenPopulated()) { + return; + } + //First, convert this zipped set of tuples to a map of + //{ + // patient/gold-1 -> [patient/1, patient/2] + // patient/gold-2 -> [patient/3, patient/4] + //} + Map> goldenResourceToSourcePidMap = new HashMap<>(); + extract(thePidTuples, goldenResourceToSourcePidMap); + + //Next, lets convert it to an inverted index for fast lookup + // { + // patient/1 -> patient/gold-1 + // patient/2 -> patient/gold-1 + // patient/3 -> patient/gold-2 + // patient/4 -> patient/gold-2 + // } + Map sourceResourceIdToGoldenResourceIdMap = new HashMap<>(); + goldenResourceToSourcePidMap.forEach((key, value) -> { + String goldenResourceId = myIdHelperService.translatePidIdToForcedId(new ResourcePersistentId(key)).orElse(key.toString()); + Map> pidsToForcedIds = myIdHelperService.translatePidsToForcedIds(value); + + Set sourceResourceIds = pidsToForcedIds.entrySet().stream() + .map(ent -> ent.getValue().isPresent() ? ent.getValue().get() : ent.getKey().toString()) + .collect(Collectors.toSet()); + + sourceResourceIds + .forEach(sourceResourceId -> sourceResourceIdToGoldenResourceIdMap.put(sourceResourceId, goldenResourceId)); + }); + + //Now that we have built our cached expansion, store it. + myMdmExpansionCacheSvc.setCacheContents(sourceResourceIdToGoldenResourceIdMap); + } + /** * Given the local myGroupId, read this group, and find all members' patient references. * @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"] @@ -154,13 +200,19 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade //Attempt to perform MDM Expansion of membership if (myMdmEnabled) { - List> goldenPidTargetPidTuple = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); + List goldenPidTargetPidTuples = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); //Now lets translate these pids into resource IDs Set uniquePids = new HashSet<>(); - goldenPidTargetPidTuple.forEach(uniquePids::addAll); - + goldenPidTargetPidTuples.forEach(tuple -> { + uniquePids.add(tuple.getGoldenPid()); + uniquePids.add(tuple.getSourcePid()); + }); Map> pidToForcedIdMap = myIdHelperService.translatePidsToForcedIds(uniquePids); + Map> goldenResourceToSourcePidMap = new HashMap<>(); + extract(goldenPidTargetPidTuples, goldenResourceToSourcePidMap); + populateMdmResourceCache(goldenPidTargetPidTuples); + //If the result of the translation is an empty optional, it means there is no forced id, and we can use the PID as the resource ID. Set resolvedResourceIds = pidToForcedIdMap.entrySet().stream() .map(entry -> entry.getValue().isPresent() ? entry.getValue().get() : entry.getKey().toString()) @@ -176,6 +228,14 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade return expandedIds; } + private void extract(List theGoldenPidTargetPidTuples, Map> theGoldenResourceToSourcePidMap) { + for (IMdmLinkDao.MdmPidTuple goldenPidTargetPidTuple : theGoldenPidTargetPidTuples) { + Long goldenPid = goldenPidTargetPidTuple.getGoldenPid(); + Long sourcePid = goldenPidTargetPidTuple.getSourcePid(); + theGoldenResourceToSourcePidMap.computeIfAbsent(goldenPid, key -> new HashSet<>()).add(sourcePid); + } + } + private void queryResourceTypeWithReferencesToPatients(Set myReadPids, List idChunk) { //Build SP map //First, inject the _typeFilters and _since from the export job diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IMdmLinkDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IMdmLinkDao.java index 7ad98e46e11..53814c503b7 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IMdmLinkDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IMdmLinkDao.java @@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.dao.data; import ca.uhn.fhir.mdm.api.MdmMatchResultEnum; import ca.uhn.fhir.jpa.entity.MdmLink; +import org.springframework.beans.factory.annotation.Value; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; @@ -40,7 +41,7 @@ public interface IMdmLinkDao extends JpaRepository { @Query("DELETE FROM MdmLink f WHERE (myGoldenResourcePid = :pid OR mySourcePid = :pid) AND myMatchResult <> :matchResult") int deleteWithAnyReferenceToPidAndMatchResultNot(@Param("pid") Long thePid, @Param("matchResult") MdmMatchResultEnum theMatchResult); - @Query("SELECT ml2.myGoldenResourcePid, ml2.mySourcePid FROM MdmLink ml2 " + + @Query("SELECT ml2.myGoldenResourcePid as goldenPid, ml2.mySourcePid as sourcePid FROM MdmLink ml2 " + "WHERE ml2.myMatchResult=:matchResult " + "AND ml2.myGoldenResourcePid IN (" + "SELECT ml.myGoldenResourcePid FROM MdmLink ml " + @@ -50,5 +51,11 @@ public interface IMdmLinkDao extends JpaRepository { "AND hrl.mySourcePath='Group.member.entity' " + "AND hrl.myTargetResourceType='Patient'" + ")") - List> expandPidsFromGroupPidGivenMatchResult(@Param("groupPid") Long theGroupPid, @Param("matchResult") MdmMatchResultEnum theMdmMatchResultEnum); + List expandPidsFromGroupPidGivenMatchResult(@Param("groupPid") Long theGroupPid, @Param("matchResult") MdmMatchResultEnum theMdmMatchResultEnum); + + interface MdmPidTuple { + Long getGoldenPid(); + Long getSourcePid(); + } + } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java new file mode 100644 index 00000000000..b4315677fec --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java @@ -0,0 +1,87 @@ +package ca.uhn.fhir.jpa.dao.mdm; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.slf4j.LoggerFactory.getLogger; + +/** + * The purpose of this class is to share context between steps of a given GroupBulkExport job. + * + * This cache allows you to port state between reader/processor/writer. In this case, we are maintaining + * a cache of Source Resource ID -> Golden Resource ID, so that we can annotate outgoing resources with their golden owner + * if applicable. + * + */ +public class MdmExpansionCacheSvc { + private static final Logger ourLog = getLogger(MdmExpansionCacheSvc.class); + + private final ConcurrentHashMap mySourceToGoldenIdCache = new ConcurrentHashMap<>(); + + /** + * Lookup a given resource's golden resource ID in the cache. Note that if you pass this function the resource ID of a + * golden resource, it will just return itself. + * + * @param theSourceId the resource ID of the source resource ,e.g. PAT123 + * @return the resource ID of the associated golden resource. + */ + public String getGoldenResourceId(String theSourceId) { + ourLog.debug(buildLogMessage("About to lookup cached resource ID " + theSourceId)); + String goldenResourceId = mySourceToGoldenIdCache.get(theSourceId); + + //A golden resources' golden resource ID is itself. + if (StringUtils.isBlank(goldenResourceId)) { + if (mySourceToGoldenIdCache.containsValue(theSourceId)) { + goldenResourceId = theSourceId; + } + } + return goldenResourceId; + } + + private String buildLogMessage(String theMessage) { + return buildLogMessage(theMessage, false); + } + + /** + * Builds a log message, potentially enriched with the cache content. + * + * @param message The log message + * @param theAddCacheContentContent If true, will annotate the log message with the current cache contents. + * @return a built log message, which may include the cache content. + */ + public String buildLogMessage(String message, boolean theAddCacheContentContent) { + StringBuilder builder = new StringBuilder(); + builder.append(message); + if (ourLog.isDebugEnabled() || theAddCacheContentContent) { + builder.append("\n") + .append("Current cache content is:") + .append("\n"); + mySourceToGoldenIdCache.entrySet().stream().forEach(entry -> builder.append(entry.getKey()).append(" -> ").append(entry.getValue()).append("\n")); + return builder.toString(); + } + return builder.toString(); + + } + + /** + * Populate the cache + * + * @param theSourceResourceIdToGoldenResourceIdMap the source ID -> golden ID map to populate the cache with. + */ + public void setCacheContents(Map theSourceResourceIdToGoldenResourceIdMap) { + if (mySourceToGoldenIdCache.isEmpty()) { + this.mySourceToGoldenIdCache.putAll(theSourceResourceIdToGoldenResourceIdMap); + } + } + + /** + * Since this cache is used at @JobScope, we can skip a whole whack of expansions happening by simply checking + * if one of our child steps has populated the cache yet. . + */ + public boolean hasBeenPopulated() { + return !mySourceToGoldenIdCache.isEmpty(); + } +} diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java index 404045a4ff8..58fd8a39f5b 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java @@ -19,27 +19,28 @@ import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity; import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity; import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; import ca.uhn.fhir.jpa.entity.MdmLink; -import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.mdm.api.MdmLinkSourceEnum; import ca.uhn.fhir.mdm.api.MdmMatchResultEnum; +import ca.uhn.fhir.parser.IParser; import ca.uhn.fhir.rest.api.Constants; -import ca.uhn.fhir.rest.api.MethodOutcome; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; +import ca.uhn.fhir.util.HapiExtensions; import ca.uhn.fhir.util.UrlUtil; import com.google.common.base.Charsets; import com.google.common.collect.Sets; import org.apache.commons.lang3.time.DateUtils; import org.hamcrest.Matchers; +import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.r4.model.Binary; import org.hl7.fhir.r4.model.CareTeam; import org.hl7.fhir.r4.model.CodeableConcept; import org.hl7.fhir.r4.model.Enumerations; +import org.hl7.fhir.r4.model.Extension; import org.hl7.fhir.r4.model.Group; import org.hl7.fhir.r4.model.Immunization; import org.hl7.fhir.r4.model.InstantType; import org.hl7.fhir.r4.model.Observation; -import org.hl7.fhir.r4.model.Parameters; import org.hl7.fhir.r4.model.Patient; import org.hl7.fhir.r4.model.Reference; import org.junit.jupiter.api.Test; @@ -55,7 +56,6 @@ import org.springframework.batch.core.explore.JobExplorer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; -import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Date; @@ -639,6 +639,61 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { assertThat(nextContents, is(containsString("IMM6"))); assertThat(nextContents, is(containsString("IMM8"))); } + @Test + public void testGroupBatchJobMdmExpansionIdentifiesGoldenResources() throws Exception { + createResources(); + + // Create a bulk job + BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions(); + bulkDataExportOptions.setOutputFormat(null); + bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Immunization", "Patient")); + bulkDataExportOptions.setSince(null); + bulkDataExportOptions.setFilters(null); + bulkDataExportOptions.setGroupId(myPatientGroupId); + bulkDataExportOptions.setExpandMdm(true); + bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP); + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); + + myBulkDataExportSvc.buildExportFiles(); + awaitAllBulkJobCompletions(); + + IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); + + assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE)); + assertThat(jobInfo.getFiles().size(), equalTo(2)); + assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); + + //Ensure that all immunizations refer to the golden resource via extension + assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); + List immunizations = readBulkExportContentsIntoResources(getBinaryContents(jobInfo, 0), Immunization.class); + immunizations + .stream().filter(immu -> !immu.getIdElement().getIdPart().equals("PAT999"))//Skip the golden resource + .forEach(immunization -> { + Extension extensionByUrl = immunization.getExtensionByUrl(HapiExtensions.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL); + String reference = ((Reference) extensionByUrl.getValue()).getReference(); + assertThat(reference, is(equalTo("Patient/PAT999"))); + }); + + //Ensure all patients are linked to their golden resource. + assertThat(jobInfo.getFiles().get(1).getResourceType(), is(equalTo("Patient"))); + List patients = readBulkExportContentsIntoResources(getBinaryContents(jobInfo, 1), Patient.class); + patients.stream() + .filter(patient -> patient.getIdElement().getIdPart().equals("PAT999")) + .forEach(patient -> { + Extension extensionByUrl = patient.getExtensionByUrl(HapiExtensions.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL); + String reference = ((Reference) extensionByUrl.getValue()).getReference(); + assertThat(reference, is(equalTo("Patient/PAT999"))); + }); + + } + + private List readBulkExportContentsIntoResources(String theContents, Class theClass) { + IParser iParser = myFhirCtx.newJsonParser(); + return Arrays.stream(theContents.split("\n")) + .map(iParser::parseResource) + .map(theClass::cast) + .collect(Collectors.toList()); + } @Test public void testPatientLevelExportWorks() throws JobParametersInvalidException { @@ -1013,7 +1068,6 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { //Non-cached should all have unique IDs List jobIds = Stream.of(jobInfo5, jobInfo6, jobInfo7, jobInfo8, jobInfo9).map(IBulkDataExportSvc.JobInfo::getJobId).collect(Collectors.toList()); - ourLog.info("ZOOP {}", String.join(", ", jobIds)); Set uniqueJobIds = new HashSet<>(jobIds); assertEquals(uniqueJobIds.size(), jobIds.size()); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4ValueSetNoVerCSNoVerTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4ValueSetNoVerCSNoVerTest.java index 9a1794f65be..500eba52486 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4ValueSetNoVerCSNoVerTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4ValueSetNoVerCSNoVerTest.java @@ -541,7 +541,6 @@ public class ResourceProviderR4ValueSetNoVerCSNoVerTest extends BaseResourceProv ValueSet expanded = (ValueSet) respParam.getParameter().get(0).getResource(); String resp = myFhirCtx.newXmlParser().setPrettyPrint(true).encodeResourceToString(expanded); - ourLog.info("zoop"); ourLog.info(resp); assertThat(resp, is(containsStringIgnoringCase(""))); diff --git a/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/BatchJobConfig.java b/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/BatchJobConfig.java index 3de13fe7a66..7acf3590585 100644 --- a/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/BatchJobConfig.java +++ b/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/BatchJobConfig.java @@ -22,6 +22,7 @@ public class BatchJobConfig { private StepBuilderFactory myStepBuilderFactory; + @Bean public Job testJob() { return myJobBuilderFactory.get("testJob")