Merge pull request #2439 from hapifhir/issue-2433-mdm-export

Support MDM expansion in Group Bulk Export
This commit is contained in:
Tadgh 2021-03-04 16:49:19 -05:00 committed by GitHub
commit c2877e84bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 677 additions and 261 deletions

View File

@ -230,6 +230,7 @@
</exclusions>
</dependency>
<!-- Patch Dependencies -->
<dependency>
<groupId>io.dogote</groupId>
@ -259,6 +260,7 @@
<artifactId>javax.annotation-api</artifactId>
</dependency>
<!-- Test Database -->
<dependency>
<groupId>org.apache.derby</groupId>

View File

@ -28,7 +28,9 @@ import org.springframework.context.annotation.Import;
//When you define a new batch job, add it here.
@Import({
CommonBatchJobConfig.class,
BulkExportJobConfig.class,})
BulkExportJobConfig.class
})
public class BatchJobsConfig {
//Empty config, as this is just an aggregator for all the various batch jobs defined around the system.
public static final String BULK_EXPORT_JOB_NAME = "bulkExportJob";
public static final String GROUP_BULK_EXPORT_JOB_NAME = "groupBulkExportJob";
}

View File

@ -0,0 +1,140 @@
package ca.uhn.fhir.jpa.bulk.job;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.util.UrlUtil;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public abstract class BaseBulkItemReader implements ItemReader<List<ResourcePersistentId>> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@Value("#{stepExecutionContext['resourceType']}")
protected String myResourceType;
@Value("#{jobExecutionContext['" + BulkExportJobConfig.JOB_UUID_PARAMETER + "']}")
protected String myJobUUID;
@Value("#{jobParameters['" + BulkExportJobConfig.READ_CHUNK_PARAMETER + "']}")
protected Long myReadChunkSize;
@Autowired
protected DaoRegistry myDaoRegistry;
@Autowired
protected FhirContext myContext;
@Autowired
private IBulkExportJobDao myBulkExportJobDao;
@Autowired
protected SearchBuilderFactory mySearchBuilderFactory;
@Autowired
private MatchUrlService myMatchUrlService;
private ISearchBuilder mySearchBuilder;
private BulkExportJobEntity myJobEntity;
private RuntimeResourceDefinition myResourceDefinition;
private Iterator<ResourcePersistentId> myPidIterator;
/**
* Get and cache an ISearchBuilder for the given resource type this partition is responsible for.
*/
protected ISearchBuilder getSearchBuilderForLocalResourceType() {
if (mySearchBuilder == null) {
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(myResourceType);
RuntimeResourceDefinition def = myContext.getResourceDefinition(myResourceType);
Class<? extends IBaseResource> nextTypeClass = def.getImplementingClass();
mySearchBuilder = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, nextTypeClass);
}
return mySearchBuilder;
}
/**
* Generate the list of pids of all resources of the given myResourceType, which reference any group member of the given myGroupId.
* Store them in a member iterator.
*/
protected void loadResourcePids() {
//Initialize an array to hold the pids of the target resources to be exported.
myPidIterator = getResourcePidIterator();
}
abstract Iterator<ResourcePersistentId> getResourcePidIterator();
protected SearchParameterMap createSearchParameterMapForJob() {
BulkExportJobEntity jobEntity = getJobEntity();
RuntimeResourceDefinition theDef = getResourceDefinition();
SearchParameterMap map = new SearchParameterMap();
Map<String, String[]> requestUrl = UrlUtil.parseQueryStrings(jobEntity.getRequest());
String[] typeFilters = requestUrl.get(JpaConstants.PARAM_EXPORT_TYPE_FILTER);
if (typeFilters != null) {
Optional<String> filter = Arrays.stream(typeFilters).filter(t -> t.startsWith(myResourceType + "?")).findFirst();
if (filter.isPresent()) {
String matchUrl = filter.get();
map = myMatchUrlService.translateMatchUrl(matchUrl, theDef);
}
}
if (jobEntity.getSince() != null) {
map.setLastUpdated(new DateRangeParam(jobEntity.getSince(), null));
}
map.setLoadSynchronous(true);
return map;
}
protected RuntimeResourceDefinition getResourceDefinition() {
if (myResourceDefinition == null) {
myResourceDefinition = myContext.getResourceDefinition(myResourceType);
}
return myResourceDefinition;
}
protected BulkExportJobEntity getJobEntity() {
if (myJobEntity == null) {
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
if (jobOpt.isPresent()) {
myJobEntity = jobOpt.get();
} else {
String errorMessage = String.format("Job with UUID %s does not exist!", myJobUUID);
throw new IllegalStateException(errorMessage);
}
}
return myJobEntity;
}
@Override
public List<ResourcePersistentId> read() {
ourLog.info("Bulk export starting generation for batch export job: [{}] with resourceType [{}] and UUID [{}]", getJobEntity(), myResourceType, myJobUUID);
if (myPidIterator == null) {
loadResourcePids();
}
int count = 0;
List<ResourcePersistentId> outgoing = new ArrayList<>();
while (myPidIterator.hasNext() && count < myReadChunkSize) {
outgoing.add(myPidIterator.next());
count += 1;
}
return outgoing.size() == 0 ? null : outgoing;
}
}

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.bulk.job;
* #L%
*/
import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
import ca.uhn.fhir.jpa.batch.processors.PidToIBaseResourceProcessor;
import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
@ -46,8 +47,10 @@ import java.util.List;
*/
@Configuration
public class BulkExportJobConfig {
public static final String JOB_UUID_PARAMETER = "jobUUID";
public static final String READ_CHUNK_PARAMETER = "readChunkSize";
public static final String EXPAND_MDM_PARAMETER = "expandMdm";
public static final String GROUP_ID_PARAMETER = "groupId";
public static final String RESOURCE_TYPES_PARAMETER = "resourceTypes";
public static final int CHUNK_SIZE = 100;
@ -69,7 +72,7 @@ public class BulkExportJobConfig {
@Bean
@Lazy
public Job bulkExportJob() {
return myJobBuilderFactory.get("bulkExportJob")
return myJobBuilderFactory.get(BatchJobsConfig.BULK_EXPORT_JOB_NAME)
.validator(bulkJobParameterValidator())
.start(createBulkExportEntityStep())
.next(partitionStep())
@ -80,7 +83,7 @@ public class BulkExportJobConfig {
@Bean
@Lazy
public Job groupBulkExportJob() {
return myJobBuilderFactory.get("groupBulkExportJob")
return myJobBuilderFactory.get(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME)
.validator(groupBulkJobParameterValidator())
.validator(bulkJobParameterValidator())
.start(createBulkExportEntityStep())

View File

@ -20,15 +20,12 @@ package ca.uhn.fhir.jpa.bulk.job;
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
@ -38,11 +35,8 @@ import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.util.UrlUtil;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import java.util.ArrayList;
import java.util.Arrays;
@ -51,89 +45,28 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
public class BulkItemReader implements ItemReader<List<ResourcePersistentId>> {
/**
* Basic Bulk Export implementation which simply reads all type filters and applies them, along with the _since param
* on a given resource type.
*/
public class BulkItemReader extends BaseBulkItemReader {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
Iterator<ResourcePersistentId> myPidIterator;
@Value("#{jobParameters['" + BulkExportJobConfig.READ_CHUNK_PARAMETER + "']}")
private Long myReadChunkSize;
@Value("#{jobExecutionContext['"+ BulkExportJobConfig.JOB_UUID_PARAMETER+"']}")
private String myJobUUID;
@Value("#{stepExecutionContext['resourceType']}")
private String myResourceType;
@Autowired
private IBulkExportJobDao myBulkExportJobDao;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private FhirContext myContext;
@Autowired
private SearchBuilderFactory mySearchBuilderFactory;
@Autowired
private MatchUrlService myMatchUrlService;
private void loadResourcePids() {
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
if (!jobOpt.isPresent()) {
ourLog.warn("Job appears to be deleted");
return;
}
BulkExportJobEntity jobEntity = jobOpt.get();
ourLog.info("Bulk export starting generation for batch export job: {}", jobEntity);
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(myResourceType);
@Override
Iterator<ResourcePersistentId> getResourcePidIterator() {
ourLog.info("Bulk export assembling export of type {} for job {}", myResourceType, myJobUUID);
RuntimeResourceDefinition def = myContext.getResourceDefinition(myResourceType);
Class<? extends IBaseResource> nextTypeClass = def.getImplementingClass();
ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, nextTypeClass);
SearchParameterMap map = createSearchParameterMapFromTypeFilter(jobEntity, def);
if (jobEntity.getSince() != null) {
map.setLastUpdated(new DateRangeParam(jobEntity.getSince(), null));
}
map.setLoadSynchronous(true);
SearchParameterMap map = createSearchParameterMapForJob();
ISearchBuilder sb = getSearchBuilderForLocalResourceType();
IResultIterator myResultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
List<ResourcePersistentId> myReadPids = new ArrayList<>();
while (myResultIterator.hasNext()) {
myReadPids.add(myResultIterator.next());
}
myPidIterator = myReadPids.iterator();
return myReadPids.iterator();
}
private SearchParameterMap createSearchParameterMapFromTypeFilter(BulkExportJobEntity theJobEntity, RuntimeResourceDefinition theDef) {
SearchParameterMap map = new SearchParameterMap();
Map<String, String[]> requestUrl = UrlUtil.parseQueryStrings(theJobEntity.getRequest());
String[] typeFilters = requestUrl.get(JpaConstants.PARAM_EXPORT_TYPE_FILTER);
if (typeFilters != null) {
Optional<String> filter = Arrays.stream(typeFilters).filter(t -> t.startsWith(myResourceType + "?")).findFirst();
if (filter.isPresent()) {
String matchUrl = filter.get();
map = myMatchUrlService.translateMatchUrl(matchUrl, theDef);
}
}
return map;
}
@Override
public List<ResourcePersistentId> read() {
if (myPidIterator == null) {
loadResourcePids();
}
int count = 0;
List<ResourcePersistentId> outgoing = new ArrayList<>();
while (myPidIterator.hasNext() && count < myReadChunkSize) {
outgoing.add(myPidIterator.next());
count += 1;
}
return outgoing.size() == 0 ? null : outgoing;
}
}

View File

@ -5,4 +5,9 @@ public class GroupBulkExportJobParametersBuilder extends BulkExportJobParameters
this.addString(BulkExportJobConfig.GROUP_ID_PARAMETER, theGroupId);
return this;
}
public GroupBulkExportJobParametersBuilder setMdm(boolean theMdm) {
this.addString(BulkExportJobConfig.EXPAND_MDM_PARAMETER, String.valueOf(theMdm));
return this;
}
}

View File

@ -20,145 +20,219 @@ package ca.uhn.fhir.jpa.bulk.job;
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao;
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.QueryChunker;
import ca.uhn.fhir.model.api.Include;
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.rest.param.HasParam;
import ca.uhn.fhir.util.UrlUtil;
import ca.uhn.fhir.rest.param.ReferenceOrListParam;
import ca.uhn.fhir.rest.param.ReferenceParam;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import javax.annotation.Nonnull;
import javax.persistence.EntityManager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId>> {
/**
* Bulk Item reader for the Group Bulk Export job.
* Instead of performing a normal query on the resource type using type filters, we instead
*
* 1. Get the group ID defined for this job
* 2. Expand its membership so we get references to all patients in the group
* 3. Optionally further expand that into all MDM-matched Patients (including golden resources)
* 4. Then perform normal bulk export, filtered so that only results that refer to members are returned.
*/
public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReader<List<ResourcePersistentId>> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
Iterator<ResourcePersistentId> myPidIterator;
public static final int QUERY_CHUNK_SIZE = 100;
@Value("#{stepExecutionContext['resourceType']}")
private String myResourceType;
@Value("#{jobParameters['" + BulkExportJobConfig.GROUP_ID_PARAMETER + "']}")
private String myGroupId;
@Value("#{jobExecutionContext['"+ BulkExportJobConfig.JOB_UUID_PARAMETER+"']}")
private String myJobUUID;
@Value("#{jobParameters['" + BulkExportJobConfig.READ_CHUNK_PARAMETER + "']}")
private Long myReadChunkSize;
@Value("#{jobParameters['" + BulkExportJobConfig.EXPAND_MDM_PARAMETER+ "'] ?: false}")
private boolean myMdmEnabled;
@Autowired
private IBulkExportJobDao myBulkExportJobDao;
private IdHelperService myIdHelperService;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private FhirContext myContext;
@Autowired
private SearchBuilderFactory mySearchBuilderFactory;
@Autowired
private EntityManager myEntityManager;
private IMdmLinkDao myMdmLinkDao;
private void loadResourcePids() {
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
if (!jobOpt.isPresent()) {
ourLog.warn("Job appears to be deleted");
return;
}
BulkExportJobEntity jobEntity = jobOpt.get();
ourLog.info("Group Bulk export starting generation for batch export job: [{}] with resourceType [{}] and UUID [{}]", jobEntity, myResourceType, myJobUUID);
//Fetch all the pids given the query.
ISearchBuilder searchBuilder = getSearchBuilder();
//Build complex-ish _has query with a revincludes which allows lookup by group membership
SearchParameterMap searchParameterMap = getSearchParameterMap(jobEntity);
IResultIterator resultIterator = searchBuilder.createQuery(
searchParameterMap,
new SearchRuntimeDetails(null, myJobUUID),
null,
RequestPartitionId.allPartitions()
);
private RuntimeSearchParam myPatientSearchParam;
@Override
Iterator<ResourcePersistentId> getResourcePidIterator() {
List<ResourcePersistentId> myReadPids = new ArrayList<>();
while (resultIterator.hasNext()) {
myReadPids.add(resultIterator.next());
//Short circuit out if we detect we are attempting to extract patients
if (myResourceType.equalsIgnoreCase("Patient")) {
return getExpandedPatientIterator();
}
//Given that databases explode when you have an IN clause with >1000 resources, we use the QueryChunker to break this into multiple queries.
List<ResourcePersistentId> revIncludePids = new ArrayList<>();
QueryChunker<ResourcePersistentId> chunker = new QueryChunker<>();
//First lets expand the group so we get a list of all patient IDs of the group, and MDM-matched patient IDs of the group.
Set<String> expandedMemberResourceIds = expandAllPatientPidsFromGroup();
if (ourLog.isDebugEnabled()) {
ourLog.debug("Group/{} has been expanded to members:[{}]", myGroupId, String.join(",", expandedMemberResourceIds));
}
chunker.chunk(myReadPids, pidChunk -> {
revIncludePids.addAll(searchBuilder.loadIncludes(myContext, myEntityManager, pidChunk, searchParameterMap.getRevIncludes(), true, searchParameterMap.getLastUpdated(), myJobUUID, null));
//Next, let's search for the target resources, with their correct patient references, chunked.
//The results will be jammed into myReadPids
QueryChunker<String> queryChunker = new QueryChunker<>();
queryChunker.chunk(new ArrayList<>(expandedMemberResourceIds), QUERY_CHUNK_SIZE, (idChunk) -> {
queryResourceTypeWithReferencesToPatients(myReadPids, idChunk);
});
myPidIterator = revIncludePids.iterator();
if (ourLog.isDebugEnabled()) {
ourLog.debug("Resource PIDs to be Bulk Exported: [{}]", myReadPids.stream().map(ResourcePersistentId::toString).collect(Collectors.joining(",")));
}
//For all group revinclude queries, you need to perform the search on the Patient DAO, which is why this is hardcoded here.
private ISearchBuilder getSearchBuilder() {
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao("Patient");
RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient");
Class<? extends IBaseResource> nextTypeClass = def.getImplementingClass();
return mySearchBuilderFactory.newSearchBuilder(dao, "Patient", nextTypeClass);
}
@Nonnull
private SearchParameterMap getSearchParameterMap(BulkExportJobEntity jobEntity) {
SearchParameterMap searchParameterMap = new SearchParameterMap();
searchParameterMap.add("_has", new HasParam("Group", "member", "_id", myGroupId));
String revIncludeString = buildRevIncludeString();
searchParameterMap.addRevInclude(new Include(revIncludeString).toLocked());
if (jobEntity.getSince() != null) {
searchParameterMap.setLastUpdated(new DateRangeParam(jobEntity.getSince(), null));
}
searchParameterMap.setLoadSynchronous(true);
return searchParameterMap;
return myReadPids.iterator();
}
/**
* Given the resource type of the job, fetch its patient compartment name, formatted for usage in an Include.
* e.g. Immunization -> Immunization:patient
*
* @return A string which can be dropped directly into an Include.
* In case we are doing a Group Bulk Export and resourceType `Patient` is requested, we can just return the group members,
* possibly expanded by MDM, and don't have to go and fetch other resource DAOs.
*/
private String buildRevIncludeString() {
private Iterator<ResourcePersistentId> getExpandedPatientIterator() {
Set<Long> patientPidsToExport = new HashSet<>();
//This gets all member pids
List<String> members = getMembers();
List<IIdType> ids = members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList());
List<Long> pidsOrThrowException = myIdHelperService.getPidsOrThrowException(ids);
patientPidsToExport.addAll(pidsOrThrowException);
if (myMdmEnabled) {
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId));
Long pidOrNull = myIdHelperService.getPidOrNull(group);
List<List<Long>> lists = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
lists.forEach(patientPidsToExport::addAll);
}
List<ResourcePersistentId> resourcePersistentIds = patientPidsToExport
.stream()
.map(ResourcePersistentId::new)
.collect(Collectors.toList());
return resourcePersistentIds.iterator();
}
/**
* Given the local myGroupId, read this group, and find all members' patient references.
* @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"]
*/
private List<String> getMembers() {
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId));
List<IPrimitiveType> evaluate = myContext.newFhirPath().evaluate(group, "member.entity.reference", IPrimitiveType.class);
return evaluate.stream().map(IPrimitiveType::getValueAsString).collect(Collectors.toList());
}
/**
* Given the local myGroupId, perform an expansion to retrieve all resource IDs of member patients.
* if myMdmEnabled is set to true, we also reach out to the IMdmLinkDao to attempt to also expand it into matched
* patients.
*
* @return a Set of Strings representing the resource IDs of all members of a group.
*/
private Set<String> expandAllPatientPidsFromGroup() {
Set<String> expandedIds = new HashSet<>();
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId));
Long pidOrNull = myIdHelperService.getPidOrNull(group);
//Attempt to perform MDM Expansion of membership
if (myMdmEnabled) {
List<List<Long>> goldenPidTargetPidTuple = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
//Now lets translate these pids into resource IDs
Set<Long> uniquePids = new HashSet<>();
goldenPidTargetPidTuple.forEach(uniquePids::addAll);
Map<Long, Optional<String>> longOptionalMap = myIdHelperService.translatePidsToForcedIds(uniquePids);
expandedIds = longOptionalMap.values().stream().map(Optional::get).collect(Collectors.toSet());
}
//Now manually add the members of the group (its possible even with mdm expansion that some members dont have MDM matches,
//so would be otherwise skipped
expandedIds.addAll(getMembers());
return expandedIds;
}
private void queryResourceTypeWithReferencesToPatients(List<ResourcePersistentId> myReadPids, List<String> idChunk) {
//Build SP map
//First, inject the _typeFilters and _since from the export job
SearchParameterMap expandedSpMap = createSearchParameterMapForJob();
//Since we are in a bulk job, we have to ensure the user didn't jam in a patient search param, since we need to manually set that.
validateSearchParameters(expandedSpMap);
// Now, further filter the query with patient references defined by the chunk of IDs we have.
filterSearchByResourceIds(idChunk, expandedSpMap);
// Fetch and cache a search builder for this resource type
ISearchBuilder searchBuilder = getSearchBuilderForLocalResourceType();
//Execute query and all found pids to our local iterator.
IResultIterator resultIterator = searchBuilder.createQuery(expandedSpMap, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
while (resultIterator.hasNext()) {
myReadPids.add(resultIterator.next());
}
}
private void filterSearchByResourceIds(List<String> idChunk, SearchParameterMap expandedSpMap) {
ReferenceOrListParam orList = new ReferenceOrListParam();
idChunk.forEach(id -> orList.add(new ReferenceParam(id)));
expandedSpMap.add(getPatientSearchParam().getName(), orList);
}
private RuntimeSearchParam validateSearchParameters(SearchParameterMap expandedSpMap) {
RuntimeSearchParam runtimeSearchParam = getPatientSearchParam();
if (expandedSpMap.get(runtimeSearchParam.getName()) != null) {
throw new IllegalArgumentException(String.format("Group Bulk Export manually modifies the Search Parameter called [%s], so you may not include this search parameter in your _typeFilter!", runtimeSearchParam.getName()));
}
return runtimeSearchParam;
}
/**
* Given the resource type, fetch its patient-based search parameter name
* 1. Attempt to find one called 'patient'
* 2. If that fails, find one called 'subject'
* 3. If that fails, find find by Patient Compartment.
* 3.1 If that returns >1 result, throw an error
* 3.2 If that returns 1 result, return it
*/
private RuntimeSearchParam getPatientSearchParam() {
if (myPatientSearchParam == null) {
RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(myResourceType);
RuntimeSearchParam patientSearchParam = runtimeResourceDefinition.getSearchParam("patient");
if (patientSearchParam == null) {
patientSearchParam = runtimeResourceDefinition.getSearchParam("subject");
if (patientSearchParam == null) {
patientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition);
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("patient");
if (myPatientSearchParam == null) {
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("subject");
if (myPatientSearchParam == null) {
myPatientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition);
if (myPatientSearchParam == null) {
String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", myResourceType);
throw new IllegalArgumentException(errorMessage);
}
}
String includeString = runtimeResourceDefinition.getName() + ":" + patientSearchParam.getName();
return includeString;
}
}
return myPatientSearchParam;
}
/**
@ -178,20 +252,4 @@ public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId
}
return patientSearchParam;
}
@Override
public List<ResourcePersistentId> read() {
if (myPidIterator == null) {
loadResourcePids();
}
int count = 0;
List<ResourcePersistentId> outgoing = new ArrayList<>();
while (myPidIterator.hasNext() && count < myReadChunkSize) {
outgoing.add(myPidIterator.next());
count += 1;
}
return outgoing.size() == 0 ? null : outgoing;
}
}

View File

@ -189,6 +189,10 @@ public class BulkDataExportProvider {
@IdParam IIdType theIdParam,
@OperationParam(name = JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theOutputFormat,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theType,
@OperationParam(name = JpaConstants.PARAM_EXPORT_SINCE, min = 0, max = 1, typeName = "instant") IPrimitiveType<Date> theSince,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theTypeFilter,
@OperationParam(name = JpaConstants.PARAM_EXPORT_MDM, min = 0, max = 1, typeName = "boolean") IPrimitiveType<Boolean> theMdm,
ServletRequestDetails theRequestDetails
) {
@ -207,10 +211,20 @@ public class BulkDataExportProvider {
//TODO GGG eventually, we will support these things.
Set<String> filters = null;
Date since = null;
boolean theMdm = false;
IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(outputFormat, resourceTypes, since, filters, theIdParam, theMdm));
Date since = null;
if (theSince != null) {
since = theSince.getValue();
}
boolean mdm = false;
if (theMdm != null) {
mdm = theMdm.getValue();
}
if (theTypeFilter != null) {
filters = ArrayUtil.commaSeparatedListToCleanSet(theTypeFilter.getValueAsString());
}
IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(outputFormat, resourceTypes, since, filters, theIdParam, mdm));
String serverBase = getServerBase(theRequestDetails);
String pollLocation = serverBase + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" + JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + outcome.getJobId();

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions;
@ -105,11 +106,11 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
private IBatchJobSubmitter myJobSubmitter;
@Autowired
@Qualifier("bulkExportJob")
@Qualifier(BatchJobsConfig.BULK_EXPORT_JOB_NAME)
private org.springframework.batch.core.Job myBulkExportJob;
@Autowired
@Qualifier("groupBulkExportJob")
@Qualifier(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME)
private org.springframework.batch.core.Job myGroupBulkExportJob;
private final int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR);
@ -138,9 +139,8 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
BulkExportJobEntity bulkExportJobEntity = jobToProcessOpt.get();
String jobUuid = bulkExportJobEntity.getJobId();
String theGroupId = getGroupIdIfPresent(bulkExportJobEntity.getRequest());
try {
processJob(jobUuid, theGroupId);
processJob(bulkExportJobEntity);
} catch (Exception e) {
ourLog.error("Failure while preparing bulk export extract", e);
myTxTemplate.execute(t -> {
@ -156,15 +156,17 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
}
}
private String getGroupIdIfPresent(String theRequestString) {
private String getQueryParameterIfPresent(String theRequestString, String theParameter) {
Map<String, String[]> stringMap = UrlUtil.parseQueryString(theRequestString);
if (stringMap != null) {
String[] strings = stringMap.get(JpaConstants.PARAM_EXPORT_GROUP_ID);
String[] strings = stringMap.get(theParameter);
if (strings != null) {
return String.join(",", strings);
}
}
return null;
}
@ -215,7 +217,8 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
}
private void processJob(String theJobUuid, String theGroupId) {
private void processJob(BulkExportJobEntity theBulkExportJobEntity) {
String theJobUuid = theBulkExportJobEntity.getJobId();
JobParametersBuilder parameters = new JobParametersBuilder()
.addString(BulkExportJobConfig.JOB_UUID_PARAMETER, theJobUuid)
.addLong(BulkExportJobConfig.READ_CHUNK_PARAMETER, READ_CHUNK_SIZE);
@ -223,8 +226,8 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
ourLog.info("Submitting bulk export job {} to job scheduler", theJobUuid);
try {
if (!StringUtils.isBlank(theGroupId)) {
parameters.addString(BulkExportJobConfig.GROUP_ID_PARAMETER, theGroupId);
if (isGroupBulkJob(theBulkExportJobEntity)) {
enhanceBulkParametersWithGroupParameters(theBulkExportJobEntity, parameters);
myJobSubmitter.runJob(myGroupBulkExportJob, parameters.toJobParameters());
} else {
myJobSubmitter.runJob(myBulkExportJob, parameters.toJobParameters());
@ -234,6 +237,17 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
}
}
private void enhanceBulkParametersWithGroupParameters(BulkExportJobEntity theBulkExportJobEntity, JobParametersBuilder theParameters) {
String theGroupId = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_GROUP_ID);
String expandMdm = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_MDM);
theParameters.addString(BulkExportJobConfig.GROUP_ID_PARAMETER, theGroupId);
theParameters.addString(BulkExportJobConfig.EXPAND_MDM_PARAMETER, expandMdm);
}
private boolean isGroupBulkJob(BulkExportJobEntity theBulkExportJobEntity) {
return getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_GROUP_ID) != null;
}
@SuppressWarnings("unchecked")
private IFhirResourceDao<IBaseBinary> getBinaryDao() {
return myDaoRegistry.getResourceDao("Binary");
@ -284,8 +298,7 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
if (theBulkDataExportOptions instanceof GroupBulkDataExportOptions) {
GroupBulkDataExportOptions groupOptions = (GroupBulkDataExportOptions) theBulkDataExportOptions;
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_GROUP_ID).append("=").append(groupOptions.getGroupId().getValue());
//TODO GGG eventually we will support this
// requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_MDM).append("=").append(groupOptions.isMdm());
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_MDM).append("=").append(groupOptions.isMdm());
}
String request = requestBuilder.toString();

View File

@ -28,6 +28,8 @@ import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;
import java.util.List;
@Repository
public interface IMdmLinkDao extends JpaRepository<MdmLink, Long> {
@Modifying
@ -37,4 +39,16 @@ public interface IMdmLinkDao extends JpaRepository<MdmLink, Long> {
@Modifying
@Query("DELETE FROM MdmLink f WHERE (myGoldenResourcePid = :pid OR mySourcePid = :pid) AND myMatchResult <> :matchResult")
int deleteWithAnyReferenceToPidAndMatchResultNot(@Param("pid") Long thePid, @Param("matchResult") MdmMatchResultEnum theMatchResult);
@Query("SELECT ml2.myGoldenResourcePid, ml2.mySourcePid FROM MdmLink ml2 " +
"WHERE ml2.myMatchResult=:matchResult " +
"AND ml2.myGoldenResourcePid IN (" +
"SELECT ml.myGoldenResourcePid FROM MdmLink ml " +
"INNER JOIN ResourceLink hrl " +
"ON hrl.myTargetResourcePid=ml.mySourcePid " +
"AND hrl.mySourceResourcePid=:groupPid " +
"AND hrl.mySourcePath='Group.member.entity' " +
"AND hrl.myTargetResourceType='Patient'" +
")")
List<List<Long>> expandPidsFromGroupPidGivenMatchResult(@Param("groupPid") Long theGroupPid, @Param("matchResult") MdmMatchResultEnum theMdmMatchResultEnum);
}

View File

@ -381,7 +381,6 @@ public class IdHelperService {
}
public Map<Long, Optional<String>> translatePidsToForcedIds(Set<Long> thePids) {
Map<Long, Optional<String>> retVal = new HashMap<>(myMemoryCacheService.getAllPresent(MemoryCacheService.CacheEnum.FORCED_ID, thePids));
List<Long> remainingPids = thePids
@ -434,6 +433,12 @@ public class IdHelperService {
return resourcePersistentIds.get(0).getIdAsLong();
}
@Nonnull
public List<Long> getPidsOrThrowException(List<IIdType> theIds) {
List<ResourcePersistentId> resourcePersistentIds = this.resolveResourcePersistentIdsWithCache(RequestPartitionId.allPartitions(), theIds);
return resourcePersistentIds.stream().map(ResourcePersistentId::getIdAsLong).collect(Collectors.toList());
}
@Nonnull
public Long getPidOrThrowException(IAnyResource theResource) {
Long retVal = (Long) theResource.getUserData(RESOURCE_PID);

View File

@ -35,8 +35,12 @@ import java.util.function.Consumer;
public class QueryChunker<T> {
public void chunk(List<T> theInput, Consumer<List<T>> theBatchConsumer) {
for (int i = 0; i < theInput.size(); i += SearchBuilder.getMaximumPageSize()) {
int to = i + SearchBuilder.getMaximumPageSize();
chunk(theInput, SearchBuilder.getMaximumPageSize(), theBatchConsumer);
}
public void chunk(List<T> theInput, int theChunkSize, Consumer<List<T>> theBatchConsumer ) {
for (int i = 0; i < theInput.size(); i += theChunkSize) {
int to = i + theChunkSize;
to = Math.min(to, theInput.size());
List<T> batch = theInput.subList(i, to);
theBatchConsumer.accept(batch);

View File

@ -49,6 +49,8 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -306,11 +308,14 @@ public class BulkDataExportProviderTest {
InstantType now = InstantType.now();
Parameters input = new Parameters();
StringType obsTypeFilter = new StringType("Observation?code=OBSCODE,DiagnosticReport?code=DRCODE");
input.addParameter(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, new StringType(Constants.CT_FHIR_NDJSON));
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE, new StringType("Observation, DiagnosticReport"));
input.addParameter(JpaConstants.PARAM_EXPORT_SINCE, now);
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE_FILTER, new StringType("Observation?code=OBSCODE,DiagnosticReport?code=DRCODE"));
input.addParameter(JpaConstants.PARAM_EXPORT_MDM, true);
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE_FILTER, obsTypeFilter);
ourLog.info(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(input));
@ -329,11 +334,9 @@ public class BulkDataExportProviderTest {
GroupBulkDataExportOptions options = myGroupBulkDataExportOptionsCaptor.getValue();
assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat());
assertThat(options.getResourceTypes(), containsInAnyOrder("Observation", "DiagnosticReport"));
//TODO GGG eventually, we will support since in group exports
assertThat(options.getSince(), nullValue());
//TODO GGG eventually, we will support filters in group exports
assertThat(options.getFilters(), nullValue());
assertThat(options.getSince(), notNullValue());
assertThat(options.getFilters(), notNullValue());
assertEquals(GROUP_ID, options.getGroupId().getValue());
assertFalse(options.isMdm());
assertThat(options.isMdm(), is(equalTo(true)));
}
}

View File

@ -2,6 +2,8 @@ package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions;
@ -16,6 +18,9 @@ import ca.uhn.fhir.jpa.dao.r4.BaseJpaR4Test;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.entity.MdmLink;
import ca.uhn.fhir.mdm.api.MdmLinkSourceEnum;
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.util.UrlUtil;
@ -62,6 +67,7 @@ import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
@ -84,11 +90,11 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
private JobExplorer myJobExplorer;
@Autowired
@Qualifier("bulkExportJob")
@Qualifier(BatchJobsConfig.BULK_EXPORT_JOB_NAME)
private Job myBulkJob;
@Autowired
@Qualifier("groupBulkExportJob")
@Qualifier(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME)
private Job myGroupBulkJob;
private IIdType myPatientGroupId;
@ -291,10 +297,10 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
if ("Patient".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"value\":\"PAT1\"}"));
assertEquals(5, nextContents.split("\n").length); // Only female patients
assertEquals(7, nextContents.split("\n").length); // Only female patients
} else if ("Observation".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"subject\":{\"reference\":\"Patient/PAT0\"}}\n"));
assertEquals(10, nextContents.split("\n").length);
assertEquals(16, nextContents.split("\n").length);
} else {
fail(next.getResourceType());
}
@ -344,16 +350,16 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
ourLog.info("Next contents for type {}:\n{}", next.getResourceType(), nextContents);
if ("Patient".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"value\":\"PAT0\""));
assertEquals(10, nextContents.split("\n").length);
assertEquals(17, nextContents.split("\n").length);
} else if ("Observation".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"subject\":{\"reference\":\"Patient/PAT0\"}}\n"));
assertEquals(10, nextContents.split("\n").length);
assertEquals(16, nextContents.split("\n").length);
}else if ("Immunization".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"patient\":{\"reference\":\"Patient/PAT0\"}}\n"));
assertEquals(10, nextContents.split("\n").length);
assertEquals(16, nextContents.split("\n").length);
} else if ("CareTeam".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"id\":\"CT0\""));
assertEquals(10, nextContents.split("\n").length);
assertEquals(16, nextContents.split("\n").length);
} else if ("Group".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"id\":\"G0\""));
assertEquals(1, nextContents.split("\n").length);
@ -485,7 +491,8 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
}
public void awaitAllBulkJobCompletions() {
List<JobInstance> bulkExport = myJobExplorer.findJobInstancesByJobName("bulkExportJob", 0, 100);
List<JobInstance> bulkExport = myJobExplorer.findJobInstancesByJobName(BatchJobsConfig.BULK_EXPORT_JOB_NAME, 0, 100);
bulkExport.addAll(myJobExplorer.findJobInstancesByJobName(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME, 0, 100));
if (bulkExport.isEmpty()) {
fail("There are no bulk export jobs running!");
}
@ -601,7 +608,133 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
} catch (JobParametersInvalidException e) {
// good
}
}
@Test
public void testMdmExpansionSuccessfullyExtractsPatients() throws JobParametersInvalidException {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("Patient"), null, null, myPatientGroupId, true));
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(1));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Patient")));
Binary patientExportContent = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, patientExportContent.getContentType());
String nextContents = new String(patientExportContent.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", patientExportContent.getResourceType(), nextContents);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Patient")));
//Output contains The entire group, plus the Mdm expansion, plus the golden resource
assertEquals(11, nextContents.split("\n").length);
}
@Test
public void testMdmExpansionWorksForGroupExportOnMatchedPatients() throws JobParametersInvalidException {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("Immunization", "Observation"), null, null, myPatientGroupId, true));
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Observation,Immunization&_groupId=" + myPatientGroupId +"&_mdm=true", jobInfo.getRequest());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(2));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
// Check immunization Content
Binary immunizationExportContent = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, immunizationExportContent.getContentType());
String nextContents = new String(immunizationExportContent.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", immunizationExportContent.getResourceType(), nextContents);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
assertThat(nextContents, is(containsString("IMM0")));
assertThat(nextContents, is(containsString("IMM2")));
assertThat(nextContents, is(containsString("IMM4")));
assertThat(nextContents, is(containsString("IMM6")));
assertThat(nextContents, is(containsString("IMM8")));
assertThat(nextContents, is(containsString("IMM1")));
assertThat(nextContents, is(containsString("IMM3")));
assertThat(nextContents, is(containsString("IMM5")));
assertThat(nextContents, is(containsString("IMM7")));
assertThat(nextContents, is(containsString("IMM9")));
assertThat(nextContents, is(containsString("IMM999")));
//Check Observation Content
Binary observationExportContent = myBinaryDao.read(jobInfo.getFiles().get(1).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, observationExportContent.getContentType());
nextContents = new String(observationExportContent.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", observationExportContent.getResourceType(), nextContents);
assertThat(jobInfo.getFiles().get(1).getResourceType(), is(equalTo("Observation")));
assertThat(nextContents, is(containsString("OBS0")));
assertThat(nextContents, is(containsString("OBS2")));
assertThat(nextContents, is(containsString("OBS4")));
assertThat(nextContents, is(containsString("OBS6")));
assertThat(nextContents, is(containsString("OBS8")));
assertThat(nextContents, is(containsString("OBS1")));
assertThat(nextContents, is(containsString("OBS3")));
assertThat(nextContents, is(containsString("OBS5")));
assertThat(nextContents, is(containsString("OBS7")));
assertThat(nextContents, is(containsString("OBS9")));
assertThat(nextContents, is(containsString("OBS999")));
//Ensure that we didn't over-include into non-group-members data.
assertThat(nextContents, is(not(containsString("OBS1000"))));
}
@Test
public void testGroupBulkExportSupportsTypeFilters() throws JobParametersInvalidException {
createResources();
Set<String> filters = new HashSet<>();
//Only get COVID-19 vaccinations
filters.add("Immunization?vaccine-code=vaccines|COVID-19");
GroupBulkDataExportOptions groupBulkDataExportOptions = new GroupBulkDataExportOptions(null, Sets.newHashSet("Immunization"), null, filters, myPatientGroupId, true );
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(groupBulkDataExportOptions);
GroupBulkExportJobParametersBuilder paramBuilder = new GroupBulkExportJobParametersBuilder();
paramBuilder.setGroupId(myPatientGroupId.getIdPart());
paramBuilder.setMdm(true);
paramBuilder.setJobUUID(jobDetails.getJobId());
paramBuilder.setReadChunkSize(10L);
JobExecution jobExecution = myBatchJobSubmitter.runJob(myGroupBulkJob, paramBuilder.toJobParameters());
awaitJobCompletion(jobExecution);
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(1));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
// Check immunization Content
Binary immunizationExportContent = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, immunizationExportContent.getContentType());
String nextContents = new String(immunizationExportContent.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", immunizationExportContent.getResourceType(), nextContents);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
assertThat(nextContents, is(containsString("IMM1")));
assertThat(nextContents, is(containsString("IMM3")));
assertThat(nextContents, is(containsString("IMM5")));
assertThat(nextContents, is(containsString("IMM7")));
assertThat(nextContents, is(containsString("IMM9")));
assertThat(nextContents, is(containsString("IMM999")));
assertThat(nextContents, is(not(containsString("Flu"))));
}
private void awaitJobCompletion(JobExecution theJobExecution) {
@ -615,25 +748,76 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
private void createResources() {
Group group = new Group();
group.setId("G0");
//Manually create a golden record
Patient goldenPatient = new Patient();
goldenPatient.setId("PAT999");
DaoMethodOutcome g1Outcome = myPatientDao.update(goldenPatient);
Long goldenPid = myIdHelperService.getPidOrNull(g1Outcome.getResource());
//Create our golden records' data.
createObservationWithIndex(999, g1Outcome.getId());
createImmunizationWithIndex(999, g1Outcome.getId());
createCareTeamWithIndex(999, g1Outcome.getId());
//Lets create an observation and an immunization for our golden patient.
for (int i = 0; i < 10; i++) {
Patient patient = new Patient();
patient.setId("PAT" + i);
patient.setGender(i % 2 == 0 ? Enumerations.AdministrativeGender.MALE : Enumerations.AdministrativeGender.FEMALE);
patient.addName().setFamily("FAM" + i);
patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i);
IIdType patId = myPatientDao.update(patient).getId().toUnqualifiedVersionless();
DaoMethodOutcome patientOutcome = createPatientWithIndex(i);
IIdType patId = patientOutcome.getId().toUnqualifiedVersionless();
Long sourcePid = myIdHelperService.getPidOrNull(patientOutcome.getResource());
//Link the patient to the golden resource
linkToGoldenResource(goldenPid, sourcePid);
//Only add half the patients to the group.
if (i % 2 == 0 ) {
group.addMember().setEntity(new Reference(patId));
}
Observation obs = new Observation();
obs.setId("OBS" + i);
obs.addIdentifier().setSystem("SYS").setValue("VAL" + i);
obs.setStatus(Observation.ObservationStatus.FINAL);
obs.getSubject().setReference(patId.getValue());
myObservationDao.update(obs);
//Create data
createObservationWithIndex(i, patId);
createImmunizationWithIndex(i, patId);
createCareTeamWithIndex(i, patId);
}
myPatientGroupId = myGroupDao.update(group).getId();
//Manually create another golden record
Patient goldenPatient2 = new Patient();
goldenPatient2.setId("PAT888");
DaoMethodOutcome g2Outcome = myPatientDao.update(goldenPatient2);
Long goldenPid2 = myIdHelperService.getPidOrNull(g2Outcome.getResource());
//Create some nongroup patients MDM linked to a different golden resource. They shouldnt be included in the query.
for (int i = 1000; i < 1005; i++) {
DaoMethodOutcome patientOutcome = createPatientWithIndex(i);
IIdType patId = patientOutcome.getId().toUnqualifiedVersionless();
Long sourcePid = myIdHelperService.getPidOrNull(patientOutcome.getResource());
linkToGoldenResource(goldenPid2, sourcePid);
createObservationWithIndex(i, patId);
createImmunizationWithIndex(i, patId);
createCareTeamWithIndex(i, patId);
}
}
private DaoMethodOutcome createPatientWithIndex(int i) {
Patient patient = new Patient();
patient.setId("PAT" + i);
patient.setGender(i % 2 == 0 ? Enumerations.AdministrativeGender.MALE : Enumerations.AdministrativeGender.FEMALE);
patient.addName().setFamily("FAM" + i);
patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i);
DaoMethodOutcome patientOutcome = myPatientDao.update(patient);
return patientOutcome;
}
private void createCareTeamWithIndex(int i, IIdType patId) {
CareTeam careTeam = new CareTeam();
careTeam.setId("CT" + i);
careTeam.setSubject(new Reference(patId)); // This maps to the "patient" search parameter on CareTeam
myCareTeamDao.update(careTeam);
}
private void createImmunizationWithIndex(int i, IIdType patId) {
Immunization immunization = new Immunization();
immunization.setId("IMM" + i);
immunization.setPatient(new Reference(patId));
@ -647,14 +831,29 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
immunization.setVaccineCode(cc);
}
myImmunizationDao.update(immunization);
CareTeam careTeam = new CareTeam();
careTeam.setId("CT" + i);
careTeam.setSubject(new Reference(patId)); // This maps to the "patient" search parameter on CareTeam
myCareTeamDao.update(careTeam);
}
myPatientGroupId = myGroupDao.update(group).getId();
private void createObservationWithIndex(int i, IIdType patId) {
Observation obs = new Observation();
obs.setId("OBS" + i);
obs.addIdentifier().setSystem("SYS").setValue("VAL" + i);
obs.setStatus(Observation.ObservationStatus.FINAL);
obs.getSubject().setReference(patId.getValue());
myObservationDao.update(obs);
}
public void linkToGoldenResource(Long theGoldenPid, Long theSourcePid) {
MdmLink mdmLink = new MdmLink();
mdmLink.setCreated(new Date());
mdmLink.setMdmSourceType("Patient");
mdmLink.setGoldenResourcePid(theGoldenPid);
mdmLink.setSourcePid(theSourcePid);
mdmLink.setMatchResult(MdmMatchResultEnum.MATCH);
mdmLink.setHadToCreateNewGoldenResource(false);
mdmLink.setEidMatch(false);
mdmLink.setLinkSource(MdmLinkSourceEnum.MANUAL);
mdmLink.setUpdated(new Date());
mdmLink.setVersion("1");
myMdmLinkDao.save(mdmLink);
}
}

View File

@ -73,12 +73,10 @@ public class TestR4Config extends BaseJavaConfigR4 {
return new CircularQueueCaptureQueriesListener();
}
@Bean
public DataSource dataSource() {
BasicDataSource retVal = new BasicDataSource() {
@Override
public Connection getConnection() {
ConnectionWrapper retVal;

View File

@ -22,6 +22,7 @@ import ca.uhn.fhir.jpa.config.TestR4Config;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc;
import ca.uhn.fhir.jpa.dao.data.IForcedIdDao;
import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao;
import ca.uhn.fhir.jpa.dao.data.IPartitionDao;
import ca.uhn.fhir.jpa.dao.data.IResourceHistoryTableDao;
import ca.uhn.fhir.jpa.dao.data.IResourceHistoryTagDao;
@ -479,11 +480,13 @@ public abstract class BaseJpaR4Test extends BaseJpaTest implements ITestDataBuil
@Autowired
private IBulkDataExportSvc myBulkDataExportSvc;
@Autowired
private IdHelperService myIdHelperService;
protected IdHelperService myIdHelperService;
@Autowired
protected IBatchJobSubmitter myBatchJobSubmitter;
@Autowired
protected ValidationSettings myValidationSettings;
@Autowired
protected IMdmLinkDao myMdmLinkDao;
@AfterEach()
public void afterCleanupDao() {
@ -549,6 +552,7 @@ public abstract class BaseJpaR4Test extends BaseJpaTest implements ITestDataBuil
@AfterEach
public void afterPurgeDatabase() {
myMdmLinkDao.deleteAll();
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc);
}

View File

@ -187,13 +187,12 @@ public class JpaConstants {
/**
* The [id] of the group when $export is called on /Group/[id]/$export
*/
public static final Object PARAM_EXPORT_GROUP_ID = "_groupId";
public static final String PARAM_EXPORT_GROUP_ID = "_groupId";
/**
* TODO GGG eventually we will support this.
* Whether mdm should be performed on group export items to expand the group items to linked items before performing the export
*/
// public static final String PARAM_EXPORT_MDM = "_mdm";
public static final String PARAM_EXPORT_MDM = "_mdm";
/**
* Parameter for delete to indicate the deleted resources should also be expunged

View File

@ -28,6 +28,7 @@ import ca.uhn.fhir.mdm.model.CanonicalEID;
import ca.uhn.fhir.util.FhirTerser;
import org.hl7.fhir.instance.model.api.IBase;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import java.lang.reflect.Method;
import java.util.Collection;
@ -38,8 +39,10 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static ca.uhn.fhir.mdm.util.GoldenResourceHelper.FIELD_NAME_IDENTIFIER;
import static org.slf4j.LoggerFactory.getLogger;
public final class TerserUtil {
private static final Logger ourLog = getLogger(TerserUtil.class);
public static final Collection<String> IDS_AND_META_EXCLUDES =
Collections.unmodifiableSet(Stream.of("id", "identifier", "meta").collect(Collectors.toSet()));
@ -105,6 +108,23 @@ public final class TerserUtil {
}
return !(resourceIdentifier.getAccessor().getValues(theResource).isEmpty());
}
/**
* get the Values of a specified field.
*
* @param theFhirContext Context holding resource definition
* @param theResource Resource to check if the specified field is set
* @param theFieldName name of the field to check
* @return Returns true if field exists and has any values set, and false otherwise
*/
public static List<IBase> getValues(FhirContext theFhirContext, IBaseResource theResource, String theFieldName) {
RuntimeResourceDefinition resourceDefinition = theFhirContext.getResourceDefinition(theResource);
BaseRuntimeChildDefinition resourceIdentifier = resourceDefinition.getChildByName(theFieldName);
if (resourceIdentifier == null) {
ourLog.info("There is no field named {} in Resource {}", theFieldName, resourceDefinition.getName());
return null;
}
return resourceIdentifier.getAccessor().getValues(theResource);
}
/**
* Clones specified composite field (collection). Composite field values must confirm to the collections