Merge pull request #2508 from hapifhir/issue-2499-identify-golden-resources-in-export

Issue 2499 identify golden resources in export
This commit is contained in:
Tadgh 2021-03-29 16:11:10 -04:00 committed by GitHub
commit 953dd408da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 478 additions and 54 deletions

View File

@ -112,6 +112,11 @@ public class HapiExtensions {
public static final String EXT_RESOURCE_PLACEHOLDER = "http://hapifhir.io/fhir/StructureDefinition/resource-placeholder";
/**
* URL for extension in a Group Bulk Export which identifies the golden patient of a given exported resource.
*/
public static final String ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL = "https://hapifhir.org/associated-patient-golden-resource/";
/**
* Non instantiable
*/
private HapiExtensions() {

View File

@ -23,6 +23,7 @@ package ca.uhn.fhir.util;
import ca.uhn.fhir.context.BaseRuntimeChildDefinition;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBase;
import org.hl7.fhir.instance.model.api.IBaseResource;
@ -31,6 +32,7 @@ import org.hl7.fhir.instance.model.api.IPrimitiveType;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
public class SearchParameterUtil {
@ -50,6 +52,62 @@ public class SearchParameterUtil {
return retVal;
}
/**
* Given the resource type, fetch its patient-based search parameter name
* 1. Attempt to find one called 'patient'
* 2. If that fails, find one called 'subject'
* 3. If that fails, find find by Patient Compartment.
* 3.1 If that returns >1 result, throw an error
* 3.2 If that returns 1 result, return it
*/
public static Optional<RuntimeSearchParam> getOnlyPatientSearchParamForResourceType(FhirContext theFhirContext, String theResourceType) {
RuntimeSearchParam myPatientSearchParam = null;
RuntimeResourceDefinition runtimeResourceDefinition = theFhirContext.getResourceDefinition(theResourceType);
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("patient");
if (myPatientSearchParam == null) {
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("subject");
if (myPatientSearchParam == null) {
myPatientSearchParam = getOnlyPatientCompartmentRuntimeSearchParam(runtimeResourceDefinition);
}
}
return Optional.ofNullable(myPatientSearchParam);
}
/**
* Search the resource definition for a compartment named 'patient' and return its related Search Parameter.
*/
public static RuntimeSearchParam getOnlyPatientCompartmentRuntimeSearchParam(FhirContext theFhirContext, String theResourceType) {
RuntimeResourceDefinition resourceDefinition = theFhirContext.getResourceDefinition(theResourceType);
return getOnlyPatientCompartmentRuntimeSearchParam(resourceDefinition);
}
public static RuntimeSearchParam getOnlyPatientCompartmentRuntimeSearchParam(RuntimeResourceDefinition runtimeResourceDefinition) {
RuntimeSearchParam patientSearchParam;
List<RuntimeSearchParam> searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient");
if (searchParams == null || searchParams.size() == 0) {
String errorMessage = String.format("Resource type [%s] is not eligible for this type of export, as it contains no Patient compartment, and no `patient` or `subject` search parameter", runtimeResourceDefinition.getId());
throw new IllegalArgumentException(errorMessage);
} else if (searchParams.size() == 1) {
patientSearchParam = searchParams.get(0);
} else {
String errorMessage = String.format("Resource type %s has more than one Search Param which references a patient compartment. We are unable to disambiguate which patient search parameter we should be searching by.", runtimeResourceDefinition.getId());
throw new IllegalArgumentException(errorMessage);
}
return patientSearchParam;
}
public static List<RuntimeSearchParam> getAllPatientCompartmentRuntimeSearchParams(FhirContext theFhirContext, String theResourceType) {
RuntimeResourceDefinition runtimeResourceDefinition = theFhirContext.getResourceDefinition(theResourceType);
return getAllPatientCompartmentRuntimeSearchParams(runtimeResourceDefinition);
}
private static List<RuntimeSearchParam> getAllPatientCompartmentRuntimeSearchParams(RuntimeResourceDefinition theRuntimeResourceDefinition) {
List<RuntimeSearchParam> patient = theRuntimeResourceDefinition.getSearchParamsForCompartmentName("Patient");
return patient;
}
@Nullable
public static String getCode(FhirContext theContext, IBaseResource theResource) {

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.batch;
* #L%
*/
import ca.uhn.fhir.jpa.batch.processors.GoldenResourceAnnotatingProcessor;
import ca.uhn.fhir.jpa.batch.processors.PidToIBaseResourceProcessor;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.context.annotation.Bean;
@ -34,4 +35,10 @@ public class CommonBatchJobConfig {
return new PidToIBaseResourceProcessor();
}
@Bean
@StepScope
public GoldenResourceAnnotatingProcessor goldenResourceAnnotatingProcessor() {
return new GoldenResourceAnnotatingProcessor();
}
}

View File

@ -0,0 +1,147 @@
package ca.uhn.fhir.jpa.batch.processors;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2021 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.fhirpath.IFhirPath;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.bulk.job.BulkExportJobConfig;
import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc;
import ca.uhn.fhir.util.ExtensionUtil;
import ca.uhn.fhir.util.HapiExtensions;
import ca.uhn.fhir.util.SearchParameterUtil;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseExtension;
import org.hl7.fhir.instance.model.api.IBaseReference;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import java.util.List;
import java.util.Optional;
/**
* Reusable Item Processor which attaches an extension to any outgoing resource. This extension will contain a resource
* reference to the golden resource patient of the given resources' patient. (e.g. Observation.subject, Immunization.patient, etc)
*/
public class GoldenResourceAnnotatingProcessor implements ItemProcessor<List<IBaseResource>, List<IBaseResource>> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@Value("#{stepExecutionContext['resourceType']}")
private String myResourceType;
@Autowired
private FhirContext myContext;
@Autowired
private MdmExpansionCacheSvc myMdmExpansionCacheSvc;
@Value("#{jobParameters['" + BulkExportJobConfig.EXPAND_MDM_PARAMETER+ "'] ?: false}")
private boolean myMdmEnabled;
private RuntimeSearchParam myRuntimeSearchParam;
private String myPatientFhirPath;
private IFhirPath myFhirPath;
private void populateRuntimeSearchParam() {
Optional<RuntimeSearchParam> oPatientSearchParam= SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, myResourceType);
if (!oPatientSearchParam.isPresent()) {
String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", myResourceType);
throw new IllegalArgumentException(errorMessage);
} else {
myRuntimeSearchParam = oPatientSearchParam.get();
}
}
@Override
public List<IBaseResource> process(List<IBaseResource> theIBaseResources) throws Exception {
//If MDM expansion is enabled, add this magic new extension, otherwise, return the resource as is.
if (myMdmEnabled) {
if (myRuntimeSearchParam == null) {
populateRuntimeSearchParam();
}
if (myPatientFhirPath == null) {
populatePatientFhirPath();
}
theIBaseResources.forEach(this::annotateClinicalResourceWithRelatedGoldenResourcePatient);
}
return theIBaseResources;
}
private void annotateClinicalResourceWithRelatedGoldenResourcePatient(IBaseResource iBaseResource) {
Optional<String> patientReference = getPatientReference(iBaseResource);
if (patientReference.isPresent()) {
addGoldenResourceExtension(iBaseResource, patientReference.get());
} else {
ourLog.error("Failed to find the patient reference information for resource {}. This is a bug, " +
"as all resources which can be exported via Group Bulk Export must reference a patient.", iBaseResource);
}
}
private Optional<String> getPatientReference(IBaseResource iBaseResource) {
//In the case of patient, we will just use the raw ID.
if (myResourceType.equalsIgnoreCase("Patient")) {
return Optional.of(iBaseResource.getIdElement().getIdPart());
//Otherwise, we will perform evaluation of the fhirPath.
} else {
Optional<IBaseReference> optionalReference = getFhirParser().evaluateFirst(iBaseResource, myPatientFhirPath, IBaseReference.class);
return optionalReference.map(theIBaseReference -> theIBaseReference.getReferenceElement().getIdPart());
}
}
private void addGoldenResourceExtension(IBaseResource iBaseResource, String sourceResourceId) {
String goldenResourceId = myMdmExpansionCacheSvc.getGoldenResourceId(sourceResourceId);
IBaseExtension<?, ?> extension = ExtensionUtil.getOrCreateExtension(iBaseResource, HapiExtensions.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL);
if (!StringUtils.isBlank(goldenResourceId)) {
ExtensionUtil.setExtension(myContext, extension, "reference", prefixPatient(goldenResourceId));
}
}
private String prefixPatient(String theResourceId) {
return "Patient/" + theResourceId;
}
private IFhirPath getFhirParser() {
if (myFhirPath == null) {
myFhirPath = myContext.newFhirPath();
}
return myFhirPath;
}
private String populatePatientFhirPath() {
if (myPatientFhirPath == null) {
myPatientFhirPath = myRuntimeSearchParam.getPath();
// GGG: Yes this is a stupid hack, but by default this runtime search param will return stuff like
// Observation.subject.where(resolve() is Patient) which unfortunately our FHIRpath evaluator doesn't play nicely with
// our FHIRPath evaluator.
if (myPatientFhirPath.contains(".where")) {
myPatientFhirPath = myPatientFhirPath.substring(0, myPatientFhirPath.indexOf(".where"));
}
}
return myPatientFhirPath;
}
}

View File

@ -50,6 +50,7 @@ public class PidToIBaseResourceProcessor implements ItemProcessor<List<ResourceP
@Autowired
private DaoRegistry myDaoRegistry;
@Value("#{stepExecutionContext['resourceType']}")
private String myResourceType;

View File

@ -36,6 +36,7 @@ import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.util.SearchParameterUtil;
import ca.uhn.fhir.util.UrlUtil;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
@ -179,47 +180,15 @@ public abstract class BaseBulkItemReader implements ItemReader<List<ResourcePers
}
/**
* Given the resource type, fetch its patient-based search parameter name
* 1. Attempt to find one called 'patient'
* 2. If that fails, find one called 'subject'
* 3. If that fails, find find by Patient Compartment.
* 3.1 If that returns >1 result, throw an error
* 3.2 If that returns 1 result, return it
*/
protected RuntimeSearchParam getPatientSearchParamForCurrentResourceType() {
if (myPatientSearchParam == null) {
RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(myResourceType);
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("patient");
if (myPatientSearchParam == null) {
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("subject");
if (myPatientSearchParam == null) {
myPatientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition);
if (myPatientSearchParam == null) {
String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", myResourceType);
throw new IllegalArgumentException(errorMessage);
}
}
Optional<RuntimeSearchParam> onlyPatientSearchParamForResourceType = SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, myResourceType);
if (onlyPatientSearchParamForResourceType.isPresent()) {
myPatientSearchParam = onlyPatientSearchParamForResourceType.get();
} else {
}
}
return myPatientSearchParam;
}
/**
* Search the resource definition for a compartment named 'patient' and return its related Search Parameter.
*/
protected RuntimeSearchParam getRuntimeSearchParamByCompartment(RuntimeResourceDefinition runtimeResourceDefinition) {
RuntimeSearchParam patientSearchParam;
List<RuntimeSearchParam> searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient");
if (searchParams == null || searchParams.size() == 0) {
String errorMessage = String.format("Resource type [%s] is not eligible for this type of export, as it contains no Patient compartment, and no `patient` or `subject` search parameter", myResourceType);
throw new IllegalArgumentException(errorMessage);
} else if (searchParams.size() == 1) {
patientSearchParam = searchParams.get(0);
} else {
String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as we are unable to disambiguate which patient search parameter we should be searching by.", myResourceType);
throw new IllegalArgumentException(errorMessage);
}
return patientSearchParam;
}
}

