From 315dc3fcb7cdc74317f5ae8495d662f2ee772fb7 Mon Sep 17 00:00:00 2001 From: Tadgh Date: Wed, 24 Mar 2021 12:50:29 -0400 Subject: [PATCH 01/12] start with failing test --- .../jpa/bulk/BulkDataExportSvcImplR4Test.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java index 404045a4ff8..86d78abd8ae 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java @@ -639,6 +639,39 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { assertThat(nextContents, is(containsString("IMM6"))); assertThat(nextContents, is(containsString("IMM8"))); } + @Test + public void testGroupBatchJobMdmExpansionIdentifiesGoldenResources() throws Exception { + createResources(); + + // Create a bulk job + BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions(); + bulkDataExportOptions.setOutputFormat(null); + bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Immunization")); + bulkDataExportOptions.setSince(null); + bulkDataExportOptions.setFilters(null); + bulkDataExportOptions.setGroupId(myPatientGroupId); + bulkDataExportOptions.setExpandMdm(true); + bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP); + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); + + + myBulkDataExportSvc.buildExportFiles(); + awaitAllBulkJobCompletions(); + + IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); + + assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE)); + assertThat(jobInfo.getFiles().size(), equalTo(1)); + assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); + + // Iterate over the files + String nextContents = getBinaryContents(jobInfo, 0); + + assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); + + assertThat(nextContents, is(containsString("subject_golden_resource"))); + + } @Test public void testPatientLevelExportWorks() throws JobParametersInvalidException { From 4d3b021f127f4e9cfcdeea3f372c58a5ee3b29e7 Mon Sep 17 00:00:00 2001 From: Tadgh Date: Thu, 25 Mar 2021 13:53:41 -0400 Subject: [PATCH 02/12] Add annotating processor and composite processor --- .../ca/uhn/fhir/util/SearchParameterUtil.java | 45 +++++++++ .../fhir/jpa/batch/CommonBatchJobConfig.java | 7 ++ .../GoldenResourceAnnotatingProcessor.java | 98 +++++++++++++++++++ .../PidToIBaseResourceProcessor.java | 1 + .../fhir/jpa/bulk/job/BaseBulkItemReader.java | 32 +----- .../jpa/bulk/job/BulkExportJobConfig.java | 33 ++++++- .../jpa/bulk/job/GroupBulkItemReader.java | 44 ++++++++- .../jpa/dao/mdm/MdmExpansionCacheSvc.java | 37 +++++++ .../fhir/jpa/batch/config/BatchJobConfig.java | 1 + 9 files changed, 263 insertions(+), 35 deletions(-) create mode 100644 hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java create mode 100644 hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java index 75d1c7c4b2a..047525705e5 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java @@ -23,6 +23,7 @@ package ca.uhn.fhir.util; import ca.uhn.fhir.context.BaseRuntimeChildDefinition; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.RuntimeResourceDefinition; +import ca.uhn.fhir.context.RuntimeSearchParam; import org.apache.commons.lang3.Validate; import org.hl7.fhir.instance.model.api.IBase; import org.hl7.fhir.instance.model.api.IBaseResource; @@ -50,6 +51,50 @@ public class SearchParameterUtil { return retVal; } + /** + * Given the resource type, fetch its patient-based search parameter name + * 1. Attempt to find one called 'patient' + * 2. If that fails, find one called 'subject' + * 3. If that fails, find find by Patient Compartment. + * 3.1 If that returns >1 result, throw an error + * 3.2 If that returns 1 result, return it + */ + public static RuntimeSearchParam getPatientSearchParamForResourceType(FhirContext theFhirContext, String theResourceType) { + RuntimeSearchParam myPatientSearchParam = null; + RuntimeResourceDefinition runtimeResourceDefinition = theFhirContext.getResourceDefinition(theResourceType); + myPatientSearchParam = runtimeResourceDefinition.getSearchParam("patient"); + if (myPatientSearchParam == null) { + myPatientSearchParam = runtimeResourceDefinition.getSearchParam("subject"); + if (myPatientSearchParam == null) { + myPatientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition); + if (myPatientSearchParam == null) { + String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", theResourceType); + throw new IllegalArgumentException(errorMessage); + } + } + } + return myPatientSearchParam; + } + + /** + * Search the resource definition for a compartment named 'patient' and return its related Search Parameter. + */ + public static RuntimeSearchParam getRuntimeSearchParamByCompartment(RuntimeResourceDefinition runtimeResourceDefinition) { + RuntimeSearchParam patientSearchParam; + List searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient"); + if (searchParams == null || searchParams.size() == 0) { + String errorMessage = String.format("Resource type [%s] is not eligible for this type of export, as it contains no Patient compartment, and no `patient` or `subject` search parameter", runtimeResourceDefinition.getId()); + throw new IllegalArgumentException(errorMessage); + } else if (searchParams.size() == 1) { + patientSearchParam = searchParams.get(0); + } else { + String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as we are unable to disambiguate which patient search parameter we should be searching by.", runtimeResourceDefinition.getId()); + throw new IllegalArgumentException(errorMessage); + } + return patientSearchParam; + + } + @Nullable public static String getCode(FhirContext theContext, IBaseResource theResource) { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/CommonBatchJobConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/CommonBatchJobConfig.java index 20da121d847..340c7be3393 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/CommonBatchJobConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/CommonBatchJobConfig.java @@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.batch; * #L% */ +import ca.uhn.fhir.jpa.batch.processors.GoldenResourceAnnotatingProcessor; import ca.uhn.fhir.jpa.batch.processors.PidToIBaseResourceProcessor; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.context.annotation.Bean; @@ -34,4 +35,10 @@ public class CommonBatchJobConfig { return new PidToIBaseResourceProcessor(); } + @Bean + @StepScope + public GoldenResourceAnnotatingProcessor goldenResourceAnnotatingProcessor() { + return new GoldenResourceAnnotatingProcessor(); + } + } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java new file mode 100644 index 00000000000..c578a71b34f --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java @@ -0,0 +1,98 @@ +package ca.uhn.fhir.jpa.batch.processors; + +/*- + * #%L + * HAPI FHIR JPA Server + * %% + * Copyright (C) 2014 - 2021 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.context.RuntimeSearchParam; +import ca.uhn.fhir.fhirpath.IFhirPath; +import ca.uhn.fhir.jpa.batch.log.Logs; +import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc; +import ca.uhn.fhir.util.ExtensionUtil; +import ca.uhn.fhir.util.SearchParameterUtil; +import org.hl7.fhir.instance.model.api.IBase; +import org.hl7.fhir.instance.model.api.IBaseReference; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IIdType; +import org.slf4j.Logger; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; + +import java.util.List; +import java.util.Optional; + +/** + * Reusable Item Processor which converts ResourcePersistentIds to their IBaseResources + */ +public class GoldenResourceAnnotatingProcessor implements ItemProcessor, List> { + private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); + public static final String ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL = "https://hapifhir.org/associated-patient-golden-resource/"; + + @Value("#{stepExecutionContext['resourceType']}") + private String myResourceType; + + @Autowired + private FhirContext myContext; + + @Autowired + private MdmExpansionCacheSvc myMdmExpansionCacheSvc; + + private RuntimeSearchParam myRuntimeSearchParam; + private IFhirPath myFhirPath; + + + + + @Override + public List process(List theIBaseResources) throws Exception { + if (myRuntimeSearchParam == null) { + myRuntimeSearchParam = SearchParameterUtil.getPatientSearchParamForResourceType(myContext, myResourceType); + } + String path = runtimeSearchParamFhirPath(); + theIBaseResources + .forEach(iBaseResource -> annotateResourceWithRelatedGoldenResourcePatient(path, iBaseResource)); + return theIBaseResources; + } + + private void annotateResourceWithRelatedGoldenResourcePatient(String path, IBaseResource iBaseResource) { + Optional evaluate = getFhirParser().evaluateFirst(iBaseResource, path, IBaseReference.class); + if (evaluate.isPresent()) { + String sourceResourceId = evaluate.get().getReferenceElement().getIdPart(); + ExtensionUtil.setExtension(myContext, iBaseResource, ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL, myMdmExpansionCacheSvc.getGoldenResourceId(sourceResourceId)); + } else { + ourLog.warn("Failed to find the patient compartment information for resource {}", iBaseResource); + } + } + + private IFhirPath getFhirParser() { + if (myFhirPath == null) { + myFhirPath = myContext.newFhirPath(); + } + return myFhirPath; + } + + private String runtimeSearchParamFhirPath() { + if (myRuntimeSearchParam == null) { + myRuntimeSearchParam = SearchParameterUtil.getPatientSearchParamForResourceType(myContext, myResourceType); + } + return myRuntimeSearchParam.getPath(); + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/PidToIBaseResourceProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/PidToIBaseResourceProcessor.java index b392eb2b264..0554b7da31d 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/PidToIBaseResourceProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/PidToIBaseResourceProcessor.java @@ -50,6 +50,7 @@ public class PidToIBaseResourceProcessor implements ItemProcessor searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient"); - if (searchParams == null || searchParams.size() == 0) { - String errorMessage = String.format("Resource type [%s] is not eligible for this type of export, as it contains no Patient compartment, and no `patient` or `subject` search parameter", myResourceType); - throw new IllegalArgumentException(errorMessage); - } else if (searchParams.size() == 1) { - patientSearchParam = searchParams.get(0); - } else { - String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as we are unable to disambiguate which patient search parameter we should be searching by.", myResourceType); - throw new IllegalArgumentException(errorMessage); - } - return patientSearchParam; - } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java index a2b8d804e3b..527ed620a76 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java @@ -21,8 +21,10 @@ package ca.uhn.fhir.jpa.bulk.job; */ import ca.uhn.fhir.jpa.batch.BatchJobsConfig; +import ca.uhn.fhir.jpa.batch.processors.GoldenResourceAnnotatingProcessor; import ca.uhn.fhir.jpa.batch.processors.PidToIBaseResourceProcessor; import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc; +import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc; import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; import org.hl7.fhir.instance.model.api.IBaseResource; import org.springframework.batch.core.Job; @@ -32,13 +34,16 @@ import org.springframework.batch.core.configuration.annotation.JobBuilderFactory import org.springframework.batch.core.configuration.annotation.JobScope; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.support.CompositeItemProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Lazy; +import java.util.ArrayList; import java.util.List; /** @@ -64,11 +69,23 @@ public class BulkExportJobConfig { @Autowired private PidToIBaseResourceProcessor myPidToIBaseResourceProcessor; + @Autowired + private GoldenResourceAnnotatingProcessor myGoldenResourceAnnotatingProcessor; + @Bean public BulkExportDaoSvc bulkExportDaoSvc() { return new BulkExportDaoSvc(); } + + @Bean + @Lazy + @StepScope + public MdmExpansionCacheSvc mdmExpansionCacheSvc() { + return new MdmExpansionCacheSvc(); + } + + @Bean @Lazy public Job bulkExportJob() { @@ -80,6 +97,18 @@ public class BulkExportJobConfig { .build(); } + @Bean + @Lazy + @StepScope + public CompositeItemProcessor, List> inflateResourceThenAnnotateWithGoldenResourceProcessor() { + CompositeItemProcessor processor = new CompositeItemProcessor<>(); + ArrayList delegates = new ArrayList<>(); + delegates.add(myPidToIBaseResourceProcessor); + delegates.add(myGoldenResourceAnnotatingProcessor); + processor.setDelegates(delegates); + return processor; + } + @Bean @Lazy public Job groupBulkExportJob() { @@ -132,7 +161,9 @@ public class BulkExportJobConfig { return myStepBuilderFactory.get("groupBulkExportGenerateResourceFilesStep") ., List> chunk(CHUNK_SIZE) //1000 resources per generated file, as the reader returns 10 resources at a time. .reader(groupBulkItemReader()) - .processor(myPidToIBaseResourceProcessor) +// .processor(myPidToIBaseResourceProcessor) +// .processor(myGoldenResourceAnnotatingProcessor) + .processor(inflateResourceThenAnnotateWithGoldenResourceProcessor()) .writer(resourceToFileWriter()) .listener(bulkExportGenerateResourceFilesStepListener()) .build(); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java index dd03381fa83..d53afe0bacd 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java @@ -27,7 +27,7 @@ import ca.uhn.fhir.jpa.dao.IResultIterator; import ca.uhn.fhir.jpa.dao.ISearchBuilder; import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao; import ca.uhn.fhir.jpa.dao.index.IdHelperService; -import ca.uhn.fhir.jpa.entity.Search; +import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc; import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.util.QueryChunker; @@ -45,6 +45,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -75,6 +77,8 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade private IdHelperService myIdHelperService; @Autowired private IMdmLinkDao myMdmLinkDao; + @Autowired + private MdmExpansionCacheSvc myMdmExpansionCacheSvc; @Override Iterator getResourcePidIterator() { @@ -119,6 +123,7 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId)); Long pidOrNull = myIdHelperService.getPidOrNull(group); List> lists = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); + lists.forEach(patientPidsToExport::addAll); } List resourcePersistentIds = patientPidsToExport @@ -154,13 +159,44 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade //Attempt to perform MDM Expansion of membership if (myMdmEnabled) { - List> goldenPidTargetPidTuple = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); + List> goldenPidTargetPidTuples = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); //Now lets translate these pids into resource IDs - Set uniquePids = new HashSet<>(); - goldenPidTargetPidTuple.forEach(uniquePids::addAll); + System.out.println("ZOOPER"); + goldenPidTargetPidTuples.stream() + .map(Object::toString) + .forEach(list -> System.out.println(String.join(", ", list))); + Set uniquePids = new HashSet<>(); + goldenPidTargetPidTuples.forEach(uniquePids::addAll); Map> pidToForcedIdMap = myIdHelperService.translatePidsToForcedIds(uniquePids); + Map> goldenResourceToSourcePidMap = new HashMap<>(); + for (List goldenPidTargetPidTuple : goldenPidTargetPidTuples) { + Long goldenPid = goldenPidTargetPidTuple.get(0); + Long sourcePid = goldenPidTargetPidTuple.get(1); + + if(!goldenResourceToSourcePidMap.containsKey(goldenPid)) { + goldenResourceToSourcePidMap.put(goldenPid, new HashSet<>()); + } + goldenResourceToSourcePidMap.get(goldenPid).add(sourcePid); + } + Map sourceResourceIdToGoldenResourceIdMap = new HashMap<>(); + + goldenResourceToSourcePidMap.entrySet() + .forEach(entry -> { + Long key = entry.getKey(); + String goldenResourceId = myIdHelperService.translatePidIdToForcedId(new ResourcePersistentId(key)).orElse(key.toString()); + + Map> pidsToForcedIds = myIdHelperService.translatePidsToForcedIds(entry.getValue()); + Set sourceResourceIds = pidsToForcedIds.entrySet().stream() + .map(ent -> ent.getValue().isPresent() ? ent.getValue().get() : ent.getKey().toString()) + .collect(Collectors.toSet()); + + sourceResourceIds + .forEach(sourceResourceId -> sourceResourceIdToGoldenResourceIdMap.put(sourceResourceId, goldenResourceId)); + }); + myMdmExpansionCacheSvc.setCacheContents(sourceResourceIdToGoldenResourceIdMap); + //If the result of the translation is an empty optional, it means there is no forced id, and we can use the PID as the resource ID. Set resolvedResourceIds = pidToForcedIdMap.entrySet().stream() .map(entry -> entry.getValue().isPresent() ? entry.getValue().get() : entry.getKey().toString()) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java new file mode 100644 index 00000000000..b923cc282a2 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java @@ -0,0 +1,37 @@ +package ca.uhn.fhir.jpa.dao.mdm; + +import org.slf4j.Logger; + +import java.util.HashMap; +import java.util.Map; + +import static org.slf4j.LoggerFactory.getLogger; + +public class MdmExpansionCacheSvc { + private static final Logger ourLog = getLogger(MdmExpansionCacheSvc.class); + + private Map mySourceToGoldenIdCache = new HashMap<>(); + + public String getGoldenResourceId(String theSourceId) { + ourLog.info(buildLog("About to lookup cached resource ID " + theSourceId, true)); + return mySourceToGoldenIdCache.get(theSourceId); + } + + public String buildLog(String message, boolean theShowContent) { + StringBuilder builder = new StringBuilder(); + builder.append(message); + if (ourLog.isDebugEnabled() || theShowContent) { + builder.append("\n") + .append("Current cache content is:") + .append("\n"); + mySourceToGoldenIdCache.entrySet().stream().forEach(entry -> builder.append(entry.getKey()).append(" -> ").append(entry.getValue()).append("\n")); + return builder.toString(); + } + return builder.toString(); + + } + + public void setCacheContents(Map theSourceResourceIdToGoldenResourceIdMap) { + this.mySourceToGoldenIdCache = theSourceResourceIdToGoldenResourceIdMap; + } +} diff --git a/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/BatchJobConfig.java b/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/BatchJobConfig.java index 3de13fe7a66..7acf3590585 100644 --- a/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/BatchJobConfig.java +++ b/hapi-fhir-jpaserver-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/BatchJobConfig.java @@ -22,6 +22,7 @@ public class BatchJobConfig { private StepBuilderFactory myStepBuilderFactory; + @Bean public Job testJob() { return myJobBuilderFactory.get("testJob") From cc1861e87691f3539c3e7b2ff4ce7613ac547c8a Mon Sep 17 00:00:00 2001 From: Tadgh Date: Thu, 25 Mar 2021 15:48:03 -0400 Subject: [PATCH 03/12] Update test. --- .../GoldenResourceAnnotatingProcessor.java | 65 ++++++++++++++----- .../jpa/bulk/job/BulkExportJobConfig.java | 2 +- .../jpa/bulk/job/GroupBulkItemReader.java | 45 ++++++------- .../jpa/dao/mdm/MdmExpansionCacheSvc.java | 12 +++- .../jpa/bulk/BulkDataExportSvcImplR4Test.java | 17 +++-- 5 files changed, 95 insertions(+), 46 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java index c578a71b34f..70d732e3326 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java @@ -24,13 +24,14 @@ import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.RuntimeSearchParam; import ca.uhn.fhir.fhirpath.IFhirPath; import ca.uhn.fhir.jpa.batch.log.Logs; +import ca.uhn.fhir.jpa.bulk.job.BulkExportJobConfig; import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc; import ca.uhn.fhir.util.ExtensionUtil; import ca.uhn.fhir.util.SearchParameterUtil; -import org.hl7.fhir.instance.model.api.IBase; +import org.apache.commons.lang3.StringUtils; +import org.hl7.fhir.instance.model.api.IBaseExtension; import org.hl7.fhir.instance.model.api.IBaseReference; import org.hl7.fhir.instance.model.api.IBaseResource; -import org.hl7.fhir.instance.model.api.IIdType; import org.slf4j.Logger; import org.springframework.batch.item.ItemProcessor; import org.springframework.beans.factory.annotation.Autowired; @@ -55,7 +56,11 @@ public class GoldenResourceAnnotatingProcessor implements ItemProcessor process(List theIBaseResources) throws Exception { - if (myRuntimeSearchParam == null) { - myRuntimeSearchParam = SearchParameterUtil.getPatientSearchParamForResourceType(myContext, myResourceType); + if (myMdmEnabled) { + if (myRuntimeSearchParam == null) { + myRuntimeSearchParam = SearchParameterUtil.getPatientSearchParamForResourceType(myContext, myResourceType); + } + String path = runtimeSearchParamFhirPath(); + theIBaseResources.forEach(iBaseResource -> annotateClinicalResourceWithRelatedGoldenResourcePatient(path, iBaseResource)); } - String path = runtimeSearchParamFhirPath(); - theIBaseResources - .forEach(iBaseResource -> annotateResourceWithRelatedGoldenResourcePatient(path, iBaseResource)); + return theIBaseResources; } - private void annotateResourceWithRelatedGoldenResourcePatient(String path, IBaseResource iBaseResource) { - Optional evaluate = getFhirParser().evaluateFirst(iBaseResource, path, IBaseReference.class); - if (evaluate.isPresent()) { - String sourceResourceId = evaluate.get().getReferenceElement().getIdPart(); - ExtensionUtil.setExtension(myContext, iBaseResource, ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL, myMdmExpansionCacheSvc.getGoldenResourceId(sourceResourceId)); + private void annotateClinicalResourceWithRelatedGoldenResourcePatient(String path, IBaseResource iBaseResource) { + Optional patientReference = getPatientReference(path, iBaseResource); + if (patientReference.isPresent()) { + addGoldenResourceExtension(iBaseResource, patientReference.get()); } else { - ourLog.warn("Failed to find the patient compartment information for resource {}", iBaseResource); + ourLog.warn("Failed to find the patient reference information for resource {}", iBaseResource); } } + private Optional getPatientReference(String path, IBaseResource iBaseResource) { + //In the case of patient, we will just use the raw ID. + if (myResourceType.equalsIgnoreCase("Patient")) { + return Optional.of(iBaseResource.getIdElement().getIdPart()); + //Otherwise, we will perform evaluation of the fhirPath. + } else { + Optional optionalReference = getFhirParser().evaluateFirst(iBaseResource, path, IBaseReference.class); + return optionalReference.map(theIBaseReference -> theIBaseReference.getReferenceElement().getIdPart()); + } + } + + private void addGoldenResourceExtension(IBaseResource iBaseResource, String sourceResourceId) { + String goldenResourceId = myMdmExpansionCacheSvc.getGoldenResourceId(sourceResourceId); + if (!StringUtils.isBlank(goldenResourceId)) { + IBaseExtension extension = ExtensionUtil.getOrCreateExtension(iBaseResource, ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL); + ExtensionUtil.setExtension(myContext, extension, "reference", prefixPatient(goldenResourceId)); + } + } + + private String prefixPatient(String theResourceId) { + return "Patient/" + theResourceId; + } + private IFhirPath getFhirParser() { if (myFhirPath == null) { myFhirPath = myContext.newFhirPath(); @@ -90,9 +119,15 @@ public class GoldenResourceAnnotatingProcessor implements ItemProcessor> lists = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); - lists.forEach(patientPidsToExport::addAll); } List resourcePersistentIds = patientPidsToExport @@ -161,11 +159,6 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade if (myMdmEnabled) { List> goldenPidTargetPidTuples = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); //Now lets translate these pids into resource IDs - - System.out.println("ZOOPER"); - goldenPidTargetPidTuples.stream() - .map(Object::toString) - .forEach(list -> System.out.println(String.join(", ", list))); Set uniquePids = new HashSet<>(); goldenPidTargetPidTuples.forEach(uniquePids::addAll); Map> pidToForcedIdMap = myIdHelperService.translatePidsToForcedIds(uniquePids); @@ -180,22 +173,8 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade } goldenResourceToSourcePidMap.get(goldenPid).add(sourcePid); } - Map sourceResourceIdToGoldenResourceIdMap = new HashMap<>(); + populateMdmResourceCacheIfNeeded(goldenResourceToSourcePidMap); - goldenResourceToSourcePidMap.entrySet() - .forEach(entry -> { - Long key = entry.getKey(); - String goldenResourceId = myIdHelperService.translatePidIdToForcedId(new ResourcePersistentId(key)).orElse(key.toString()); - - Map> pidsToForcedIds = myIdHelperService.translatePidsToForcedIds(entry.getValue()); - Set sourceResourceIds = pidsToForcedIds.entrySet().stream() - .map(ent -> ent.getValue().isPresent() ? ent.getValue().get() : ent.getKey().toString()) - .collect(Collectors.toSet()); - - sourceResourceIds - .forEach(sourceResourceId -> sourceResourceIdToGoldenResourceIdMap.put(sourceResourceId, goldenResourceId)); - }); - myMdmExpansionCacheSvc.setCacheContents(sourceResourceIdToGoldenResourceIdMap); //If the result of the translation is an empty optional, it means there is no forced id, and we can use the PID as the resource ID. Set resolvedResourceIds = pidToForcedIdMap.entrySet().stream() @@ -212,6 +191,28 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade return expandedIds; } + private void populateMdmResourceCacheIfNeeded(Map> goldenResourceToSourcePidMap) { + if (myMdmExpansionCacheSvc.hasBeenPopulated()) { + return; + } + Map sourceResourceIdToGoldenResourceIdMap = new HashMap<>(); + + goldenResourceToSourcePidMap.entrySet() + .forEach(entry -> { + Long key = entry.getKey(); + String goldenResourceId = myIdHelperService.translatePidIdToForcedId(new ResourcePersistentId(key)).orElse(key.toString()); + + Map> pidsToForcedIds = myIdHelperService.translatePidsToForcedIds(entry.getValue()); + Set sourceResourceIds = pidsToForcedIds.entrySet().stream() + .map(ent -> ent.getValue().isPresent() ? ent.getValue().get() : ent.getKey().toString()) + .collect(Collectors.toSet()); + + sourceResourceIds + .forEach(sourceResourceId -> sourceResourceIdToGoldenResourceIdMap.put(sourceResourceId, goldenResourceId)); + }); + myMdmExpansionCacheSvc.setCacheContents(sourceResourceIdToGoldenResourceIdMap); + } + private void queryResourceTypeWithReferencesToPatients(Set myReadPids, List idChunk) { //Build SP map //First, inject the _typeFilters and _since from the export job diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java index b923cc282a2..6e80ba7e58b 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java @@ -2,15 +2,15 @@ package ca.uhn.fhir.jpa.dao.mdm; import org.slf4j.Logger; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import static org.slf4j.LoggerFactory.getLogger; public class MdmExpansionCacheSvc { private static final Logger ourLog = getLogger(MdmExpansionCacheSvc.class); - private Map mySourceToGoldenIdCache = new HashMap<>(); + private ConcurrentHashMap mySourceToGoldenIdCache = new ConcurrentHashMap<>(); public String getGoldenResourceId(String theSourceId) { ourLog.info(buildLog("About to lookup cached resource ID " + theSourceId, true)); @@ -32,6 +32,12 @@ public class MdmExpansionCacheSvc { } public void setCacheContents(Map theSourceResourceIdToGoldenResourceIdMap) { - this.mySourceToGoldenIdCache = theSourceResourceIdToGoldenResourceIdMap; + if (mySourceToGoldenIdCache.isEmpty()) { + this.mySourceToGoldenIdCache.putAll(theSourceResourceIdToGoldenResourceIdMap); + } + } + + public boolean hasBeenPopulated() { + return !mySourceToGoldenIdCache.isEmpty(); } } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java index 86d78abd8ae..0e1ea8e3453 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java @@ -6,6 +6,7 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome; import ca.uhn.fhir.jpa.batch.BatchJobsConfig; import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; +import ca.uhn.fhir.jpa.batch.processors.GoldenResourceAnnotatingProcessor; import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions; import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.bulk.job.BulkExportJobParametersBuilder; @@ -646,7 +647,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { // Create a bulk job BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions(); bulkDataExportOptions.setOutputFormat(null); - bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Immunization")); + bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Immunization", "Observation", "Patient")); bulkDataExportOptions.setSince(null); bulkDataExportOptions.setFilters(null); bulkDataExportOptions.setGroupId(myPatientGroupId); @@ -654,22 +655,28 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP); IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); - myBulkDataExportSvc.buildExportFiles(); awaitAllBulkJobCompletions(); IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE)); - assertThat(jobInfo.getFiles().size(), equalTo(1)); + assertThat(jobInfo.getFiles().size(), equalTo(3)); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); // Iterate over the files + String nextContents = getBinaryContents(jobInfo, 0); - assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); + assertThat(nextContents, is(containsString(GoldenResourceAnnotatingProcessor.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL))); - assertThat(nextContents, is(containsString("subject_golden_resource"))); + nextContents = getBinaryContents(jobInfo, 1); + assertThat(jobInfo.getFiles().get(1).getResourceType(), is(equalTo("Observation"))); + assertThat(nextContents, is(containsString(GoldenResourceAnnotatingProcessor.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL))); + + nextContents = getBinaryContents(jobInfo, 2); + assertThat(jobInfo.getFiles().get(2).getResourceType(), is(equalTo("Patient"))); + assertThat(nextContents, is(containsString(GoldenResourceAnnotatingProcessor.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL))); } From a1a3e65106b3fb4433647f408814815973b30cd4 Mon Sep 17 00:00:00 2001 From: Tadgh Date: Fri, 26 Mar 2021 10:20:41 -0400 Subject: [PATCH 04/12] remove dead whitespace --- .../batch/processors/GoldenResourceAnnotatingProcessor.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java index 70d732e3326..11685f118c3 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java @@ -63,9 +63,6 @@ public class GoldenResourceAnnotatingProcessor implements ItemProcessor process(List theIBaseResources) throws Exception { if (myMdmEnabled) { From f7eee5985e9e06d3bb65b682e36946bb3d9c822f Mon Sep 17 00:00:00 2001 From: Tadgh Date: Fri, 26 Mar 2021 11:58:52 -0400 Subject: [PATCH 05/12] Add some logging to figure out CI failure --- .../batch/processors/GoldenResourceAnnotatingProcessor.java | 4 ++++ .../ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java | 1 - 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java index 11685f118c3..0fb7db8c32c 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java @@ -101,6 +101,10 @@ public class GoldenResourceAnnotatingProcessor implements ItemProcessor extension = ExtensionUtil.getOrCreateExtension(iBaseResource, ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL); ExtensionUtil.setExtension(myContext, extension, "reference", prefixPatient(goldenResourceId)); + } else { + IBaseExtension extension = ExtensionUtil.getOrCreateExtension(iBaseResource, "failed-to-associated-golden-id"); + ExtensionUtil.setExtension(myContext, extension, "string", myMdmExpansionCacheSvc.buildLog("not working!", true)); + } } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java index 0e1ea8e3453..e44f0703c6f 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java @@ -677,7 +677,6 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { nextContents = getBinaryContents(jobInfo, 2); assertThat(jobInfo.getFiles().get(2).getResourceType(), is(equalTo("Patient"))); assertThat(nextContents, is(containsString(GoldenResourceAnnotatingProcessor.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL))); - } @Test From bee33f763a5016b9588449b8d386bcc98f898962 Mon Sep 17 00:00:00 2001 From: Tadgh Date: Fri, 26 Mar 2021 15:54:27 -0400 Subject: [PATCH 06/12] Refactors, tidying. Improving docs --- .../ca/uhn/fhir/util/SearchParameterUtil.java | 31 +++++--- .../GoldenResourceAnnotatingProcessor.java | 48 +++++++----- .../fhir/jpa/bulk/job/BaseBulkItemReader.java | 15 ++-- .../jpa/bulk/job/GroupBulkItemReader.java | 76 +++++++++++++------ .../jpa/dao/mdm/MdmExpansionCacheSvc.java | 38 ++++++++-- .../jpa/bulk/BulkDataExportSvcImplR4Test.java | 1 - ...rceProviderR4ValueSetNoVerCSNoVerTest.java | 1 - 7 files changed, 144 insertions(+), 66 deletions(-) diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java index 047525705e5..c225af57251 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java @@ -32,6 +32,7 @@ import org.hl7.fhir.instance.model.api.IPrimitiveType; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; +import java.util.Optional; public class SearchParameterUtil { @@ -59,27 +60,29 @@ public class SearchParameterUtil { * 3.1 If that returns >1 result, throw an error * 3.2 If that returns 1 result, return it */ - public static RuntimeSearchParam getPatientSearchParamForResourceType(FhirContext theFhirContext, String theResourceType) { + public static Optional getOnlyPatientSearchParamForResourceType(FhirContext theFhirContext, String theResourceType) { RuntimeSearchParam myPatientSearchParam = null; RuntimeResourceDefinition runtimeResourceDefinition = theFhirContext.getResourceDefinition(theResourceType); myPatientSearchParam = runtimeResourceDefinition.getSearchParam("patient"); if (myPatientSearchParam == null) { myPatientSearchParam = runtimeResourceDefinition.getSearchParam("subject"); if (myPatientSearchParam == null) { - myPatientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition); - if (myPatientSearchParam == null) { - String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", theResourceType); - throw new IllegalArgumentException(errorMessage); - } + myPatientSearchParam = getOnlyPatientCompartmentRuntimeSearchParam(runtimeResourceDefinition); } } - return myPatientSearchParam; + return Optional.ofNullable(myPatientSearchParam); } + /** * Search the resource definition for a compartment named 'patient' and return its related Search Parameter. */ - public static RuntimeSearchParam getRuntimeSearchParamByCompartment(RuntimeResourceDefinition runtimeResourceDefinition) { + public static RuntimeSearchParam getOnlyPatientCompartmentRuntimeSearchParam(FhirContext theFhirContext, String theResourceType) { + RuntimeResourceDefinition resourceDefinition = theFhirContext.getResourceDefinition(theResourceType); + return getOnlyPatientCompartmentRuntimeSearchParam(resourceDefinition); + } + + public static RuntimeSearchParam getOnlyPatientCompartmentRuntimeSearchParam(RuntimeResourceDefinition runtimeResourceDefinition) { RuntimeSearchParam patientSearchParam; List searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient"); if (searchParams == null || searchParams.size() == 0) { @@ -88,11 +91,21 @@ public class SearchParameterUtil { } else if (searchParams.size() == 1) { patientSearchParam = searchParams.get(0); } else { - String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as we are unable to disambiguate which patient search parameter we should be searching by.", runtimeResourceDefinition.getId()); + String errorMessage = String.format("Resource type %s has more than one Search Param which references a patient compartment. We are unable to disambiguate which patient search parameter we should be searching by.", runtimeResourceDefinition.getId()); throw new IllegalArgumentException(errorMessage); } return patientSearchParam; + } + public static List getAllPatientCompartmentRuntimeSearchParams(FhirContext theFhirContext, String theResourceType) { + RuntimeResourceDefinition runtimeResourceDefinition = theFhirContext.getResourceDefinition(theResourceType); + return getAllPatientCompartmentRuntimeSearchParams(runtimeResourceDefinition); + + } + + private static List getAllPatientCompartmentRuntimeSearchParams(RuntimeResourceDefinition theRuntimeResourceDefinition) { + List patient = theRuntimeResourceDefinition.getSearchParamsForCompartmentName("Patient"); + return patient; } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java index 0fb7db8c32c..0dc489a7f36 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java @@ -41,7 +41,8 @@ import java.util.List; import java.util.Optional; /** - * Reusable Item Processor which converts ResourcePersistentIds to their IBaseResources + * Reusable Item Processor which attaches an extension to any outgoing resource. This extension will contain a resource + * reference to the golden resource patient of the given resources' patient. (e.g. Observation.subject, Immunization.patient, etc) */ public class GoldenResourceAnnotatingProcessor implements ItemProcessor, List> { private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); @@ -59,25 +60,40 @@ public class GoldenResourceAnnotatingProcessor implements ItemProcessor oPatientSearchParam= SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, myResourceType); + if (!oPatientSearchParam.isPresent()) { + String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", myResourceType); + throw new IllegalArgumentException(errorMessage); + } else { + myRuntimeSearchParam = oPatientSearchParam.get(); + } + } + @Override public List process(List theIBaseResources) throws Exception { + //If MDM expansion is enabled, add this magic new extension, otherwise, return the resource as is. if (myMdmEnabled) { if (myRuntimeSearchParam == null) { - myRuntimeSearchParam = SearchParameterUtil.getPatientSearchParamForResourceType(myContext, myResourceType); + populateRuntimeSearchParam(); } - String path = runtimeSearchParamFhirPath(); - theIBaseResources.forEach(iBaseResource -> annotateClinicalResourceWithRelatedGoldenResourcePatient(path, iBaseResource)); + if (myPatientFhirPath == null) { + populatePatientFhirPath(); + } + theIBaseResources.forEach(this::annotateClinicalResourceWithRelatedGoldenResourcePatient); } - return theIBaseResources; } - private void annotateClinicalResourceWithRelatedGoldenResourcePatient(String path, IBaseResource iBaseResource) { - Optional patientReference = getPatientReference(path, iBaseResource); + private void annotateClinicalResourceWithRelatedGoldenResourcePatient(IBaseResource iBaseResource) { + Optional patientReference = getPatientReference(iBaseResource); if (patientReference.isPresent()) { addGoldenResourceExtension(iBaseResource, patientReference.get()); } else { @@ -85,26 +101,24 @@ public class GoldenResourceAnnotatingProcessor implements ItemProcessor getPatientReference(String path, IBaseResource iBaseResource) { + private Optional getPatientReference(IBaseResource iBaseResource) { //In the case of patient, we will just use the raw ID. if (myResourceType.equalsIgnoreCase("Patient")) { return Optional.of(iBaseResource.getIdElement().getIdPart()); //Otherwise, we will perform evaluation of the fhirPath. } else { - Optional optionalReference = getFhirParser().evaluateFirst(iBaseResource, path, IBaseReference.class); + Optional optionalReference = getFhirParser().evaluateFirst(iBaseResource, myPatientFhirPath, IBaseReference.class); return optionalReference.map(theIBaseReference -> theIBaseReference.getReferenceElement().getIdPart()); } } private void addGoldenResourceExtension(IBaseResource iBaseResource, String sourceResourceId) { String goldenResourceId = myMdmExpansionCacheSvc.getGoldenResourceId(sourceResourceId); + IBaseExtension extension = ExtensionUtil.getOrCreateExtension(iBaseResource, ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL); if (!StringUtils.isBlank(goldenResourceId)) { - IBaseExtension extension = ExtensionUtil.getOrCreateExtension(iBaseResource, ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL); ExtensionUtil.setExtension(myContext, extension, "reference", prefixPatient(goldenResourceId)); } else { - IBaseExtension extension = ExtensionUtil.getOrCreateExtension(iBaseResource, "failed-to-associated-golden-id"); - ExtensionUtil.setExtension(myContext, extension, "string", myMdmExpansionCacheSvc.buildLog("not working!", true)); - + ExtensionUtil.setExtension(myContext, extension, "string", "This patient has no matched golden resource."); } } @@ -119,12 +133,12 @@ public class GoldenResourceAnnotatingProcessor implements ItemProcessor1 result, throw an error - * 3.2 If that returns 1 result, return it - */ protected RuntimeSearchParam getPatientSearchParamForCurrentResourceType() { if (myPatientSearchParam == null) { - myPatientSearchParam = SearchParameterUtil.getPatientSearchParamForResourceType(myContext, myResourceType); + Optional onlyPatientSearchParamForResourceType = SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, myResourceType); + if (onlyPatientSearchParamForResourceType.isPresent()) { + myPatientSearchParam = onlyPatientSearchParamForResourceType.get(); + } else { + + } } return myPatientSearchParam; } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java index a00e54a18a6..f90978a9d26 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java @@ -112,17 +112,17 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade * possibly expanded by MDM, and don't have to go and fetch other resource DAOs. */ private Iterator getExpandedPatientIterator() { - Set patientPidsToExport = new HashSet<>(); List members = getMembers(); List ids = members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList()); List pidsOrThrowException = myIdHelperService.getPidsOrThrowException(ids); - patientPidsToExport.addAll(pidsOrThrowException); + Set patientPidsToExport = new HashSet<>(pidsOrThrowException); if (myMdmEnabled) { IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId)); Long pidOrNull = myIdHelperService.getPidOrNull(group); - List> lists = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); - lists.forEach(patientPidsToExport::addAll); + List> goldenPidSourcePidTuple = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); + goldenPidSourcePidTuple.forEach(patientPidsToExport::addAll); + populateMdmResourceCache(goldenPidSourcePidTuple); } List resourcePersistentIds = patientPidsToExport .stream() @@ -131,6 +131,53 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade return resourcePersistentIds.iterator(); } + /** + * @param thePidTuples + */ + private void populateMdmResourceCache(List> thePidTuples) { + if (myMdmExpansionCacheSvc.hasBeenPopulated()) { + return; + } + //First, convert this zipped set of tuples to a map of + //{ + // patient/gold-1 -> [patient/1, patient/2] + // patient/gold-2 -> [patient/3, patient/4] + //} + Map> goldenResourceToSourcePidMap = new HashMap<>(); + for (List goldenPidTargetPidTuple : thePidTuples) { + Long goldenPid = goldenPidTargetPidTuple.get(0); + Long sourcePid = goldenPidTargetPidTuple.get(1); + + if(!goldenResourceToSourcePidMap.containsKey(goldenPid)) { + goldenResourceToSourcePidMap.put(goldenPid, new HashSet<>()); + } + goldenResourceToSourcePidMap.get(goldenPid).add(sourcePid); + } + + //Next, lets convert it to an inverted index for fast lookup + // { + // patient/1 -> patient/gold-1 + // patient/2 -> patient/gold-1 + // patient/3 -> patient/gold-2 + // patient/4 -> patient/gold-2 + // } + Map sourceResourceIdToGoldenResourceIdMap = new HashMap<>(); + goldenResourceToSourcePidMap.forEach((key, value) -> { + String goldenResourceId = myIdHelperService.translatePidIdToForcedId(new ResourcePersistentId(key)).orElse(key.toString()); + Map> pidsToForcedIds = myIdHelperService.translatePidsToForcedIds(value); + + Set sourceResourceIds = pidsToForcedIds.entrySet().stream() + .map(ent -> ent.getValue().isPresent() ? ent.getValue().get() : ent.getKey().toString()) + .collect(Collectors.toSet()); + + sourceResourceIds + .forEach(sourceResourceId -> sourceResourceIdToGoldenResourceIdMap.put(sourceResourceId, goldenResourceId)); + }); + + //Now that we have built our cached expansion, store it. + myMdmExpansionCacheSvc.setCacheContents(sourceResourceIdToGoldenResourceIdMap); + } + /** * Given the local myGroupId, read this group, and find all members' patient references. * @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"] @@ -173,8 +220,7 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade } goldenResourceToSourcePidMap.get(goldenPid).add(sourcePid); } - populateMdmResourceCacheIfNeeded(goldenResourceToSourcePidMap); - + populateMdmResourceCache(goldenPidTargetPidTuples); //If the result of the translation is an empty optional, it means there is no forced id, and we can use the PID as the resource ID. Set resolvedResourceIds = pidToForcedIdMap.entrySet().stream() @@ -192,25 +238,7 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade } private void populateMdmResourceCacheIfNeeded(Map> goldenResourceToSourcePidMap) { - if (myMdmExpansionCacheSvc.hasBeenPopulated()) { - return; - } - Map sourceResourceIdToGoldenResourceIdMap = new HashMap<>(); - goldenResourceToSourcePidMap.entrySet() - .forEach(entry -> { - Long key = entry.getKey(); - String goldenResourceId = myIdHelperService.translatePidIdToForcedId(new ResourcePersistentId(key)).orElse(key.toString()); - - Map> pidsToForcedIds = myIdHelperService.translatePidsToForcedIds(entry.getValue()); - Set sourceResourceIds = pidsToForcedIds.entrySet().stream() - .map(ent -> ent.getValue().isPresent() ? ent.getValue().get() : ent.getKey().toString()) - .collect(Collectors.toSet()); - - sourceResourceIds - .forEach(sourceResourceId -> sourceResourceIdToGoldenResourceIdMap.put(sourceResourceId, goldenResourceId)); - }); - myMdmExpansionCacheSvc.setCacheContents(sourceResourceIdToGoldenResourceIdMap); } private void queryResourceTypeWithReferencesToPatients(Set myReadPids, List idChunk) { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java index 6e80ba7e58b..19bc25dd810 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java @@ -1,5 +1,6 @@ package ca.uhn.fhir.jpa.dao.mdm; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import java.util.Map; @@ -7,20 +8,47 @@ import java.util.concurrent.ConcurrentHashMap; import static org.slf4j.LoggerFactory.getLogger; +/** + * The purpose of this class is to share context between steps of a given GroupBulkExport job. + * + * This cache allows you to port state between reader/processor/writer. In this case, we are maintaining + * a cache of Source Resource ID -> Golden Resource ID, so that we can annotate outgoing resources with their golden owner + * if applicable. + * + */ public class MdmExpansionCacheSvc { private static final Logger ourLog = getLogger(MdmExpansionCacheSvc.class); - private ConcurrentHashMap mySourceToGoldenIdCache = new ConcurrentHashMap<>(); + private final ConcurrentHashMap mySourceToGoldenIdCache = new ConcurrentHashMap<>(); public String getGoldenResourceId(String theSourceId) { - ourLog.info(buildLog("About to lookup cached resource ID " + theSourceId, true)); - return mySourceToGoldenIdCache.get(theSourceId); + ourLog.debug(buildLogMessage("About to lookup cached resource ID " + theSourceId)); + String goldenResourceId = mySourceToGoldenIdCache.get(theSourceId); + + //A golden resources' golden resource ID is itself. + if (StringUtils.isBlank(goldenResourceId)) { + if (mySourceToGoldenIdCache.containsValue(theSourceId)) { + goldenResourceId = theSourceId; + } + } + return goldenResourceId; } - public String buildLog(String message, boolean theShowContent) { + public String buildLogMessage(String theMessage) { + return buildLogMessage(theMessage, false); + } + + /** + * Builds a log message, potentially enriched with the cache content. + * + * @param message The log message + * @param theAddCacheContentContent If true, will annotate the log message with the current cache contents. + * @return a built log message, which may include the cache content. + */ + public String buildLogMessage(String message, boolean theAddCacheContentContent) { StringBuilder builder = new StringBuilder(); builder.append(message); - if (ourLog.isDebugEnabled() || theShowContent) { + if (ourLog.isDebugEnabled() || theAddCacheContentContent) { builder.append("\n") .append("Current cache content is:") .append("\n"); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java index e44f0703c6f..a6d1ec08287 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java @@ -1052,7 +1052,6 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { //Non-cached should all have unique IDs List jobIds = Stream.of(jobInfo5, jobInfo6, jobInfo7, jobInfo8, jobInfo9).map(IBulkDataExportSvc.JobInfo::getJobId).collect(Collectors.toList()); - ourLog.info("ZOOP {}", String.join(", ", jobIds)); Set uniqueJobIds = new HashSet<>(jobIds); assertEquals(uniqueJobIds.size(), jobIds.size()); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4ValueSetNoVerCSNoVerTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4ValueSetNoVerCSNoVerTest.java index 9a1794f65be..500eba52486 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4ValueSetNoVerCSNoVerTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4ValueSetNoVerCSNoVerTest.java @@ -541,7 +541,6 @@ public class ResourceProviderR4ValueSetNoVerCSNoVerTest extends BaseResourceProv ValueSet expanded = (ValueSet) respParam.getParameter().get(0).getResource(); String resp = myFhirCtx.newXmlParser().setPrettyPrint(true).encodeResourceToString(expanded); - ourLog.info("zoop"); ourLog.info(resp); assertThat(resp, is(containsStringIgnoringCase(""))); From a3fbd3e1a1817b3883de6092ec5e3238d9651523 Mon Sep 17 00:00:00 2001 From: Tadgh Date: Fri, 26 Mar 2021 16:00:06 -0400 Subject: [PATCH 07/12] Hotter docs --- .../java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java | 2 -- .../java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java | 4 ---- .../java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java | 7 +++++++ 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java index 8a495496f23..22a11abcbdc 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java @@ -161,8 +161,6 @@ public class BulkExportJobConfig { return myStepBuilderFactory.get("groupBulkExportGenerateResourceFilesStep") ., List> chunk(CHUNK_SIZE) //1000 resources per generated file, as the reader returns 10 resources at a time. .reader(groupBulkItemReader()) -// .processor(myPidToIBaseResourceProcessor) -// .processor(myGoldenResourceAnnotatingProcessor) .processor(inflateResourceThenAnnotateWithGoldenResourceProcessor()) .writer(resourceToFileWriter()) .listener(bulkExportGenerateResourceFilesStepListener()) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java index f90978a9d26..a5515667469 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java @@ -237,10 +237,6 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade return expandedIds; } - private void populateMdmResourceCacheIfNeeded(Map> goldenResourceToSourcePidMap) { - - } - private void queryResourceTypeWithReferencesToPatients(Set myReadPids, List idChunk) { //Build SP map //First, inject the _typeFilters and _since from the export job diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java index 19bc25dd810..7ad00390ae8 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java @@ -21,6 +21,13 @@ public class MdmExpansionCacheSvc { private final ConcurrentHashMap mySourceToGoldenIdCache = new ConcurrentHashMap<>(); + /** + * Lookup a given resource's golden resource ID in the cache. Note that if you pass this function the resource ID of a + * golden resource, it will just return itself. + * + * @param theSourceId the resource ID of the source resource ,e.g. PAT123 + * @return the resource ID of the associated golden resource. + */ public String getGoldenResourceId(String theSourceId) { ourLog.debug(buildLogMessage("About to lookup cached resource ID " + theSourceId)); String goldenResourceId = mySourceToGoldenIdCache.get(theSourceId); From e8f49f2fba86fa371852eb55d9558c704d664338 Mon Sep 17 00:00:00 2001 From: Tadgh Date: Fri, 26 Mar 2021 16:01:29 -0400 Subject: [PATCH 08/12] More docs --- .../ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java index 7ad00390ae8..b4315677fec 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/mdm/MdmExpansionCacheSvc.java @@ -41,7 +41,7 @@ public class MdmExpansionCacheSvc { return goldenResourceId; } - public String buildLogMessage(String theMessage) { + private String buildLogMessage(String theMessage) { return buildLogMessage(theMessage, false); } @@ -66,12 +66,21 @@ public class MdmExpansionCacheSvc { } + /** + * Populate the cache + * + * @param theSourceResourceIdToGoldenResourceIdMap the source ID -> golden ID map to populate the cache with. + */ public void setCacheContents(Map theSourceResourceIdToGoldenResourceIdMap) { if (mySourceToGoldenIdCache.isEmpty()) { this.mySourceToGoldenIdCache.putAll(theSourceResourceIdToGoldenResourceIdMap); } } + /** + * Since this cache is used at @JobScope, we can skip a whole whack of expansions happening by simply checking + * if one of our child steps has populated the cache yet. . + */ public boolean hasBeenPopulated() { return !mySourceToGoldenIdCache.isEmpty(); } From 002d3fcf3c25ee781f2fba7acd697157834d4b8b Mon Sep 17 00:00:00 2001 From: Tadgh Date: Mon, 29 Mar 2021 11:07:23 -0400 Subject: [PATCH 09/12] Migrate constant --- .../main/java/ca/uhn/fhir/util/HapiExtensions.java | 5 +++++ .../GoldenResourceAnnotatingProcessor.java | 4 ++-- .../fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java | 12 ++++-------- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/HapiExtensions.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/HapiExtensions.java index 3d5d052ac78..f2cf7903407 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/HapiExtensions.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/HapiExtensions.java @@ -112,6 +112,11 @@ public class HapiExtensions { public static final String EXT_RESOURCE_PLACEHOLDER = "http://hapifhir.io/fhir/StructureDefinition/resource-placeholder"; /** + * URL for extension in a Group Bulk Export which identifies the golden patient of a given exported resource. + */ + public static final String ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL = "https://hapifhir.org/associated-patient-golden-resource/"; + + /** * Non instantiable */ private HapiExtensions() { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java index 0dc489a7f36..2013cce4af7 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java @@ -27,6 +27,7 @@ import ca.uhn.fhir.jpa.batch.log.Logs; import ca.uhn.fhir.jpa.bulk.job.BulkExportJobConfig; import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc; import ca.uhn.fhir.util.ExtensionUtil; +import ca.uhn.fhir.util.HapiExtensions; import ca.uhn.fhir.util.SearchParameterUtil; import org.apache.commons.lang3.StringUtils; import org.hl7.fhir.instance.model.api.IBaseExtension; @@ -46,7 +47,6 @@ import java.util.Optional; */ public class GoldenResourceAnnotatingProcessor implements ItemProcessor, List> { private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); - public static final String ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL = "https://hapifhir.org/associated-patient-golden-resource/"; @Value("#{stepExecutionContext['resourceType']}") private String myResourceType; @@ -114,7 +114,7 @@ public class GoldenResourceAnnotatingProcessor implements ItemProcessor extension = ExtensionUtil.getOrCreateExtension(iBaseResource, ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL); + IBaseExtension extension = ExtensionUtil.getOrCreateExtension(iBaseResource, HapiExtensions.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL); if (!StringUtils.isBlank(goldenResourceId)) { ExtensionUtil.setExtension(myContext, extension, "reference", prefixPatient(goldenResourceId)); } else { diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java index a6d1ec08287..3145072d546 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java @@ -6,7 +6,6 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome; import ca.uhn.fhir.jpa.batch.BatchJobsConfig; import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; -import ca.uhn.fhir.jpa.batch.processors.GoldenResourceAnnotatingProcessor; import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions; import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.bulk.job.BulkExportJobParametersBuilder; @@ -20,12 +19,11 @@ import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity; import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity; import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; import ca.uhn.fhir.jpa.entity.MdmLink; -import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.mdm.api.MdmLinkSourceEnum; import ca.uhn.fhir.mdm.api.MdmMatchResultEnum; import ca.uhn.fhir.rest.api.Constants; -import ca.uhn.fhir.rest.api.MethodOutcome; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; +import ca.uhn.fhir.util.HapiExtensions; import ca.uhn.fhir.util.UrlUtil; import com.google.common.base.Charsets; import com.google.common.collect.Sets; @@ -40,7 +38,6 @@ import org.hl7.fhir.r4.model.Group; import org.hl7.fhir.r4.model.Immunization; import org.hl7.fhir.r4.model.InstantType; import org.hl7.fhir.r4.model.Observation; -import org.hl7.fhir.r4.model.Parameters; import org.hl7.fhir.r4.model.Patient; import org.hl7.fhir.r4.model.Reference; import org.junit.jupiter.api.Test; @@ -56,7 +53,6 @@ import org.springframework.batch.core.explore.JobExplorer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; -import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Date; @@ -668,15 +664,15 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { String nextContents = getBinaryContents(jobInfo, 0); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); - assertThat(nextContents, is(containsString(GoldenResourceAnnotatingProcessor.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL))); + assertThat(nextContents, is(containsString(HapiExtensions.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL))); nextContents = getBinaryContents(jobInfo, 1); assertThat(jobInfo.getFiles().get(1).getResourceType(), is(equalTo("Observation"))); - assertThat(nextContents, is(containsString(GoldenResourceAnnotatingProcessor.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL))); + assertThat(nextContents, is(containsString(HapiExtensions.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL))); nextContents = getBinaryContents(jobInfo, 2); assertThat(jobInfo.getFiles().get(2).getResourceType(), is(equalTo("Patient"))); - assertThat(nextContents, is(containsString(GoldenResourceAnnotatingProcessor.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL))); + assertThat(nextContents, is(containsString(HapiExtensions.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL))); } @Test From c46617fefa2568d08223d28299f1174c60a5ad95 Mon Sep 17 00:00:00 2001 From: Tadgh Date: Mon, 29 Mar 2021 11:38:52 -0400 Subject: [PATCH 10/12] Use dynamic projection --- .../GoldenResourceAnnotatingProcessor.java | 2 -- .../jpa/bulk/job/GroupBulkItemReader.java | 28 +++++++++++-------- .../ca/uhn/fhir/jpa/dao/data/IMdmLinkDao.java | 11 ++++++-- 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java index 2013cce4af7..0fb79d6505d 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java @@ -117,8 +117,6 @@ public class GoldenResourceAnnotatingProcessor implements ItemProcessor extension = ExtensionUtil.getOrCreateExtension(iBaseResource, HapiExtensions.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL); if (!StringUtils.isBlank(goldenResourceId)) { ExtensionUtil.setExtension(myContext, extension, "reference", prefixPatient(goldenResourceId)); - } else { - ExtensionUtil.setExtension(myContext, extension, "string", "This patient has no matched golden resource."); } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java index a5515667469..cbd27398a49 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java @@ -120,8 +120,11 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade if (myMdmEnabled) { IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId)); Long pidOrNull = myIdHelperService.getPidOrNull(group); - List> goldenPidSourcePidTuple = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); - goldenPidSourcePidTuple.forEach(patientPidsToExport::addAll); + List goldenPidSourcePidTuple = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); + goldenPidSourcePidTuple.forEach(tuple -> { + patientPidsToExport.add(tuple.getGoldenPid()); + patientPidsToExport.add(tuple.getSourcePid()); + }); populateMdmResourceCache(goldenPidSourcePidTuple); } List resourcePersistentIds = patientPidsToExport @@ -134,7 +137,7 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade /** * @param thePidTuples */ - private void populateMdmResourceCache(List> thePidTuples) { + private void populateMdmResourceCache(List thePidTuples) { if (myMdmExpansionCacheSvc.hasBeenPopulated()) { return; } @@ -144,9 +147,9 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade // patient/gold-2 -> [patient/3, patient/4] //} Map> goldenResourceToSourcePidMap = new HashMap<>(); - for (List goldenPidTargetPidTuple : thePidTuples) { - Long goldenPid = goldenPidTargetPidTuple.get(0); - Long sourcePid = goldenPidTargetPidTuple.get(1); + for (IMdmLinkDao.MdmPidTuple goldenPidTargetPidTuple : thePidTuples) { + Long goldenPid = goldenPidTargetPidTuple.getGoldenPid(); + Long sourcePid = goldenPidTargetPidTuple.getSourcePid(); if(!goldenResourceToSourcePidMap.containsKey(goldenPid)) { goldenResourceToSourcePidMap.put(goldenPid, new HashSet<>()); @@ -204,16 +207,19 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade //Attempt to perform MDM Expansion of membership if (myMdmEnabled) { - List> goldenPidTargetPidTuples = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); + List goldenPidTargetPidTuples = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); //Now lets translate these pids into resource IDs Set uniquePids = new HashSet<>(); - goldenPidTargetPidTuples.forEach(uniquePids::addAll); + goldenPidTargetPidTuples.forEach(tuple -> { + uniquePids.add(tuple.getGoldenPid()); + uniquePids.add(tuple.getSourcePid()); + }); Map> pidToForcedIdMap = myIdHelperService.translatePidsToForcedIds(uniquePids); Map> goldenResourceToSourcePidMap = new HashMap<>(); - for (List goldenPidTargetPidTuple : goldenPidTargetPidTuples) { - Long goldenPid = goldenPidTargetPidTuple.get(0); - Long sourcePid = goldenPidTargetPidTuple.get(1); + for (IMdmLinkDao.MdmPidTuple goldenPidTargetPidTuple : goldenPidTargetPidTuples) { + Long goldenPid = goldenPidTargetPidTuple.getGoldenPid(); + Long sourcePid = goldenPidTargetPidTuple.getSourcePid(); if(!goldenResourceToSourcePidMap.containsKey(goldenPid)) { goldenResourceToSourcePidMap.put(goldenPid, new HashSet<>()); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IMdmLinkDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IMdmLinkDao.java index 7ad98e46e11..53814c503b7 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IMdmLinkDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IMdmLinkDao.java @@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.dao.data; import ca.uhn.fhir.mdm.api.MdmMatchResultEnum; import ca.uhn.fhir.jpa.entity.MdmLink; +import org.springframework.beans.factory.annotation.Value; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; @@ -40,7 +41,7 @@ public interface IMdmLinkDao extends JpaRepository { @Query("DELETE FROM MdmLink f WHERE (myGoldenResourcePid = :pid OR mySourcePid = :pid) AND myMatchResult <> :matchResult") int deleteWithAnyReferenceToPidAndMatchResultNot(@Param("pid") Long thePid, @Param("matchResult") MdmMatchResultEnum theMatchResult); - @Query("SELECT ml2.myGoldenResourcePid, ml2.mySourcePid FROM MdmLink ml2 " + + @Query("SELECT ml2.myGoldenResourcePid as goldenPid, ml2.mySourcePid as sourcePid FROM MdmLink ml2 " + "WHERE ml2.myMatchResult=:matchResult " + "AND ml2.myGoldenResourcePid IN (" + "SELECT ml.myGoldenResourcePid FROM MdmLink ml " + @@ -50,5 +51,11 @@ public interface IMdmLinkDao extends JpaRepository { "AND hrl.mySourcePath='Group.member.entity' " + "AND hrl.myTargetResourceType='Patient'" + ")") - List> expandPidsFromGroupPidGivenMatchResult(@Param("groupPid") Long theGroupPid, @Param("matchResult") MdmMatchResultEnum theMdmMatchResultEnum); + List expandPidsFromGroupPidGivenMatchResult(@Param("groupPid") Long theGroupPid, @Param("matchResult") MdmMatchResultEnum theMdmMatchResultEnum); + + interface MdmPidTuple { + Long getGoldenPid(); + Long getSourcePid(); + } + } From 19810db65de467f2e27e1ea41669ef7126b1d913 Mon Sep 17 00:00:00 2001 From: Tadgh Date: Mon, 29 Mar 2021 14:55:41 -0400 Subject: [PATCH 11/12] Test refactor. address review comments --- .../GoldenResourceAnnotatingProcessor.java | 3 +- .../jpa/bulk/job/GroupBulkItemReader.java | 29 +++++-------- .../jpa/bulk/BulkDataExportSvcImplR4Test.java | 42 ++++++++++++++----- 3 files changed, 44 insertions(+), 30 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java index 0fb79d6505d..4566aa83af3 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/GoldenResourceAnnotatingProcessor.java @@ -97,7 +97,8 @@ public class GoldenResourceAnnotatingProcessor implements ItemProcessor [patient/3, patient/4] //} Map> goldenResourceToSourcePidMap = new HashMap<>(); - for (IMdmLinkDao.MdmPidTuple goldenPidTargetPidTuple : thePidTuples) { - Long goldenPid = goldenPidTargetPidTuple.getGoldenPid(); - Long sourcePid = goldenPidTargetPidTuple.getSourcePid(); - - if(!goldenResourceToSourcePidMap.containsKey(goldenPid)) { - goldenResourceToSourcePidMap.put(goldenPid, new HashSet<>()); - } - goldenResourceToSourcePidMap.get(goldenPid).add(sourcePid); - } + extract(thePidTuples, goldenResourceToSourcePidMap); //Next, lets convert it to an inverted index for fast lookup // { @@ -217,15 +210,7 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade Map> pidToForcedIdMap = myIdHelperService.translatePidsToForcedIds(uniquePids); Map> goldenResourceToSourcePidMap = new HashMap<>(); - for (IMdmLinkDao.MdmPidTuple goldenPidTargetPidTuple : goldenPidTargetPidTuples) { - Long goldenPid = goldenPidTargetPidTuple.getGoldenPid(); - Long sourcePid = goldenPidTargetPidTuple.getSourcePid(); - - if(!goldenResourceToSourcePidMap.containsKey(goldenPid)) { - goldenResourceToSourcePidMap.put(goldenPid, new HashSet<>()); - } - goldenResourceToSourcePidMap.get(goldenPid).add(sourcePid); - } + extract(goldenPidTargetPidTuples, goldenResourceToSourcePidMap); populateMdmResourceCache(goldenPidTargetPidTuples); //If the result of the translation is an empty optional, it means there is no forced id, and we can use the PID as the resource ID. @@ -243,6 +228,14 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade return expandedIds; } + private void extract(List theGoldenPidTargetPidTuples, Map> theGoldenResourceToSourcePidMap) { + for (IMdmLinkDao.MdmPidTuple goldenPidTargetPidTuple : theGoldenPidTargetPidTuples) { + Long goldenPid = goldenPidTargetPidTuple.getGoldenPid(); + Long sourcePid = goldenPidTargetPidTuple.getSourcePid(); + theGoldenResourceToSourcePidMap.computeIfAbsent(goldenPid, key -> new HashSet<>()).add(sourcePid); + } + } + private void queryResourceTypeWithReferencesToPatients(Set myReadPids, List idChunk) { //Build SP map //First, inject the _typeFilters and _since from the export job diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java index 3145072d546..b1cc427546e 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java @@ -21,6 +21,7 @@ import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; import ca.uhn.fhir.jpa.entity.MdmLink; import ca.uhn.fhir.mdm.api.MdmLinkSourceEnum; import ca.uhn.fhir.mdm.api.MdmMatchResultEnum; +import ca.uhn.fhir.parser.IParser; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.util.HapiExtensions; @@ -29,11 +30,13 @@ import com.google.common.base.Charsets; import com.google.common.collect.Sets; import org.apache.commons.lang3.time.DateUtils; import org.hamcrest.Matchers; +import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.r4.model.Binary; import org.hl7.fhir.r4.model.CareTeam; import org.hl7.fhir.r4.model.CodeableConcept; import org.hl7.fhir.r4.model.Enumerations; +import org.hl7.fhir.r4.model.Extension; import org.hl7.fhir.r4.model.Group; import org.hl7.fhir.r4.model.Immunization; import org.hl7.fhir.r4.model.InstantType; @@ -643,7 +646,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { // Create a bulk job BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions(); bulkDataExportOptions.setOutputFormat(null); - bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Immunization", "Observation", "Patient")); + bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Immunization", "Patient")); bulkDataExportOptions.setSince(null); bulkDataExportOptions.setFilters(null); bulkDataExportOptions.setGroupId(myPatientGroupId); @@ -660,19 +663,36 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { assertThat(jobInfo.getFiles().size(), equalTo(3)); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); - // Iterate over the files - - String nextContents = getBinaryContents(jobInfo, 0); + //Ensure that all immunizations refer to the golden resource via extension assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); - assertThat(nextContents, is(containsString(HapiExtensions.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL))); + List immunizations = readBulkExportContentsIntoResources(getBinaryContents(jobInfo, 0), Immunization.class); + immunizations + .stream().filter(immu -> !immu.getIdElement().getIdPart().equals("PAT999"))//Skip the golden resource + .forEach(immunization -> { + Extension extensionByUrl = immunization.getExtensionByUrl(HapiExtensions.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL); + String reference = ((Reference) extensionByUrl.getValue()).getReference(); + assertThat(reference, is(equalTo("Patient/PAT999"))); + }); - nextContents = getBinaryContents(jobInfo, 1); - assertThat(jobInfo.getFiles().get(1).getResourceType(), is(equalTo("Observation"))); - assertThat(nextContents, is(containsString(HapiExtensions.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL))); + //Ensure all patients are linked to their golden resource. + assertThat(jobInfo.getFiles().get(1).getResourceType(), is(equalTo("Patient"))); + List patients = readBulkExportContentsIntoResources(getBinaryContents(jobInfo, 2), Patient.class); + patients.stream() + .filter(patient -> patient.getIdElement().getIdPart().equals("PAT999")) + .forEach(patient -> { + Extension extensionByUrl = patient.getExtensionByUrl(HapiExtensions.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL); + String reference = ((Reference) extensionByUrl.getValue()).getReference(); + assertThat(reference, is(equalTo("Patient/PAT999"))); + }); - nextContents = getBinaryContents(jobInfo, 2); - assertThat(jobInfo.getFiles().get(2).getResourceType(), is(equalTo("Patient"))); - assertThat(nextContents, is(containsString(HapiExtensions.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL))); + } + + private List readBulkExportContentsIntoResources(String theContents, Class theClass) { + IParser iParser = myFhirCtx.newJsonParser(); + return Arrays.stream(theContents.split("\n")) + .map(iParser::parseResource) + .map(theClass::cast) + .collect(Collectors.toList()); } @Test From 432579c591e45f0a9d147d74c5dd203e23ebe55c Mon Sep 17 00:00:00 2001 From: Tadgh Date: Mon, 29 Mar 2021 15:00:20 -0400 Subject: [PATCH 12/12] Fix test error --- .../ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java index b1cc427546e..58fd8a39f5b 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java @@ -660,7 +660,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE)); - assertThat(jobInfo.getFiles().size(), equalTo(3)); + assertThat(jobInfo.getFiles().size(), equalTo(2)); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); //Ensure that all immunizations refer to the golden resource via extension @@ -676,7 +676,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { //Ensure all patients are linked to their golden resource. assertThat(jobInfo.getFiles().get(1).getResourceType(), is(equalTo("Patient"))); - List patients = readBulkExportContentsIntoResources(getBinaryContents(jobInfo, 2), Patient.class); + List patients = readBulkExportContentsIntoResources(getBinaryContents(jobInfo, 1), Patient.class); patients.stream() .filter(patient -> patient.getIdElement().getIdPart().equals("PAT999")) .forEach(patient -> {