From 4226cae75201821f718dfbe0eb38bbafaf531d72 Mon Sep 17 00:00:00 2001 From: Tadgh Date: Fri, 9 Sep 2022 19:18:30 -0400 Subject: [PATCH] Broken bulk export (#4012) * Implement test * wip * Fixed a bug in bulk export * add changelog * typo * Switch out to debug logs --- .../6_2_0/4013-bulk-export-regression.yaml | 4 +++ .../export/svc/JpaBulkExportProcessor.java | 33 ++++++------------- .../uhn/fhir/jpa/bulk/BulkDataExportTest.java | 33 +++++++++++++++++-- .../jobs/export/FetchResourceIdsStep.java | 4 +++ .../jobs/services/Batch2JobRunnerImpl.java | 1 - .../jobs/export/FetchResourceIdsStepTest.java | 3 +- .../coordinator/JobCoordinatorImpl.java | 2 +- .../batch2/model/JobInstanceStartRequest.java | 10 ++++++ 8 files changed, 61 insertions(+), 29 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_2_0/4013-bulk-export-regression.yaml diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_2_0/4013-bulk-export-regression.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_2_0/4013-bulk-export-regression.yaml new file mode 100644 index 00000000000..f044558db26 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_2_0/4013-bulk-export-regression.yaml @@ -0,0 +1,4 @@ +--- +type: fix +issue: 4013 +title: "In HAPI-FHIR 6.1.0, a regression was introduced into bulk export causing exports beyond the first one to fail in strange ways. This has been corrected." diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/JpaBulkExportProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/JpaBulkExportProcessor.java index 1b11b1bb377..fbd38b19fe9 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/JpaBulkExportProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/JpaBulkExportProcessor.java @@ -106,10 +106,6 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor { @Autowired private MdmExpansionCacheSvc myMdmExpansionCacheSvc; - private final HashMap myResourceTypeToSearchBuilder = new HashMap<>(); - - private final HashMap myResourceTypeToFhirPath = new HashMap<>(); - private IFhirPath myFhirPath; @Transactional @@ -151,8 +147,10 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor { } } } else if (theParams.getExportStyle() == BulkDataExportOptions.ExportStyle.GROUP) { + ourLog.trace("About to expand a Group Bulk Export"); // Group if (resourceType.equalsIgnoreCase("Patient")) { + ourLog.info("Expanding Patients of a Group Bulk Export."); return getExpandedPatientIterator(theParams); } @@ -194,14 +192,10 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor { */ protected ISearchBuilder getSearchBuilderForLocalResourceType(ExportPIDIteratorParameters theParams) { String resourceType = theParams.getResourceType(); - if (!myResourceTypeToSearchBuilder.containsKey(resourceType)) { - IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceType); - RuntimeResourceDefinition def = myContext.getResourceDefinition(resourceType); - Class nextTypeClass = def.getImplementingClass(); - ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, resourceType, nextTypeClass); - myResourceTypeToSearchBuilder.put(resourceType, sb); - } - return myResourceTypeToSearchBuilder.get(resourceType); + IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceType); + RuntimeResourceDefinition def = myContext.getResourceDefinition(resourceType); + Class nextTypeClass = def.getImplementingClass(); + return mySearchBuilderFactory.newSearchBuilder(dao, resourceType, nextTypeClass); } protected RuntimeSearchParam getPatientSearchParamForCurrentResourceType(String theResourceType) { @@ -220,9 +214,6 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor { annotateBackwardsReferences(resource); } } - - // is this necessary? - myResourceTypeToFhirPath.clear(); } /** @@ -258,6 +249,7 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor { private Iterator getExpandedPatientIterator(ExportPIDIteratorParameters theParameters) { List members = getMembersFromGroupWithFilter(theParameters); List ids = members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList()); + ourLog.debug("While extracting patients from a group, we found {} patients.", ids.size()); // Are bulk exports partition aware or care about partition at all? This does List pidsOrThrowException = myIdHelperService.getPidsOrThrowException(RequestPartitionId.allPartitions(), ids); @@ -298,6 +290,7 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor { HasOrListParam hasOrListParam = new HasOrListParam(); hasOrListParam.addOr(new HasParam("Group", "member", "_id", theParameters.getGroupId())); map.add(PARAM_HAS, hasOrListParam); + ourLog.debug("Searching for members of group {} with job id {} with map {}", theParameters.getGroupId(), theParameters.getJobId(), map); IResultIterator resultIterator = searchBuilder.createQuery(map, new SearchRuntimeDetails(null, theParameters.getJobId()), @@ -486,14 +479,8 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor { private Optional getPatientReference(IBaseResource iBaseResource) { String fhirPath; - String resourceType = iBaseResource.fhirType(); - if (myResourceTypeToFhirPath.containsKey(resourceType)) { - fhirPath = myResourceTypeToFhirPath.get(resourceType); - } else { - RuntimeSearchParam runtimeSearchParam = getRuntimeSearchParam(iBaseResource); - fhirPath = getPatientFhirPath(runtimeSearchParam); - myResourceTypeToFhirPath.put(resourceType, fhirPath); - } + RuntimeSearchParam runtimeSearchParam = getRuntimeSearchParam(iBaseResource); + fhirPath = getPatientFhirPath(runtimeSearchParam); if (iBaseResource.fhirType().equalsIgnoreCase("Patient")) { return Optional.of(iBaseResource.getIdElement().getIdPart()); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportTest.java index f507f327ee8..f1cadd1edcf 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportTest.java @@ -59,7 +59,7 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test { group.addMember().getEntity().setReference("Patient/PM"); myClient.update().resource(group).execute(); - varifyBulkExportResults("G", Sets.newHashSet("Patient?gender=female"), Collections.singletonList("\"PF\""), Collections.singletonList("\"PM\"")); + verifyBulkExportResults("G", Sets.newHashSet("Patient?gender=female"), Collections.singletonList("\"PF\""), Collections.singletonList("\"PM\"")); } @Test @@ -90,11 +90,38 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test { group.addMember().getEntity().setReference("Patient/PING2"); myClient.update().resource(group).execute(); - varifyBulkExportResults("G2", new HashSet<>(), List.of("\"PING1\"", "\"PING2\""), Collections.singletonList("\"PNING3\"")); + verifyBulkExportResults("G2", new HashSet<>(), List.of("\"PING1\"", "\"PING2\""), Collections.singletonList("\"PNING3\"")); + } + + @Test + public void testTwoBulkExportsInArow() { + // Create some resources + Patient patient = new Patient(); + patient.setId("PING1"); + patient.setGender(Enumerations.AdministrativeGender.FEMALE); + patient.setActive(true); + myClient.update().resource(patient).execute(); + + Group group = new Group(); + group.setId("Group/G2"); + group.setActive(true); + group.addMember().getEntity().setReference("Patient/PING1"); + myClient.update().resource(group).execute(); + myCaptureQueriesListener.clear(); + verifyBulkExportResults("G2", new HashSet<>(), List.of("\"PING1\""), Collections.singletonList("\"PNING3\"")); + myCaptureQueriesListener.logSelectQueries(); + ourLog.error("************"); + myCaptureQueriesListener.clear(); + try { + verifyBulkExportResults("G2", new HashSet<>(), List.of("\"PING1\""), Collections.singletonList("\"PNING3\"")); + } finally { + myCaptureQueriesListener.logSelectQueries(); + + } } - private void varifyBulkExportResults(String theGroupId, HashSet theFilters, List theContainedList, List theExcludedList) { + private void verifyBulkExportResults(String theGroupId, HashSet theFilters, List theContainedList, List theExcludedList) { BulkDataExportOptions options = new BulkDataExportOptions(); options.setResourceTypes(Sets.newHashSet("Patient")); options.setGroupId(new IdType("Group", theGroupId)); diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStep.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStep.java index 92b1bc75dae..5890de21803 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStep.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStep.java @@ -63,6 +63,7 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker pidIterator = myBulkExportProcessor.getResourcePidIterator(providerParams); List idsToSubmit = new ArrayList<>(); + if (!pidIterator.hasNext()) { + ourLog.warn("Bulk Export generated an iterator with no results!"); + } while (pidIterator.hasNext()) { ResourcePersistentId pid = pidIterator.next(); diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/services/Batch2JobRunnerImpl.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/services/Batch2JobRunnerImpl.java index a1bea23400d..59e7efa7926 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/services/Batch2JobRunnerImpl.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/services/Batch2JobRunnerImpl.java @@ -125,7 +125,6 @@ public class Batch2JobRunnerImpl implements IBatch2JobRunner { private Batch2JobStartResponse startBatch2BulkExportJob(BulkExportParameters theParameters) { JobInstanceStartRequest request = createStartRequest(theParameters); request.setParameters(BulkExportJobParameters.createFromExportJobParameters(theParameters)); - return myJobCoordinator.startInstance(request); } diff --git a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStepTest.java b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStepTest.java index 1e39aa098a6..0f0aac670f4 100644 --- a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStepTest.java +++ b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStepTest.java @@ -149,7 +149,8 @@ public class FetchResourceIdsStepTest { verify(myAppender, atLeastOnce()).doAppend(logCaptor.capture()); List events = logCaptor.getAllValues(); assertTrue(events.get(0).getMessage().contains("Starting BatchExport job")); - assertTrue(events.get(1).getFormattedMessage().contains("Submitted " + assertTrue(events.get(1).getMessage().contains("Running FetchResource")); + assertTrue(events.get(2).getFormattedMessage().contains("Submitted " + parameters.getResourceTypes().size() + " groups of ids for processing" )); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java index f5bb4886cb8..4466e4df3a5 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java @@ -129,7 +129,7 @@ public class JobCoordinatorImpl implements IJobCoordinator { instance.setStatus(StatusEnum.QUEUED); String instanceId = myJobPersistence.storeNewInstance(instance); - ourLog.info("Stored new {} job {} with status {}", jobDefinition.getJobDefinitionId(), instanceId, instance.getStatus()); + ourLog.info("Stored new {} job {} with status {} and parameters {}", jobDefinition.getJobDefinitionId(), instanceId, instance.getStatus(), instance.getParameters()); BatchWorkChunk batchWorkChunk = BatchWorkChunk.firstChunk(jobDefinition, instanceId); String chunkId = myJobPersistence.storeWorkChunk(batchWorkChunk); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobInstanceStartRequest.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobInstanceStartRequest.java index 6e735ecb402..414b2b47592 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobInstanceStartRequest.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobInstanceStartRequest.java @@ -89,7 +89,17 @@ public class JobInstanceStartRequest implements IModelJson { return myUseCache; } + public void setUseCache(boolean theUseCache) { myUseCache = theUseCache; } + + @Override + public String toString() { + return "JobInstanceStartRequest{" + + "myJobDefinitionId='" + myJobDefinitionId + '\'' + + ", myParameters='" + myParameters + '\'' + + ", myUseCache=" + myUseCache + + '}'; + } }