View File

@ -21,8 +21,10 @@ package ca.uhn.fhir.jpa.bulk.job;
*/
import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
import ca.uhn.fhir.jpa.batch.processors.GoldenResourceAnnotatingProcessor;
import ca.uhn.fhir.jpa.batch.processors.PidToIBaseResourceProcessor;
import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc;
import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.batch.core.Job;
@ -32,13 +34,16 @@ import org.springframework.batch.core.configuration.annotation.JobBuilderFactory
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import java.util.ArrayList;
import java.util.List;
/**
@ -64,11 +69,23 @@ public class BulkExportJobConfig {
@Autowired
private PidToIBaseResourceProcessor myPidToIBaseResourceProcessor;
@Autowired
private GoldenResourceAnnotatingProcessor myGoldenResourceAnnotatingProcessor;
@Bean
public BulkExportDaoSvc bulkExportDaoSvc() {
return new BulkExportDaoSvc();
}
@Bean
@Lazy
@JobScope
public MdmExpansionCacheSvc mdmExpansionCacheSvc() {
return new MdmExpansionCacheSvc();
}
@Bean
@Lazy
public Job bulkExportJob() {
@ -80,6 +97,18 @@ public class BulkExportJobConfig {
.build();
}
@Bean
@Lazy
@StepScope
public CompositeItemProcessor<List<ResourcePersistentId>, List<IBaseResource>> inflateResourceThenAnnotateWithGoldenResourceProcessor() {
CompositeItemProcessor processor = new CompositeItemProcessor<>();
ArrayList<ItemProcessor> delegates = new ArrayList<>();
delegates.add(myPidToIBaseResourceProcessor);
delegates.add(myGoldenResourceAnnotatingProcessor);
processor.setDelegates(delegates);
return processor;
}
@Bean
@Lazy
public Job groupBulkExportJob() {
@ -132,7 +161,7 @@ public class BulkExportJobConfig {
return myStepBuilderFactory.get("groupBulkExportGenerateResourceFilesStep")
.<List<ResourcePersistentId>, List<IBaseResource>> chunk(CHUNK_SIZE) //1000 resources per generated file, as the reader returns 10 resources at a time.
.reader(groupBulkItemReader())
.processor(myPidToIBaseResourceProcessor)
.processor(inflateResourceThenAnnotateWithGoldenResourceProcessor())
.writer(resourceToFileWriter())
.listener(bulkExportGenerateResourceFilesStepListener())
.build();

View File

@ -27,7 +27,7 @@ import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao;
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.QueryChunker;
@ -36,6 +36,7 @@ import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.ReferenceOrListParam;
import ca.uhn.fhir.rest.param.ReferenceParam;
import com.google.common.collect.Multimaps;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
@ -45,6 +46,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -75,6 +77,8 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
private IdHelperService myIdHelperService;
@Autowired
private IMdmLinkDao myMdmLinkDao;
@Autowired
private MdmExpansionCacheSvc myMdmExpansionCacheSvc;
@Override
Iterator<ResourcePersistentId> getResourcePidIterator() {
@ -109,17 +113,20 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
* possibly expanded by MDM, and don't have to go and fetch other resource DAOs.
*/
private Iterator<ResourcePersistentId> getExpandedPatientIterator() {
Set<Long> patientPidsToExport = new HashSet<>();
List<String> members = getMembers();
List<IIdType> ids = members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList());
List<Long> pidsOrThrowException = myIdHelperService.getPidsOrThrowException(ids);
patientPidsToExport.addAll(pidsOrThrowException);
Set<Long> patientPidsToExport = new HashSet<>(pidsOrThrowException);
if (myMdmEnabled) {
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId));
Long pidOrNull = myIdHelperService.getPidOrNull(group);
List<List<Long>> lists = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
lists.forEach(patientPidsToExport::addAll);
List<IMdmLinkDao.MdmPidTuple> goldenPidSourcePidTuple = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
goldenPidSourcePidTuple.forEach(tuple -> {
patientPidsToExport.add(tuple.getGoldenPid());
patientPidsToExport.add(tuple.getSourcePid());
});
populateMdmResourceCache(goldenPidSourcePidTuple);
}
List<ResourcePersistentId> resourcePersistentIds = patientPidsToExport
.stream()
@ -128,6 +135,45 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
return resourcePersistentIds.iterator();
}
/**
* @param thePidTuples
*/
private void populateMdmResourceCache(List<IMdmLinkDao.MdmPidTuple> thePidTuples) {
if (myMdmExpansionCacheSvc.hasBeenPopulated()) {
return;
}
//First, convert this zipped set of tuples to a map of
//{
// patient/gold-1 -> [patient/1, patient/2]
// patient/gold-2 -> [patient/3, patient/4]
//}
Map<Long, Set<Long>> goldenResourceToSourcePidMap = new HashMap<>();
extract(thePidTuples, goldenResourceToSourcePidMap);
//Next, lets convert it to an inverted index for fast lookup
// {
// patient/1 -> patient/gold-1
// patient/2 -> patient/gold-1
// patient/3 -> patient/gold-2
// patient/4 -> patient/gold-2
// }
Map<String, String> sourceResourceIdToGoldenResourceIdMap = new HashMap<>();
goldenResourceToSourcePidMap.forEach((key, value) -> {
String goldenResourceId = myIdHelperService.translatePidIdToForcedId(new ResourcePersistentId(key)).orElse(key.toString());
Map<Long, Optional<String>> pidsToForcedIds = myIdHelperService.translatePidsToForcedIds(value);
Set<String> sourceResourceIds = pidsToForcedIds.entrySet().stream()
.map(ent -> ent.getValue().isPresent() ? ent.getValue().get() : ent.getKey().toString())
.collect(Collectors.toSet());
sourceResourceIds
.forEach(sourceResourceId -> sourceResourceIdToGoldenResourceIdMap.put(sourceResourceId, goldenResourceId));
});
//Now that we have built our cached expansion, store it.
myMdmExpansionCacheSvc.setCacheContents(sourceResourceIdToGoldenResourceIdMap);
}
/**
* Given the local myGroupId, read this group, and find all members' patient references.
* @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"]
@ -154,13 +200,19 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
//Attempt to perform MDM Expansion of membership
if (myMdmEnabled) {
List<List<Long>> goldenPidTargetPidTuple = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
List<IMdmLinkDao.MdmPidTuple> goldenPidTargetPidTuples = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
//Now lets translate these pids into resource IDs
Set<Long> uniquePids = new HashSet<>();
goldenPidTargetPidTuple.forEach(uniquePids::addAll);
goldenPidTargetPidTuples.forEach(tuple -> {
uniquePids.add(tuple.getGoldenPid());
uniquePids.add(tuple.getSourcePid());
});
Map<Long, Optional<String>> pidToForcedIdMap = myIdHelperService.translatePidsToForcedIds(uniquePids);
Map<Long, Set<Long>> goldenResourceToSourcePidMap = new HashMap<>();
extract(goldenPidTargetPidTuples, goldenResourceToSourcePidMap);
populateMdmResourceCache(goldenPidTargetPidTuples);
//If the result of the translation is an empty optional, it means there is no forced id, and we can use the PID as the resource ID.
Set<String> resolvedResourceIds = pidToForcedIdMap.entrySet().stream()
.map(entry -> entry.getValue().isPresent() ? entry.getValue().get() : entry.getKey().toString())
@ -176,6 +228,14 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
return expandedIds;
}
private void extract(List<IMdmLinkDao.MdmPidTuple> theGoldenPidTargetPidTuples, Map<Long, Set<Long>> theGoldenResourceToSourcePidMap) {
for (IMdmLinkDao.MdmPidTuple goldenPidTargetPidTuple : theGoldenPidTargetPidTuples) {
Long goldenPid = goldenPidTargetPidTuple.getGoldenPid();
Long sourcePid = goldenPidTargetPidTuple.getSourcePid();
theGoldenResourceToSourcePidMap.computeIfAbsent(goldenPid, key -> new HashSet<>()).add(sourcePid);
}
}
private void queryResourceTypeWithReferencesToPatients(Set<ResourcePersistentId> myReadPids, List<String> idChunk) {
//Build SP map
//First, inject the _typeFilters and _since from the export job

View File

@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
import ca.uhn.fhir.jpa.entity.MdmLink;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
@ -40,7 +41,7 @@ public interface IMdmLinkDao extends JpaRepository<MdmLink, Long> {
@Query("DELETE FROM MdmLink f WHERE (myGoldenResourcePid = :pid OR mySourcePid = :pid) AND myMatchResult <> :matchResult")
int deleteWithAnyReferenceToPidAndMatchResultNot(@Param("pid") Long thePid, @Param("matchResult") MdmMatchResultEnum theMatchResult);
@Query("SELECT ml2.myGoldenResourcePid, ml2.mySourcePid FROM MdmLink ml2 " +
@Query("SELECT ml2.myGoldenResourcePid as goldenPid, ml2.mySourcePid as sourcePid FROM MdmLink ml2 " +
"WHERE ml2.myMatchResult=:matchResult " +
"AND ml2.myGoldenResourcePid IN (" +
"SELECT ml.myGoldenResourcePid FROM MdmLink ml " +
@ -50,5 +51,11 @@ public interface IMdmLinkDao extends JpaRepository<MdmLink, Long> {
"AND hrl.mySourcePath='Group.member.entity' " +
"AND hrl.myTargetResourceType='Patient'" +
")")
List<List<Long>> expandPidsFromGroupPidGivenMatchResult(@Param("groupPid") Long theGroupPid, @Param("matchResult") MdmMatchResultEnum theMdmMatchResultEnum);
List<MdmPidTuple> expandPidsFromGroupPidGivenMatchResult(@Param("groupPid") Long theGroupPid, @Param("matchResult") MdmMatchResultEnum theMdmMatchResultEnum);
interface MdmPidTuple {
Long getGoldenPid();
Long getSourcePid();
}
}

View File

@ -0,0 +1,87 @@
package ca.uhn.fhir.jpa.dao.mdm;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static org.slf4j.LoggerFactory.getLogger;
/**
* The purpose of this class is to share context between steps of a given GroupBulkExport job.
*
* This cache allows you to port state between reader/processor/writer. In this case, we are maintaining
* a cache of Source Resource ID -> Golden Resource ID, so that we can annotate outgoing resources with their golden owner
* if applicable.
*
*/
public class MdmExpansionCacheSvc {
private static final Logger ourLog = getLogger(MdmExpansionCacheSvc.class);
private final ConcurrentHashMap<String, String> mySourceToGoldenIdCache = new ConcurrentHashMap<>();
/**
* Lookup a given resource's golden resource ID in the cache. Note that if you pass this function the resource ID of a
* golden resource, it will just return itself.
*
* @param theSourceId the resource ID of the source resource ,e.g. PAT123
* @return the resource ID of the associated golden resource.
*/
public String getGoldenResourceId(String theSourceId) {
ourLog.debug(buildLogMessage("About to lookup cached resource ID " + theSourceId));
String goldenResourceId = mySourceToGoldenIdCache.get(theSourceId);
//A golden resources' golden resource ID is itself.
if (StringUtils.isBlank(goldenResourceId)) {
if (mySourceToGoldenIdCache.containsValue(theSourceId)) {
goldenResourceId = theSourceId;
}
}
return goldenResourceId;
}
private String buildLogMessage(String theMessage) {
return buildLogMessage(theMessage, false);
}
/**
* Builds a log message, potentially enriched with the cache content.
*
* @param message The log message
* @param theAddCacheContentContent If true, will annotate the log message with the current cache contents.
* @return a built log message, which may include the cache content.
*/
public String buildLogMessage(String message, boolean theAddCacheContentContent) {
StringBuilder builder = new StringBuilder();
builder.append(message);
if (ourLog.isDebugEnabled() || theAddCacheContentContent) {
builder.append("\n")
.append("Current cache content is:")
.append("\n");
mySourceToGoldenIdCache.entrySet().stream().forEach(entry -> builder.append(entry.getKey()).append(" -> ").append(entry.getValue()).append("\n"));
return builder.toString();
}
return builder.toString();
}
/**
* Populate the cache
*
* @param theSourceResourceIdToGoldenResourceIdMap the source ID -> golden ID map to populate the cache with.
*/
public void setCacheContents(Map<String, String> theSourceResourceIdToGoldenResourceIdMap) {
if (mySourceToGoldenIdCache.isEmpty()) {
this.mySourceToGoldenIdCache.putAll(theSourceResourceIdToGoldenResourceIdMap);
}
}
/**
* Since this cache is used at @JobScope, we can skip a whole whack of expansions happening by simply checking
* if one of our child steps has populated the cache yet. .
*/
public boolean hasBeenPopulated() {
return !mySourceToGoldenIdCache.isEmpty();
}
}

