Refactors, tidying. Improving docs

This commit is contained in:
Tadgh 2021-03-26 15:54:27 -04:00
parent f7eee5985e
commit bee33f763a
7 changed files with 144 additions and 66 deletions

View File

@ -32,6 +32,7 @@ import org.hl7.fhir.instance.model.api.IPrimitiveType;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
public class SearchParameterUtil {
@ -59,27 +60,29 @@ public class SearchParameterUtil {
* 3.1 If that returns >1 result, throw an error
* 3.2 If that returns 1 result, return it
*/
public static RuntimeSearchParam getPatientSearchParamForResourceType(FhirContext theFhirContext, String theResourceType) {
public static Optional<RuntimeSearchParam> getOnlyPatientSearchParamForResourceType(FhirContext theFhirContext, String theResourceType) {
RuntimeSearchParam myPatientSearchParam = null;
RuntimeResourceDefinition runtimeResourceDefinition = theFhirContext.getResourceDefinition(theResourceType);
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("patient");
if (myPatientSearchParam == null) {
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("subject");
if (myPatientSearchParam == null) {
myPatientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition);
if (myPatientSearchParam == null) {
String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", theResourceType);
throw new IllegalArgumentException(errorMessage);
}
myPatientSearchParam = getOnlyPatientCompartmentRuntimeSearchParam(runtimeResourceDefinition);
}
}
return myPatientSearchParam;
return Optional.ofNullable(myPatientSearchParam);
}
/**
* Search the resource definition for a compartment named 'patient' and return its related Search Parameter.
*/
public static RuntimeSearchParam getRuntimeSearchParamByCompartment(RuntimeResourceDefinition runtimeResourceDefinition) {
public static RuntimeSearchParam getOnlyPatientCompartmentRuntimeSearchParam(FhirContext theFhirContext, String theResourceType) {
RuntimeResourceDefinition resourceDefinition = theFhirContext.getResourceDefinition(theResourceType);
return getOnlyPatientCompartmentRuntimeSearchParam(resourceDefinition);
}
public static RuntimeSearchParam getOnlyPatientCompartmentRuntimeSearchParam(RuntimeResourceDefinition runtimeResourceDefinition) {
RuntimeSearchParam patientSearchParam;
List<RuntimeSearchParam> searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient");
if (searchParams == null || searchParams.size() == 0) {
@ -88,11 +91,21 @@ public class SearchParameterUtil {
} else if (searchParams.size() == 1) {
patientSearchParam = searchParams.get(0);
} else {
String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as we are unable to disambiguate which patient search parameter we should be searching by.", runtimeResourceDefinition.getId());
String errorMessage = String.format("Resource type %s has more than one Search Param which references a patient compartment. We are unable to disambiguate which patient search parameter we should be searching by.", runtimeResourceDefinition.getId());
throw new IllegalArgumentException(errorMessage);
}
return patientSearchParam;
}
public static List<RuntimeSearchParam> getAllPatientCompartmentRuntimeSearchParams(FhirContext theFhirContext, String theResourceType) {
RuntimeResourceDefinition runtimeResourceDefinition = theFhirContext.getResourceDefinition(theResourceType);
return getAllPatientCompartmentRuntimeSearchParams(runtimeResourceDefinition);
}
private static List<RuntimeSearchParam> getAllPatientCompartmentRuntimeSearchParams(RuntimeResourceDefinition theRuntimeResourceDefinition) {
List<RuntimeSearchParam> patient = theRuntimeResourceDefinition.getSearchParamsForCompartmentName("Patient");
return patient;
}

View File

@ -41,7 +41,8 @@ import java.util.List;
import java.util.Optional;
/**
* Reusable Item Processor which converts ResourcePersistentIds to their IBaseResources
* Reusable Item Processor which attaches an extension to any outgoing resource. This extension will contain a resource
* reference to the golden resource patient of the given resources' patient. (e.g. Observation.subject, Immunization.patient, etc)
*/
public class GoldenResourceAnnotatingProcessor implements ItemProcessor<List<IBaseResource>, List<IBaseResource>> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@ -59,25 +60,40 @@ public class GoldenResourceAnnotatingProcessor implements ItemProcessor<List<IBa
@Value("#{jobParameters['" + BulkExportJobConfig.EXPAND_MDM_PARAMETER+ "'] ?: false}")
private boolean myMdmEnabled;
private RuntimeSearchParam myRuntimeSearchParam;
private String myPatientFhirPath;
private IFhirPath myFhirPath;
private void populateRuntimeSearchParam() {
Optional<RuntimeSearchParam> oPatientSearchParam= SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, myResourceType);
if (!oPatientSearchParam.isPresent()) {
String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", myResourceType);
throw new IllegalArgumentException(errorMessage);
} else {
myRuntimeSearchParam = oPatientSearchParam.get();
}
}
@Override
public List<IBaseResource> process(List<IBaseResource> theIBaseResources) throws Exception {
//If MDM expansion is enabled, add this magic new extension, otherwise, return the resource as is.
if (myMdmEnabled) {
if (myRuntimeSearchParam == null) {
myRuntimeSearchParam = SearchParameterUtil.getPatientSearchParamForResourceType(myContext, myResourceType);
populateRuntimeSearchParam();
}
String path = runtimeSearchParamFhirPath();
theIBaseResources.forEach(iBaseResource -> annotateClinicalResourceWithRelatedGoldenResourcePatient(path, iBaseResource));
if (myPatientFhirPath == null) {
populatePatientFhirPath();
}
theIBaseResources.forEach(this::annotateClinicalResourceWithRelatedGoldenResourcePatient);
}
return theIBaseResources;
}
private void annotateClinicalResourceWithRelatedGoldenResourcePatient(String path, IBaseResource iBaseResource) {
Optional<String> patientReference = getPatientReference(path, iBaseResource);
private void annotateClinicalResourceWithRelatedGoldenResourcePatient(IBaseResource iBaseResource) {
Optional<String> patientReference = getPatientReference(iBaseResource);
if (patientReference.isPresent()) {
addGoldenResourceExtension(iBaseResource, patientReference.get());
} else {
@ -85,26 +101,24 @@ public class GoldenResourceAnnotatingProcessor implements ItemProcessor<List<IBa
}
}
private Optional<String> getPatientReference(String path, IBaseResource iBaseResource) {
private Optional<String> getPatientReference(IBaseResource iBaseResource) {
//In the case of patient, we will just use the raw ID.
if (myResourceType.equalsIgnoreCase("Patient")) {
return Optional.of(iBaseResource.getIdElement().getIdPart());
//Otherwise, we will perform evaluation of the fhirPath.
} else {
Optional<IBaseReference> optionalReference = getFhirParser().evaluateFirst(iBaseResource, path, IBaseReference.class);
Optional<IBaseReference> optionalReference = getFhirParser().evaluateFirst(iBaseResource, myPatientFhirPath, IBaseReference.class);
return optionalReference.map(theIBaseReference -> theIBaseReference.getReferenceElement().getIdPart());
}
}
private void addGoldenResourceExtension(IBaseResource iBaseResource, String sourceResourceId) {
String goldenResourceId = myMdmExpansionCacheSvc.getGoldenResourceId(sourceResourceId);
IBaseExtension<?, ?> extension = ExtensionUtil.getOrCreateExtension(iBaseResource, ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL);
if (!StringUtils.isBlank(goldenResourceId)) {
IBaseExtension<?, ?> extension = ExtensionUtil.getOrCreateExtension(iBaseResource, ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL);
ExtensionUtil.setExtension(myContext, extension, "reference", prefixPatient(goldenResourceId));
} else {
IBaseExtension<?, ?> extension = ExtensionUtil.getOrCreateExtension(iBaseResource, "failed-to-associated-golden-id");
ExtensionUtil.setExtension(myContext, extension, "string", myMdmExpansionCacheSvc.buildLog("not working!", true));
ExtensionUtil.setExtension(myContext, extension, "string", "This patient has no matched golden resource.");
}
}
@ -119,12 +133,12 @@ public class GoldenResourceAnnotatingProcessor implements ItemProcessor<List<IBa
return myFhirPath;
}
private String runtimeSearchParamFhirPath() {
private String populatePatientFhirPath() {
if (myPatientFhirPath == null) {
myRuntimeSearchParam = SearchParameterUtil.getPatientSearchParamForResourceType(myContext, myResourceType);
myPatientFhirPath = myRuntimeSearchParam.getPath();
//Yes this is a stupid hack, but by default this runtime search param will return stuff like
// Observation.subject.where(resolve() is Patient) which unfortunately our FHIRpath evaluator doesn't play nicely with.
// GGG: Yes this is a stupid hack, but by default this runtime search param will return stuff like
// Observation.subject.where(resolve() is Patient) which unfortunately our FHIRpath evaluator doesn't play nicely with
// our FHIRPath evaluator.
if (myPatientFhirPath.contains(".where")) {
myPatientFhirPath = myPatientFhirPath.substring(0, myPatientFhirPath.indexOf(".where"));
}

View File

@ -180,17 +180,14 @@ public abstract class BaseBulkItemReader implements ItemReader<List<ResourcePers
}
/**
* Given the resource type, fetch its patient-based search parameter name
* 1. Attempt to find one called 'patient'
* 2. If that fails, find one called 'subject'
* 3. If that fails, find find by Patient Compartment.
* 3.1 If that returns >1 result, throw an error
* 3.2 If that returns 1 result, return it
*/
protected RuntimeSearchParam getPatientSearchParamForCurrentResourceType() {
if (myPatientSearchParam == null) {
myPatientSearchParam = SearchParameterUtil.getPatientSearchParamForResourceType(myContext, myResourceType);
Optional<RuntimeSearchParam> onlyPatientSearchParamForResourceType = SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, myResourceType);
if (onlyPatientSearchParamForResourceType.isPresent()) {
myPatientSearchParam = onlyPatientSearchParamForResourceType.get();
} else {
}
}
return myPatientSearchParam;
}

View File

@ -112,17 +112,17 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
* possibly expanded by MDM, and don't have to go and fetch other resource DAOs.
*/
private Iterator<ResourcePersistentId> getExpandedPatientIterator() {
Set<Long> patientPidsToExport = new HashSet<>();
List<String> members = getMembers();
List<IIdType> ids = members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList());
List<Long> pidsOrThrowException = myIdHelperService.getPidsOrThrowException(ids);
patientPidsToExport.addAll(pidsOrThrowException);
Set<Long> patientPidsToExport = new HashSet<>(pidsOrThrowException);
if (myMdmEnabled) {
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId));
Long pidOrNull = myIdHelperService.getPidOrNull(group);
List<List<Long>> lists = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
lists.forEach(patientPidsToExport::addAll);
List<List<Long>> goldenPidSourcePidTuple = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
goldenPidSourcePidTuple.forEach(patientPidsToExport::addAll);
populateMdmResourceCache(goldenPidSourcePidTuple);
}
List<ResourcePersistentId> resourcePersistentIds = patientPidsToExport
.stream()
@ -131,6 +131,53 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
return resourcePersistentIds.iterator();
}
/**
* @param thePidTuples
*/
private void populateMdmResourceCache(List<List<Long>> thePidTuples) {
if (myMdmExpansionCacheSvc.hasBeenPopulated()) {
return;
}
//First, convert this zipped set of tuples to a map of
//{
// patient/gold-1 -> [patient/1, patient/2]
// patient/gold-2 -> [patient/3, patient/4]
//}
Map<Long, Set<Long>> goldenResourceToSourcePidMap = new HashMap<>();
for (List<Long> goldenPidTargetPidTuple : thePidTuples) {
Long goldenPid = goldenPidTargetPidTuple.get(0);
Long sourcePid = goldenPidTargetPidTuple.get(1);
if(!goldenResourceToSourcePidMap.containsKey(goldenPid)) {
goldenResourceToSourcePidMap.put(goldenPid, new HashSet<>());
}
goldenResourceToSourcePidMap.get(goldenPid).add(sourcePid);
}
//Next, lets convert it to an inverted index for fast lookup
// {
// patient/1 -> patient/gold-1
// patient/2 -> patient/gold-1
// patient/3 -> patient/gold-2
// patient/4 -> patient/gold-2
// }
Map<String, String> sourceResourceIdToGoldenResourceIdMap = new HashMap<>();
goldenResourceToSourcePidMap.forEach((key, value) -> {
String goldenResourceId = myIdHelperService.translatePidIdToForcedId(new ResourcePersistentId(key)).orElse(key.toString());
Map<Long, Optional<String>> pidsToForcedIds = myIdHelperService.translatePidsToForcedIds(value);
Set<String> sourceResourceIds = pidsToForcedIds.entrySet().stream()
.map(ent -> ent.getValue().isPresent() ? ent.getValue().get() : ent.getKey().toString())
.collect(Collectors.toSet());
sourceResourceIds
.forEach(sourceResourceId -> sourceResourceIdToGoldenResourceIdMap.put(sourceResourceId, goldenResourceId));
});
//Now that we have built our cached expansion, store it.
myMdmExpansionCacheSvc.setCacheContents(sourceResourceIdToGoldenResourceIdMap);
}
/**
* Given the local myGroupId, read this group, and find all members' patient references.
* @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"]
@ -173,8 +220,7 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
}
goldenResourceToSourcePidMap.get(goldenPid).add(sourcePid);
}
populateMdmResourceCacheIfNeeded(goldenResourceToSourcePidMap);
populateMdmResourceCache(goldenPidTargetPidTuples);
//If the result of the translation is an empty optional, it means there is no forced id, and we can use the PID as the resource ID.
Set<String> resolvedResourceIds = pidToForcedIdMap.entrySet().stream()
@ -192,25 +238,7 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
}
private void populateMdmResourceCacheIfNeeded(Map<Long, Set<Long>> goldenResourceToSourcePidMap) {
if (myMdmExpansionCacheSvc.hasBeenPopulated()) {
return;
}
Map<String, String> sourceResourceIdToGoldenResourceIdMap = new HashMap<>();
goldenResourceToSourcePidMap.entrySet()
.forEach(entry -> {
Long key = entry.getKey();
String goldenResourceId = myIdHelperService.translatePidIdToForcedId(new ResourcePersistentId(key)).orElse(key.toString());
Map<Long, Optional<String>> pidsToForcedIds = myIdHelperService.translatePidsToForcedIds(entry.getValue());
Set<String> sourceResourceIds = pidsToForcedIds.entrySet().stream()
.map(ent -> ent.getValue().isPresent() ? ent.getValue().get() : ent.getKey().toString())
.collect(Collectors.toSet());
sourceResourceIds
.forEach(sourceResourceId -> sourceResourceIdToGoldenResourceIdMap.put(sourceResourceId, goldenResourceId));
});
myMdmExpansionCacheSvc.setCacheContents(sourceResourceIdToGoldenResourceIdMap);
}
private void queryResourceTypeWithReferencesToPatients(Set<ResourcePersistentId> myReadPids, List<String> idChunk) {

View File

@ -1,5 +1,6 @@
package ca.uhn.fhir.jpa.dao.mdm;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import java.util.Map;
@ -7,20 +8,47 @@ import java.util.concurrent.ConcurrentHashMap;
import static org.slf4j.LoggerFactory.getLogger;
/**
* The purpose of this class is to share context between steps of a given GroupBulkExport job.
*
* This cache allows you to port state between reader/processor/writer. In this case, we are maintaining
* a cache of Source Resource ID -> Golden Resource ID, so that we can annotate outgoing resources with their golden owner
* if applicable.
*
*/
public class MdmExpansionCacheSvc {
private static final Logger ourLog = getLogger(MdmExpansionCacheSvc.class);
private ConcurrentHashMap<String, String> mySourceToGoldenIdCache = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, String> mySourceToGoldenIdCache = new ConcurrentHashMap<>();
public String getGoldenResourceId(String theSourceId) {
ourLog.info(buildLog("About to lookup cached resource ID " + theSourceId, true));
return mySourceToGoldenIdCache.get(theSourceId);
ourLog.debug(buildLogMessage("About to lookup cached resource ID " + theSourceId));
String goldenResourceId = mySourceToGoldenIdCache.get(theSourceId);
//A golden resources' golden resource ID is itself.
if (StringUtils.isBlank(goldenResourceId)) {
if (mySourceToGoldenIdCache.containsValue(theSourceId)) {
goldenResourceId = theSourceId;
}
}
return goldenResourceId;
}
public String buildLog(String message, boolean theShowContent) {
public String buildLogMessage(String theMessage) {
return buildLogMessage(theMessage, false);
}
/**
* Builds a log message, potentially enriched with the cache content.
*
* @param message The log message
* @param theAddCacheContentContent If true, will annotate the log message with the current cache contents.
* @return a built log message, which may include the cache content.
*/
public String buildLogMessage(String message, boolean theAddCacheContentContent) {
StringBuilder builder = new StringBuilder();
builder.append(message);
if (ourLog.isDebugEnabled() || theShowContent) {
if (ourLog.isDebugEnabled() || theAddCacheContentContent) {
builder.append("\n")
.append("Current cache content is:")
.append("\n");

View File

@ -1052,7 +1052,6 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
//Non-cached should all have unique IDs
List<String> jobIds = Stream.of(jobInfo5, jobInfo6, jobInfo7, jobInfo8, jobInfo9).map(IBulkDataExportSvc.JobInfo::getJobId).collect(Collectors.toList());
ourLog.info("ZOOP {}", String.join(", ", jobIds));
Set<String> uniqueJobIds = new HashSet<>(jobIds);
assertEquals(uniqueJobIds.size(), jobIds.size());

View File

@ -541,7 +541,6 @@ public class ResourceProviderR4ValueSetNoVerCSNoVerTest extends BaseResourceProv
ValueSet expanded = (ValueSet) respParam.getParameter().get(0).getResource();
String resp = myFhirCtx.newXmlParser().setPrettyPrint(true).encodeResourceToString(expanded);
ourLog.info("zoop");
ourLog.info(resp);
assertThat(resp, is(containsStringIgnoringCase("<code value=\"M\"/>")));