Support MDM expansion in group bulk export, re-add capability to use _typeFilter in Bulk Export
This commit is contained in:
parent
53598c3bdc
commit
57c7d4f3eb
|
@ -31,30 +31,35 @@ import ca.uhn.fhir.jpa.dao.IResultIterator;
|
|||
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
|
||||
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
|
||||
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
|
||||
import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao;
|
||||
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
|
||||
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
|
||||
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
|
||||
import ca.uhn.fhir.jpa.model.util.JpaConstants;
|
||||
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
|
||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||
import ca.uhn.fhir.jpa.util.QueryChunker;
|
||||
import ca.uhn.fhir.model.api.Include;
|
||||
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
|
||||
import ca.uhn.fhir.model.primitive.IdDt;
|
||||
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
|
||||
import ca.uhn.fhir.rest.param.DateRangeParam;
|
||||
import ca.uhn.fhir.rest.param.HasParam;
|
||||
import ca.uhn.fhir.rest.param.ReferenceOrListParam;
|
||||
import ca.uhn.fhir.rest.param.ReferenceParam;
|
||||
import ca.uhn.fhir.util.UrlUtil;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.instance.model.api.IPrimitiveType;
|
||||
import org.slf4j.Logger;
|
||||
import org.springframework.batch.item.ItemReader;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.persistence.EntityManager;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId>> {
|
||||
|
@ -69,6 +74,8 @@ public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId
|
|||
private String myJobUUID;
|
||||
@Value("#{jobParameters['" + BulkExportJobConfig.READ_CHUNK_PARAMETER + "']}")
|
||||
private Long myReadChunkSize;
|
||||
@Value("#{jobParameters['" + BulkExportJobConfig.EXPAND_MDM_PARAMETER+ "']}")
|
||||
private String myMdmEnabled;
|
||||
|
||||
@Autowired
|
||||
private IBulkExportJobDao myBulkExportJobDao;
|
||||
|
@ -79,88 +86,177 @@ public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId
|
|||
@Autowired
|
||||
private SearchBuilderFactory mySearchBuilderFactory;
|
||||
@Autowired
|
||||
private EntityManager myEntityManager;
|
||||
private IdHelperService myIdHelperService;
|
||||
@Autowired
|
||||
private IMdmLinkDao myMdmLinkDao;
|
||||
@Autowired
|
||||
private MatchUrlService myMatchUrlService;
|
||||
|
||||
private void loadResourcePids() {
|
||||
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
|
||||
if (!jobOpt.isPresent()) {
|
||||
ourLog.warn("Job appears to be deleted");
|
||||
return;
|
||||
}
|
||||
BulkExportJobEntity jobEntity = jobOpt.get();
|
||||
ourLog.info("Group Bulk export starting generation for batch export job: [{}] with resourceType [{}] and UUID [{}]", jobEntity, myResourceType, myJobUUID);
|
||||
private ISearchBuilder mySearchBuilder;
|
||||
private RuntimeSearchParam myPatientSearchParam;
|
||||
private BulkExportJobEntity myJobEntity;
|
||||
private RuntimeResourceDefinition myResourceDefinition;
|
||||
|
||||
//Fetch all the pids given the query.
|
||||
ISearchBuilder searchBuilder = getSearchBuilder();
|
||||
|
||||
//Build complex-ish _has query with a revincludes which allows lookup by group membership
|
||||
SearchParameterMap searchParameterMap = getSearchParameterMap(jobEntity);
|
||||
|
||||
IResultIterator resultIterator = searchBuilder.createQuery(
|
||||
searchParameterMap,
|
||||
new SearchRuntimeDetails(null, myJobUUID),
|
||||
null,
|
||||
RequestPartitionId.allPartitions()
|
||||
);
|
||||
|
||||
List<ResourcePersistentId> myReadPids = new ArrayList<>();
|
||||
while (resultIterator.hasNext()) {
|
||||
myReadPids.add(resultIterator.next());
|
||||
}
|
||||
|
||||
//Given that databases explode when you have an IN clause with >1000 resources, we use the QueryChunker to break this into multiple queries.
|
||||
List<ResourcePersistentId> revIncludePids = new ArrayList<>();
|
||||
QueryChunker<ResourcePersistentId> chunker = new QueryChunker<>();
|
||||
|
||||
chunker.chunk(myReadPids, pidChunk -> {
|
||||
revIncludePids.addAll(searchBuilder.loadIncludes(myContext, myEntityManager, pidChunk, searchParameterMap.getRevIncludes(), true, searchParameterMap.getLastUpdated(), myJobUUID, null));
|
||||
});
|
||||
|
||||
myPidIterator = revIncludePids.iterator();
|
||||
}
|
||||
|
||||
//For all group revinclude queries, you need to perform the search on the Patient DAO, which is why this is hardcoded here.
|
||||
private ISearchBuilder getSearchBuilder() {
|
||||
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao("Patient");
|
||||
RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient");
|
||||
Class<? extends IBaseResource> nextTypeClass = def.getImplementingClass();
|
||||
return mySearchBuilderFactory.newSearchBuilder(dao, "Patient", nextTypeClass);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private SearchParameterMap getSearchParameterMap(BulkExportJobEntity jobEntity) {
|
||||
SearchParameterMap searchParameterMap = new SearchParameterMap();
|
||||
searchParameterMap.add("_has", new HasParam("Group", "member", "_id", myGroupId));
|
||||
|
||||
String revIncludeString = buildRevIncludeString();
|
||||
searchParameterMap.addRevInclude(new Include(revIncludeString).toLocked());
|
||||
|
||||
if (jobEntity.getSince() != null) {
|
||||
searchParameterMap.setLastUpdated(new DateRangeParam(jobEntity.getSince(), null));
|
||||
}
|
||||
searchParameterMap.setLoadSynchronous(true);
|
||||
return searchParameterMap;
|
||||
/**
|
||||
* Given the local myGroupId, read this group, and find all members' patient references.
|
||||
* @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"]
|
||||
*/
|
||||
private List<String> getMembers() {
|
||||
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId));
|
||||
List<IPrimitiveType> evaluate = myContext.newFhirPath().evaluate(group, "member.entity.reference", IPrimitiveType.class);
|
||||
return evaluate.stream().map(IPrimitiveType::getValueAsString).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Given the resource type of the job, fetch its patient compartment name, formatted for usage in an Include.
|
||||
* e.g. Immunization -> Immunization:patient
|
||||
* Given the local myGroupId, perform an expansion to retrieve all resource IDs of member patients.
|
||||
* if myMdmEnabled is set to true, we also reach out to the IMdmLinkDao to attempt to also expand it into matched
|
||||
* patients.
|
||||
*
|
||||
* @return A string which can be dropped directly into an Include.
|
||||
* @return a Set of Strings representing the resource IDs of all members of a group.
|
||||
*/
|
||||
private String buildRevIncludeString() {
|
||||
RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(myResourceType);
|
||||
RuntimeSearchParam patientSearchParam = runtimeResourceDefinition.getSearchParam("patient");
|
||||
if (patientSearchParam == null) {
|
||||
patientSearchParam = runtimeResourceDefinition.getSearchParam("subject");
|
||||
if (patientSearchParam == null) {
|
||||
patientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition);
|
||||
private Set<String> expandAllPatientPidsFromGroup() {
|
||||
Set<String> expandedIds = new HashSet<>();
|
||||
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId));
|
||||
Long pidOrNull = myIdHelperService.getPidOrNull(group);
|
||||
|
||||
//Attempt to perform MDM Expansion of membership
|
||||
if (Boolean.valueOf(myMdmEnabled)) {
|
||||
List<List<Long>> goldenPidTargetPidTuple = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
|
||||
//Now lets translate these pids into resource IDs
|
||||
Set<Long> uniquePids = new HashSet<>();
|
||||
goldenPidTargetPidTuple.forEach(uniquePids::addAll);
|
||||
Map<Long, Optional<String>> longOptionalMap = myIdHelperService.translatePidsToForcedIds(uniquePids);
|
||||
expandedIds = longOptionalMap.values().stream().map(Optional::get).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
//Now manually add the members of the group (its possible even with mdm expansion that some members dont have MDM matches,
|
||||
//so would be otherwise skipped
|
||||
expandedIds.addAll(getMembers());
|
||||
|
||||
return expandedIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate the list of pids of all resources of the given myResourceType, which reference any group member of the given myGroupId.
|
||||
* Store them in a member iterator.
|
||||
*/
|
||||
private void loadResourcePids() {
|
||||
//Initialize an array to hold the pids of the target resources to be exported.
|
||||
List<ResourcePersistentId> myReadPids = new ArrayList<>();
|
||||
|
||||
//First lets expand the group so we get a list of all patient IDs of the group, and MDM-matched patient IDs of the group.
|
||||
Set<String> expandedMemberResourceIds = expandAllPatientPidsFromGroup();
|
||||
|
||||
|
||||
//Next, let's search for the target resources, with their correct patient references, chunked.
|
||||
//The results will be jammed into myReadPids
|
||||
QueryChunker<String> queryChunker = new QueryChunker<>();
|
||||
queryChunker.chunk(new ArrayList<>(expandedMemberResourceIds), 100, (idChunk) -> {
|
||||
queryTargetResourceWithReferencesToPatients(myReadPids, idChunk);
|
||||
});
|
||||
myPidIterator = myReadPids.iterator();
|
||||
}
|
||||
|
||||
private void queryTargetResourceWithReferencesToPatients(List<ResourcePersistentId> myReadPids, List<String> idChunk) {
|
||||
//Build SP map
|
||||
//First, inject the _typeFilters from the export job
|
||||
SearchParameterMap expandedSpMap = createSearchParameterMapFromTypeFilter();
|
||||
|
||||
//Reject any attempts for users to filter on the patient searchparameter, as we have to manually set it.
|
||||
RuntimeSearchParam runtimeSearchParam = getPatientSearchParam();
|
||||
if (expandedSpMap.get(runtimeSearchParam.getName()) != null) {
|
||||
throw new IllegalArgumentException(String.format("Group Bulk Export manually modifies the Search Parameter called [{}], so you may not include this search parameter in your _typeFilter!", runtimeSearchParam.getName()));
|
||||
}
|
||||
|
||||
ReferenceOrListParam orList = new ReferenceOrListParam();
|
||||
idChunk.forEach(id -> orList.add(new ReferenceParam(id)));
|
||||
expandedSpMap.add(runtimeSearchParam.getName(), orList);
|
||||
|
||||
// Fetch and cache a search builder for this resource type
|
||||
ISearchBuilder searchBuilder = getSearchBuilderForLocalResourceType();
|
||||
|
||||
//Execute query and all found pids to our local iterator.
|
||||
IResultIterator resultIterator = searchBuilder.createQuery(expandedSpMap, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
|
||||
while (resultIterator.hasNext()) {
|
||||
myReadPids.add(resultIterator.next());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get and cache an ISearchBuilder for the given resource type this partition is responsible for.
|
||||
*/
|
||||
private ISearchBuilder getSearchBuilderForLocalResourceType() {
|
||||
if (mySearchBuilder == null) {
|
||||
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(myResourceType);
|
||||
RuntimeResourceDefinition def = myContext.getResourceDefinition(myResourceType);
|
||||
Class<? extends IBaseResource> nextTypeClass = def.getImplementingClass();
|
||||
mySearchBuilder = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, nextTypeClass);
|
||||
}
|
||||
return mySearchBuilder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given the resource type, fetch its patient-based search parameter name
|
||||
* 1. Attempt to find one called 'patient'
|
||||
* 2. If that fails, find one called 'subject'
|
||||
* 3. If that fails, find find by Patient Compartment.
|
||||
* 3.1 If that returns >1 result, throw an error
|
||||
* 3.2 If that returns 1 result, return it
|
||||
*/
|
||||
private RuntimeSearchParam getPatientSearchParam() {
|
||||
if (myPatientSearchParam == null) {
|
||||
RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(myResourceType);
|
||||
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("patient");
|
||||
if (myPatientSearchParam == null) {
|
||||
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("subject");
|
||||
if (myPatientSearchParam == null) {
|
||||
myPatientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition);
|
||||
if (myPatientSearchParam == null) {
|
||||
String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", myResourceType);
|
||||
throw new IllegalArgumentException(errorMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
String includeString = runtimeResourceDefinition.getName() + ":" + patientSearchParam.getName();
|
||||
return includeString;
|
||||
return myPatientSearchParam;
|
||||
}
|
||||
|
||||
private SearchParameterMap createSearchParameterMapFromTypeFilter() {
|
||||
SearchParameterMap map = new SearchParameterMap();
|
||||
Map<String, String[]> requestUrl = UrlUtil.parseQueryStrings(getJobEntity().getRequest());
|
||||
String[] typeFilters = requestUrl.get(JpaConstants.PARAM_EXPORT_TYPE_FILTER);
|
||||
if (typeFilters != null) {
|
||||
Optional<String> filter = Arrays.stream(typeFilters).filter(t -> t.startsWith(myResourceType + "?")).findFirst();
|
||||
if (filter.isPresent()) {
|
||||
String matchUrl = filter.get();
|
||||
map = myMatchUrlService.translateMatchUrl(matchUrl, getResourceDefinition());
|
||||
}
|
||||
}
|
||||
map.setLoadSynchronous(true);
|
||||
return map;
|
||||
}
|
||||
|
||||
private RuntimeResourceDefinition getResourceDefinition() {
|
||||
if (myResourceDefinition == null) {
|
||||
myResourceDefinition = myContext.getResourceDefinition(myResourceType);
|
||||
}
|
||||
return myResourceDefinition;
|
||||
}
|
||||
|
||||
private BulkExportJobEntity getJobEntity() {
|
||||
if (myJobEntity == null) {
|
||||
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
|
||||
if (jobOpt.isPresent()) {
|
||||
myJobEntity = jobOpt.get();
|
||||
} else {
|
||||
String errorMessage = String.format("Job with UUID %s does not exist!", myJobUUID);
|
||||
throw new IllegalStateException(errorMessage);
|
||||
}
|
||||
}
|
||||
return myJobEntity;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Search the resource definition for a compartment named 'patient' and return its related Search Parameter.
|
||||
*/
|
||||
|
@ -181,9 +277,13 @@ public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId
|
|||
|
||||
@Override
|
||||
public List<ResourcePersistentId> read() {
|
||||
|
||||
ourLog.info("Group Bulk export starting generation for batch export job: [{}] with resourceType [{}] and UUID [{}]", getJobEntity(), myResourceType, myJobUUID);
|
||||
|
||||
if (myPidIterator == null) {
|
||||
loadResourcePids();
|
||||
}
|
||||
|
||||
int count = 0;
|
||||
List<ResourcePersistentId> outgoing = new ArrayList<>();
|
||||
while (myPidIterator.hasNext() && count < myReadChunkSize) {
|
||||
|
@ -194,4 +294,5 @@ public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId
|
|||
return outgoing.size() == 0 ? null : outgoing;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -28,6 +28,8 @@ import org.springframework.data.jpa.repository.Query;
|
|||
import org.springframework.data.repository.query.Param;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Repository
|
||||
public interface IMdmLinkDao extends JpaRepository<MdmLink, Long> {
|
||||
@Modifying
|
||||
|
@ -37,4 +39,19 @@ public interface IMdmLinkDao extends JpaRepository<MdmLink, Long> {
|
|||
@Modifying
|
||||
@Query("DELETE FROM MdmLink f WHERE (myGoldenResourcePid = :pid OR mySourcePid = :pid) AND myMatchResult <> :matchResult")
|
||||
int deleteWithAnyReferenceToPidAndMatchResultNot(@Param("pid") Long thePid, @Param("matchResult") MdmMatchResultEnum theMatchResult);
|
||||
|
||||
@Query("SELECT f.mySourcePid FROM MdmLink f WHERE f.myMatchResult=:matchResult AND f.myGoldenResourcePid IN (:pids)")
|
||||
List<Long> expandGoldenResourcePids(@Param("pids")List<Long> theGoldenResourcePids, @Param("matchResult") MdmMatchResultEnum theMdmMatchResultEnum);
|
||||
|
||||
@Query("SELECT f.myGoldenResourcePid FROM MdmLink f WHERE f.mySourcePid IN (:pids)")
|
||||
List<Long> getGoldenResourcePids(@Param("pids") List<Long> theSourcePids);
|
||||
|
||||
@Query("SELECT ml2.myGoldenResourcePid, ml2.mySourcePid FROM MdmLink ml2 " +
|
||||
"WHERE ml2.myMatchResult=:matchResult " +
|
||||
"AND ml2.myGoldenResourcePid IN (" +
|
||||
"SELECT ml.myGoldenResourcePid FROM MdmLink ml " +
|
||||
"INNER JOIN ResourceLink hrl " +
|
||||
"ON hrl.myTargetResourcePid=ml.mySourcePid " +
|
||||
"AND hrl.mySourceResourcePid=:groupPid)")
|
||||
List<List<Long>> expandPidsFromGroupPidGivenMatchResult(@Param("groupPid") Long theGroupPid, @Param("matchResult") MdmMatchResultEnum theMdmMatchResultEnum);
|
||||
}
|
||||
|
|
|
@ -381,7 +381,6 @@ public class IdHelperService {
|
|||
}
|
||||
|
||||
public Map<Long, Optional<String>> translatePidsToForcedIds(Set<Long> thePids) {
|
||||
|
||||
Map<Long, Optional<String>> retVal = new HashMap<>(myMemoryCacheService.getAllPresent(MemoryCacheService.CacheEnum.FORCED_ID, thePids));
|
||||
|
||||
List<Long> remainingPids = thePids
|
||||
|
@ -434,6 +433,12 @@ public class IdHelperService {
|
|||
return resourcePersistentIds.get(0).getIdAsLong();
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
public List<Long> getPidsOrThrowException(List<IIdType> theIds) {
|
||||
List<ResourcePersistentId> resourcePersistentIds = this.resolveResourcePersistentIdsWithCache(RequestPartitionId.allPartitions(), theIds);
|
||||
return resourcePersistentIds.stream().map(ResourcePersistentId::getIdAsLong).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
public Long getPidOrThrowException(IAnyResource theResource) {
|
||||
Long retVal = (Long) theResource.getUserData(RESOURCE_PID);
|
||||
|
|
|
@ -35,8 +35,12 @@ import java.util.function.Consumer;
|
|||
public class QueryChunker<T> {
|
||||
|
||||
public void chunk(List<T> theInput, Consumer<List<T>> theBatchConsumer) {
|
||||
for (int i = 0; i < theInput.size(); i += SearchBuilder.getMaximumPageSize()) {
|
||||
int to = i + SearchBuilder.getMaximumPageSize();
|
||||
chunk(theInput, SearchBuilder.getMaximumPageSize(), theBatchConsumer);
|
||||
}
|
||||
|
||||
public void chunk(List<T> theInput, int theChunkSize, Consumer<List<T>> theBatchConsumer ) {
|
||||
for (int i = 0; i < theInput.size(); i += theChunkSize) {
|
||||
int to = i + theChunkSize;
|
||||
to = Math.min(to, theInput.size());
|
||||
List<T> batch = theInput.subList(i, to);
|
||||
theBatchConsumer.accept(batch);
|
||||
|
|
|
@ -68,6 +68,7 @@ import static org.hamcrest.CoreMatchers.containsString;
|
|||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
@ -614,7 +615,74 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
|
|||
createResources();
|
||||
|
||||
// Create a bulk job
|
||||
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("Immunization"), null, null, myPatientGroupId, true));
|
||||
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("Immunization", "Observation"), null, null, myPatientGroupId, true));
|
||||
|
||||
GroupBulkExportJobParametersBuilder paramBuilder = new GroupBulkExportJobParametersBuilder();
|
||||
paramBuilder.setGroupId(myPatientGroupId.getIdPart());
|
||||
paramBuilder.setMdm(true);
|
||||
paramBuilder.setJobUUID(jobDetails.getJobId());
|
||||
paramBuilder.setReadChunkSize(10L);
|
||||
|
||||
JobExecution jobExecution = myBatchJobSubmitter.runJob(myGroupBulkJob, paramBuilder.toJobParameters());
|
||||
|
||||
awaitJobCompletion(jobExecution);
|
||||
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
|
||||
|
||||
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
|
||||
assertThat(jobInfo.getFiles().size(), equalTo(2));
|
||||
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
|
||||
|
||||
// Check immunization Content
|
||||
Binary immunizationExportContent = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
|
||||
assertEquals(Constants.CT_FHIR_NDJSON, immunizationExportContent.getContentType());
|
||||
String nextContents = new String(immunizationExportContent.getContent(), Constants.CHARSET_UTF8);
|
||||
ourLog.info("Next contents for type {}:\n{}", immunizationExportContent.getResourceType(), nextContents);
|
||||
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
|
||||
assertThat(nextContents, is(containsString("IMM0")));
|
||||
assertThat(nextContents, is(containsString("IMM2")));
|
||||
assertThat(nextContents, is(containsString("IMM4")));
|
||||
assertThat(nextContents, is(containsString("IMM6")));
|
||||
assertThat(nextContents, is(containsString("IMM8")));
|
||||
assertThat(nextContents, is(containsString("IMM1")));
|
||||
assertThat(nextContents, is(containsString("IMM3")));
|
||||
assertThat(nextContents, is(containsString("IMM5")));
|
||||
assertThat(nextContents, is(containsString("IMM7")));
|
||||
assertThat(nextContents, is(containsString("IMM9")));
|
||||
assertThat(nextContents, is(containsString("IMM999")));
|
||||
|
||||
|
||||
//Check Observation Content
|
||||
Binary observationExportContent = myBinaryDao.read(jobInfo.getFiles().get(1).getResourceId());
|
||||
assertEquals(Constants.CT_FHIR_NDJSON, observationExportContent.getContentType());
|
||||
nextContents = new String(observationExportContent.getContent(), Constants.CHARSET_UTF8);
|
||||
ourLog.info("Next contents for type {}:\n{}", observationExportContent.getResourceType(), nextContents);
|
||||
assertThat(jobInfo.getFiles().get(1).getResourceType(), is(equalTo("Observation")));
|
||||
assertThat(nextContents, is(containsString("OBS0")));
|
||||
assertThat(nextContents, is(containsString("OBS2")));
|
||||
assertThat(nextContents, is(containsString("OBS4")));
|
||||
assertThat(nextContents, is(containsString("OBS6")));
|
||||
assertThat(nextContents, is(containsString("OBS8")));
|
||||
assertThat(nextContents, is(containsString("OBS1")));
|
||||
assertThat(nextContents, is(containsString("OBS3")));
|
||||
assertThat(nextContents, is(containsString("OBS5")));
|
||||
assertThat(nextContents, is(containsString("OBS7")));
|
||||
assertThat(nextContents, is(containsString("OBS9")));
|
||||
assertThat(nextContents, is(containsString("OBS999")));
|
||||
|
||||
//Ensure that we didn't over-include into non-group-members data.
|
||||
assertThat(nextContents, is(not(containsString("OBS1000"))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupBulkExportSupportsTypeFilters() throws JobParametersInvalidException {
|
||||
createResources();
|
||||
Set<String> filters = new HashSet<>();
|
||||
|
||||
//Only get COVID-19 vaccinations
|
||||
filters.add("Immunization?vaccine-code=vaccines|COVID-19");
|
||||
|
||||
GroupBulkDataExportOptions groupBulkDataExportOptions = new GroupBulkDataExportOptions(null, Sets.newHashSet("Immunization"), null, filters, myPatientGroupId, true );
|
||||
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(groupBulkDataExportOptions);
|
||||
|
||||
GroupBulkExportJobParametersBuilder paramBuilder = new GroupBulkExportJobParametersBuilder();
|
||||
paramBuilder.setGroupId(myPatientGroupId.getIdPart());
|
||||
|
@ -631,24 +699,21 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
|
|||
assertThat(jobInfo.getFiles().size(), equalTo(1));
|
||||
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
|
||||
|
||||
// Iterate over the files
|
||||
Binary nextBinary = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
|
||||
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
|
||||
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
|
||||
ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents);
|
||||
// Check immunization Content
|
||||
Binary immunizationExportContent = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
|
||||
assertEquals(Constants.CT_FHIR_NDJSON, immunizationExportContent.getContentType());
|
||||
String nextContents = new String(immunizationExportContent.getContent(), Constants.CHARSET_UTF8);
|
||||
ourLog.info("Next contents for type {}:\n{}", immunizationExportContent.getResourceType(), nextContents);
|
||||
|
||||
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
|
||||
assertThat(nextContents, is(containsString("IMM0")));
|
||||
assertThat(nextContents, is(containsString("IMM2")));
|
||||
assertThat(nextContents, is(containsString("IMM4")));
|
||||
assertThat(nextContents, is(containsString("IMM6")));
|
||||
assertThat(nextContents, is(containsString("IMM8")));
|
||||
assertThat(nextContents, is(containsString("IMM1")));
|
||||
assertThat(nextContents, is(containsString("IMM3")));
|
||||
assertThat(nextContents, is(containsString("IMM5")));
|
||||
assertThat(nextContents, is(containsString("IMM7")));
|
||||
assertThat(nextContents, is(containsString("IMM9")));
|
||||
assertThat(nextContents, is(containsString("IMM999")));
|
||||
|
||||
assertThat(nextContents, is(not(containsString("Flu"))));
|
||||
}
|
||||
|
||||
private void awaitJobCompletion(JobExecution theJobExecution) {
|
||||
|
@ -683,7 +748,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
|
|||
|
||||
//Link the patient to the golden resource
|
||||
linkToGoldenResource(goldenPid, sourcePid);
|
||||
|
||||
|
||||
//Only add half the patients to the group.
|
||||
if (i % 2 == 0 ) {
|
||||
group.addMember().setEntity(new Reference(patId));
|
||||
|
@ -695,6 +760,24 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
|
|||
createCareTeamWithIndex(i, patId);
|
||||
}
|
||||
myPatientGroupId = myGroupDao.update(group).getId();
|
||||
|
||||
//Manually create a golden record
|
||||
Patient goldenPatient2 = new Patient();
|
||||
goldenPatient2.setId("PAT888");
|
||||
DaoMethodOutcome g2Outcome = myPatientDao.update(goldenPatient2);
|
||||
Long goldenPid2 = myIdHelperService.getPidOrNull(g2Outcome.getResource());
|
||||
|
||||
//Create some nongroup patients MDM linked to a different golden resource. They shouldnt be included in the query.
|
||||
for (int i = 1000; i < 1005; i++) {
|
||||
DaoMethodOutcome patientOutcome = createPatientWithIndex(i);
|
||||
IIdType patId = patientOutcome.getId().toUnqualifiedVersionless();
|
||||
Long sourcePid = myIdHelperService.getPidOrNull(patientOutcome.getResource());
|
||||
linkToGoldenResource(goldenPid2, sourcePid);
|
||||
createObservationWithIndex(i, patId);
|
||||
createImmunizationWithIndex(i, patId);
|
||||
createCareTeamWithIndex(i, patId);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private DaoMethodOutcome createPatientWithIndex(int i) {
|
||||
|
|
|
@ -73,12 +73,10 @@ public class TestR4Config extends BaseJavaConfigR4 {
|
|||
return new CircularQueueCaptureQueriesListener();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public DataSource dataSource() {
|
||||
BasicDataSource retVal = new BasicDataSource() {
|
||||
|
||||
|
||||
@Override
|
||||
public Connection getConnection() {
|
||||
ConnectionWrapper retVal;
|
||||
|
|
|
@ -28,6 +28,7 @@ import ca.uhn.fhir.mdm.model.CanonicalEID;
|
|||
import ca.uhn.fhir.util.FhirTerser;
|
||||
import org.hl7.fhir.instance.model.api.IBase;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Collection;
|
||||
|
@ -38,8 +39,10 @@ import java.util.stream.Collectors;
|
|||
import java.util.stream.Stream;
|
||||
|
||||
import static ca.uhn.fhir.mdm.util.GoldenResourceHelper.FIELD_NAME_IDENTIFIER;
|
||||
import static org.slf4j.LoggerFactory.getLogger;
|
||||
|
||||
public final class TerserUtil {
|
||||
private static final Logger ourLog = getLogger(TerserUtil.class);
|
||||
|
||||
public static final Collection<String> IDS_AND_META_EXCLUDES =
|
||||
Collections.unmodifiableSet(Stream.of("id", "identifier", "meta").collect(Collectors.toSet()));
|
||||
|
@ -105,6 +108,23 @@ public final class TerserUtil {
|
|||
}
|
||||
return !(resourceIdentifier.getAccessor().getValues(theResource).isEmpty());
|
||||
}
|
||||
/**
|
||||
* get the Values of a specified field.
|
||||
*
|
||||
* @param theFhirContext Context holding resource definition
|
||||
* @param theResource Resource to check if the specified field is set
|
||||
* @param theFieldName name of the field to check
|
||||
* @return Returns true if field exists and has any values set, and false otherwise
|
||||
*/
|
||||
public static List<IBase> getValues(FhirContext theFhirContext, IBaseResource theResource, String theFieldName) {
|
||||
RuntimeResourceDefinition resourceDefinition = theFhirContext.getResourceDefinition(theResource);
|
||||
BaseRuntimeChildDefinition resourceIdentifier = resourceDefinition.getChildByName(theFieldName);
|
||||
if (resourceIdentifier == null) {
|
||||
ourLog.info("There is no field named {} in Resource {}", theFieldName, resourceDefinition.getName());
|
||||
return null;
|
||||
}
|
||||
return resourceIdentifier.getAccessor().getValues(theResource);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clones specified composite field (collection). Composite field values must confirm to the collections
|
||||
|
|
Loading…
Reference in New Issue