View File

@ -19,27 +19,28 @@ import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.entity.MdmLink;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.mdm.api.MdmLinkSourceEnum;
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.util.HapiExtensions;
import ca.uhn.fhir.util.UrlUtil;
import com.google.common.base.Charsets;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.time.DateUtils;
import org.hamcrest.Matchers;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Binary;
import org.hl7.fhir.r4.model.CareTeam;
import org.hl7.fhir.r4.model.CodeableConcept;
import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.Extension;
import org.hl7.fhir.r4.model.Group;
import org.hl7.fhir.r4.model.Immunization;
import org.hl7.fhir.r4.model.InstantType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Reference;
import org.junit.jupiter.api.Test;
@ -55,7 +56,6 @@ import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
@ -639,6 +639,61 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(nextContents, is(containsString("IMM6")));
assertThat(nextContents, is(containsString("IMM8")));
}
@Test
public void testGroupBatchJobMdmExpansionIdentifiesGoldenResources() throws Exception {
createResources();
// Create a bulk job
BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setOutputFormat(null);
bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Immunization", "Patient"));
bulkDataExportOptions.setSince(null);
bulkDataExportOptions.setFilters(null);
bulkDataExportOptions.setGroupId(myPatientGroupId);
bulkDataExportOptions.setExpandMdm(true);
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(2));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
//Ensure that all immunizations refer to the golden resource via extension
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
List<Immunization> immunizations = readBulkExportContentsIntoResources(getBinaryContents(jobInfo, 0), Immunization.class);
immunizations
.stream().filter(immu -> !immu.getIdElement().getIdPart().equals("PAT999"))//Skip the golden resource
.forEach(immunization -> {
Extension extensionByUrl = immunization.getExtensionByUrl(HapiExtensions.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL);
String reference = ((Reference) extensionByUrl.getValue()).getReference();
assertThat(reference, is(equalTo("Patient/PAT999")));
});
//Ensure all patients are linked to their golden resource.
assertThat(jobInfo.getFiles().get(1).getResourceType(), is(equalTo("Patient")));
List<Patient> patients = readBulkExportContentsIntoResources(getBinaryContents(jobInfo, 1), Patient.class);
patients.stream()
.filter(patient -> patient.getIdElement().getIdPart().equals("PAT999"))
.forEach(patient -> {
Extension extensionByUrl = patient.getExtensionByUrl(HapiExtensions.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL);
String reference = ((Reference) extensionByUrl.getValue()).getReference();
assertThat(reference, is(equalTo("Patient/PAT999")));
});
}
private <T extends IBaseResource> List<T> readBulkExportContentsIntoResources(String theContents, Class<T> theClass) {
IParser iParser = myFhirCtx.newJsonParser();
return Arrays.stream(theContents.split("\n"))
.map(iParser::parseResource)
.map(theClass::cast)
.collect(Collectors.toList());
}
@Test
public void testPatientLevelExportWorks() throws JobParametersInvalidException {
@ -1013,7 +1068,6 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
//Non-cached should all have unique IDs
List<String> jobIds = Stream.of(jobInfo5, jobInfo6, jobInfo7, jobInfo8, jobInfo9).map(IBulkDataExportSvc.JobInfo::getJobId).collect(Collectors.toList());
ourLog.info("ZOOP {}", String.join(", ", jobIds));
Set<String> uniqueJobIds = new HashSet<>(jobIds);
assertEquals(uniqueJobIds.size(), jobIds.size());

View File

@ -541,7 +541,6 @@ public class ResourceProviderR4ValueSetNoVerCSNoVerTest extends BaseResourceProv
ValueSet expanded = (ValueSet) respParam.getParameter().get(0).getResource();
String resp = myFhirCtx.newXmlParser().setPrettyPrint(true).encodeResourceToString(expanded);
ourLog.info("zoop");
ourLog.info(resp);
assertThat(resp, is(containsStringIgnoringCase("<code value=\"M\"/>")));

View File

@ -22,6 +22,7 @@ public class BatchJobConfig {
private StepBuilderFactory myStepBuilderFactory;
@Bean
public Job testJob() {
return myJobBuilderFactory.get("testJob")