Fix ForcedId criteria query (#4195)
* Fix ForcedId criteria query * wip parameterizing tests * Refactor almost all APIs to use ResourcePersistentId. Convert to fhir resource id when necessary (at search time) * run tests in ANY id mode * Test fix Co-authored-by: juan.marchionatto <juan.marchionatto@smilecdr.com> Co-authored-by: Tadgh <garygrantgraham@gmail.com> Co-authored-by: jamesagnew <jamesagnew@gmail.com>
This commit is contained in:
parent
e1e33df38c
commit
3d9dfd4f08
|
@ -0,0 +1,4 @@
|
|||
---
|
||||
type: fix
|
||||
issue: 4194
|
||||
title: "Bulk Group export was failing to export Patient resources when Client ID mode was set to: ANY. This has been fixed"
|
|
@ -215,6 +215,7 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
|
|||
if (theResourceType.equalsIgnoreCase("Patient")) {
|
||||
ourLog.info("Expanding Patients of a Group Bulk Export.");
|
||||
pids = getExpandedPatientList(theParams);
|
||||
ourLog.info("Obtained {} PIDs", pids.size());
|
||||
} else if (theResourceType.equalsIgnoreCase("Group")) {
|
||||
pids = getSingletonGroupList(theParams);
|
||||
} else {
|
||||
|
@ -225,7 +226,7 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
|
|||
|
||||
private Set<ResourcePersistentId> getRelatedResourceTypePids(ExportPIDIteratorParameters theParams, RuntimeResourceDefinition theDef) {
|
||||
Set<ResourcePersistentId> pids = new HashSet<>();
|
||||
Set<String> expandedMemberResourceIds = expandAllPatientPidsFromGroup(theParams);
|
||||
Set<ResourcePersistentId> expandedMemberResourceIds = expandAllPatientPidsFromGroup(theParams);
|
||||
assert expandedMemberResourceIds != null && !expandedMemberResourceIds.isEmpty();
|
||||
if (ourLog.isDebugEnabled()) {
|
||||
ourLog.debug("{} has been expanded to members:[{}]", theParams.getGroupId(), expandedMemberResourceIds);
|
||||
|
@ -233,8 +234,8 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
|
|||
|
||||
//Next, let's search for the target resources, with their correct patient references, chunked.
|
||||
//The results will be jammed into myReadPids
|
||||
QueryChunker<String> queryChunker = new QueryChunker<>();
|
||||
queryChunker.chunk(new ArrayList<>(expandedMemberResourceIds), QUERY_CHUNK_SIZE, (idChunk) -> {
|
||||
QueryChunker<ResourcePersistentId> queryChunker = new QueryChunker<>();
|
||||
queryChunker.chunk(expandedMemberResourceIds, QUERY_CHUNK_SIZE, (idChunk) -> {
|
||||
queryResourceTypeWithReferencesToPatients(pids, idChunk, theParams, theDef);
|
||||
});
|
||||
return pids;
|
||||
|
@ -307,12 +308,13 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
|
|||
* possibly expanded by MDM, and don't have to go and fetch other resource DAOs.
|
||||
*/
|
||||
private Set<ResourcePersistentId> getExpandedPatientList(ExportPIDIteratorParameters theParameters) {
|
||||
List<String> members = getMembersFromGroupWithFilter(theParameters);
|
||||
List<ResourcePersistentId> members = getMembersFromGroupWithFilter(theParameters);
|
||||
List<IIdType> ids = members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList());
|
||||
ourLog.debug("While extracting patients from a group, we found {} patients.", ids.size());
|
||||
|
||||
ourLog.info("While extracting patients from a group, we found {} patients.", ids.size());
|
||||
ourLog.info("Found patients: {}", ids.stream().map(id -> id.getValue()).collect(Collectors.joining(", ")));
|
||||
// Are bulk exports partition aware or care about partition at all? This does
|
||||
List<ResourcePersistentId> pidsOrThrowException = myIdHelperService.getPidsOrThrowException(RequestPartitionId.allPartitions(), ids);
|
||||
|
||||
List<ResourcePersistentId> pidsOrThrowException = members;
|
||||
Set<ResourcePersistentId> patientPidsToExport = new HashSet<>(pidsOrThrowException);
|
||||
|
||||
if (theParameters.isExpandMdm()) {
|
||||
|
@ -334,9 +336,10 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
|
|||
*
|
||||
* @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"]
|
||||
*/
|
||||
private List<String> getMembersFromGroupWithFilter(ExportPIDIteratorParameters theParameters) {
|
||||
private List<ResourcePersistentId> getMembersFromGroupWithFilter(ExportPIDIteratorParameters theParameters) {
|
||||
RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient");
|
||||
List<String> pids = new ArrayList<>();
|
||||
List<ResourcePersistentId> resPids = new ArrayList<>();
|
||||
|
||||
List<SearchParameterMap> maps = myBulkExportHelperSvc.createSearchParameterMapsForResourceType(def, theParameters);
|
||||
|
||||
|
@ -349,11 +352,12 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
|
|||
new SearchRuntimeDetails(null, theParameters.getJobId()),
|
||||
null,
|
||||
RequestPartitionId.allPartitions());
|
||||
|
||||
while (resultIterator.hasNext()) {
|
||||
pids.add(resultIterator.next().toString());
|
||||
resPids.add(resultIterator.next());
|
||||
}
|
||||
}
|
||||
return pids;
|
||||
return resPids;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -415,9 +419,14 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
|
|||
}
|
||||
|
||||
private void queryResourceTypeWithReferencesToPatients(Set<ResourcePersistentId> myReadPids,
|
||||
List<String> idChunk,
|
||||
List<ResourcePersistentId> resourcePersistentIdChunk,
|
||||
ExportPIDIteratorParameters theParams,
|
||||
RuntimeResourceDefinition theDef) {
|
||||
|
||||
//Convert Resource Persistent IDs to actual client IDs.
|
||||
Set<ResourcePersistentId> pidSet = new HashSet<>(resourcePersistentIdChunk);
|
||||
Set<String> resourceIds = myIdHelperService.translatePidsToFhirResourceIds(pidSet);
|
||||
|
||||
//Build SP map
|
||||
//First, inject the _typeFilters and _since from the export job
|
||||
List<SearchParameterMap> expandedSpMaps = myBulkExportHelperSvc.createSearchParameterMapsForResourceType(theDef, theParams);
|
||||
|
@ -431,9 +440,9 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
|
|||
|
||||
// Now, further filter the query with patient references defined by the chunk of IDs we have.
|
||||
if (PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES.contains(theParams.getResourceType())) {
|
||||
filterSearchByHasParam(idChunk, expandedSpMap, theParams);
|
||||
filterSearchByHasParam(resourceIds, expandedSpMap, theParams);
|
||||
} else {
|
||||
filterSearchByResourceIds(idChunk, expandedSpMap, theParams);
|
||||
filterSearchByResourceIds(resourceIds, expandedSpMap, theParams);
|
||||
}
|
||||
|
||||
//Execute query and all found pids to our local iterator.
|
||||
|
@ -461,7 +470,7 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
|
|||
* @param expandedSpMap
|
||||
* @param theParams
|
||||
*/
|
||||
private void filterSearchByResourceIds(List<String> idChunk, SearchParameterMap expandedSpMap, ExportPIDIteratorParameters theParams) {
|
||||
private void filterSearchByResourceIds(Set<String> idChunk, SearchParameterMap expandedSpMap, ExportPIDIteratorParameters theParams) {
|
||||
ReferenceOrListParam orList = new ReferenceOrListParam();
|
||||
idChunk.forEach(id -> orList.add(new ReferenceParam(id)));
|
||||
RuntimeSearchParam patientSearchParamForCurrentResourceType = getPatientSearchParamForCurrentResourceType(theParams.getResourceType());
|
||||
|
@ -472,17 +481,17 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
|
|||
* @param idChunk
|
||||
* @param expandedSpMap
|
||||
*/
|
||||
private void filterSearchByHasParam(List<String> idChunk, SearchParameterMap expandedSpMap, ExportPIDIteratorParameters theParams) {
|
||||
private void filterSearchByHasParam(Set<String> idChunk, SearchParameterMap expandedSpMap, ExportPIDIteratorParameters theParams) {
|
||||
HasOrListParam hasOrListParam = new HasOrListParam();
|
||||
idChunk.stream().forEach(id -> hasOrListParam.addOr(buildHasParam(id, theParams.getResourceType())));
|
||||
expandedSpMap.add("_has", hasOrListParam);
|
||||
}
|
||||
|
||||
private HasParam buildHasParam(String theId, String theResourceType) {
|
||||
private HasParam buildHasParam(String theResourceId, String theResourceType) {
|
||||
if ("Practitioner".equalsIgnoreCase(theResourceType)) {
|
||||
return new HasParam("Patient", "general-practitioner", "_id", theId);
|
||||
return new HasParam("Patient", "general-practitioner", "_id", theResourceId);
|
||||
} else if ("Organization".equalsIgnoreCase(theResourceType)) {
|
||||
return new HasParam("Patient", "organization", "_id", theId);
|
||||
return new HasParam("Patient", "organization", "_id", theResourceId);
|
||||
} else {
|
||||
throw new IllegalArgumentException(Msg.code(2077) + " We can't handle forward references onto type " + theResourceType);
|
||||
}
|
||||
|
@ -495,14 +504,27 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
|
|||
*
|
||||
* @return a Set of Strings representing the resource IDs of all members of a group.
|
||||
*/
|
||||
private Set<String> expandAllPatientPidsFromGroup(ExportPIDIteratorParameters theParams) {
|
||||
Set<String> expandedIds = new HashSet<>();
|
||||
private Set<ResourcePersistentId> expandAllPatientPidsFromGroup(ExportPIDIteratorParameters theParams) {
|
||||
Set<ResourcePersistentId> expandedIds = new HashSet<>();
|
||||
SystemRequestDetails requestDetails = SystemRequestDetails.newSystemRequestAllPartitions();
|
||||
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(theParams.getGroupId()), requestDetails);
|
||||
ResourcePersistentId pidOrNull = myIdHelperService.getPidOrNull(RequestPartitionId.allPartitions(), group);
|
||||
|
||||
//Attempt to perform MDM Expansion of membership
|
||||
if (theParams.isExpandMdm()) {
|
||||
expandedIds.addAll(performMembershipExpansionViaMdmTable(pidOrNull));
|
||||
}
|
||||
|
||||
//Now manually add the members of the group (its possible even with mdm expansion that some members dont have MDM matches,
|
||||
//so would be otherwise skipped
|
||||
List<ResourcePersistentId> membersFromGroupWithFilter = getMembersFromGroupWithFilter(theParams);
|
||||
ourLog.debug("Group with ID [{}] has been expanded to: {}", theParams.getGroupId(), membersFromGroupWithFilter);
|
||||
expandedIds.addAll(membersFromGroupWithFilter);
|
||||
|
||||
return expandedIds;
|
||||
}
|
||||
|
||||
private Set<ResourcePersistentId> performMembershipExpansionViaMdmTable(ResourcePersistentId pidOrNull) {
|
||||
List<MdmPidTuple> goldenPidTargetPidTuples = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
|
||||
//Now lets translate these pids into resource IDs
|
||||
Set<ResourcePersistentId> uniquePids = new HashSet<>();
|
||||
|
@ -516,18 +538,7 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
|
|||
extract(goldenPidTargetPidTuples, goldenResourceToSourcePidMap);
|
||||
populateMdmResourceCache(goldenPidTargetPidTuples);
|
||||
|
||||
//If the result of the translation is an empty optional, it means there is no forced id, and we can use the PID as the resource ID.
|
||||
Set<String> resolvedResourceIds = pidToForcedIdMap.getResolvedResourceIds();
|
||||
expandedIds.addAll(resolvedResourceIds);
|
||||
}
|
||||
|
||||
//Now manually add the members of the group (its possible even with mdm expansion that some members dont have MDM matches,
|
||||
//so would be otherwise skipped
|
||||
List<String> membersFromGroupWithFilter = getMembersFromGroupWithFilter(theParams);
|
||||
ourLog.debug("Group with ID [{}] has been expanded to: {}", theParams.getGroupId(), membersFromGroupWithFilter);
|
||||
expandedIds.addAll(membersFromGroupWithFilter);
|
||||
|
||||
return expandedIds;
|
||||
return uniquePids;
|
||||
}
|
||||
|
||||
/* Mdm Expansion */
|
||||
|
|
|
@ -273,9 +273,6 @@ public class JpaBulkExportProcessorTest {
|
|||
IFhirResourceDao<?> mockDao = mock(IFhirResourceDao.class);
|
||||
ISearchBuilder searchBuilder = mock(ISearchBuilder.class);
|
||||
|
||||
// when
|
||||
when(myIdHelperService.getPidsOrThrowException(any(), anyList()))
|
||||
.thenReturn(pids);
|
||||
// from getMembersFromGroupWithFilter
|
||||
when(myBulkExportHelperService.createSearchParameterMapsForResourceType(any(RuntimeResourceDefinition.class), eq(parameters)))
|
||||
.thenReturn(Collections.singletonList(new SearchParameterMap()));
|
||||
|
|
|
@ -62,6 +62,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
|||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
|
||||
|
||||
public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(BulkExportUseCaseTest.class);
|
||||
|
||||
|
@ -71,6 +73,7 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
|
|||
@Autowired
|
||||
private IJobPersistence myJobPersistence;
|
||||
|
||||
|
||||
@Nested
|
||||
public class SpecConformanceTests {
|
||||
@Test
|
||||
|
@ -114,6 +117,7 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
|
|||
|
||||
@Nested
|
||||
public class SystemBulkExportTests {
|
||||
|
||||
@Test
|
||||
public void testBinariesAreStreamedWithRespectToAcceptHeader() throws IOException {
|
||||
int patientCount = 5;
|
||||
|
@ -228,6 +232,8 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Nested
|
||||
public class PatientBulkExportTests {
|
||||
|
||||
|
@ -268,6 +274,7 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Nested
|
||||
public class GroupBulkExportTests {
|
||||
@Test
|
||||
|
@ -702,6 +709,53 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
|
|||
assertThat(typeToContents.get("Observation"), containsString("obs-included-0"));
|
||||
assertThat(typeToContents.get("Observation"), containsString("obs-included-999"));
|
||||
}
|
||||
|
||||
@Nested
|
||||
public class WithClientIdStrategyEnumANY {
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
myDaoConfig.setResourceClientIdStrategy(DaoConfig.ClientIdStrategyEnum.ANY);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void tearDown() {
|
||||
myDaoConfig.setResourceClientIdStrategy(DaoConfig.ClientIdStrategyEnum.ALPHANUMERIC);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupExportPatientOnly() {
|
||||
Patient patient = new Patient();
|
||||
patient.setId("PING1");
|
||||
patient.setGender(Enumerations.AdministrativeGender.FEMALE);
|
||||
patient.setActive(true);
|
||||
myClient.update().resource(patient).execute();
|
||||
|
||||
//Other patient not in group
|
||||
Patient patient2 = new Patient();
|
||||
patient2.setId("POG2");
|
||||
patient2.setGender(Enumerations.AdministrativeGender.FEMALE);
|
||||
patient2.setActive(true);
|
||||
myClient.update().resource(patient2).execute();
|
||||
|
||||
Group group = new Group();
|
||||
group.setId("Group/G2");
|
||||
group.setActive(true);
|
||||
group.addMember().getEntity().setReference("Patient/PING1");
|
||||
myClient.update().resource(group).execute();
|
||||
|
||||
HashSet<String> resourceTypes = Sets.newHashSet("Patient");
|
||||
BulkExportJobResults bulkExportJobResults = startGroupBulkExportJobAndAwaitCompletion(resourceTypes, new HashSet<>(), "G2");
|
||||
|
||||
Map<String, List<IBaseResource>> typeToResources = convertJobResultsToResources(bulkExportJobResults);
|
||||
assertThat(typeToResources.get("Patient"), hasSize(1));
|
||||
|
||||
Map<String, String> typeToContents = convertJobResultsToStringContents(bulkExportJobResults);
|
||||
assertThat(typeToContents.get("Patient"), containsString("PING1"));
|
||||
assertThat(typeToContents.get("Patient"), not(containsString("POG2")));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private Map<String, String> convertJobResultsToStringContents(BulkExportJobResults theResults) {
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
package ca.uhn.fhir.jpa.bulk;
|
||||
|
||||
import ca.uhn.fhir.batch2.api.IJobPersistence;
|
||||
import ca.uhn.fhir.batch2.model.JobInstance;
|
||||
import ca.uhn.fhir.jpa.api.config.DaoConfig;
|
||||
import ca.uhn.fhir.jpa.api.model.BulkExportJobResults;
|
||||
import ca.uhn.fhir.jpa.api.svc.IBatch2JobRunner;
|
||||
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
|
||||
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportResponseJson;
|
||||
import ca.uhn.fhir.jpa.provider.r4.BaseResourceProviderR4Test;
|
||||
import ca.uhn.fhir.jpa.util.BulkExportUtils;
|
||||
import ca.uhn.fhir.parser.IParser;
|
||||
import ca.uhn.fhir.rest.api.Constants;
|
||||
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
|
||||
import ca.uhn.fhir.util.JsonUtil;
|
||||
import ca.uhn.fhir.util.SearchParameterUtil;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.io.Charsets;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.http.Header;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.r4.model.*;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Nested;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.CoreMatchers.notNullValue;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
|
||||
public class BulkExportUseCaseTestAnyMode extends BulkExportUseCaseTest {
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(BulkExportUseCaseTestAnyMode.class);
|
||||
|
||||
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
myDaoConfig.setResourceClientIdStrategy(DaoConfig.ClientIdStrategyEnum.ANY);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() {
|
||||
myDaoConfig.setResourceClientIdStrategy(new DaoConfig().getResourceClientIdStrategy());
|
||||
}
|
||||
|
||||
}
|
|
@ -79,7 +79,7 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker<BulkExportJobPa
|
|||
providerParams.setResourceType(resourceType);
|
||||
|
||||
// filters are the filters for searching
|
||||
ourLog.info("Running FetchResourceIdsStep with params: {}", providerParams);
|
||||
ourLog.info("Running FetchResourceIdsStep for resource type: {} with params: {}", resourceType, providerParams);
|
||||
Iterator<ResourcePersistentId> pidIterator = myBulkExportProcessor.getResourcePidIterator(providerParams);
|
||||
List<Id> idsToSubmit = new ArrayList<>();
|
||||
|
||||
|
|
Loading…
Reference in New Issue