diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_4_0/2556-prevent-bulk-failure-while-partitioned.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_4_0/2556-prevent-bulk-failure-while-partitioned.yaml new file mode 100644 index 00000000000..facbd901d31 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_4_0/2556-prevent-bulk-failure-while-partitioned.yaml @@ -0,0 +1,4 @@ +--- +type: fix +issue: 2556 +title: "Fixed a bug which would cause Bulk Export to fail when run in a partitioned environment." diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/GroupBulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/GroupBulkItemReader.java index 3a10fec2aae..6b49a2134b7 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/GroupBulkItemReader.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/GroupBulkItemReader.java @@ -29,6 +29,7 @@ import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao; import ca.uhn.fhir.jpa.dao.index.IdHelperService; import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc; import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; +import ca.uhn.fhir.jpa.partition.SystemRequestDetails; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.util.QueryChunker; import ca.uhn.fhir.mdm.api.MdmMatchResultEnum; @@ -118,7 +119,8 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade Set patientPidsToExport = new HashSet<>(pidsOrThrowException); if (myMdmEnabled) { - IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId)); + SystemRequestDetails srd = SystemRequestDetails.newSystemRequestAllPartitions(); + IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId), srd); Long pidOrNull = myIdHelperService.getPidOrNull(group); List goldenPidSourcePidTuple = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); goldenPidSourcePidTuple.forEach(tuple -> { @@ -178,13 +180,12 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade * @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"] */ private List getMembers() { - IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId)); + SystemRequestDetails requestDetails = SystemRequestDetails.newSystemRequestAllPartitions(); + IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId), requestDetails); List evaluate = myContext.newFhirPath().evaluate(group, "member.entity.reference", IPrimitiveType.class); return evaluate.stream().map(IPrimitiveType::getValueAsString).collect(Collectors.toList()); } - - /** * Given the local myGroupId, perform an expansion to retrieve all resource IDs of member patients. * if myMdmEnabled is set to true, we also reach out to the IMdmLinkDao to attempt to also expand it into matched @@ -194,7 +195,8 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade */ private Set expandAllPatientPidsFromGroup() { Set expandedIds = new HashSet<>(); - IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId)); + SystemRequestDetails requestDetails = SystemRequestDetails.newSystemRequestAllPartitions(); + IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId), requestDetails); Long pidOrNull = myIdHelperService.getPidOrNull(group); //Attempt to perform MDM Expansion of membership diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/ResourceToFileWriter.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/ResourceToFileWriter.java index 8b4ebe7e86a..df362a79f19 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/ResourceToFileWriter.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/ResourceToFileWriter.java @@ -27,6 +27,7 @@ import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome; import ca.uhn.fhir.jpa.batch.log.Logs; import ca.uhn.fhir.jpa.bulk.export.svc.BulkExportDaoSvc; import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity; +import ca.uhn.fhir.jpa.partition.SystemRequestDetails; import ca.uhn.fhir.parser.IParser; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.util.BinaryUtil; @@ -100,7 +101,7 @@ public class ResourceToFileWriter implements ItemWriter> { IBaseBinary binary = BinaryUtil.newBinary(myFhirContext); binary.setContentType(Constants.CT_FHIR_NDJSON); binary.setContent(myOutputStream.toByteArray()); - DaoMethodOutcome outcome = myBinaryDao.create(binary); + DaoMethodOutcome outcome = myBinaryDao.create(binary, new SystemRequestDetails()); return outcome.getResource().getIdElement(); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/BulkDataExportSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/BulkDataExportSvcImpl.java index 872c036f63a..1dc8d672af7 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/BulkDataExportSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/BulkDataExportSvcImpl.java @@ -42,6 +42,7 @@ import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.model.util.JpaConstants; +import ca.uhn.fhir.jpa.partition.SystemRequestDetails; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; @@ -203,8 +204,8 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { for (BulkExportCollectionFileEntity nextFile : nextCollection.getFiles()) { ourLog.info("Purging bulk data file: {}", nextFile.getResourceId()); - getBinaryDao().delete(toId(nextFile.getResourceId())); - getBinaryDao().forceExpungeInExistingTransaction(toId(nextFile.getResourceId()), new ExpungeOptions().setExpungeDeletedResources(true).setExpungeOldVersions(true), null); + getBinaryDao().delete(toId(nextFile.getResourceId()), new SystemRequestDetails()); + getBinaryDao().forceExpungeInExistingTransaction(toId(nextFile.getResourceId()), new ExpungeOptions().setExpungeDeletedResources(true).setExpungeOldVersions(true), new SystemRequestDetails()); myBulkExportCollectionFileDao.deleteByPid(nextFile.getId()); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/packages/JpaPackageCache.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/packages/JpaPackageCache.java index 6c64b331a17..76f621729cd 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/packages/JpaPackageCache.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/packages/JpaPackageCache.java @@ -65,7 +65,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.PageRequest; -import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Slice; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.support.TransactionTemplate; @@ -655,16 +654,8 @@ public class JpaPackageCache extends BasePackageCacheManager implements IHapiPac } private void deleteAndExpungeResourceBinary(IIdType theResourceBinaryId, ExpungeOptions theOptions) { - - if (myPartitionSettings.isPartitioningEnabled()) { - SystemRequestDetails requestDetails = new SystemRequestDetails(); - requestDetails.setTenantId(JpaConstants.DEFAULT_PARTITION_NAME); - getBinaryDao().delete(theResourceBinaryId, requestDetails).getEntity(); - getBinaryDao().forceExpungeInExistingTransaction(theResourceBinaryId, theOptions, requestDetails); - } else { - getBinaryDao().delete(theResourceBinaryId).getEntity(); - getBinaryDao().forceExpungeInExistingTransaction(theResourceBinaryId, theOptions, null); - } + getBinaryDao().delete(theResourceBinaryId, new SystemRequestDetails()).getEntity(); + getBinaryDao().forceExpungeInExistingTransaction(theResourceBinaryId, theOptions, new SystemRequestDetails()); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/packages/PackageInstallerSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/packages/PackageInstallerSvcImpl.java index beb787dc32e..7c044f667db 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/packages/PackageInstallerSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/packages/PackageInstallerSvcImpl.java @@ -347,7 +347,7 @@ public class PackageInstallerSvcImpl implements IPackageInstallerSvc { private IBundleProvider searchResource(IFhirResourceDao theDao, SearchParameterMap theMap) { if (myPartitionSettings.isPartitioningEnabled()) { SystemRequestDetails requestDetails = new SystemRequestDetails(); - requestDetails.setTenantId(JpaConstants.DEFAULT_PARTITION_NAME); +// requestDetails.setTenantId(JpaConstants.DEFAULT_PARTITION_NAME); return theDao.search(theMap, requestDetails); } else { return theDao.search(theMap); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/partition/RequestPartitionHelperSvc.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/partition/RequestPartitionHelperSvc.java index 37da051117a..ce725f71515 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/partition/RequestPartitionHelperSvc.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/partition/RequestPartitionHelperSvc.java @@ -35,6 +35,8 @@ import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; import org.apache.commons.lang3.Validate; import org.hl7.fhir.instance.model.api.IBaseResource; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import javax.annotation.Nonnull; @@ -44,11 +46,15 @@ import java.util.HashSet; import java.util.List; import java.util.Objects; +import static ca.uhn.fhir.jpa.model.util.JpaConstants.ALL_PARTITIONS_NAME; import static ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster.doCallHooks; import static ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster.doCallHooksAndReturnObject; import static ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster.hasHooks; +import static org.slf4j.LoggerFactory.getLogger; public class RequestPartitionHelperSvc implements IRequestPartitionHelperSvc { + private static final Logger ourLog = getLogger(RequestPartitionHelperSvc.class); + private final HashSet myNonPartitionableResourceNames; @@ -95,14 +101,18 @@ public class RequestPartitionHelperSvc implements IRequestPartitionHelperSvc { public RequestPartitionId determineReadPartitionForRequest(@Nullable RequestDetails theRequest, String theResourceType) { RequestPartitionId requestPartitionId; + boolean nonPartitionableResource = myNonPartitionableResourceNames.contains(theResourceType); if (myPartitionSettings.isPartitioningEnabled()) { // Handle system requests - if ((theRequest == null && myNonPartitionableResourceNames.contains(theResourceType))) { + //TODO GGG eventually, theRequest will not be allowed to be null here, and we will pass through SystemRequestDetails instead. + if (theRequest == null && nonPartitionableResource) { return RequestPartitionId.defaultPartition(); } - // Interceptor call: STORAGE_PARTITION_IDENTIFY_READ - if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_READ, myInterceptorBroadcaster, theRequest)) { + if (theRequest instanceof SystemRequestDetails) { + requestPartitionId = getSystemRequestPartitionId(theRequest, nonPartitionableResource); + // Interceptor call: STORAGE_PARTITION_IDENTIFY_READ + } else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_READ, myInterceptorBroadcaster, theRequest)) { HookParams params = new HookParams() .add(RequestDetails.class, theRequest) .addIfMatchesType(ServletRequestDetails.class, theRequest); @@ -119,6 +129,48 @@ public class RequestPartitionHelperSvc implements IRequestPartitionHelperSvc { return RequestPartitionId.allPartitions(); } + /** + * + * For system requests, read partition from tenant ID if present, otherwise set to DEFAULT. If the resource they are attempting to partition + * is non-partitionable scream in the logs and set the partition to DEFAULT. + * + * @param theRequest + * @param theNonPartitionableResource + * @return + */ + @NotNull + private RequestPartitionId getSystemRequestPartitionId(@NotNull RequestDetails theRequest, boolean theNonPartitionableResource) { + RequestPartitionId requestPartitionId; + requestPartitionId = getSystemRequestPartitionId(theRequest); + if (theNonPartitionableResource && !requestPartitionId.isDefaultPartition()) { + throw new InternalErrorException("System call is attempting to write a non-partitionable resource to a partition! This is a bug!"); + } + return requestPartitionId; + } + + /** + * Determine the partition for a System Call (defined by the fact that the request is of type SystemRequestDetails) + * + * 1. If the tenant ID is set to the constant for all partitions, return all partitions + * 2. If there is a tenant ID set in the request, use it. + * 3. Otherwise, return the Default Partition. + * + * @param theRequest The {@link SystemRequestDetails} + * @return the {@link RequestPartitionId} to be used for this request. + */ + @NotNull + private RequestPartitionId getSystemRequestPartitionId(@NotNull RequestDetails theRequest) { + if (theRequest.getTenantId() != null) { + if (theRequest.getTenantId().equals(ALL_PARTITIONS_NAME)) { + return RequestPartitionId.allPartitions(); + } else { + return RequestPartitionId.fromPartitionName(theRequest.getTenantId()); + } + } else { + return RequestPartitionId.defaultPartition(); + } + } + /** * Invoke the {@link Pointcut#STORAGE_PARTITION_IDENTIFY_CREATE} interceptor pointcut to determine the tenant for a create request. */ @@ -128,18 +180,22 @@ public class RequestPartitionHelperSvc implements IRequestPartitionHelperSvc { RequestPartitionId requestPartitionId; if (myPartitionSettings.isPartitioningEnabled()) { - - // Interceptor call: STORAGE_PARTITION_IDENTIFY_CREATE - HookParams params = new HookParams() - .add(IBaseResource.class, theResource) - .add(RequestDetails.class, theRequest) - .addIfMatchesType(ServletRequestDetails.class, theRequest); - requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(myInterceptorBroadcaster, theRequest, Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE, params); - - // Handle system requests boolean nonPartitionableResource = myNonPartitionableResourceNames.contains(theResourceType); - if (nonPartitionableResource && requestPartitionId == null) { - requestPartitionId = RequestPartitionId.defaultPartition(); + + if (theRequest instanceof SystemRequestDetails) { + requestPartitionId = getSystemRequestPartitionId(theRequest, nonPartitionableResource); + } else { + //This is an external Request (e.g. ServletRequestDetails) so we want to figure out the partition via interceptor. + HookParams params = new HookParams()// Interceptor call: STORAGE_PARTITION_IDENTIFY_CREATE + .add(IBaseResource.class, theResource) + .add(RequestDetails.class, theRequest) + .addIfMatchesType(ServletRequestDetails.class, theRequest); + requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(myInterceptorBroadcaster, theRequest, Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE, params); + + //If the interceptors haven't selected a partition, and its a non-partitionable resource anyhow, send to DEFAULT + if (nonPartitionableResource && requestPartitionId == null) { + requestPartitionId = RequestPartitionId.defaultPartition(); + } } String resourceName = myFhirContext.getResourceType(theResource); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/partition/SystemRequestDetails.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/partition/SystemRequestDetails.java index c2b3361eb0c..f194a1d8f73 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/partition/SystemRequestDetails.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/partition/SystemRequestDetails.java @@ -35,17 +35,15 @@ import ca.uhn.fhir.rest.server.IRestfulServerDefaults; import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableListMultimap; -import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ListMultimap; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; import java.io.IOException; import java.io.InputStream; import java.io.Reader; import java.nio.charset.Charset; import java.util.List; -import java.util.Optional; + +import static ca.uhn.fhir.jpa.model.util.JpaConstants.ALL_PARTITIONS_NAME; /** * A default RequestDetails implementation that can be used for system calls to @@ -104,6 +102,11 @@ public class SystemRequestDetails extends RequestDetails { } myHeaders.put(theName, theValue); } + public static SystemRequestDetails newSystemRequestAllPartitions() { + SystemRequestDetails systemRequestDetails = new SystemRequestDetails(); + systemRequestDetails.setTenantId(ALL_PARTITIONS_NAME); + return systemRequestDetails; + } @Override diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java index fdc92d090e9..701de70bb04 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java @@ -18,6 +18,7 @@ import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity; import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity; import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; import ca.uhn.fhir.jpa.entity.MdmLink; +import ca.uhn.fhir.jpa.partition.SystemRequestDetails; import ca.uhn.fhir.mdm.api.MdmLinkSourceEnum; import ca.uhn.fhir.mdm.api.MdmMatchResultEnum; import ca.uhn.fhir.parser.IParser; @@ -116,7 +117,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { Binary b = new Binary(); b.setContent(new byte[]{0, 1, 2, 3}); - String binaryId = myBinaryDao.create(b).getId().toUnqualifiedVersionless().getValue(); + String binaryId = myBinaryDao.create(b, new SystemRequestDetails()).getId().toUnqualifiedVersionless().getValue(); BulkExportJobEntity job = new BulkExportJobEntity(); job.setStatus(BulkExportJobStatusEnum.COMPLETE); @@ -494,7 +495,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { Patient patient = new Patient(); patient.setId("PAT" + i); patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i); - myPatientDao.update(patient).getId().toUnqualifiedVersionless(); + myPatientDao.update(patient, new SystemRequestDetails()).getId().toUnqualifiedVersionless(); } // Create a bulk job @@ -523,7 +524,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { // Iterate over the files for (IBulkDataExportSvc.FileEntry next : status.getFiles()) { - Binary nextBinary = myBinaryDao.read(next.getResourceId()); + Binary nextBinary = myBinaryDao.read(next.getResourceId(), new SystemRequestDetails()); assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType()); String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8); ourLog.info("Next contents for type {}:\n{}", next.getResourceType(), nextContents); @@ -848,7 +849,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { public String getBinaryContents(IBulkDataExportSvc.JobInfo theJobInfo, int theIndex) { // Iterate over the files - Binary nextBinary = myBinaryDao.read(theJobInfo.getFiles().get(theIndex).getResourceId()); + Binary nextBinary = myBinaryDao.read(theJobInfo.getFiles().get(theIndex).getResourceId(), new SystemRequestDetails()); assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType()); String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8); ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents); @@ -928,7 +929,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { //Check Observation Content - Binary observationExportContent = myBinaryDao.read(jobInfo.getFiles().get(1).getResourceId()); + Binary observationExportContent = myBinaryDao.read(jobInfo.getFiles().get(1).getResourceId(), new SystemRequestDetails()); assertEquals(Constants.CT_FHIR_NDJSON, observationExportContent.getContentType()); nextContents = new String(observationExportContent.getContent(), Constants.CHARSET_UTF8); ourLog.info("Next contents for type {}:\n{}", observationExportContent.getResourceType(), nextContents); @@ -1029,7 +1030,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { } @Test - public void testCacheSettingIsRespectedWhenCreatingNewJobs() { + public void testCacheSettingIsRespectedWhenCreatingNewJobs() throws InterruptedException { BulkDataExportOptions options = new BulkDataExportOptions(); options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM); options.setResourceTypes(Sets.newHashSet("Procedure")); @@ -1048,6 +1049,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { IBulkDataExportSvc.JobInfo jobInfo6 = myBulkDataExportSvc.submitJob(options, false); IBulkDataExportSvc.JobInfo jobInfo7 = myBulkDataExportSvc.submitJob(options, false); IBulkDataExportSvc.JobInfo jobInfo8 = myBulkDataExportSvc.submitJob(options, false); + Thread.sleep(100L); //stupid commit timings. IBulkDataExportSvc.JobInfo jobInfo9 = myBulkDataExportSvc.submitJob(options, false); //First non-cached should retrieve new ID. @@ -1061,7 +1063,48 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { //Now if we create another one and ask for the cache, we should get the most-recently-insert entry. IBulkDataExportSvc.JobInfo jobInfo10 = myBulkDataExportSvc.submitJob(options, true); assertThat(jobInfo10.getJobId(), is(equalTo(jobInfo9.getJobId()))); + } + @Test + public void testBulkExportWritesToDEFAULTPartitionWhenPartitioningIsEnabled() { + myPartitionSettings.setPartitioningEnabled(true); + createResources(); + + //Only get COVID-19 vaccinations + Set filters = new HashSet<>(); + filters.add("Immunization?vaccine-code=vaccines|COVID-19"); + + BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions(); + bulkDataExportOptions.setOutputFormat(null); + bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Immunization")); + bulkDataExportOptions.setSince(null); + bulkDataExportOptions.setFilters(filters); + bulkDataExportOptions.setGroupId(myPatientGroupId); + bulkDataExportOptions.setExpandMdm(true); + bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP); + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); + + myBulkDataExportSvc.buildExportFiles(); + awaitAllBulkJobCompletions(); + + IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); + + assertThat(jobInfo.getStatus(), equalTo(BulkExportJobStatusEnum.COMPLETE)); + assertThat(jobInfo.getFiles().size(), equalTo(1)); + assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); + + // Check immunization Content + String nextContents = getBinaryContents(jobInfo, 0); + + assertThat(nextContents, is(containsString("IMM1"))); + assertThat(nextContents, is(containsString("IMM3"))); + assertThat(nextContents, is(containsString("IMM5"))); + assertThat(nextContents, is(containsString("IMM7"))); + assertThat(nextContents, is(containsString("IMM9"))); + assertThat(nextContents, is(containsString("IMM999"))); + + assertThat(nextContents, is(not(containsString("Flu")))); + myPartitionSettings.setPartitioningEnabled(false); } private void createResources() { @@ -1071,7 +1114,8 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { //Manually create a golden record Patient goldenPatient = new Patient(); goldenPatient.setId("PAT999"); - DaoMethodOutcome g1Outcome = myPatientDao.update(goldenPatient); + SystemRequestDetails srd = SystemRequestDetails.newSystemRequestAllPartitions(); + DaoMethodOutcome g1Outcome = myPatientDao.update(goldenPatient, srd); Long goldenPid = myIdHelperService.getPidOrNull(g1Outcome.getResource()); //Create our golden records' data. @@ -1098,12 +1142,12 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { createCareTeamWithIndex(i, patId); } - myPatientGroupId = myGroupDao.update(group).getId(); + myPatientGroupId = myGroupDao.update(group, new SystemRequestDetails()).getId(); //Manually create another golden record Patient goldenPatient2 = new Patient(); goldenPatient2.setId("PAT888"); - DaoMethodOutcome g2Outcome = myPatientDao.update(goldenPatient2); + DaoMethodOutcome g2Outcome = myPatientDao.update(goldenPatient2, new SystemRequestDetails()); Long goldenPid2 = myIdHelperService.getPidOrNull(g2Outcome.getResource()); //Create some nongroup patients MDM linked to a different golden resource. They shouldnt be included in the query. @@ -1132,14 +1176,14 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { patient.setGender(i % 2 == 0 ? Enumerations.AdministrativeGender.MALE : Enumerations.AdministrativeGender.FEMALE); patient.addName().setFamily("FAM" + i); patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i); - return myPatientDao.update(patient); + return myPatientDao.update(patient, new SystemRequestDetails()); } private void createCareTeamWithIndex(int i, IIdType patId) { CareTeam careTeam = new CareTeam(); careTeam.setId("CT" + i); careTeam.setSubject(new Reference(patId)); // This maps to the "patient" search parameter on CareTeam - myCareTeamDao.update(careTeam); + myCareTeamDao.update(careTeam, new SystemRequestDetails()); } private void createImmunizationWithIndex(int i, IIdType patId) { @@ -1157,7 +1201,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { cc.addCoding().setSystem("vaccines").setCode("COVID-19"); immunization.setVaccineCode(cc); } - myImmunizationDao.update(immunization); + myImmunizationDao.update(immunization, new SystemRequestDetails()); } private void createObservationWithIndex(int i, IIdType patId) { @@ -1168,7 +1212,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test { if (patId != null) { obs.getSubject().setReference(patId.getValue()); } - myObservationDao.update(obs); + myObservationDao.update(obs, new SystemRequestDetails()); } public void linkToGoldenResource(Long theGoldenPid, Long theSourcePid) { diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitioningSqlR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitioningSqlR4Test.java index cb5691a51be..c8814ae0e4a 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitioningSqlR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitioningSqlR4Test.java @@ -642,7 +642,6 @@ public class PartitioningSqlR4Test extends BasePartitioningR4Test { assertEquals(myPartitionId, resourceTable.getPartitionId().getPartitionId().intValue()); assertEquals(myPartitionDate, resourceTable.getPartitionId().getPartitionDate()); }); - } @Test diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java index feb1fbf5bd2..2d7c10fa796 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java @@ -218,6 +218,11 @@ public class JpaConstants { */ public static final String DEFAULT_PARTITION_NAME = "DEFAULT"; + /** + * The name of the collection of all partitions + */ + public static final String ALL_PARTITIONS_NAME = "ALL_PARTITIONS"; + /** * Parameter for the $expand operation */