Fix partition selection for system request details
This commit is contained in:
parent
d94611edf6
commit
7cabcbd772
|
@ -29,6 +29,8 @@ import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao;
|
|||
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
|
||||
import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc;
|
||||
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
|
||||
import ca.uhn.fhir.jpa.model.util.JpaConstants;
|
||||
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
|
||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||
import ca.uhn.fhir.jpa.util.QueryChunker;
|
||||
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
|
||||
|
@ -178,13 +180,13 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
|
|||
* @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"]
|
||||
*/
|
||||
private List<String> getMembers() {
|
||||
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId));
|
||||
SystemRequestDetails requestDetails = new SystemRequestDetails();
|
||||
requestDetails.setTenantId(JpaConstants.ALL_PARTITIONS_NAME);
|
||||
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId), requestDetails);
|
||||
List<IPrimitiveType> evaluate = myContext.newFhirPath().evaluate(group, "member.entity.reference", IPrimitiveType.class);
|
||||
return evaluate.stream().map(IPrimitiveType::getValueAsString).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Given the local myGroupId, perform an expansion to retrieve all resource IDs of member patients.
|
||||
* if myMdmEnabled is set to true, we also reach out to the IMdmLinkDao to attempt to also expand it into matched
|
||||
|
@ -194,7 +196,9 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
|
|||
*/
|
||||
private Set<String> expandAllPatientPidsFromGroup() {
|
||||
Set<String> expandedIds = new HashSet<>();
|
||||
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId));
|
||||
SystemRequestDetails requestDetails = new SystemRequestDetails();
|
||||
requestDetails.setTenantId(JpaConstants.ALL_PARTITIONS_NAME);
|
||||
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId), new SystemRequestDetails());
|
||||
Long pidOrNull = myIdHelperService.getPidOrNull(group);
|
||||
|
||||
//Attempt to perform MDM Expansion of membership
|
||||
|
|
|
@ -27,6 +27,7 @@ import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
|
|||
import ca.uhn.fhir.jpa.batch.log.Logs;
|
||||
import ca.uhn.fhir.jpa.bulk.export.svc.BulkExportDaoSvc;
|
||||
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
|
||||
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
|
||||
import ca.uhn.fhir.parser.IParser;
|
||||
import ca.uhn.fhir.rest.api.Constants;
|
||||
import ca.uhn.fhir.util.BinaryUtil;
|
||||
|
@ -100,7 +101,7 @@ public class ResourceToFileWriter implements ItemWriter<List<IBaseResource>> {
|
|||
IBaseBinary binary = BinaryUtil.newBinary(myFhirContext);
|
||||
binary.setContentType(Constants.CT_FHIR_NDJSON);
|
||||
binary.setContent(myOutputStream.toByteArray());
|
||||
DaoMethodOutcome outcome = myBinaryDao.create(binary);
|
||||
DaoMethodOutcome outcome = myBinaryDao.create(binary, new SystemRequestDetails());
|
||||
return outcome.getResource().getIdElement();
|
||||
}
|
||||
|
||||
|
|
|
@ -44,6 +44,7 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import static ca.uhn.fhir.jpa.model.util.JpaConstants.ALL_PARTITIONS_NAME;
|
||||
import static ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster.doCallHooks;
|
||||
import static ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster.doCallHooksAndReturnObject;
|
||||
import static ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster.hasHooks;
|
||||
|
@ -101,6 +102,19 @@ public class RequestPartitionHelperSvc implements IRequestPartitionHelperSvc {
|
|||
return RequestPartitionId.defaultPartition();
|
||||
}
|
||||
|
||||
//Shortcircuit and write system calls out to default partition.
|
||||
if (theRequest instanceof SystemRequestDetails) {
|
||||
if (theRequest.getTenantId() != null) {
|
||||
if (theRequest.getTenantId().equals(ALL_PARTITIONS_NAME)) {
|
||||
return RequestPartitionId.allPartitions();
|
||||
} else {
|
||||
return RequestPartitionId.fromPartitionName(theRequest.getTenantId());
|
||||
}
|
||||
} else {
|
||||
return RequestPartitionId.defaultPartition();
|
||||
}
|
||||
}
|
||||
|
||||
// Interceptor call: STORAGE_PARTITION_IDENTIFY_READ
|
||||
if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_READ, myInterceptorBroadcaster, theRequest)) {
|
||||
HookParams params = new HookParams()
|
||||
|
@ -129,6 +143,20 @@ public class RequestPartitionHelperSvc implements IRequestPartitionHelperSvc {
|
|||
|
||||
if (myPartitionSettings.isPartitioningEnabled()) {
|
||||
|
||||
//Shortcircuit and write system calls out to default partition.
|
||||
if (theRequest instanceof SystemRequestDetails) {
|
||||
if (theRequest.getTenantId() != null) {
|
||||
if (theRequest.getTenantId().equals(ALL_PARTITIONS_NAME)) {
|
||||
return RequestPartitionId.allPartitions();
|
||||
} else {
|
||||
return RequestPartitionId.fromPartitionName(theRequest.getTenantId());
|
||||
}
|
||||
} else {
|
||||
return RequestPartitionId.defaultPartition();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Interceptor call: STORAGE_PARTITION_IDENTIFY_CREATE
|
||||
HookParams params = new HookParams()
|
||||
.add(IBaseResource.class, theResource)
|
||||
|
|
|
@ -18,6 +18,7 @@ import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
|
|||
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
|
||||
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
|
||||
import ca.uhn.fhir.jpa.entity.MdmLink;
|
||||
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
|
||||
import ca.uhn.fhir.mdm.api.MdmLinkSourceEnum;
|
||||
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
|
||||
import ca.uhn.fhir.parser.IParser;
|
||||
|
@ -494,7 +495,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
|||
Patient patient = new Patient();
|
||||
patient.setId("PAT" + i);
|
||||
patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i);
|
||||
myPatientDao.update(patient).getId().toUnqualifiedVersionless();
|
||||
myPatientDao.update(patient, new SystemRequestDetails()).getId().toUnqualifiedVersionless();
|
||||
}
|
||||
|
||||
// Create a bulk job
|
||||
|
@ -848,7 +849,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
|||
|
||||
public String getBinaryContents(IBulkDataExportSvc.JobInfo theJobInfo, int theIndex) {
|
||||
// Iterate over the files
|
||||
Binary nextBinary = myBinaryDao.read(theJobInfo.getFiles().get(theIndex).getResourceId());
|
||||
Binary nextBinary = myBinaryDao.read(theJobInfo.getFiles().get(theIndex).getResourceId(), new SystemRequestDetails());
|
||||
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
|
||||
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
|
||||
ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents);
|
||||
|
@ -928,7 +929,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
|||
|
||||
|
||||
//Check Observation Content
|
||||
Binary observationExportContent = myBinaryDao.read(jobInfo.getFiles().get(1).getResourceId());
|
||||
Binary observationExportContent = myBinaryDao.read(jobInfo.getFiles().get(1).getResourceId(), new SystemRequestDetails());
|
||||
assertEquals(Constants.CT_FHIR_NDJSON, observationExportContent.getContentType());
|
||||
nextContents = new String(observationExportContent.getContent(), Constants.CHARSET_UTF8);
|
||||
ourLog.info("Next contents for type {}:\n{}", observationExportContent.getResourceType(), nextContents);
|
||||
|
@ -1061,7 +1062,47 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
|||
//Now if we create another one and ask for the cache, we should get the most-recently-insert entry.
|
||||
IBulkDataExportSvc.JobInfo jobInfo10 = myBulkDataExportSvc.submitJob(options, true);
|
||||
assertThat(jobInfo10.getJobId(), is(equalTo(jobInfo9.getJobId())));
|
||||
}
|
||||
@Test
|
||||
public void testBulkExportWritesToDEFAULTPartitionWhenPartitioningIsEnabled() {
|
||||
myPartitionSettings.setPartitioningEnabled(true);
|
||||
|
||||
createResources();
|
||||
|
||||
//Only get COVID-19 vaccinations
|
||||
Set<String> filters = new HashSet<>();
|
||||
filters.add("Immunization?vaccine-code=vaccines|COVID-19");
|
||||
|
||||
BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
|
||||
bulkDataExportOptions.setOutputFormat(null);
|
||||
bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Immunization"));
|
||||
bulkDataExportOptions.setSince(null);
|
||||
bulkDataExportOptions.setFilters(filters);
|
||||
bulkDataExportOptions.setGroupId(myPatientGroupId);
|
||||
bulkDataExportOptions.setExpandMdm(true);
|
||||
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
|
||||
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
|
||||
|
||||
myBulkDataExportSvc.buildExportFiles();
|
||||
awaitAllBulkJobCompletions();
|
||||
|
||||
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
|
||||
|
||||
assertThat(jobInfo.getStatus(), equalTo(BulkExportJobStatusEnum.COMPLETE));
|
||||
assertThat(jobInfo.getFiles().size(), equalTo(1));
|
||||
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
|
||||
|
||||
// Check immunization Content
|
||||
String nextContents = getBinaryContents(jobInfo, 0);
|
||||
|
||||
assertThat(nextContents, is(containsString("IMM1")));
|
||||
assertThat(nextContents, is(containsString("IMM3")));
|
||||
assertThat(nextContents, is(containsString("IMM5")));
|
||||
assertThat(nextContents, is(containsString("IMM7")));
|
||||
assertThat(nextContents, is(containsString("IMM9")));
|
||||
assertThat(nextContents, is(containsString("IMM999")));
|
||||
|
||||
assertThat(nextContents, is(not(containsString("Flu"))));
|
||||
}
|
||||
|
||||
private void createResources() {
|
||||
|
@ -1071,7 +1112,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
|||
//Manually create a golden record
|
||||
Patient goldenPatient = new Patient();
|
||||
goldenPatient.setId("PAT999");
|
||||
DaoMethodOutcome g1Outcome = myPatientDao.update(goldenPatient);
|
||||
DaoMethodOutcome g1Outcome = myPatientDao.update(goldenPatient, new SystemRequestDetails());
|
||||
Long goldenPid = myIdHelperService.getPidOrNull(g1Outcome.getResource());
|
||||
|
||||
//Create our golden records' data.
|
||||
|
@ -1098,12 +1139,12 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
|||
createCareTeamWithIndex(i, patId);
|
||||
}
|
||||
|
||||
myPatientGroupId = myGroupDao.update(group).getId();
|
||||
myPatientGroupId = myGroupDao.update(group, new SystemRequestDetails()).getId();
|
||||
|
||||
//Manually create another golden record
|
||||
Patient goldenPatient2 = new Patient();
|
||||
goldenPatient2.setId("PAT888");
|
||||
DaoMethodOutcome g2Outcome = myPatientDao.update(goldenPatient2);
|
||||
DaoMethodOutcome g2Outcome = myPatientDao.update(goldenPatient2, new SystemRequestDetails());
|
||||
Long goldenPid2 = myIdHelperService.getPidOrNull(g2Outcome.getResource());
|
||||
|
||||
//Create some nongroup patients MDM linked to a different golden resource. They shouldnt be included in the query.
|
||||
|
@ -1132,14 +1173,14 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
|||
patient.setGender(i % 2 == 0 ? Enumerations.AdministrativeGender.MALE : Enumerations.AdministrativeGender.FEMALE);
|
||||
patient.addName().setFamily("FAM" + i);
|
||||
patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i);
|
||||
return myPatientDao.update(patient);
|
||||
return myPatientDao.update(patient, new SystemRequestDetails());
|
||||
}
|
||||
|
||||
private void createCareTeamWithIndex(int i, IIdType patId) {
|
||||
CareTeam careTeam = new CareTeam();
|
||||
careTeam.setId("CT" + i);
|
||||
careTeam.setSubject(new Reference(patId)); // This maps to the "patient" search parameter on CareTeam
|
||||
myCareTeamDao.update(careTeam);
|
||||
myCareTeamDao.update(careTeam, new SystemRequestDetails());
|
||||
}
|
||||
|
||||
private void createImmunizationWithIndex(int i, IIdType patId) {
|
||||
|
@ -1157,7 +1198,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
|||
cc.addCoding().setSystem("vaccines").setCode("COVID-19");
|
||||
immunization.setVaccineCode(cc);
|
||||
}
|
||||
myImmunizationDao.update(immunization);
|
||||
myImmunizationDao.update(immunization, new SystemRequestDetails());
|
||||
}
|
||||
|
||||
private void createObservationWithIndex(int i, IIdType patId) {
|
||||
|
@ -1168,7 +1209,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
|||
if (patId != null) {
|
||||
obs.getSubject().setReference(patId.getValue());
|
||||
}
|
||||
myObservationDao.update(obs);
|
||||
myObservationDao.update(obs, new SystemRequestDetails());
|
||||
}
|
||||
|
||||
public void linkToGoldenResource(Long theGoldenPid, Long theSourcePid) {
|
||||
|
|
|
@ -218,6 +218,11 @@ public class JpaConstants {
|
|||
*/
|
||||
public static final String DEFAULT_PARTITION_NAME = "DEFAULT";
|
||||
|
||||
/**
|
||||
* The name of the collection of all partitions
|
||||
*/
|
||||
public static final String ALL_PARTITIONS_NAME = "ALL_PARTITIONS";
|
||||
|
||||
/**
|
||||
* Parameter for the $expand operation
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue