Merge pull request #2558 from hapifhir/issue-2556-bulk-export-bug
Issue 2556 bulk export bug
This commit is contained in:
commit
48f93f5555
|
@ -0,0 +1,4 @@
|
||||||
|
---
|
||||||
|
type: fix
|
||||||
|
issue: 2556
|
||||||
|
title: "Fixed a bug which would cause Bulk Export to fail when run in a partitioned environment."
|
|
@ -29,6 +29,7 @@ import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao;
|
||||||
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
|
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
|
||||||
import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc;
|
import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc;
|
||||||
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
|
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
|
||||||
|
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
|
||||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||||
import ca.uhn.fhir.jpa.util.QueryChunker;
|
import ca.uhn.fhir.jpa.util.QueryChunker;
|
||||||
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
|
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
|
||||||
|
@ -118,7 +119,8 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
|
||||||
Set<Long> patientPidsToExport = new HashSet<>(pidsOrThrowException);
|
Set<Long> patientPidsToExport = new HashSet<>(pidsOrThrowException);
|
||||||
|
|
||||||
if (myMdmEnabled) {
|
if (myMdmEnabled) {
|
||||||
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId));
|
SystemRequestDetails srd = SystemRequestDetails.newSystemRequestAllPartitions();
|
||||||
|
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId), srd);
|
||||||
Long pidOrNull = myIdHelperService.getPidOrNull(group);
|
Long pidOrNull = myIdHelperService.getPidOrNull(group);
|
||||||
List<IMdmLinkDao.MdmPidTuple> goldenPidSourcePidTuple = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
|
List<IMdmLinkDao.MdmPidTuple> goldenPidSourcePidTuple = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
|
||||||
goldenPidSourcePidTuple.forEach(tuple -> {
|
goldenPidSourcePidTuple.forEach(tuple -> {
|
||||||
|
@ -178,13 +180,12 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
|
||||||
* @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"]
|
* @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"]
|
||||||
*/
|
*/
|
||||||
private List<String> getMembers() {
|
private List<String> getMembers() {
|
||||||
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId));
|
SystemRequestDetails requestDetails = SystemRequestDetails.newSystemRequestAllPartitions();
|
||||||
|
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId), requestDetails);
|
||||||
List<IPrimitiveType> evaluate = myContext.newFhirPath().evaluate(group, "member.entity.reference", IPrimitiveType.class);
|
List<IPrimitiveType> evaluate = myContext.newFhirPath().evaluate(group, "member.entity.reference", IPrimitiveType.class);
|
||||||
return evaluate.stream().map(IPrimitiveType::getValueAsString).collect(Collectors.toList());
|
return evaluate.stream().map(IPrimitiveType::getValueAsString).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given the local myGroupId, perform an expansion to retrieve all resource IDs of member patients.
|
* Given the local myGroupId, perform an expansion to retrieve all resource IDs of member patients.
|
||||||
* if myMdmEnabled is set to true, we also reach out to the IMdmLinkDao to attempt to also expand it into matched
|
* if myMdmEnabled is set to true, we also reach out to the IMdmLinkDao to attempt to also expand it into matched
|
||||||
|
@ -194,7 +195,8 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
|
||||||
*/
|
*/
|
||||||
private Set<String> expandAllPatientPidsFromGroup() {
|
private Set<String> expandAllPatientPidsFromGroup() {
|
||||||
Set<String> expandedIds = new HashSet<>();
|
Set<String> expandedIds = new HashSet<>();
|
||||||
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId));
|
SystemRequestDetails requestDetails = SystemRequestDetails.newSystemRequestAllPartitions();
|
||||||
|
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId), requestDetails);
|
||||||
Long pidOrNull = myIdHelperService.getPidOrNull(group);
|
Long pidOrNull = myIdHelperService.getPidOrNull(group);
|
||||||
|
|
||||||
//Attempt to perform MDM Expansion of membership
|
//Attempt to perform MDM Expansion of membership
|
||||||
|
|
|
@ -27,6 +27,7 @@ import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
|
||||||
import ca.uhn.fhir.jpa.batch.log.Logs;
|
import ca.uhn.fhir.jpa.batch.log.Logs;
|
||||||
import ca.uhn.fhir.jpa.bulk.export.svc.BulkExportDaoSvc;
|
import ca.uhn.fhir.jpa.bulk.export.svc.BulkExportDaoSvc;
|
||||||
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
|
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
|
||||||
|
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
|
||||||
import ca.uhn.fhir.parser.IParser;
|
import ca.uhn.fhir.parser.IParser;
|
||||||
import ca.uhn.fhir.rest.api.Constants;
|
import ca.uhn.fhir.rest.api.Constants;
|
||||||
import ca.uhn.fhir.util.BinaryUtil;
|
import ca.uhn.fhir.util.BinaryUtil;
|
||||||
|
@ -100,7 +101,7 @@ public class ResourceToFileWriter implements ItemWriter<List<IBaseResource>> {
|
||||||
IBaseBinary binary = BinaryUtil.newBinary(myFhirContext);
|
IBaseBinary binary = BinaryUtil.newBinary(myFhirContext);
|
||||||
binary.setContentType(Constants.CT_FHIR_NDJSON);
|
binary.setContentType(Constants.CT_FHIR_NDJSON);
|
||||||
binary.setContent(myOutputStream.toByteArray());
|
binary.setContent(myOutputStream.toByteArray());
|
||||||
DaoMethodOutcome outcome = myBinaryDao.create(binary);
|
DaoMethodOutcome outcome = myBinaryDao.create(binary, new SystemRequestDetails());
|
||||||
return outcome.getResource().getIdElement();
|
return outcome.getResource().getIdElement();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -42,6 +42,7 @@ import ca.uhn.fhir.jpa.model.sched.HapiJob;
|
||||||
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
|
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
|
||||||
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
|
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
|
||||||
import ca.uhn.fhir.jpa.model.util.JpaConstants;
|
import ca.uhn.fhir.jpa.model.util.JpaConstants;
|
||||||
|
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
|
||||||
import ca.uhn.fhir.rest.api.Constants;
|
import ca.uhn.fhir.rest.api.Constants;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
|
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
|
||||||
|
@ -203,8 +204,8 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
|
||||||
for (BulkExportCollectionFileEntity nextFile : nextCollection.getFiles()) {
|
for (BulkExportCollectionFileEntity nextFile : nextCollection.getFiles()) {
|
||||||
|
|
||||||
ourLog.info("Purging bulk data file: {}", nextFile.getResourceId());
|
ourLog.info("Purging bulk data file: {}", nextFile.getResourceId());
|
||||||
getBinaryDao().delete(toId(nextFile.getResourceId()));
|
getBinaryDao().delete(toId(nextFile.getResourceId()), new SystemRequestDetails());
|
||||||
getBinaryDao().forceExpungeInExistingTransaction(toId(nextFile.getResourceId()), new ExpungeOptions().setExpungeDeletedResources(true).setExpungeOldVersions(true), null);
|
getBinaryDao().forceExpungeInExistingTransaction(toId(nextFile.getResourceId()), new ExpungeOptions().setExpungeDeletedResources(true).setExpungeOldVersions(true), new SystemRequestDetails());
|
||||||
myBulkExportCollectionFileDao.deleteByPid(nextFile.getId());
|
myBulkExportCollectionFileDao.deleteByPid(nextFile.getId());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,7 +65,6 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.data.domain.PageRequest;
|
import org.springframework.data.domain.PageRequest;
|
||||||
import org.springframework.data.domain.Pageable;
|
|
||||||
import org.springframework.data.domain.Slice;
|
import org.springframework.data.domain.Slice;
|
||||||
import org.springframework.transaction.PlatformTransactionManager;
|
import org.springframework.transaction.PlatformTransactionManager;
|
||||||
import org.springframework.transaction.support.TransactionTemplate;
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
|
@ -655,16 +654,8 @@ public class JpaPackageCache extends BasePackageCacheManager implements IHapiPac
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteAndExpungeResourceBinary(IIdType theResourceBinaryId, ExpungeOptions theOptions) {
|
private void deleteAndExpungeResourceBinary(IIdType theResourceBinaryId, ExpungeOptions theOptions) {
|
||||||
|
getBinaryDao().delete(theResourceBinaryId, new SystemRequestDetails()).getEntity();
|
||||||
if (myPartitionSettings.isPartitioningEnabled()) {
|
getBinaryDao().forceExpungeInExistingTransaction(theResourceBinaryId, theOptions, new SystemRequestDetails());
|
||||||
SystemRequestDetails requestDetails = new SystemRequestDetails();
|
|
||||||
requestDetails.setTenantId(JpaConstants.DEFAULT_PARTITION_NAME);
|
|
||||||
getBinaryDao().delete(theResourceBinaryId, requestDetails).getEntity();
|
|
||||||
getBinaryDao().forceExpungeInExistingTransaction(theResourceBinaryId, theOptions, requestDetails);
|
|
||||||
} else {
|
|
||||||
getBinaryDao().delete(theResourceBinaryId).getEntity();
|
|
||||||
getBinaryDao().forceExpungeInExistingTransaction(theResourceBinaryId, theOptions, null);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -347,7 +347,7 @@ public class PackageInstallerSvcImpl implements IPackageInstallerSvc {
|
||||||
private IBundleProvider searchResource(IFhirResourceDao theDao, SearchParameterMap theMap) {
|
private IBundleProvider searchResource(IFhirResourceDao theDao, SearchParameterMap theMap) {
|
||||||
if (myPartitionSettings.isPartitioningEnabled()) {
|
if (myPartitionSettings.isPartitioningEnabled()) {
|
||||||
SystemRequestDetails requestDetails = new SystemRequestDetails();
|
SystemRequestDetails requestDetails = new SystemRequestDetails();
|
||||||
requestDetails.setTenantId(JpaConstants.DEFAULT_PARTITION_NAME);
|
// requestDetails.setTenantId(JpaConstants.DEFAULT_PARTITION_NAME);
|
||||||
return theDao.search(theMap, requestDetails);
|
return theDao.search(theMap, requestDetails);
|
||||||
} else {
|
} else {
|
||||||
return theDao.search(theMap);
|
return theDao.search(theMap);
|
||||||
|
|
|
@ -35,6 +35,8 @@ import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
|
||||||
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
|
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
|
||||||
import org.apache.commons.lang3.Validate;
|
import org.apache.commons.lang3.Validate;
|
||||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import org.slf4j.Logger;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
|
@ -44,11 +46,15 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
|
import static ca.uhn.fhir.jpa.model.util.JpaConstants.ALL_PARTITIONS_NAME;
|
||||||
import static ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster.doCallHooks;
|
import static ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster.doCallHooks;
|
||||||
import static ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster.doCallHooksAndReturnObject;
|
import static ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster.doCallHooksAndReturnObject;
|
||||||
import static ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster.hasHooks;
|
import static ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster.hasHooks;
|
||||||
|
import static org.slf4j.LoggerFactory.getLogger;
|
||||||
|
|
||||||
public class RequestPartitionHelperSvc implements IRequestPartitionHelperSvc {
|
public class RequestPartitionHelperSvc implements IRequestPartitionHelperSvc {
|
||||||
|
private static final Logger ourLog = getLogger(RequestPartitionHelperSvc.class);
|
||||||
|
|
||||||
|
|
||||||
private final HashSet<Object> myNonPartitionableResourceNames;
|
private final HashSet<Object> myNonPartitionableResourceNames;
|
||||||
|
|
||||||
|
@ -95,14 +101,18 @@ public class RequestPartitionHelperSvc implements IRequestPartitionHelperSvc {
|
||||||
public RequestPartitionId determineReadPartitionForRequest(@Nullable RequestDetails theRequest, String theResourceType) {
|
public RequestPartitionId determineReadPartitionForRequest(@Nullable RequestDetails theRequest, String theResourceType) {
|
||||||
RequestPartitionId requestPartitionId;
|
RequestPartitionId requestPartitionId;
|
||||||
|
|
||||||
|
boolean nonPartitionableResource = myNonPartitionableResourceNames.contains(theResourceType);
|
||||||
if (myPartitionSettings.isPartitioningEnabled()) {
|
if (myPartitionSettings.isPartitioningEnabled()) {
|
||||||
// Handle system requests
|
// Handle system requests
|
||||||
if ((theRequest == null && myNonPartitionableResourceNames.contains(theResourceType))) {
|
//TODO GGG eventually, theRequest will not be allowed to be null here, and we will pass through SystemRequestDetails instead.
|
||||||
|
if (theRequest == null && nonPartitionableResource) {
|
||||||
return RequestPartitionId.defaultPartition();
|
return RequestPartitionId.defaultPartition();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Interceptor call: STORAGE_PARTITION_IDENTIFY_READ
|
if (theRequest instanceof SystemRequestDetails) {
|
||||||
if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_READ, myInterceptorBroadcaster, theRequest)) {
|
requestPartitionId = getSystemRequestPartitionId(theRequest, nonPartitionableResource);
|
||||||
|
// Interceptor call: STORAGE_PARTITION_IDENTIFY_READ
|
||||||
|
} else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_READ, myInterceptorBroadcaster, theRequest)) {
|
||||||
HookParams params = new HookParams()
|
HookParams params = new HookParams()
|
||||||
.add(RequestDetails.class, theRequest)
|
.add(RequestDetails.class, theRequest)
|
||||||
.addIfMatchesType(ServletRequestDetails.class, theRequest);
|
.addIfMatchesType(ServletRequestDetails.class, theRequest);
|
||||||
|
@ -119,6 +129,48 @@ public class RequestPartitionHelperSvc implements IRequestPartitionHelperSvc {
|
||||||
return RequestPartitionId.allPartitions();
|
return RequestPartitionId.allPartitions();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* For system requests, read partition from tenant ID if present, otherwise set to DEFAULT. If the resource they are attempting to partition
|
||||||
|
* is non-partitionable scream in the logs and set the partition to DEFAULT.
|
||||||
|
*
|
||||||
|
* @param theRequest
|
||||||
|
* @param theNonPartitionableResource
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
@NotNull
|
||||||
|
private RequestPartitionId getSystemRequestPartitionId(@NotNull RequestDetails theRequest, boolean theNonPartitionableResource) {
|
||||||
|
RequestPartitionId requestPartitionId;
|
||||||
|
requestPartitionId = getSystemRequestPartitionId(theRequest);
|
||||||
|
if (theNonPartitionableResource && !requestPartitionId.isDefaultPartition()) {
|
||||||
|
throw new InternalErrorException("System call is attempting to write a non-partitionable resource to a partition! This is a bug!");
|
||||||
|
}
|
||||||
|
return requestPartitionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determine the partition for a System Call (defined by the fact that the request is of type SystemRequestDetails)
|
||||||
|
*
|
||||||
|
* 1. If the tenant ID is set to the constant for all partitions, return all partitions
|
||||||
|
* 2. If there is a tenant ID set in the request, use it.
|
||||||
|
* 3. Otherwise, return the Default Partition.
|
||||||
|
*
|
||||||
|
* @param theRequest The {@link SystemRequestDetails}
|
||||||
|
* @return the {@link RequestPartitionId} to be used for this request.
|
||||||
|
*/
|
||||||
|
@NotNull
|
||||||
|
private RequestPartitionId getSystemRequestPartitionId(@NotNull RequestDetails theRequest) {
|
||||||
|
if (theRequest.getTenantId() != null) {
|
||||||
|
if (theRequest.getTenantId().equals(ALL_PARTITIONS_NAME)) {
|
||||||
|
return RequestPartitionId.allPartitions();
|
||||||
|
} else {
|
||||||
|
return RequestPartitionId.fromPartitionName(theRequest.getTenantId());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return RequestPartitionId.defaultPartition();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Invoke the {@link Pointcut#STORAGE_PARTITION_IDENTIFY_CREATE} interceptor pointcut to determine the tenant for a create request.
|
* Invoke the {@link Pointcut#STORAGE_PARTITION_IDENTIFY_CREATE} interceptor pointcut to determine the tenant for a create request.
|
||||||
*/
|
*/
|
||||||
|
@ -128,18 +180,22 @@ public class RequestPartitionHelperSvc implements IRequestPartitionHelperSvc {
|
||||||
RequestPartitionId requestPartitionId;
|
RequestPartitionId requestPartitionId;
|
||||||
|
|
||||||
if (myPartitionSettings.isPartitioningEnabled()) {
|
if (myPartitionSettings.isPartitioningEnabled()) {
|
||||||
|
|
||||||
// Interceptor call: STORAGE_PARTITION_IDENTIFY_CREATE
|
|
||||||
HookParams params = new HookParams()
|
|
||||||
.add(IBaseResource.class, theResource)
|
|
||||||
.add(RequestDetails.class, theRequest)
|
|
||||||
.addIfMatchesType(ServletRequestDetails.class, theRequest);
|
|
||||||
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(myInterceptorBroadcaster, theRequest, Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE, params);
|
|
||||||
|
|
||||||
// Handle system requests
|
|
||||||
boolean nonPartitionableResource = myNonPartitionableResourceNames.contains(theResourceType);
|
boolean nonPartitionableResource = myNonPartitionableResourceNames.contains(theResourceType);
|
||||||
if (nonPartitionableResource && requestPartitionId == null) {
|
|
||||||
requestPartitionId = RequestPartitionId.defaultPartition();
|
if (theRequest instanceof SystemRequestDetails) {
|
||||||
|
requestPartitionId = getSystemRequestPartitionId(theRequest, nonPartitionableResource);
|
||||||
|
} else {
|
||||||
|
//This is an external Request (e.g. ServletRequestDetails) so we want to figure out the partition via interceptor.
|
||||||
|
HookParams params = new HookParams()// Interceptor call: STORAGE_PARTITION_IDENTIFY_CREATE
|
||||||
|
.add(IBaseResource.class, theResource)
|
||||||
|
.add(RequestDetails.class, theRequest)
|
||||||
|
.addIfMatchesType(ServletRequestDetails.class, theRequest);
|
||||||
|
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(myInterceptorBroadcaster, theRequest, Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE, params);
|
||||||
|
|
||||||
|
//If the interceptors haven't selected a partition, and its a non-partitionable resource anyhow, send to DEFAULT
|
||||||
|
if (nonPartitionableResource && requestPartitionId == null) {
|
||||||
|
requestPartitionId = RequestPartitionId.defaultPartition();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
String resourceName = myFhirContext.getResourceType(theResource);
|
String resourceName = myFhirContext.getResourceType(theResource);
|
||||||
|
|
|
@ -35,17 +35,15 @@ import ca.uhn.fhir.rest.server.IRestfulServerDefaults;
|
||||||
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor;
|
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor;
|
||||||
import com.google.common.collect.ArrayListMultimap;
|
import com.google.common.collect.ArrayListMultimap;
|
||||||
import com.google.common.collect.ImmutableListMultimap;
|
import com.google.common.collect.ImmutableListMultimap;
|
||||||
import com.google.common.collect.ImmutableMultimap;
|
|
||||||
import com.google.common.collect.ListMultimap;
|
import com.google.common.collect.ListMultimap;
|
||||||
import com.google.common.collect.Multimap;
|
|
||||||
import com.google.common.collect.Multimaps;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.Reader;
|
import java.io.Reader;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
|
||||||
|
import static ca.uhn.fhir.jpa.model.util.JpaConstants.ALL_PARTITIONS_NAME;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A default RequestDetails implementation that can be used for system calls to
|
* A default RequestDetails implementation that can be used for system calls to
|
||||||
|
@ -104,6 +102,11 @@ public class SystemRequestDetails extends RequestDetails {
|
||||||
}
|
}
|
||||||
myHeaders.put(theName, theValue);
|
myHeaders.put(theName, theValue);
|
||||||
}
|
}
|
||||||
|
public static SystemRequestDetails newSystemRequestAllPartitions() {
|
||||||
|
SystemRequestDetails systemRequestDetails = new SystemRequestDetails();
|
||||||
|
systemRequestDetails.setTenantId(ALL_PARTITIONS_NAME);
|
||||||
|
return systemRequestDetails;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -18,6 +18,7 @@ import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
|
||||||
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
|
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
|
||||||
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
|
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
|
||||||
import ca.uhn.fhir.jpa.entity.MdmLink;
|
import ca.uhn.fhir.jpa.entity.MdmLink;
|
||||||
|
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
|
||||||
import ca.uhn.fhir.mdm.api.MdmLinkSourceEnum;
|
import ca.uhn.fhir.mdm.api.MdmLinkSourceEnum;
|
||||||
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
|
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
|
||||||
import ca.uhn.fhir.parser.IParser;
|
import ca.uhn.fhir.parser.IParser;
|
||||||
|
@ -116,7 +117,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
||||||
|
|
||||||
Binary b = new Binary();
|
Binary b = new Binary();
|
||||||
b.setContent(new byte[]{0, 1, 2, 3});
|
b.setContent(new byte[]{0, 1, 2, 3});
|
||||||
String binaryId = myBinaryDao.create(b).getId().toUnqualifiedVersionless().getValue();
|
String binaryId = myBinaryDao.create(b, new SystemRequestDetails()).getId().toUnqualifiedVersionless().getValue();
|
||||||
|
|
||||||
BulkExportJobEntity job = new BulkExportJobEntity();
|
BulkExportJobEntity job = new BulkExportJobEntity();
|
||||||
job.setStatus(BulkExportJobStatusEnum.COMPLETE);
|
job.setStatus(BulkExportJobStatusEnum.COMPLETE);
|
||||||
|
@ -494,7 +495,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
||||||
Patient patient = new Patient();
|
Patient patient = new Patient();
|
||||||
patient.setId("PAT" + i);
|
patient.setId("PAT" + i);
|
||||||
patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i);
|
patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i);
|
||||||
myPatientDao.update(patient).getId().toUnqualifiedVersionless();
|
myPatientDao.update(patient, new SystemRequestDetails()).getId().toUnqualifiedVersionless();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a bulk job
|
// Create a bulk job
|
||||||
|
@ -523,7 +524,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
||||||
|
|
||||||
// Iterate over the files
|
// Iterate over the files
|
||||||
for (IBulkDataExportSvc.FileEntry next : status.getFiles()) {
|
for (IBulkDataExportSvc.FileEntry next : status.getFiles()) {
|
||||||
Binary nextBinary = myBinaryDao.read(next.getResourceId());
|
Binary nextBinary = myBinaryDao.read(next.getResourceId(), new SystemRequestDetails());
|
||||||
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
|
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
|
||||||
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
|
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
|
||||||
ourLog.info("Next contents for type {}:\n{}", next.getResourceType(), nextContents);
|
ourLog.info("Next contents for type {}:\n{}", next.getResourceType(), nextContents);
|
||||||
|
@ -848,7 +849,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
||||||
|
|
||||||
public String getBinaryContents(IBulkDataExportSvc.JobInfo theJobInfo, int theIndex) {
|
public String getBinaryContents(IBulkDataExportSvc.JobInfo theJobInfo, int theIndex) {
|
||||||
// Iterate over the files
|
// Iterate over the files
|
||||||
Binary nextBinary = myBinaryDao.read(theJobInfo.getFiles().get(theIndex).getResourceId());
|
Binary nextBinary = myBinaryDao.read(theJobInfo.getFiles().get(theIndex).getResourceId(), new SystemRequestDetails());
|
||||||
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
|
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
|
||||||
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
|
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
|
||||||
ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents);
|
ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents);
|
||||||
|
@ -928,7 +929,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
||||||
|
|
||||||
|
|
||||||
//Check Observation Content
|
//Check Observation Content
|
||||||
Binary observationExportContent = myBinaryDao.read(jobInfo.getFiles().get(1).getResourceId());
|
Binary observationExportContent = myBinaryDao.read(jobInfo.getFiles().get(1).getResourceId(), new SystemRequestDetails());
|
||||||
assertEquals(Constants.CT_FHIR_NDJSON, observationExportContent.getContentType());
|
assertEquals(Constants.CT_FHIR_NDJSON, observationExportContent.getContentType());
|
||||||
nextContents = new String(observationExportContent.getContent(), Constants.CHARSET_UTF8);
|
nextContents = new String(observationExportContent.getContent(), Constants.CHARSET_UTF8);
|
||||||
ourLog.info("Next contents for type {}:\n{}", observationExportContent.getResourceType(), nextContents);
|
ourLog.info("Next contents for type {}:\n{}", observationExportContent.getResourceType(), nextContents);
|
||||||
|
@ -1029,7 +1030,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCacheSettingIsRespectedWhenCreatingNewJobs() {
|
public void testCacheSettingIsRespectedWhenCreatingNewJobs() throws InterruptedException {
|
||||||
BulkDataExportOptions options = new BulkDataExportOptions();
|
BulkDataExportOptions options = new BulkDataExportOptions();
|
||||||
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
|
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
|
||||||
options.setResourceTypes(Sets.newHashSet("Procedure"));
|
options.setResourceTypes(Sets.newHashSet("Procedure"));
|
||||||
|
@ -1048,6 +1049,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
||||||
IBulkDataExportSvc.JobInfo jobInfo6 = myBulkDataExportSvc.submitJob(options, false);
|
IBulkDataExportSvc.JobInfo jobInfo6 = myBulkDataExportSvc.submitJob(options, false);
|
||||||
IBulkDataExportSvc.JobInfo jobInfo7 = myBulkDataExportSvc.submitJob(options, false);
|
IBulkDataExportSvc.JobInfo jobInfo7 = myBulkDataExportSvc.submitJob(options, false);
|
||||||
IBulkDataExportSvc.JobInfo jobInfo8 = myBulkDataExportSvc.submitJob(options, false);
|
IBulkDataExportSvc.JobInfo jobInfo8 = myBulkDataExportSvc.submitJob(options, false);
|
||||||
|
Thread.sleep(100L); //stupid commit timings.
|
||||||
IBulkDataExportSvc.JobInfo jobInfo9 = myBulkDataExportSvc.submitJob(options, false);
|
IBulkDataExportSvc.JobInfo jobInfo9 = myBulkDataExportSvc.submitJob(options, false);
|
||||||
|
|
||||||
//First non-cached should retrieve new ID.
|
//First non-cached should retrieve new ID.
|
||||||
|
@ -1061,7 +1063,48 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
||||||
//Now if we create another one and ask for the cache, we should get the most-recently-insert entry.
|
//Now if we create another one and ask for the cache, we should get the most-recently-insert entry.
|
||||||
IBulkDataExportSvc.JobInfo jobInfo10 = myBulkDataExportSvc.submitJob(options, true);
|
IBulkDataExportSvc.JobInfo jobInfo10 = myBulkDataExportSvc.submitJob(options, true);
|
||||||
assertThat(jobInfo10.getJobId(), is(equalTo(jobInfo9.getJobId())));
|
assertThat(jobInfo10.getJobId(), is(equalTo(jobInfo9.getJobId())));
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
public void testBulkExportWritesToDEFAULTPartitionWhenPartitioningIsEnabled() {
|
||||||
|
myPartitionSettings.setPartitioningEnabled(true);
|
||||||
|
|
||||||
|
createResources();
|
||||||
|
|
||||||
|
//Only get COVID-19 vaccinations
|
||||||
|
Set<String> filters = new HashSet<>();
|
||||||
|
filters.add("Immunization?vaccine-code=vaccines|COVID-19");
|
||||||
|
|
||||||
|
BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
|
||||||
|
bulkDataExportOptions.setOutputFormat(null);
|
||||||
|
bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Immunization"));
|
||||||
|
bulkDataExportOptions.setSince(null);
|
||||||
|
bulkDataExportOptions.setFilters(filters);
|
||||||
|
bulkDataExportOptions.setGroupId(myPatientGroupId);
|
||||||
|
bulkDataExportOptions.setExpandMdm(true);
|
||||||
|
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
|
||||||
|
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
|
||||||
|
|
||||||
|
myBulkDataExportSvc.buildExportFiles();
|
||||||
|
awaitAllBulkJobCompletions();
|
||||||
|
|
||||||
|
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
|
||||||
|
|
||||||
|
assertThat(jobInfo.getStatus(), equalTo(BulkExportJobStatusEnum.COMPLETE));
|
||||||
|
assertThat(jobInfo.getFiles().size(), equalTo(1));
|
||||||
|
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
|
||||||
|
|
||||||
|
// Check immunization Content
|
||||||
|
String nextContents = getBinaryContents(jobInfo, 0);
|
||||||
|
|
||||||
|
assertThat(nextContents, is(containsString("IMM1")));
|
||||||
|
assertThat(nextContents, is(containsString("IMM3")));
|
||||||
|
assertThat(nextContents, is(containsString("IMM5")));
|
||||||
|
assertThat(nextContents, is(containsString("IMM7")));
|
||||||
|
assertThat(nextContents, is(containsString("IMM9")));
|
||||||
|
assertThat(nextContents, is(containsString("IMM999")));
|
||||||
|
|
||||||
|
assertThat(nextContents, is(not(containsString("Flu"))));
|
||||||
|
myPartitionSettings.setPartitioningEnabled(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createResources() {
|
private void createResources() {
|
||||||
|
@ -1071,7 +1114,8 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
||||||
//Manually create a golden record
|
//Manually create a golden record
|
||||||
Patient goldenPatient = new Patient();
|
Patient goldenPatient = new Patient();
|
||||||
goldenPatient.setId("PAT999");
|
goldenPatient.setId("PAT999");
|
||||||
DaoMethodOutcome g1Outcome = myPatientDao.update(goldenPatient);
|
SystemRequestDetails srd = SystemRequestDetails.newSystemRequestAllPartitions();
|
||||||
|
DaoMethodOutcome g1Outcome = myPatientDao.update(goldenPatient, srd);
|
||||||
Long goldenPid = myIdHelperService.getPidOrNull(g1Outcome.getResource());
|
Long goldenPid = myIdHelperService.getPidOrNull(g1Outcome.getResource());
|
||||||
|
|
||||||
//Create our golden records' data.
|
//Create our golden records' data.
|
||||||
|
@ -1098,12 +1142,12 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
||||||
createCareTeamWithIndex(i, patId);
|
createCareTeamWithIndex(i, patId);
|
||||||
}
|
}
|
||||||
|
|
||||||
myPatientGroupId = myGroupDao.update(group).getId();
|
myPatientGroupId = myGroupDao.update(group, new SystemRequestDetails()).getId();
|
||||||
|
|
||||||
//Manually create another golden record
|
//Manually create another golden record
|
||||||
Patient goldenPatient2 = new Patient();
|
Patient goldenPatient2 = new Patient();
|
||||||
goldenPatient2.setId("PAT888");
|
goldenPatient2.setId("PAT888");
|
||||||
DaoMethodOutcome g2Outcome = myPatientDao.update(goldenPatient2);
|
DaoMethodOutcome g2Outcome = myPatientDao.update(goldenPatient2, new SystemRequestDetails());
|
||||||
Long goldenPid2 = myIdHelperService.getPidOrNull(g2Outcome.getResource());
|
Long goldenPid2 = myIdHelperService.getPidOrNull(g2Outcome.getResource());
|
||||||
|
|
||||||
//Create some nongroup patients MDM linked to a different golden resource. They shouldnt be included in the query.
|
//Create some nongroup patients MDM linked to a different golden resource. They shouldnt be included in the query.
|
||||||
|
@ -1132,14 +1176,14 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
||||||
patient.setGender(i % 2 == 0 ? Enumerations.AdministrativeGender.MALE : Enumerations.AdministrativeGender.FEMALE);
|
patient.setGender(i % 2 == 0 ? Enumerations.AdministrativeGender.MALE : Enumerations.AdministrativeGender.FEMALE);
|
||||||
patient.addName().setFamily("FAM" + i);
|
patient.addName().setFamily("FAM" + i);
|
||||||
patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i);
|
patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i);
|
||||||
return myPatientDao.update(patient);
|
return myPatientDao.update(patient, new SystemRequestDetails());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createCareTeamWithIndex(int i, IIdType patId) {
|
private void createCareTeamWithIndex(int i, IIdType patId) {
|
||||||
CareTeam careTeam = new CareTeam();
|
CareTeam careTeam = new CareTeam();
|
||||||
careTeam.setId("CT" + i);
|
careTeam.setId("CT" + i);
|
||||||
careTeam.setSubject(new Reference(patId)); // This maps to the "patient" search parameter on CareTeam
|
careTeam.setSubject(new Reference(patId)); // This maps to the "patient" search parameter on CareTeam
|
||||||
myCareTeamDao.update(careTeam);
|
myCareTeamDao.update(careTeam, new SystemRequestDetails());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createImmunizationWithIndex(int i, IIdType patId) {
|
private void createImmunizationWithIndex(int i, IIdType patId) {
|
||||||
|
@ -1157,7 +1201,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
||||||
cc.addCoding().setSystem("vaccines").setCode("COVID-19");
|
cc.addCoding().setSystem("vaccines").setCode("COVID-19");
|
||||||
immunization.setVaccineCode(cc);
|
immunization.setVaccineCode(cc);
|
||||||
}
|
}
|
||||||
myImmunizationDao.update(immunization);
|
myImmunizationDao.update(immunization, new SystemRequestDetails());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createObservationWithIndex(int i, IIdType patId) {
|
private void createObservationWithIndex(int i, IIdType patId) {
|
||||||
|
@ -1168,7 +1212,7 @@ public class BulkDataExportSvcImplR4Test extends BaseBatchJobR4Test {
|
||||||
if (patId != null) {
|
if (patId != null) {
|
||||||
obs.getSubject().setReference(patId.getValue());
|
obs.getSubject().setReference(patId.getValue());
|
||||||
}
|
}
|
||||||
myObservationDao.update(obs);
|
myObservationDao.update(obs, new SystemRequestDetails());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void linkToGoldenResource(Long theGoldenPid, Long theSourcePid) {
|
public void linkToGoldenResource(Long theGoldenPid, Long theSourcePid) {
|
||||||
|
|
|
@ -642,7 +642,6 @@ public class PartitioningSqlR4Test extends BasePartitioningR4Test {
|
||||||
assertEquals(myPartitionId, resourceTable.getPartitionId().getPartitionId().intValue());
|
assertEquals(myPartitionId, resourceTable.getPartitionId().getPartitionId().intValue());
|
||||||
assertEquals(myPartitionDate, resourceTable.getPartitionId().getPartitionDate());
|
assertEquals(myPartitionDate, resourceTable.getPartitionId().getPartitionDate());
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -218,6 +218,11 @@ public class JpaConstants {
|
||||||
*/
|
*/
|
||||||
public static final String DEFAULT_PARTITION_NAME = "DEFAULT";
|
public static final String DEFAULT_PARTITION_NAME = "DEFAULT";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The name of the collection of all partitions
|
||||||
|
*/
|
||||||
|
public static final String ALL_PARTITIONS_NAME = "ALL_PARTITIONS";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parameter for the $expand operation
|
* Parameter for the $expand operation
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue