Partition management for expired jobs

This commit is contained in:
Tadgh 2021-04-16 11:55:12 -04:00
parent 8035d51e48
commit fad32aa636
3 changed files with 14 additions and 9 deletions

View File

@ -29,7 +29,6 @@ import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao;
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.QueryChunker;
@ -56,6 +55,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static ca.uhn.fhir.jpa.model.util.JpaConstants.ALL_PARTITIONS_NAME;
/**
* Bulk Item reader for the Group Bulk Export job.
* Instead of performing a normal query on the resource type using type filters, we instead
@ -120,7 +121,9 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
Set<Long> patientPidsToExport = new HashSet<>(pidsOrThrowException);
if (myMdmEnabled) {
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId));
SystemRequestDetails srd = new SystemRequestDetails();
srd.setTenantId(ALL_PARTITIONS_NAME);
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId), srd);
Long pidOrNull = myIdHelperService.getPidOrNull(group);
List<IMdmLinkDao.MdmPidTuple> goldenPidSourcePidTuple = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
goldenPidSourcePidTuple.forEach(tuple -> {
@ -181,7 +184,7 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
*/
private List<String> getMembers() {
SystemRequestDetails requestDetails = new SystemRequestDetails();
requestDetails.setTenantId(JpaConstants.ALL_PARTITIONS_NAME);
requestDetails.setTenantId(ALL_PARTITIONS_NAME);
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId), requestDetails);
List<IPrimitiveType> evaluate = myContext.newFhirPath().evaluate(group, "member.entity.reference", IPrimitiveType.class);
return evaluate.stream().map(IPrimitiveType::getValueAsString).collect(Collectors.toList());
@ -197,7 +200,7 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
private Set<String> expandAllPatientPidsFromGroup() {
Set<String> expandedIds = new HashSet<>();
SystemRequestDetails requestDetails = new SystemRequestDetails();
requestDetails.setTenantId(JpaConstants.ALL_PARTITIONS_NAME);
requestDetails.setTenantId(ALL_PARTITIONS_NAME);
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId), new SystemRequestDetails());
Long pidOrNull = myIdHelperService.getPidOrNull(group);

View File

@ -42,6 +42,7 @@ import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
@ -203,8 +204,8 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
for (BulkExportCollectionFileEntity nextFile : nextCollection.getFiles()) {
ourLog.info("Purging bulk data file: {}", nextFile.getResourceId());
getBinaryDao().delete(toId(nextFile.getResourceId()));
getBinaryDao().forceExpungeInExistingTransaction(toId(nextFile.getResourceId()), new ExpungeOptions().setExpungeDeletedResources(true).setExpungeOldVersions(true), null);
getBinaryDao().delete(toId(nextFile.getResourceId()), new SystemRequestDetails());
getBinaryDao().forceExpungeInExistingTransaction(toId(nextFile.getResourceId()), new ExpungeOptions().setExpungeDeletedResources(true).setExpungeOldVersions(true), new SystemRequestDetails());
myBulkExportCollectionFileDao.deleteByPid(nextFile.getId());
}

View File

@ -117,7 +117,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
Binary b = new Binary();
b.setContent(new byte[]{0, 1, 2, 3});
String binaryId = myBinaryDao.create(b).getId().toUnqualifiedVersionless().getValue();
String binaryId = myBinaryDao.create(b, new SystemRequestDetails()).getId().toUnqualifiedVersionless().getValue();
BulkExportJobEntity job = new BulkExportJobEntity();
job.setStatus(BulkExportJobStatusEnum.COMPLETE);
@ -524,7 +524,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
// Iterate over the files
for (IBulkDataExportSvc.FileEntry next : status.getFiles()) {
Binary nextBinary = myBinaryDao.read(next.getResourceId());
Binary nextBinary = myBinaryDao.read(next.getResourceId(), new SystemRequestDetails());
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", next.getResourceType(), nextContents);
@ -1030,7 +1030,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
}
@Test
public void testCacheSettingIsRespectedWhenCreatingNewJobs() {
public void testCacheSettingIsRespectedWhenCreatingNewJobs() throws InterruptedException {
BulkDataExportOptions options = new BulkDataExportOptions();
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
options.setResourceTypes(Sets.newHashSet("Procedure"));
@ -1049,6 +1049,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
IBulkDataExportSvc.JobInfo jobInfo6 = myBulkDataExportSvc.submitJob(options, false);
IBulkDataExportSvc.JobInfo jobInfo7 = myBulkDataExportSvc.submitJob(options, false);
IBulkDataExportSvc.JobInfo jobInfo8 = myBulkDataExportSvc.submitJob(options, false);
Thread.sleep(100L); //stupid commit timings.
IBulkDataExportSvc.JobInfo jobInfo9 = myBulkDataExportSvc.submitJob(options, false);
//First non-cached should retrieve new ID.