Broken bulk export (#4012)

* Implement test

* wip

* Fixed a bug in bulk export

* add changelog

* typo

* Switch out to debug logs
This commit is contained in:
Tadgh 2022-09-09 19:18:30 -04:00 committed by GitHub
parent 306bb8b64a
commit 4226cae752
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 61 additions and 29 deletions

View File

@ -0,0 +1,4 @@
---
type: fix
issue: 4013
title: "In HAPI-FHIR 6.1.0, a regression was introduced into bulk export causing exports beyond the first one to fail in strange ways. This has been corrected."

View File

@ -106,10 +106,6 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
@Autowired
private MdmExpansionCacheSvc myMdmExpansionCacheSvc;
private final HashMap<String, ISearchBuilder> myResourceTypeToSearchBuilder = new HashMap<>();
private final HashMap<String, String> myResourceTypeToFhirPath = new HashMap<>();
private IFhirPath myFhirPath;
@Transactional
@ -151,8 +147,10 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
}
}
} else if (theParams.getExportStyle() == BulkDataExportOptions.ExportStyle.GROUP) {
ourLog.trace("About to expand a Group Bulk Export");
// Group
if (resourceType.equalsIgnoreCase("Patient")) {
ourLog.info("Expanding Patients of a Group Bulk Export.");
return getExpandedPatientIterator(theParams);
}
@ -194,14 +192,10 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
*/
protected ISearchBuilder getSearchBuilderForLocalResourceType(ExportPIDIteratorParameters theParams) {
String resourceType = theParams.getResourceType();
if (!myResourceTypeToSearchBuilder.containsKey(resourceType)) {
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceType);
RuntimeResourceDefinition def = myContext.getResourceDefinition(resourceType);
Class<? extends IBaseResource> nextTypeClass = def.getImplementingClass();
ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, resourceType, nextTypeClass);
myResourceTypeToSearchBuilder.put(resourceType, sb);
}
return myResourceTypeToSearchBuilder.get(resourceType);
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceType);
RuntimeResourceDefinition def = myContext.getResourceDefinition(resourceType);
Class<? extends IBaseResource> nextTypeClass = def.getImplementingClass();
return mySearchBuilderFactory.newSearchBuilder(dao, resourceType, nextTypeClass);
}
protected RuntimeSearchParam getPatientSearchParamForCurrentResourceType(String theResourceType) {
@ -220,9 +214,6 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
annotateBackwardsReferences(resource);
}
}
// is this necessary?
myResourceTypeToFhirPath.clear();
}
/**
@ -258,6 +249,7 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
private Iterator<ResourcePersistentId> getExpandedPatientIterator(ExportPIDIteratorParameters theParameters) {
List<String> members = getMembersFromGroupWithFilter(theParameters);
List<IIdType> ids = members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList());
ourLog.debug("While extracting patients from a group, we found {} patients.", ids.size());
// Are bulk exports partition aware or care about partition at all? This does
List<ResourcePersistentId> pidsOrThrowException = myIdHelperService.getPidsOrThrowException(RequestPartitionId.allPartitions(), ids);
@ -298,6 +290,7 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
HasOrListParam hasOrListParam = new HasOrListParam();
hasOrListParam.addOr(new HasParam("Group", "member", "_id", theParameters.getGroupId()));
map.add(PARAM_HAS, hasOrListParam);
ourLog.debug("Searching for members of group {} with job id {} with map {}", theParameters.getGroupId(), theParameters.getJobId(), map);
IResultIterator resultIterator = searchBuilder.createQuery(map,
new SearchRuntimeDetails(null, theParameters.getJobId()),
@ -486,14 +479,8 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
private Optional<String> getPatientReference(IBaseResource iBaseResource) {
String fhirPath;
String resourceType = iBaseResource.fhirType();
if (myResourceTypeToFhirPath.containsKey(resourceType)) {
fhirPath = myResourceTypeToFhirPath.get(resourceType);
} else {
RuntimeSearchParam runtimeSearchParam = getRuntimeSearchParam(iBaseResource);
fhirPath = getPatientFhirPath(runtimeSearchParam);
myResourceTypeToFhirPath.put(resourceType, fhirPath);
}
RuntimeSearchParam runtimeSearchParam = getRuntimeSearchParam(iBaseResource);
fhirPath = getPatientFhirPath(runtimeSearchParam);
if (iBaseResource.fhirType().equalsIgnoreCase("Patient")) {
return Optional.of(iBaseResource.getIdElement().getIdPart());

View File

@ -59,7 +59,7 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
group.addMember().getEntity().setReference("Patient/PM");
myClient.update().resource(group).execute();
varifyBulkExportResults("G", Sets.newHashSet("Patient?gender=female"), Collections.singletonList("\"PF\""), Collections.singletonList("\"PM\""));
verifyBulkExportResults("G", Sets.newHashSet("Patient?gender=female"), Collections.singletonList("\"PF\""), Collections.singletonList("\"PM\""));
}
@Test
@ -90,11 +90,38 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
group.addMember().getEntity().setReference("Patient/PING2");
myClient.update().resource(group).execute();
varifyBulkExportResults("G2", new HashSet<>(), List.of("\"PING1\"", "\"PING2\""), Collections.singletonList("\"PNING3\""));
verifyBulkExportResults("G2", new HashSet<>(), List.of("\"PING1\"", "\"PING2\""), Collections.singletonList("\"PNING3\""));
}
@Test
public void testTwoBulkExportsInArow() {
// Create some resources
Patient patient = new Patient();
patient.setId("PING1");
patient.setGender(Enumerations.AdministrativeGender.FEMALE);
patient.setActive(true);
myClient.update().resource(patient).execute();
Group group = new Group();
group.setId("Group/G2");
group.setActive(true);
group.addMember().getEntity().setReference("Patient/PING1");
myClient.update().resource(group).execute();
myCaptureQueriesListener.clear();
verifyBulkExportResults("G2", new HashSet<>(), List.of("\"PING1\""), Collections.singletonList("\"PNING3\""));
myCaptureQueriesListener.logSelectQueries();
ourLog.error("************");
myCaptureQueriesListener.clear();
try {
verifyBulkExportResults("G2", new HashSet<>(), List.of("\"PING1\""), Collections.singletonList("\"PNING3\""));
} finally {
myCaptureQueriesListener.logSelectQueries();
}
}
private void varifyBulkExportResults(String theGroupId, HashSet<String> theFilters, List<String> theContainedList, List<String> theExcludedList) {
private void verifyBulkExportResults(String theGroupId, HashSet<String> theFilters, List<String> theContainedList, List<String> theExcludedList) {
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Patient"));
options.setGroupId(new IdType("Group", theGroupId));

View File

@ -63,6 +63,7 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker<BulkExportJobPa
providerParams.setExportStyle(params.getExportStyle());
providerParams.setGroupId(params.getGroupId());
providerParams.setExpandMdm(params.isExpandMdm());
ourLog.info("Running FetchResourceIdsStep with params: {}", providerParams);
int submissionCount = 0;
try {
@ -73,6 +74,9 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker<BulkExportJobPa
Iterator<ResourcePersistentId> pidIterator = myBulkExportProcessor.getResourcePidIterator(providerParams);
List<Id> idsToSubmit = new ArrayList<>();
if (!pidIterator.hasNext()) {
ourLog.warn("Bulk Export generated an iterator with no results!");
}
while (pidIterator.hasNext()) {
ResourcePersistentId pid = pidIterator.next();

View File

@ -125,7 +125,6 @@ public class Batch2JobRunnerImpl implements IBatch2JobRunner {
private Batch2JobStartResponse startBatch2BulkExportJob(BulkExportParameters theParameters) {
JobInstanceStartRequest request = createStartRequest(theParameters);
request.setParameters(BulkExportJobParameters.createFromExportJobParameters(theParameters));
return myJobCoordinator.startInstance(request);
}

View File

@ -149,7 +149,8 @@ public class FetchResourceIdsStepTest {
verify(myAppender, atLeastOnce()).doAppend(logCaptor.capture());
List<ILoggingEvent> events = logCaptor.getAllValues();
assertTrue(events.get(0).getMessage().contains("Starting BatchExport job"));
assertTrue(events.get(1).getFormattedMessage().contains("Submitted "
assertTrue(events.get(1).getMessage().contains("Running FetchResource"));
assertTrue(events.get(2).getFormattedMessage().contains("Submitted "
+ parameters.getResourceTypes().size()
+ " groups of ids for processing"
));

View File

@ -129,7 +129,7 @@ public class JobCoordinatorImpl implements IJobCoordinator {
instance.setStatus(StatusEnum.QUEUED);
String instanceId = myJobPersistence.storeNewInstance(instance);
ourLog.info("Stored new {} job {} with status {}", jobDefinition.getJobDefinitionId(), instanceId, instance.getStatus());
ourLog.info("Stored new {} job {} with status {} and parameters {}", jobDefinition.getJobDefinitionId(), instanceId, instance.getStatus(), instance.getParameters());
BatchWorkChunk batchWorkChunk = BatchWorkChunk.firstChunk(jobDefinition, instanceId);
String chunkId = myJobPersistence.storeWorkChunk(batchWorkChunk);

View File

@ -89,7 +89,17 @@ public class JobInstanceStartRequest implements IModelJson {
return myUseCache;
}
public void setUseCache(boolean theUseCache) {
myUseCache = theUseCache;
}
@Override
public String toString() {
return "JobInstanceStartRequest{" +
"myJobDefinitionId='" + myJobDefinitionId + '\'' +
", myParameters='" + myParameters + '\'' +
", myUseCache=" + myUseCache +
'}';
}
}