Add annotating processor and composite processor
This commit is contained in:
parent
315dc3fcb7
commit
4d3b021f12
|
@ -23,6 +23,7 @@ package ca.uhn.fhir.util;
|
||||||
import ca.uhn.fhir.context.BaseRuntimeChildDefinition;
|
import ca.uhn.fhir.context.BaseRuntimeChildDefinition;
|
||||||
import ca.uhn.fhir.context.FhirContext;
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
import ca.uhn.fhir.context.RuntimeResourceDefinition;
|
import ca.uhn.fhir.context.RuntimeResourceDefinition;
|
||||||
|
import ca.uhn.fhir.context.RuntimeSearchParam;
|
||||||
import org.apache.commons.lang3.Validate;
|
import org.apache.commons.lang3.Validate;
|
||||||
import org.hl7.fhir.instance.model.api.IBase;
|
import org.hl7.fhir.instance.model.api.IBase;
|
||||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
|
@ -50,6 +51,50 @@ public class SearchParameterUtil {
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given the resource type, fetch its patient-based search parameter name
|
||||||
|
* 1. Attempt to find one called 'patient'
|
||||||
|
* 2. If that fails, find one called 'subject'
|
||||||
|
* 3. If that fails, find find by Patient Compartment.
|
||||||
|
* 3.1 If that returns >1 result, throw an error
|
||||||
|
* 3.2 If that returns 1 result, return it
|
||||||
|
*/
|
||||||
|
public static RuntimeSearchParam getPatientSearchParamForResourceType(FhirContext theFhirContext, String theResourceType) {
|
||||||
|
RuntimeSearchParam myPatientSearchParam = null;
|
||||||
|
RuntimeResourceDefinition runtimeResourceDefinition = theFhirContext.getResourceDefinition(theResourceType);
|
||||||
|
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("patient");
|
||||||
|
if (myPatientSearchParam == null) {
|
||||||
|
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("subject");
|
||||||
|
if (myPatientSearchParam == null) {
|
||||||
|
myPatientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition);
|
||||||
|
if (myPatientSearchParam == null) {
|
||||||
|
String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", theResourceType);
|
||||||
|
throw new IllegalArgumentException(errorMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return myPatientSearchParam;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Search the resource definition for a compartment named 'patient' and return its related Search Parameter.
|
||||||
|
*/
|
||||||
|
public static RuntimeSearchParam getRuntimeSearchParamByCompartment(RuntimeResourceDefinition runtimeResourceDefinition) {
|
||||||
|
RuntimeSearchParam patientSearchParam;
|
||||||
|
List<RuntimeSearchParam> searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient");
|
||||||
|
if (searchParams == null || searchParams.size() == 0) {
|
||||||
|
String errorMessage = String.format("Resource type [%s] is not eligible for this type of export, as it contains no Patient compartment, and no `patient` or `subject` search parameter", runtimeResourceDefinition.getId());
|
||||||
|
throw new IllegalArgumentException(errorMessage);
|
||||||
|
} else if (searchParams.size() == 1) {
|
||||||
|
patientSearchParam = searchParams.get(0);
|
||||||
|
} else {
|
||||||
|
String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as we are unable to disambiguate which patient search parameter we should be searching by.", runtimeResourceDefinition.getId());
|
||||||
|
throw new IllegalArgumentException(errorMessage);
|
||||||
|
}
|
||||||
|
return patientSearchParam;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
public static String getCode(FhirContext theContext, IBaseResource theResource) {
|
public static String getCode(FhirContext theContext, IBaseResource theResource) {
|
||||||
|
|
|
@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.batch;
|
||||||
* #L%
|
* #L%
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
import ca.uhn.fhir.jpa.batch.processors.GoldenResourceAnnotatingProcessor;
|
||||||
import ca.uhn.fhir.jpa.batch.processors.PidToIBaseResourceProcessor;
|
import ca.uhn.fhir.jpa.batch.processors.PidToIBaseResourceProcessor;
|
||||||
import org.springframework.batch.core.configuration.annotation.StepScope;
|
import org.springframework.batch.core.configuration.annotation.StepScope;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
|
@ -34,4 +35,10 @@ public class CommonBatchJobConfig {
|
||||||
return new PidToIBaseResourceProcessor();
|
return new PidToIBaseResourceProcessor();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@StepScope
|
||||||
|
public GoldenResourceAnnotatingProcessor goldenResourceAnnotatingProcessor() {
|
||||||
|
return new GoldenResourceAnnotatingProcessor();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,98 @@
|
||||||
|
package ca.uhn.fhir.jpa.batch.processors;
|
||||||
|
|
||||||
|
/*-
|
||||||
|
* #%L
|
||||||
|
* HAPI FHIR JPA Server
|
||||||
|
* %%
|
||||||
|
* Copyright (C) 2014 - 2021 Smile CDR, Inc.
|
||||||
|
* %%
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
* #L%
|
||||||
|
*/
|
||||||
|
|
||||||
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
|
import ca.uhn.fhir.context.RuntimeSearchParam;
|
||||||
|
import ca.uhn.fhir.fhirpath.IFhirPath;
|
||||||
|
import ca.uhn.fhir.jpa.batch.log.Logs;
|
||||||
|
import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc;
|
||||||
|
import ca.uhn.fhir.util.ExtensionUtil;
|
||||||
|
import ca.uhn.fhir.util.SearchParameterUtil;
|
||||||
|
import org.hl7.fhir.instance.model.api.IBase;
|
||||||
|
import org.hl7.fhir.instance.model.api.IBaseReference;
|
||||||
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
|
import org.hl7.fhir.instance.model.api.IIdType;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.springframework.batch.item.ItemProcessor;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reusable Item Processor which converts ResourcePersistentIds to their IBaseResources
|
||||||
|
*/
|
||||||
|
public class GoldenResourceAnnotatingProcessor implements ItemProcessor<List<IBaseResource>, List<IBaseResource>> {
|
||||||
|
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
|
||||||
|
public static final String ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL = "https://hapifhir.org/associated-patient-golden-resource/";
|
||||||
|
|
||||||
|
@Value("#{stepExecutionContext['resourceType']}")
|
||||||
|
private String myResourceType;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private FhirContext myContext;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private MdmExpansionCacheSvc myMdmExpansionCacheSvc;
|
||||||
|
|
||||||
|
private RuntimeSearchParam myRuntimeSearchParam;
|
||||||
|
private IFhirPath myFhirPath;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<IBaseResource> process(List<IBaseResource> theIBaseResources) throws Exception {
|
||||||
|
if (myRuntimeSearchParam == null) {
|
||||||
|
myRuntimeSearchParam = SearchParameterUtil.getPatientSearchParamForResourceType(myContext, myResourceType);
|
||||||
|
}
|
||||||
|
String path = runtimeSearchParamFhirPath();
|
||||||
|
theIBaseResources
|
||||||
|
.forEach(iBaseResource -> annotateResourceWithRelatedGoldenResourcePatient(path, iBaseResource));
|
||||||
|
return theIBaseResources;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void annotateResourceWithRelatedGoldenResourcePatient(String path, IBaseResource iBaseResource) {
|
||||||
|
Optional<IBaseReference> evaluate = getFhirParser().evaluateFirst(iBaseResource, path, IBaseReference.class);
|
||||||
|
if (evaluate.isPresent()) {
|
||||||
|
String sourceResourceId = evaluate.get().getReferenceElement().getIdPart();
|
||||||
|
ExtensionUtil.setExtension(myContext, iBaseResource, ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL, myMdmExpansionCacheSvc.getGoldenResourceId(sourceResourceId));
|
||||||
|
} else {
|
||||||
|
ourLog.warn("Failed to find the patient compartment information for resource {}", iBaseResource);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private IFhirPath getFhirParser() {
|
||||||
|
if (myFhirPath == null) {
|
||||||
|
myFhirPath = myContext.newFhirPath();
|
||||||
|
}
|
||||||
|
return myFhirPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String runtimeSearchParamFhirPath() {
|
||||||
|
if (myRuntimeSearchParam == null) {
|
||||||
|
myRuntimeSearchParam = SearchParameterUtil.getPatientSearchParamForResourceType(myContext, myResourceType);
|
||||||
|
}
|
||||||
|
return myRuntimeSearchParam.getPath();
|
||||||
|
}
|
||||||
|
}
|
|
@ -50,6 +50,7 @@ public class PidToIBaseResourceProcessor implements ItemProcessor<List<ResourceP
|
||||||
@Autowired
|
@Autowired
|
||||||
private DaoRegistry myDaoRegistry;
|
private DaoRegistry myDaoRegistry;
|
||||||
|
|
||||||
|
|
||||||
@Value("#{stepExecutionContext['resourceType']}")
|
@Value("#{stepExecutionContext['resourceType']}")
|
||||||
private String myResourceType;
|
private String myResourceType;
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,7 @@ import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
|
||||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||||
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
|
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
|
||||||
import ca.uhn.fhir.rest.param.DateRangeParam;
|
import ca.uhn.fhir.rest.param.DateRangeParam;
|
||||||
|
import ca.uhn.fhir.util.SearchParameterUtil;
|
||||||
import ca.uhn.fhir.util.UrlUtil;
|
import ca.uhn.fhir.util.UrlUtil;
|
||||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -189,37 +190,8 @@ public abstract class BaseBulkItemReader implements ItemReader<List<ResourcePers
|
||||||
*/
|
*/
|
||||||
protected RuntimeSearchParam getPatientSearchParamForCurrentResourceType() {
|
protected RuntimeSearchParam getPatientSearchParamForCurrentResourceType() {
|
||||||
if (myPatientSearchParam == null) {
|
if (myPatientSearchParam == null) {
|
||||||
RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(myResourceType);
|
myPatientSearchParam = SearchParameterUtil.getPatientSearchParamForResourceType(myContext, myResourceType);
|
||||||
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("patient");
|
|
||||||
if (myPatientSearchParam == null) {
|
|
||||||
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("subject");
|
|
||||||
if (myPatientSearchParam == null) {
|
|
||||||
myPatientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition);
|
|
||||||
if (myPatientSearchParam == null) {
|
|
||||||
String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", myResourceType);
|
|
||||||
throw new IllegalArgumentException(errorMessage);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return myPatientSearchParam;
|
return myPatientSearchParam;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Search the resource definition for a compartment named 'patient' and return its related Search Parameter.
|
|
||||||
*/
|
|
||||||
protected RuntimeSearchParam getRuntimeSearchParamByCompartment(RuntimeResourceDefinition runtimeResourceDefinition) {
|
|
||||||
RuntimeSearchParam patientSearchParam;
|
|
||||||
List<RuntimeSearchParam> searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient");
|
|
||||||
if (searchParams == null || searchParams.size() == 0) {
|
|
||||||
String errorMessage = String.format("Resource type [%s] is not eligible for this type of export, as it contains no Patient compartment, and no `patient` or `subject` search parameter", myResourceType);
|
|
||||||
throw new IllegalArgumentException(errorMessage);
|
|
||||||
} else if (searchParams.size() == 1) {
|
|
||||||
patientSearchParam = searchParams.get(0);
|
|
||||||
} else {
|
|
||||||
String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as we are unable to disambiguate which patient search parameter we should be searching by.", myResourceType);
|
|
||||||
throw new IllegalArgumentException(errorMessage);
|
|
||||||
}
|
|
||||||
return patientSearchParam;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,8 +21,10 @@ package ca.uhn.fhir.jpa.bulk.job;
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
|
import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
|
||||||
|
import ca.uhn.fhir.jpa.batch.processors.GoldenResourceAnnotatingProcessor;
|
||||||
import ca.uhn.fhir.jpa.batch.processors.PidToIBaseResourceProcessor;
|
import ca.uhn.fhir.jpa.batch.processors.PidToIBaseResourceProcessor;
|
||||||
import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc;
|
import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc;
|
||||||
|
import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc;
|
||||||
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
|
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
|
||||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
import org.springframework.batch.core.Job;
|
import org.springframework.batch.core.Job;
|
||||||
|
@ -32,13 +34,16 @@ import org.springframework.batch.core.configuration.annotation.JobBuilderFactory
|
||||||
import org.springframework.batch.core.configuration.annotation.JobScope;
|
import org.springframework.batch.core.configuration.annotation.JobScope;
|
||||||
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
|
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
|
||||||
import org.springframework.batch.core.configuration.annotation.StepScope;
|
import org.springframework.batch.core.configuration.annotation.StepScope;
|
||||||
|
import org.springframework.batch.item.ItemProcessor;
|
||||||
import org.springframework.batch.item.ItemReader;
|
import org.springframework.batch.item.ItemReader;
|
||||||
import org.springframework.batch.item.ItemWriter;
|
import org.springframework.batch.item.ItemWriter;
|
||||||
|
import org.springframework.batch.item.support.CompositeItemProcessor;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.context.annotation.Lazy;
|
import org.springframework.context.annotation.Lazy;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -64,11 +69,23 @@ public class BulkExportJobConfig {
|
||||||
@Autowired
|
@Autowired
|
||||||
private PidToIBaseResourceProcessor myPidToIBaseResourceProcessor;
|
private PidToIBaseResourceProcessor myPidToIBaseResourceProcessor;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private GoldenResourceAnnotatingProcessor myGoldenResourceAnnotatingProcessor;
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public BulkExportDaoSvc bulkExportDaoSvc() {
|
public BulkExportDaoSvc bulkExportDaoSvc() {
|
||||||
return new BulkExportDaoSvc();
|
return new BulkExportDaoSvc();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@Lazy
|
||||||
|
@StepScope
|
||||||
|
public MdmExpansionCacheSvc mdmExpansionCacheSvc() {
|
||||||
|
return new MdmExpansionCacheSvc();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
@Lazy
|
@Lazy
|
||||||
public Job bulkExportJob() {
|
public Job bulkExportJob() {
|
||||||
|
@ -80,6 +97,18 @@ public class BulkExportJobConfig {
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@Lazy
|
||||||
|
@StepScope
|
||||||
|
public CompositeItemProcessor<List<ResourcePersistentId>, List<IBaseResource>> inflateResourceThenAnnotateWithGoldenResourceProcessor() {
|
||||||
|
CompositeItemProcessor processor = new CompositeItemProcessor<>();
|
||||||
|
ArrayList<ItemProcessor> delegates = new ArrayList<>();
|
||||||
|
delegates.add(myPidToIBaseResourceProcessor);
|
||||||
|
delegates.add(myGoldenResourceAnnotatingProcessor);
|
||||||
|
processor.setDelegates(delegates);
|
||||||
|
return processor;
|
||||||
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
@Lazy
|
@Lazy
|
||||||
public Job groupBulkExportJob() {
|
public Job groupBulkExportJob() {
|
||||||
|
@ -132,7 +161,9 @@ public class BulkExportJobConfig {
|
||||||
return myStepBuilderFactory.get("groupBulkExportGenerateResourceFilesStep")
|
return myStepBuilderFactory.get("groupBulkExportGenerateResourceFilesStep")
|
||||||
.<List<ResourcePersistentId>, List<IBaseResource>> chunk(CHUNK_SIZE) //1000 resources per generated file, as the reader returns 10 resources at a time.
|
.<List<ResourcePersistentId>, List<IBaseResource>> chunk(CHUNK_SIZE) //1000 resources per generated file, as the reader returns 10 resources at a time.
|
||||||
.reader(groupBulkItemReader())
|
.reader(groupBulkItemReader())
|
||||||
.processor(myPidToIBaseResourceProcessor)
|
// .processor(myPidToIBaseResourceProcessor)
|
||||||
|
// .processor(myGoldenResourceAnnotatingProcessor)
|
||||||
|
.processor(inflateResourceThenAnnotateWithGoldenResourceProcessor())
|
||||||
.writer(resourceToFileWriter())
|
.writer(resourceToFileWriter())
|
||||||
.listener(bulkExportGenerateResourceFilesStepListener())
|
.listener(bulkExportGenerateResourceFilesStepListener())
|
||||||
.build();
|
.build();
|
||||||
|
|
|
@ -27,7 +27,7 @@ import ca.uhn.fhir.jpa.dao.IResultIterator;
|
||||||
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
|
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
|
||||||
import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao;
|
import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao;
|
||||||
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
|
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
|
||||||
import ca.uhn.fhir.jpa.entity.Search;
|
import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc;
|
||||||
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
|
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
|
||||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||||
import ca.uhn.fhir.jpa.util.QueryChunker;
|
import ca.uhn.fhir.jpa.util.QueryChunker;
|
||||||
|
@ -45,6 +45,8 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -75,6 +77,8 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
|
||||||
private IdHelperService myIdHelperService;
|
private IdHelperService myIdHelperService;
|
||||||
@Autowired
|
@Autowired
|
||||||
private IMdmLinkDao myMdmLinkDao;
|
private IMdmLinkDao myMdmLinkDao;
|
||||||
|
@Autowired
|
||||||
|
private MdmExpansionCacheSvc myMdmExpansionCacheSvc;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
Iterator<ResourcePersistentId> getResourcePidIterator() {
|
Iterator<ResourcePersistentId> getResourcePidIterator() {
|
||||||
|
@ -119,6 +123,7 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
|
||||||
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId));
|
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId));
|
||||||
Long pidOrNull = myIdHelperService.getPidOrNull(group);
|
Long pidOrNull = myIdHelperService.getPidOrNull(group);
|
||||||
List<List<Long>> lists = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
|
List<List<Long>> lists = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
|
||||||
|
|
||||||
lists.forEach(patientPidsToExport::addAll);
|
lists.forEach(patientPidsToExport::addAll);
|
||||||
}
|
}
|
||||||
List<ResourcePersistentId> resourcePersistentIds = patientPidsToExport
|
List<ResourcePersistentId> resourcePersistentIds = patientPidsToExport
|
||||||
|
@ -154,13 +159,44 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
|
||||||
|
|
||||||
//Attempt to perform MDM Expansion of membership
|
//Attempt to perform MDM Expansion of membership
|
||||||
if (myMdmEnabled) {
|
if (myMdmEnabled) {
|
||||||
List<List<Long>> goldenPidTargetPidTuple = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
|
List<List<Long>> goldenPidTargetPidTuples = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
|
||||||
//Now lets translate these pids into resource IDs
|
//Now lets translate these pids into resource IDs
|
||||||
Set<Long> uniquePids = new HashSet<>();
|
|
||||||
goldenPidTargetPidTuple.forEach(uniquePids::addAll);
|
|
||||||
|
|
||||||
|
System.out.println("ZOOPER");
|
||||||
|
goldenPidTargetPidTuples.stream()
|
||||||
|
.map(Object::toString)
|
||||||
|
.forEach(list -> System.out.println(String.join(", ", list)));
|
||||||
|
Set<Long> uniquePids = new HashSet<>();
|
||||||
|
goldenPidTargetPidTuples.forEach(uniquePids::addAll);
|
||||||
Map<Long, Optional<String>> pidToForcedIdMap = myIdHelperService.translatePidsToForcedIds(uniquePids);
|
Map<Long, Optional<String>> pidToForcedIdMap = myIdHelperService.translatePidsToForcedIds(uniquePids);
|
||||||
|
|
||||||
|
Map<Long, Set<Long>> goldenResourceToSourcePidMap = new HashMap<>();
|
||||||
|
for (List<Long> goldenPidTargetPidTuple : goldenPidTargetPidTuples) {
|
||||||
|
Long goldenPid = goldenPidTargetPidTuple.get(0);
|
||||||
|
Long sourcePid = goldenPidTargetPidTuple.get(1);
|
||||||
|
|
||||||
|
if(!goldenResourceToSourcePidMap.containsKey(goldenPid)) {
|
||||||
|
goldenResourceToSourcePidMap.put(goldenPid, new HashSet<>());
|
||||||
|
}
|
||||||
|
goldenResourceToSourcePidMap.get(goldenPid).add(sourcePid);
|
||||||
|
}
|
||||||
|
Map<String, String> sourceResourceIdToGoldenResourceIdMap = new HashMap<>();
|
||||||
|
|
||||||
|
goldenResourceToSourcePidMap.entrySet()
|
||||||
|
.forEach(entry -> {
|
||||||
|
Long key = entry.getKey();
|
||||||
|
String goldenResourceId = myIdHelperService.translatePidIdToForcedId(new ResourcePersistentId(key)).orElse(key.toString());
|
||||||
|
|
||||||
|
Map<Long, Optional<String>> pidsToForcedIds = myIdHelperService.translatePidsToForcedIds(entry.getValue());
|
||||||
|
Set<String> sourceResourceIds = pidsToForcedIds.entrySet().stream()
|
||||||
|
.map(ent -> ent.getValue().isPresent() ? ent.getValue().get() : ent.getKey().toString())
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
|
sourceResourceIds
|
||||||
|
.forEach(sourceResourceId -> sourceResourceIdToGoldenResourceIdMap.put(sourceResourceId, goldenResourceId));
|
||||||
|
});
|
||||||
|
myMdmExpansionCacheSvc.setCacheContents(sourceResourceIdToGoldenResourceIdMap);
|
||||||
|
|
||||||
//If the result of the translation is an empty optional, it means there is no forced id, and we can use the PID as the resource ID.
|
//If the result of the translation is an empty optional, it means there is no forced id, and we can use the PID as the resource ID.
|
||||||
Set<String> resolvedResourceIds = pidToForcedIdMap.entrySet().stream()
|
Set<String> resolvedResourceIds = pidToForcedIdMap.entrySet().stream()
|
||||||
.map(entry -> entry.getValue().isPresent() ? entry.getValue().get() : entry.getKey().toString())
|
.map(entry -> entry.getValue().isPresent() ? entry.getValue().get() : entry.getKey().toString())
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
package ca.uhn.fhir.jpa.dao.mdm;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.slf4j.LoggerFactory.getLogger;
|
||||||
|
|
||||||
|
public class MdmExpansionCacheSvc {
|
||||||
|
private static final Logger ourLog = getLogger(MdmExpansionCacheSvc.class);
|
||||||
|
|
||||||
|
private Map<String, String> mySourceToGoldenIdCache = new HashMap<>();
|
||||||
|
|
||||||
|
public String getGoldenResourceId(String theSourceId) {
|
||||||
|
ourLog.info(buildLog("About to lookup cached resource ID " + theSourceId, true));
|
||||||
|
return mySourceToGoldenIdCache.get(theSourceId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String buildLog(String message, boolean theShowContent) {
|
||||||
|
StringBuilder builder = new StringBuilder();
|
||||||
|
builder.append(message);
|
||||||
|
if (ourLog.isDebugEnabled() || theShowContent) {
|
||||||
|
builder.append("\n")
|
||||||
|
.append("Current cache content is:")
|
||||||
|
.append("\n");
|
||||||
|
mySourceToGoldenIdCache.entrySet().stream().forEach(entry -> builder.append(entry.getKey()).append(" -> ").append(entry.getValue()).append("\n"));
|
||||||
|
return builder.toString();
|
||||||
|
}
|
||||||
|
return builder.toString();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCacheContents(Map<String, String> theSourceResourceIdToGoldenResourceIdMap) {
|
||||||
|
this.mySourceToGoldenIdCache = theSourceResourceIdToGoldenResourceIdMap;
|
||||||
|
}
|
||||||
|
}
|
|
@ -22,6 +22,7 @@ public class BatchJobConfig {
|
||||||
private StepBuilderFactory myStepBuilderFactory;
|
private StepBuilderFactory myStepBuilderFactory;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public Job testJob() {
|
public Job testJob() {
|
||||||
return myJobBuilderFactory.get("testJob")
|
return myJobBuilderFactory.get("testJob")
|
||||||
|
|
Loading…
Reference in New Issue