From 7cabcbd772a604317a19e73d40c3cfe7c106ac42 Mon Sep 17 00:00:00 2001 From: Tadgh Date: Fri, 16 Apr 2021 11:12:40 -0400 Subject: [PATCH] Fix partition selection for system request details --- .../bulk/export/job/GroupBulkItemReader.java | 12 ++-- .../bulk/export/job/ResourceToFileWriter.java | 3 +- .../partition/RequestPartitionHelperSvc.java | 30 ++++++++- .../jpa/bulk/BulkDataExportSvcImplR4Test.java | 61 ++++++++++++++++--- .../uhn/fhir/jpa/model/util/JpaConstants.java | 5 ++ 5 files changed, 95 insertions(+), 16 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/GroupBulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/GroupBulkItemReader.java index 3a10fec2aae..acac73df135 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/GroupBulkItemReader.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/GroupBulkItemReader.java @@ -29,6 +29,8 @@ import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao; import ca.uhn.fhir.jpa.dao.index.IdHelperService; import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc; import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; +import ca.uhn.fhir.jpa.model.util.JpaConstants; +import ca.uhn.fhir.jpa.partition.SystemRequestDetails; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.util.QueryChunker; import ca.uhn.fhir.mdm.api.MdmMatchResultEnum; @@ -178,13 +180,13 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade * @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"] */ private List getMembers() { - IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId)); + SystemRequestDetails requestDetails = new SystemRequestDetails(); + requestDetails.setTenantId(JpaConstants.ALL_PARTITIONS_NAME); + IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId), requestDetails); List evaluate = myContext.newFhirPath().evaluate(group, "member.entity.reference", IPrimitiveType.class); return evaluate.stream().map(IPrimitiveType::getValueAsString).collect(Collectors.toList()); } - - /** * Given the local myGroupId, perform an expansion to retrieve all resource IDs of member patients. * if myMdmEnabled is set to true, we also reach out to the IMdmLinkDao to attempt to also expand it into matched @@ -194,7 +196,9 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade */ private Set expandAllPatientPidsFromGroup() { Set expandedIds = new HashSet<>(); - IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId)); + SystemRequestDetails requestDetails = new SystemRequestDetails(); + requestDetails.setTenantId(JpaConstants.ALL_PARTITIONS_NAME); + IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId), new SystemRequestDetails()); Long pidOrNull = myIdHelperService.getPidOrNull(group); //Attempt to perform MDM Expansion of membership diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/ResourceToFileWriter.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/ResourceToFileWriter.java index 8b4ebe7e86a..df362a79f19 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/ResourceToFileWriter.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/ResourceToFileWriter.java @@ -27,6 +27,7 @@ import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome; import ca.uhn.fhir.jpa.batch.log.Logs; import ca.uhn.fhir.jpa.bulk.export.svc.BulkExportDaoSvc; import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity; +import ca.uhn.fhir.jpa.partition.SystemRequestDetails; import ca.uhn.fhir.parser.IParser; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.util.BinaryUtil; @@ -100,7 +101,7 @@ public class ResourceToFileWriter implements ItemWriter> { IBaseBinary binary = BinaryUtil.newBinary(myFhirContext); binary.setContentType(Constants.CT_FHIR_NDJSON); binary.setContent(myOutputStream.toByteArray()); - DaoMethodOutcome outcome = myBinaryDao.create(binary); + DaoMethodOutcome outcome = myBinaryDao.create(binary, new SystemRequestDetails()); return outcome.getResource().getIdElement(); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/partition/RequestPartitionHelperSvc.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/partition/RequestPartitionHelperSvc.java index 37da051117a..94d360f1b31 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/partition/RequestPartitionHelperSvc.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/partition/RequestPartitionHelperSvc.java @@ -44,6 +44,7 @@ import java.util.HashSet; import java.util.List; import java.util.Objects; +import static ca.uhn.fhir.jpa.model.util.JpaConstants.ALL_PARTITIONS_NAME; import static ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster.doCallHooks; import static ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster.doCallHooksAndReturnObject; import static ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster.hasHooks; @@ -101,6 +102,19 @@ public class RequestPartitionHelperSvc implements IRequestPartitionHelperSvc { return RequestPartitionId.defaultPartition(); } + //Shortcircuit and write system calls out to default partition. + if (theRequest instanceof SystemRequestDetails) { + if (theRequest.getTenantId() != null) { + if (theRequest.getTenantId().equals(ALL_PARTITIONS_NAME)) { + return RequestPartitionId.allPartitions(); + } else { + return RequestPartitionId.fromPartitionName(theRequest.getTenantId()); + } + } else { + return RequestPartitionId.defaultPartition(); + } + } + // Interceptor call: STORAGE_PARTITION_IDENTIFY_READ if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_READ, myInterceptorBroadcaster, theRequest)) { HookParams params = new HookParams() @@ -129,7 +143,21 @@ public class RequestPartitionHelperSvc implements IRequestPartitionHelperSvc { if (myPartitionSettings.isPartitioningEnabled()) { - // Interceptor call: STORAGE_PARTITION_IDENTIFY_CREATE + //Shortcircuit and write system calls out to default partition. + if (theRequest instanceof SystemRequestDetails) { + if (theRequest.getTenantId() != null) { + if (theRequest.getTenantId().equals(ALL_PARTITIONS_NAME)) { + return RequestPartitionId.allPartitions(); + } else { + return RequestPartitionId.fromPartitionName(theRequest.getTenantId()); + } + } else { + return RequestPartitionId.defaultPartition(); + } + } + + + // Interceptor call: STORAGE_PARTITION_IDENTIFY_CREATE HookParams params = new HookParams() .add(IBaseResource.class, theResource) .add(RequestDetails.class, theRequest) diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java index fdc92d090e9..902e5019e98 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java @@ -18,6 +18,7 @@ import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity; import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity; import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; import ca.uhn.fhir.jpa.entity.MdmLink; +import ca.uhn.fhir.jpa.partition.SystemRequestDetails; import ca.uhn.fhir.mdm.api.MdmLinkSourceEnum; import ca.uhn.fhir.mdm.api.MdmMatchResultEnum; import ca.uhn.fhir.parser.IParser; @@ -494,7 +495,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { Patient patient = new Patient(); patient.setId("PAT" + i); patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i); - myPatientDao.update(patient).getId().toUnqualifiedVersionless(); + myPatientDao.update(patient, new SystemRequestDetails()).getId().toUnqualifiedVersionless(); } // Create a bulk job @@ -848,7 +849,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { public String getBinaryContents(IBulkDataExportSvc.JobInfo theJobInfo, int theIndex) { // Iterate over the files - Binary nextBinary = myBinaryDao.read(theJobInfo.getFiles().get(theIndex).getResourceId()); + Binary nextBinary = myBinaryDao.read(theJobInfo.getFiles().get(theIndex).getResourceId(), new SystemRequestDetails()); assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType()); String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8); ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents); @@ -928,7 +929,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { //Check Observation Content - Binary observationExportContent = myBinaryDao.read(jobInfo.getFiles().get(1).getResourceId()); + Binary observationExportContent = myBinaryDao.read(jobInfo.getFiles().get(1).getResourceId(), new SystemRequestDetails()); assertEquals(Constants.CT_FHIR_NDJSON, observationExportContent.getContentType()); nextContents = new String(observationExportContent.getContent(), Constants.CHARSET_UTF8); ourLog.info("Next contents for type {}:\n{}", observationExportContent.getResourceType(), nextContents); @@ -1061,7 +1062,47 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { //Now if we create another one and ask for the cache, we should get the most-recently-insert entry. IBulkDataExportSvc.JobInfo jobInfo10 = myBulkDataExportSvc.submitJob(options, true); assertThat(jobInfo10.getJobId(), is(equalTo(jobInfo9.getJobId()))); + } + @Test + public void testBulkExportWritesToDEFAULTPartitionWhenPartitioningIsEnabled() { + myPartitionSettings.setPartitioningEnabled(true); + createResources(); + + //Only get COVID-19 vaccinations + Set filters = new HashSet<>(); + filters.add("Immunization?vaccine-code=vaccines|COVID-19"); + + BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions(); + bulkDataExportOptions.setOutputFormat(null); + bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Immunization")); + bulkDataExportOptions.setSince(null); + bulkDataExportOptions.setFilters(filters); + bulkDataExportOptions.setGroupId(myPatientGroupId); + bulkDataExportOptions.setExpandMdm(true); + bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP); + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); + + myBulkDataExportSvc.buildExportFiles(); + awaitAllBulkJobCompletions(); + + IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); + + assertThat(jobInfo.getStatus(), equalTo(BulkExportJobStatusEnum.COMPLETE)); + assertThat(jobInfo.getFiles().size(), equalTo(1)); + assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); + + // Check immunization Content + String nextContents = getBinaryContents(jobInfo, 0); + + assertThat(nextContents, is(containsString("IMM1"))); + assertThat(nextContents, is(containsString("IMM3"))); + assertThat(nextContents, is(containsString("IMM5"))); + assertThat(nextContents, is(containsString("IMM7"))); + assertThat(nextContents, is(containsString("IMM9"))); + assertThat(nextContents, is(containsString("IMM999"))); + + assertThat(nextContents, is(not(containsString("Flu")))); } private void createResources() { @@ -1071,7 +1112,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { //Manually create a golden record Patient goldenPatient = new Patient(); goldenPatient.setId("PAT999"); - DaoMethodOutcome g1Outcome = myPatientDao.update(goldenPatient); + DaoMethodOutcome g1Outcome = myPatientDao.update(goldenPatient, new SystemRequestDetails()); Long goldenPid = myIdHelperService.getPidOrNull(g1Outcome.getResource()); //Create our golden records' data. @@ -1098,12 +1139,12 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { createCareTeamWithIndex(i, patId); } - myPatientGroupId = myGroupDao.update(group).getId(); + myPatientGroupId = myGroupDao.update(group, new SystemRequestDetails()).getId(); //Manually create another golden record Patient goldenPatient2 = new Patient(); goldenPatient2.setId("PAT888"); - DaoMethodOutcome g2Outcome = myPatientDao.update(goldenPatient2); + DaoMethodOutcome g2Outcome = myPatientDao.update(goldenPatient2, new SystemRequestDetails()); Long goldenPid2 = myIdHelperService.getPidOrNull(g2Outcome.getResource()); //Create some nongroup patients MDM linked to a different golden resource. They shouldnt be included in the query. @@ -1132,14 +1173,14 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { patient.setGender(i % 2 == 0 ? Enumerations.AdministrativeGender.MALE : Enumerations.AdministrativeGender.FEMALE); patient.addName().setFamily("FAM" + i); patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i); - return myPatientDao.update(patient); + return myPatientDao.update(patient, new SystemRequestDetails()); } private void createCareTeamWithIndex(int i, IIdType patId) { CareTeam careTeam = new CareTeam(); careTeam.setId("CT" + i); careTeam.setSubject(new Reference(patId)); // This maps to the "patient" search parameter on CareTeam - myCareTeamDao.update(careTeam); + myCareTeamDao.update(careTeam, new SystemRequestDetails()); } private void createImmunizationWithIndex(int i, IIdType patId) { @@ -1157,7 +1198,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { cc.addCoding().setSystem("vaccines").setCode("COVID-19"); immunization.setVaccineCode(cc); } - myImmunizationDao.update(immunization); + myImmunizationDao.update(immunization, new SystemRequestDetails()); } private void createObservationWithIndex(int i, IIdType patId) { @@ -1168,7 +1209,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { if (patId != null) { obs.getSubject().setReference(patId.getValue()); } - myObservationDao.update(obs); + myObservationDao.update(obs, new SystemRequestDetails()); } public void linkToGoldenResource(Long theGoldenPid, Long theSourcePid) { diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java index feb1fbf5bd2..2d7c10fa796 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java @@ -218,6 +218,11 @@ public class JpaConstants { */ public static final String DEFAULT_PARTITION_NAME = "DEFAULT"; + /** + * The name of the collection of all partitions + */ + public static final String ALL_PARTITIONS_NAME = "ALL_PARTITIONS"; + /** * Parameter for the $expand operation */