Merge pull request #2463 from hapifhir/issue-2445-support-patient-level-export

Issue 2445 support patient level export
This commit is contained in:
Tadgh 2021-03-11 13:20:44 -05:00 committed by GitHub
commit 950e65dc48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1134 additions and 355 deletions

View File

@ -50,6 +50,7 @@ public class SearchParameterUtil {
return retVal;
}
@Nullable
public static String getCode(FhirContext theContext, IBaseResource theResource) {
return getStringChild(theContext, theResource, "code");

View File

@ -0,0 +1,5 @@
---
type: add
issue: 2433
title: "Support has been added for MDM expansion during Group bulk export. Calling a group export via `/Group/123/$export?_mdm=true`
will cause Bulk Export to not only match group members, but also any MDM-matched patients, and their related golden record patients"

View File

@ -0,0 +1,4 @@
---
type: add
issue: 2445
title: "Support has been added for patient level Bulk export. This can be done via the `/Patient/$export` endpoint. Also, support has been added for setting Cache-Control header to no-cache for Bulk Export requests."

View File

@ -33,4 +33,5 @@ import org.springframework.context.annotation.Import;
public class BatchJobsConfig {
public static final String BULK_EXPORT_JOB_NAME = "bulkExportJob";
public static final String GROUP_BULK_EXPORT_JOB_NAME = "groupBulkExportJob";
public static final String PATIENT_BULK_EXPORT_JOB_NAME = "patientBulkExportJob";
}

View File

@ -20,22 +20,55 @@ package ca.uhn.fhir.jpa.bulk.api;
* #L%
*/
import org.hl7.fhir.instance.model.api.IIdType;
import java.util.Date;
import java.util.Set;
public class BulkDataExportOptions {
private final String myOutputFormat;
private final Set<String> myResourceTypes;
private final Date mySince;
private final Set<String> myFilters;
public BulkDataExportOptions() {
public BulkDataExportOptions(String theOutputFormat, Set<String> theResourceTypes, Date theSince, Set<String> theFilters) {
}
public enum ExportStyle {
PATIENT,
GROUP,
SYSTEM
}
private String myOutputFormat;
private Set<String> myResourceTypes;
private Date mySince;
private Set<String> myFilters;
private ExportStyle myExportStyle;
private boolean myExpandMdm;
private IIdType myGroupId;
public void setOutputFormat(String theOutputFormat) {
myOutputFormat = theOutputFormat;
}
public void setResourceTypes(Set<String> theResourceTypes) {
myResourceTypes = theResourceTypes;
}
public void setSince(Date theSince) {
mySince = theSince;
}
public void setFilters(Set<String> theFilters) {
myFilters = theFilters;
}
public ExportStyle getExportStyle() {
return myExportStyle;
}
public void setExportStyle(ExportStyle theExportStyle) {
myExportStyle = theExportStyle;
}
public String getOutputFormat() {
return myOutputFormat;
}
@ -51,4 +84,20 @@ public class BulkDataExportOptions {
public Set<String> getFilters() {
return myFilters;
}
public boolean isExpandMdm() {
return myExpandMdm;
}
public void setExpandMdm(boolean theExpandMdm) {
myExpandMdm = theExpandMdm;
}
public IIdType getGroupId() {
return myGroupId;
}
public void setGroupId(IIdType theGroupId) {
myGroupId = theGroupId;
}
}

View File

@ -1,45 +0,0 @@
package ca.uhn.fhir.jpa.bulk.api;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2021 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.hl7.fhir.instance.model.api.IIdType;
import java.util.Date;
import java.util.Set;
public class GroupBulkDataExportOptions extends BulkDataExportOptions {
private final IIdType myGroupId;
private final boolean myMdm;
public GroupBulkDataExportOptions(String theOutputFormat, Set<String> theResourceTypes, Date theSince, Set<String> theFilters, IIdType theGroupId, boolean theMdm) {
super(theOutputFormat, theResourceTypes, theSince, theFilters);
myGroupId = theGroupId;
myMdm = theMdm;
}
public IIdType getGroupId() {
return myGroupId;
}
public boolean isMdm() {
return myMdm;
}
}

View File

@ -27,6 +27,7 @@ import javax.transaction.Transactional;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
public interface IBulkDataExportSvc {
void buildExportFiles();
@ -36,8 +37,15 @@ public interface IBulkDataExportSvc {
JobInfo submitJob(BulkDataExportOptions theBulkDataExportOptions);
JobInfo submitJob(BulkDataExportOptions theBulkDataExportOptions, Boolean useCache);
JobInfo getJobInfoOrThrowResourceNotFound(String theJobId);
/**
* Return a set of all resource types which contain search parameters which have Patient as a target.
*/
Set<String> getPatientCompartmentResources();
void cancelAndPurgeAllJobs();
class JobInfo {

View File

@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.bulk.job;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.batch.log.Logs;
@ -29,6 +30,7 @@ import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
@ -43,10 +45,12 @@ import org.springframework.beans.factory.annotation.Value;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
public abstract class BaseBulkItemReader implements ItemReader<List<ResourcePersistentId>> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@ -73,6 +77,7 @@ public abstract class BaseBulkItemReader implements ItemReader<List<ResourcePers
private RuntimeResourceDefinition myResourceDefinition;
private Iterator<ResourcePersistentId> myPidIterator;
private RuntimeSearchParam myPatientSearchParam;
/**
* Get and cache an ISearchBuilder for the given resource type this partition is responsible for.
@ -98,24 +103,40 @@ public abstract class BaseBulkItemReader implements ItemReader<List<ResourcePers
abstract Iterator<ResourcePersistentId> getResourcePidIterator();
protected SearchParameterMap createSearchParameterMapForJob() {
protected List<SearchParameterMap> createSearchParameterMapsForResourceType() {
BulkExportJobEntity jobEntity = getJobEntity();
RuntimeResourceDefinition theDef = getResourceDefinition();
SearchParameterMap map = new SearchParameterMap();
Map<String, String[]> requestUrl = UrlUtil.parseQueryStrings(jobEntity.getRequest());
String[] typeFilters = requestUrl.get(JpaConstants.PARAM_EXPORT_TYPE_FILTER);
List<SearchParameterMap> spMaps = null;
if (typeFilters != null) {
Optional<String> filter = Arrays.stream(typeFilters).filter(t -> t.startsWith(myResourceType + "?")).findFirst();
if (filter.isPresent()) {
String matchUrl = filter.get();
map = myMatchUrlService.translateMatchUrl(matchUrl, theDef);
}
spMaps = Arrays.stream(typeFilters)
.filter(typeFilter -> typeFilter.startsWith(myResourceType + "?"))
.map(filter -> buildSearchParameterMapForTypeFilter(filter, theDef))
.collect(Collectors.toList());
}
if (jobEntity.getSince() != null) {
map.setLastUpdated(new DateRangeParam(jobEntity.getSince(), null));
//None of the _typeFilters applied to the current resource type, so just make a simple one.
if (spMaps == null || spMaps.isEmpty()) {
SearchParameterMap defaultMap = new SearchParameterMap();
enhanceSearchParameterMapWithCommonParameters(defaultMap);
spMaps = Collections.singletonList(defaultMap);
}
return spMaps;
}
private void enhanceSearchParameterMapWithCommonParameters(SearchParameterMap map) {
map.setLoadSynchronous(true);
return map;
if (getJobEntity().getSince() != null) {
map.setLastUpdated(new DateRangeParam(getJobEntity().getSince(), null));
}
}
public SearchParameterMap buildSearchParameterMapForTypeFilter(String theFilter, RuntimeResourceDefinition theDef) {
SearchParameterMap searchParameterMap = myMatchUrlService.translateMatchUrl(theFilter, theDef);
enhanceSearchParameterMapWithCommonParameters(searchParameterMap);
return searchParameterMap;
}
protected RuntimeResourceDefinition getResourceDefinition() {
@ -157,4 +178,48 @@ public abstract class BaseBulkItemReader implements ItemReader<List<ResourcePers
return outgoing.size() == 0 ? null : outgoing;
}
/**
* Given the resource type, fetch its patient-based search parameter name
* 1. Attempt to find one called 'patient'
* 2. If that fails, find one called 'subject'
* 3. If that fails, find find by Patient Compartment.
* 3.1 If that returns >1 result, throw an error
* 3.2 If that returns 1 result, return it
*/
protected RuntimeSearchParam getPatientSearchParamForCurrentResourceType() {
if (myPatientSearchParam == null) {
RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(myResourceType);
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("patient");
if (myPatientSearchParam == null) {
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("subject");
if (myPatientSearchParam == null) {
myPatientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition);
if (myPatientSearchParam == null) {
String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", myResourceType);
throw new IllegalArgumentException(errorMessage);
}
}
}
}
return myPatientSearchParam;
}
/**
* Search the resource definition for a compartment named 'patient' and return its related Search Parameter.
*/
protected RuntimeSearchParam getRuntimeSearchParamByCompartment(RuntimeResourceDefinition runtimeResourceDefinition) {
RuntimeSearchParam patientSearchParam;
List<RuntimeSearchParam> searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient");
if (searchParams == null || searchParams.size() == 0) {
String errorMessage = String.format("Resource type [%s] is not eligible for this type of export, as it contains no Patient compartment, and no `patient` or `subject` search parameter", myResourceType);
throw new IllegalArgumentException(errorMessage);
} else if (searchParams.size() == 1) {
patientSearchParam = searchParams.get(0);
} else {
String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as we are unable to disambiguate which patient search parameter we should be searching by.", myResourceType);
throw new IllegalArgumentException(errorMessage);
}
return patientSearchParam;
}
}

View File

@ -92,6 +92,17 @@ public class BulkExportJobConfig {
.build();
}
@Bean
@Lazy
public Job patientBulkExportJob() {
return myJobBuilderFactory.get(BatchJobsConfig.PATIENT_BULK_EXPORT_JOB_NAME)
.validator(bulkJobParameterValidator())
.start(createBulkExportEntityStep())
.next(patientPartitionStep())
.next(closeJobStep())
.build();
}
@Bean
public GroupIdPresentValidator groupBulkJobParameterValidator() {
return new GroupIdPresentValidator();
@ -115,6 +126,7 @@ public class BulkExportJobConfig {
return new BulkExportJobParameterValidator();
}
//Writers
@Bean
public Step groupBulkExportGenerateResourceFilesStep() {
return myStepBuilderFactory.get("groupBulkExportGenerateResourceFilesStep")
@ -122,17 +134,10 @@ public class BulkExportJobConfig {
.reader(groupBulkItemReader())
.processor(myPidToIBaseResourceProcessor)
.writer(resourceToFileWriter())
.listener(bulkExportGenrateResourceFilesStepListener())
.listener(bulkExportGenerateResourceFilesStepListener())
.build();
}
@Bean
@StepScope
public GroupBulkItemReader groupBulkItemReader(){
return new GroupBulkItemReader();
}
@Bean
public Step bulkExportGenerateResourceFilesStep() {
return myStepBuilderFactory.get("bulkExportGenerateResourceFilesStep")
@ -140,7 +145,17 @@ public class BulkExportJobConfig {
.reader(bulkItemReader())
.processor(myPidToIBaseResourceProcessor)
.writer(resourceToFileWriter())
.listener(bulkExportGenrateResourceFilesStepListener())
.listener(bulkExportGenerateResourceFilesStepListener())
.build();
}
@Bean
public Step patientBulkExportGenerateResourceFilesStep() {
return myStepBuilderFactory.get("patientBulkExportGenerateResourceFilesStep")
.<List<ResourcePersistentId>, List<IBaseResource>> chunk(CHUNK_SIZE) //1000 resources per generated file, as the reader returns 10 resources at a time.
.reader(patientBulkItemReader())
.processor(myPidToIBaseResourceProcessor)
.writer(resourceToFileWriter())
.listener(bulkExportGenerateResourceFilesStepListener())
.build();
}
@ -165,10 +180,17 @@ public class BulkExportJobConfig {
@Bean
@JobScope
public BulkExportGenerateResourceFilesStepListener bulkExportGenrateResourceFilesStepListener() {
public BulkExportGenerateResourceFilesStepListener bulkExportGenerateResourceFilesStepListener() {
return new BulkExportGenerateResourceFilesStepListener();
}
@Bean
public Step partitionStep() {
return myStepBuilderFactory.get("partitionStep")
.partitioner("bulkExportGenerateResourceFilesStep", bulkExportResourceTypePartitioner())
.step(bulkExportGenerateResourceFilesStep())
.build();
}
@Bean
public Step groupPartitionStep() {
@ -177,14 +199,28 @@ public class BulkExportJobConfig {
.step(groupBulkExportGenerateResourceFilesStep())
.build();
}
@Bean
public Step partitionStep() {
public Step patientPartitionStep() {
return myStepBuilderFactory.get("partitionStep")
.partitioner("bulkExportGenerateResourceFilesStep", bulkExportResourceTypePartitioner())
.step(bulkExportGenerateResourceFilesStep())
.partitioner("patientBulkExportGenerateResourceFilesStep", bulkExportResourceTypePartitioner())
.step(patientBulkExportGenerateResourceFilesStep())
.build();
}
@Bean
@StepScope
public GroupBulkItemReader groupBulkItemReader(){
return new GroupBulkItemReader();
}
@Bean
@StepScope
public PatientBulkItemReader patientBulkItemReader() {
return new PatientBulkItemReader();
}
@Bean
@StepScope
public BulkItemReader bulkItemReader(){
@ -199,7 +235,7 @@ public class BulkExportJobConfig {
@Bean
@StepScope
public ItemWriter<List<IBaseResource>> resourceToFileWriter() {
public ResourceToFileWriter resourceToFileWriter() {
return new ResourceToFileWriter();
}

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.bulk.job;
* #L%
*/
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.rest.api.Constants;
import org.springframework.batch.core.JobParametersBuilder;
@ -65,4 +66,8 @@ public class BulkExportJobParametersBuilder extends JobParametersBuilder {
this.addLong("readChunkSize", theReadChunkSize);
return this;
}
public BulkExportJobParametersBuilder setExportStyle(BulkDataExportOptions.ExportStyle theExportStyle) {
this.addString("exportStyle", theExportStyle.name());
return this;
}
}

View File

@ -20,30 +20,20 @@ package ca.uhn.fhir.jpa.bulk.job;
* #L%
*/
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.util.UrlUtil;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* Basic Bulk Export implementation which simply reads all type filters and applies them, along with the _since param
@ -52,19 +42,21 @@ import java.util.Optional;
public class BulkItemReader extends BaseBulkItemReader {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@Override
Iterator<ResourcePersistentId> getResourcePidIterator() {
ourLog.info("Bulk export assembling export of type {} for job {}", myResourceType, myJobUUID);
Set<ResourcePersistentId> myReadPids = new HashSet<>();
SearchParameterMap map = createSearchParameterMapForJob();
List<SearchParameterMap> map = createSearchParameterMapsForResourceType();
ISearchBuilder sb = getSearchBuilderForLocalResourceType();
IResultIterator myResultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
List<ResourcePersistentId> myReadPids = new ArrayList<>();
while (myResultIterator.hasNext()) {
myReadPids.add(myResultIterator.next());
for (SearchParameterMap spMap: map) {
ourLog.debug("About to evaluate query {}", spMap.toNormalizedQueryString(myContext));
IResultIterator myResultIterator = sb.createQuery(spMap, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
while (myResultIterator.hasNext()) {
myReadPids.add(myResultIterator.next());
}
}
return myReadPids.iterator();
}

View File

@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.bulk.job;
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.Constants;
import org.apache.commons.lang3.StringUtils;
import org.springframework.batch.core.StepContribution;
@ -66,7 +67,20 @@ public class CreateBulkExportEntityTasklet implements Tasklet {
outputFormat = Constants.CT_FHIR_NDJSON;
}
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(outputFormat, resourceTypeSet, since, filterSet));
BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setOutputFormat(outputFormat);
bulkDataExportOptions.setResourceTypes(resourceTypeSet);
bulkDataExportOptions.setSince(since);
bulkDataExportOptions.setFilters(filterSet);
//Set export style
String exportStyle = (String)jobParameters.get("exportStyle");
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.valueOf(exportStyle));
//Set group id if present
String groupId = (String)jobParameters.get("groupId");
bulkDataExportOptions.setGroupId(new IdDt(groupId));
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
addUUIDToJobContext(theChunkContext, jobInfo.getJobId());
return RepeatStatus.FINISHED;

View File

@ -20,7 +20,6 @@ package ca.uhn.fhir.jpa.bulk.job;
* #L%
*/
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.batch.log.Logs;
@ -28,6 +27,7 @@ import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao;
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.QueryChunker;
@ -39,7 +39,6 @@ import ca.uhn.fhir.rest.param.ReferenceParam;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
@ -77,11 +76,9 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
@Autowired
private IMdmLinkDao myMdmLinkDao;
private RuntimeSearchParam myPatientSearchParam;
@Override
Iterator<ResourcePersistentId> getResourcePidIterator() {
List<ResourcePersistentId> myReadPids = new ArrayList<>();
Set<ResourcePersistentId> myReadPids = new HashSet<>();
//Short circuit out if we detect we are attempting to extract patients
if (myResourceType.equalsIgnoreCase("Patient")) {
@ -113,7 +110,6 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
*/
private Iterator<ResourcePersistentId> getExpandedPatientIterator() {
Set<Long> patientPidsToExport = new HashSet<>();
//This gets all member pids
List<String> members = getMembers();
List<IIdType> ids = members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList());
List<Long> pidsOrThrowException = myIdHelperService.getPidsOrThrowException(ids);
@ -162,8 +158,15 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
//Now lets translate these pids into resource IDs
Set<Long> uniquePids = new HashSet<>();
goldenPidTargetPidTuple.forEach(uniquePids::addAll);
Map<Long, Optional<String>> longOptionalMap = myIdHelperService.translatePidsToForcedIds(uniquePids);
expandedIds = longOptionalMap.values().stream().map(Optional::get).collect(Collectors.toSet());
Map<Long, Optional<String>> pidToForcedIdMap = myIdHelperService.translatePidsToForcedIds(uniquePids);
//If the result of the translation is an empty optional, it means there is no forced id, and we can use the PID as the resource ID.
Set<String> resolvedResourceIds = pidToForcedIdMap.entrySet().stream()
.map(entry -> entry.getValue().isPresent() ? entry.getValue().get() : entry.getKey().toString())
.collect(Collectors.toSet());
expandedIds.addAll(resolvedResourceIds);
}
//Now manually add the members of the group (its possible even with mdm expansion that some members dont have MDM matches,
@ -173,83 +176,40 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
return expandedIds;
}
private void queryResourceTypeWithReferencesToPatients(List<ResourcePersistentId> myReadPids, List<String> idChunk) {
private void queryResourceTypeWithReferencesToPatients(Set<ResourcePersistentId> myReadPids, List<String> idChunk) {
//Build SP map
//First, inject the _typeFilters and _since from the export job
SearchParameterMap expandedSpMap = createSearchParameterMapForJob();
List<SearchParameterMap> expandedSpMaps = createSearchParameterMapsForResourceType();
for (SearchParameterMap expandedSpMap: expandedSpMaps) {
//Since we are in a bulk job, we have to ensure the user didn't jam in a patient search param, since we need to manually set that.
validateSearchParameters(expandedSpMap);
//Since we are in a bulk job, we have to ensure the user didn't jam in a patient search param, since we need to manually set that.
validateSearchParameters(expandedSpMap);
// Now, further filter the query with patient references defined by the chunk of IDs we have.
filterSearchByResourceIds(idChunk, expandedSpMap);
// Now, further filter the query with patient references defined by the chunk of IDs we have.
filterSearchByResourceIds(idChunk, expandedSpMap);
// Fetch and cache a search builder for this resource type
ISearchBuilder searchBuilder = getSearchBuilderForLocalResourceType();
// Fetch and cache a search builder for this resource type
ISearchBuilder searchBuilder = getSearchBuilderForLocalResourceType();
//Execute query and all found pids to our local iterator.
IResultIterator resultIterator = searchBuilder.createQuery(expandedSpMap, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
while (resultIterator.hasNext()) {
myReadPids.add(resultIterator.next());
//Execute query and all found pids to our local iterator.
IResultIterator resultIterator = searchBuilder.createQuery(expandedSpMap, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
while (resultIterator.hasNext()) {
myReadPids.add(resultIterator.next());
}
}
}
private void filterSearchByResourceIds(List<String> idChunk, SearchParameterMap expandedSpMap) {
ReferenceOrListParam orList = new ReferenceOrListParam();
idChunk.forEach(id -> orList.add(new ReferenceParam(id)));
expandedSpMap.add(getPatientSearchParam().getName(), orList);
expandedSpMap.add(getPatientSearchParamForCurrentResourceType().getName(), orList);
}
private RuntimeSearchParam validateSearchParameters(SearchParameterMap expandedSpMap) {
RuntimeSearchParam runtimeSearchParam = getPatientSearchParam();
RuntimeSearchParam runtimeSearchParam = getPatientSearchParamForCurrentResourceType();
if (expandedSpMap.get(runtimeSearchParam.getName()) != null) {
throw new IllegalArgumentException(String.format("Group Bulk Export manually modifies the Search Parameter called [%s], so you may not include this search parameter in your _typeFilter!", runtimeSearchParam.getName()));
}
return runtimeSearchParam;
}
/**
* Given the resource type, fetch its patient-based search parameter name
* 1. Attempt to find one called 'patient'
* 2. If that fails, find one called 'subject'
* 3. If that fails, find find by Patient Compartment.
* 3.1 If that returns >1 result, throw an error
* 3.2 If that returns 1 result, return it
*/
private RuntimeSearchParam getPatientSearchParam() {
if (myPatientSearchParam == null) {
RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(myResourceType);
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("patient");
if (myPatientSearchParam == null) {
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("subject");
if (myPatientSearchParam == null) {
myPatientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition);
if (myPatientSearchParam == null) {
String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", myResourceType);
throw new IllegalArgumentException(errorMessage);
}
}
}
}
return myPatientSearchParam;
}
/**
* Search the resource definition for a compartment named 'patient' and return its related Search Parameter.
*/
private RuntimeSearchParam getRuntimeSearchParamByCompartment(RuntimeResourceDefinition runtimeResourceDefinition) {
RuntimeSearchParam patientSearchParam;
List<RuntimeSearchParam> searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient");
if (searchParams == null || searchParams.size() == 0) {
String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as it contains no Patient compartment, and no `patient` or `subject` search parameter", myResourceType);
throw new IllegalArgumentException(errorMessage);
} else if (searchParams.size() == 1) {
patientSearchParam = searchParams.get(0);
} else {
String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as we are unable to disambiguate which patient search parameter we should be searching by.", myResourceType);
throw new IllegalArgumentException(errorMessage);
}
return patientSearchParam;
}
}

View File

@ -0,0 +1,98 @@
package ca.uhn.fhir.jpa.bulk.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2021 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.ReferenceParam;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* Bulk Item reader for the Patient Bulk Export job.
* Instead of performing a normal query on the resource type using type filters, we instead
*
* 1. Determine the resourcetype
* 2. Search for anything that has `patient-compartment-search-param:missing=false`
*/
public class PatientBulkItemReader extends BaseBulkItemReader implements ItemReader<List<ResourcePersistentId>> {
@Autowired
private DaoConfig myDaoConfig;
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
private RuntimeSearchParam validateSearchParameters(SearchParameterMap expandedSpMap) {
RuntimeSearchParam runtimeSearchParam = getPatientSearchParamForCurrentResourceType();
if (expandedSpMap.get(runtimeSearchParam.getName()) != null) {
throw new IllegalArgumentException(String.format("Patient Bulk Export manually modifies the Search Parameter called [%s], so you may not include this search parameter in your _typeFilter!", runtimeSearchParam.getName()));
}
return runtimeSearchParam;
}
@Override
Iterator<ResourcePersistentId> getResourcePidIterator() {
if (myDaoConfig.getIndexMissingFields() == DaoConfig.IndexEnabledEnum.DISABLED) {
String errorMessage = "You attempted to start a Patient Bulk Export, but the system has `Index Missing Fields` disabled. It must be enabled for Patient Bulk Export";
ourLog.error(errorMessage);
throw new IllegalStateException(errorMessage);
}
List<ResourcePersistentId> myReadPids = new ArrayList<>();
//use _typeFilter and _since and all those fancy bits and bobs to generate our basic SP map.
List<SearchParameterMap> maps = createSearchParameterMapsForResourceType();
String patientSearchParam = getPatientSearchParamForCurrentResourceType().getName();
for (SearchParameterMap map: maps) {
//Ensure users did not monkey with the patient compartment search parameter.
validateSearchParameters(map);
//Skip adding the parameter querying for patient= if we are in fact querying the patient resource type.
if (!myResourceType.equalsIgnoreCase("Patient")) {
map.add(patientSearchParam, new ReferenceParam().setMissing(false));
}
ourLog.debug("About to execute query {}", map.toNormalizedQueryString(myContext));
ISearchBuilder sb = getSearchBuilderForLocalResourceType();
IResultIterator myResultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
while (myResultIterator.hasNext()) {
myReadPids.add(myResultIterator.next());
}
}
return myReadPids.iterator();
}
}

View File

@ -22,13 +22,13 @@ package ca.uhn.fhir.jpa.bulk.provider;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.model.BulkExportResponseJson;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.rest.annotation.IdParam;
import ca.uhn.fhir.rest.annotation.Operation;
import ca.uhn.fhir.rest.annotation.OperationParam;
import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.PreferHeader;
import ca.uhn.fhir.rest.server.RestfulServerUtils;
@ -43,14 +43,24 @@ import org.hl7.fhir.instance.model.api.IBaseOperationOutcome;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.InstantType;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.slf4j.LoggerFactory.getLogger;
public class BulkDataExportProvider {
public static final String FARM_TO_TABLE_TYPE_FILTER_REGEX = "(?:,)(?=[A-Z][a-z]+\\?)";
private static final Logger ourLog = getLogger(BulkDataExportProvider.class);
@Autowired
private IBulkDataExportSvc myBulkDataExportSvc;
@ -78,43 +88,75 @@ public class BulkDataExportProvider {
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theTypeFilter,
ServletRequestDetails theRequestDetails
) {
validatePreferAsyncHeader(theRequestDetails);
BulkDataExportOptions bulkDataExportOptions = buildSystemBulkExportOptions(theOutputFormat, theType, theSince, theTypeFilter);
Boolean useCache = shouldUseCache(theRequestDetails);
IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(bulkDataExportOptions, useCache);
writePollingLocationToResponseHeaders(theRequestDetails, outcome);
}
String preferHeader = theRequestDetails.getHeader(Constants.HEADER_PREFER);
PreferHeader prefer = RestfulServerUtils.parsePreferHeader(null, preferHeader);
if (prefer.getRespondAsync() == false) {
throw new InvalidRequestException("Must request async processing for $export");
private boolean shouldUseCache(ServletRequestDetails theRequestDetails) {
CacheControlDirective cacheControlDirective = new CacheControlDirective().parse(theRequestDetails.getHeaders(Constants.HEADER_CACHE_CONTROL));
return !cacheControlDirective.isNoCache();
}
private String getServerBase(ServletRequestDetails theRequestDetails) {
return StringUtils.removeEnd(theRequestDetails.getServerBaseForRequest(), "/");
}
/**
* Group/Id/$export
*/
@Operation(name = JpaConstants.OPERATION_EXPORT, manualResponse = true, idempotent = true, typeName = "Group")
public void groupExport(
@IdParam IIdType theIdParam,
@OperationParam(name = JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theOutputFormat,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theType,
@OperationParam(name = JpaConstants.PARAM_EXPORT_SINCE, min = 0, max = 1, typeName = "instant") IPrimitiveType<Date> theSince,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theTypeFilter,
@OperationParam(name = JpaConstants.PARAM_EXPORT_MDM, min = 0, max = 1, typeName = "boolean") IPrimitiveType<Boolean> theMdm,
ServletRequestDetails theRequestDetails
) {
ourLog.debug("Received Group Bulk Export Request for Group {}", theIdParam);
ourLog.debug("_type={}", theIdParam);
ourLog.debug("_since={}", theSince);
ourLog.debug("_typeFilter={}", theTypeFilter);
ourLog.debug("_mdm=", theMdm);
validatePreferAsyncHeader(theRequestDetails);
BulkDataExportOptions bulkDataExportOptions = buildGroupBulkExportOptions(theOutputFormat, theType, theSince, theTypeFilter, theIdParam, theMdm);
validateResourceTypesAllContainPatientSearchParams(bulkDataExportOptions.getResourceTypes());
IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(bulkDataExportOptions, shouldUseCache(theRequestDetails));
writePollingLocationToResponseHeaders(theRequestDetails, outcome);
}
private void validateResourceTypesAllContainPatientSearchParams(Set<String> theResourceTypes) {
List<String> badResourceTypes = theResourceTypes.stream()
.filter(resourceType -> !myBulkDataExportSvc.getPatientCompartmentResources().contains(resourceType))
.collect(Collectors.toList());
if (!badResourceTypes.isEmpty()) {
throw new InvalidRequestException(String.format("Resource types [%s] are invalid for this type of export, as they do not contain search parameters that refer to patients.", String.join(",", badResourceTypes)));
}
}
String outputFormat = theOutputFormat != null ? theOutputFormat.getValueAsString() : null;
Set<String> resourceTypes = null;
if (theType != null) {
resourceTypes = ArrayUtil.commaSeparatedListToCleanSet(theType.getValueAsString());
}
Date since = null;
if (theSince != null) {
since = theSince.getValue();
}
Set<String> filters = null;
if (theTypeFilter != null) {
filters = ArrayUtil.commaSeparatedListToCleanSet(theTypeFilter.getValueAsString());
}
IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(outputFormat, resourceTypes, since, filters));
String serverBase = getServerBase(theRequestDetails);
String pollLocation = serverBase + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" + JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + outcome.getJobId();
HttpServletResponse response = theRequestDetails.getServletResponse();
// Add standard headers
theRequestDetails.getServer().addHeadersToResponse(response);
// Successful 202 Accepted
response.addHeader(Constants.HEADER_CONTENT_LOCATION, pollLocation);
response.setStatus(Constants.STATUS_HTTP_202_ACCEPTED);
/**
* Patient/$export
*/
@Operation(name = JpaConstants.OPERATION_EXPORT, manualResponse = true, idempotent = true, typeName = "Patient")
public void patientExport(
@OperationParam(name = JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theOutputFormat,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theType,
@OperationParam(name = JpaConstants.PARAM_EXPORT_SINCE, min = 0, max = 1, typeName = "instant") IPrimitiveType<Date> theSince,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theTypeFilter,
ServletRequestDetails theRequestDetails
) {
validatePreferAsyncHeader(theRequestDetails);
BulkDataExportOptions bulkDataExportOptions = buildPatientBulkExportOptions(theOutputFormat, theType, theSince, theTypeFilter);
validateResourceTypesAllContainPatientSearchParams(bulkDataExportOptions.getResourceTypes());
IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(bulkDataExportOptions, shouldUseCache(theRequestDetails));
writePollingLocationToResponseHeaders(theRequestDetails, outcome);
}
/**
@ -171,37 +213,31 @@ public class BulkDataExportProvider {
OperationOutcomeUtil.addIssue(myFhirContext, oo, "error", status.getStatusMessage(), null, null);
myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToWriter(oo, response.getWriter());
response.getWriter().close();
}
}
private String getServerBase(ServletRequestDetails theRequestDetails) {
return StringUtils.removeEnd(theRequestDetails.getServerBaseForRequest(), "/");
private BulkDataExportOptions buildSystemBulkExportOptions(IPrimitiveType<String> theOutputFormat, IPrimitiveType<String> theType, IPrimitiveType<Date> theSince, IPrimitiveType<String> theTypeFilter) {
return buildBulkDataExportOptions(theOutputFormat, theType, theSince, theTypeFilter, BulkDataExportOptions.ExportStyle.SYSTEM);
}
/**
* Group/Id/$export
*/
@Operation(name = JpaConstants.OPERATION_EXPORT, manualResponse = true, idempotent = true, typeName = "Group")
public void groupExport(
@IdParam IIdType theIdParam,
@OperationParam(name = JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theOutputFormat,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theType,
@OperationParam(name = JpaConstants.PARAM_EXPORT_SINCE, min = 0, max = 1, typeName = "instant") IPrimitiveType<Date> theSince,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theTypeFilter,
@OperationParam(name = JpaConstants.PARAM_EXPORT_MDM, min = 0, max = 1, typeName = "boolean") IPrimitiveType<Boolean> theMdm,
private BulkDataExportOptions buildGroupBulkExportOptions(IPrimitiveType<String> theOutputFormat, IPrimitiveType<String> theType, IPrimitiveType<Date> theSince, IPrimitiveType<String> theTypeFilter, IIdType theGroupId, IPrimitiveType<Boolean> theExpandMdm) {
BulkDataExportOptions bulkDataExportOptions = buildBulkDataExportOptions(theOutputFormat, theType, theSince, theTypeFilter, BulkDataExportOptions.ExportStyle.GROUP);
bulkDataExportOptions.setGroupId(theGroupId);
ServletRequestDetails theRequestDetails
) {
String preferHeader = theRequestDetails.getHeader(Constants.HEADER_PREFER);
PreferHeader prefer = RestfulServerUtils.parsePreferHeader(null, preferHeader);
if (prefer.getRespondAsync() == false) {
throw new InvalidRequestException("Must request async processing for $export");
boolean mdm = false;
if (theExpandMdm != null) {
mdm = theExpandMdm.getValue();
}
bulkDataExportOptions.setExpandMdm(mdm);
return bulkDataExportOptions;
}
private BulkDataExportOptions buildPatientBulkExportOptions(IPrimitiveType<String> theOutputFormat, IPrimitiveType<String> theType, IPrimitiveType<Date> theSince, IPrimitiveType<String> theTypeFilter) {
return buildBulkDataExportOptions(theOutputFormat, theType, theSince, theTypeFilter, BulkDataExportOptions.ExportStyle.PATIENT);
}
private BulkDataExportOptions buildBulkDataExportOptions(IPrimitiveType<String> theOutputFormat, IPrimitiveType<String> theType, IPrimitiveType<Date> theSince, IPrimitiveType<String> theTypeFilter, BulkDataExportOptions.ExportStyle theExportStyle) {
String outputFormat = theOutputFormat != null ? theOutputFormat.getValueAsString() : null;
Set<String> resourceTypes = null;
@ -209,25 +245,25 @@ public class BulkDataExportProvider {
resourceTypes = ArrayUtil.commaSeparatedListToCleanSet(theType.getValueAsString());
}
//TODO GGG eventually, we will support these things.
Set<String> filters = null;
Date since = null;
if (theSince != null) {
since = theSince.getValue();
}
boolean mdm = false;
if (theMdm != null) {
mdm = theMdm.getValue();
}
if (theTypeFilter != null) {
filters = ArrayUtil.commaSeparatedListToCleanSet(theTypeFilter.getValueAsString());
}
IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(outputFormat, resourceTypes, since, filters, theIdParam, mdm));
Set<String> typeFilters = splitTypeFilters(theTypeFilter);
BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setFilters(typeFilters);
bulkDataExportOptions.setExportStyle(theExportStyle);
bulkDataExportOptions.setSince(since);
bulkDataExportOptions.setResourceTypes(resourceTypes);
bulkDataExportOptions.setOutputFormat(outputFormat);
return bulkDataExportOptions;
}
public void writePollingLocationToResponseHeaders(ServletRequestDetails theRequestDetails, IBulkDataExportSvc.JobInfo theOutcome) {
String serverBase = getServerBase(theRequestDetails);
String pollLocation = serverBase + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" + JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + outcome.getJobId();
String pollLocation = serverBase + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" + JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + theOutcome.getJobId();
HttpServletResponse response = theRequestDetails.getServletResponse();
@ -238,4 +274,26 @@ public class BulkDataExportProvider {
response.addHeader(Constants.HEADER_CONTENT_LOCATION, pollLocation);
response.setStatus(Constants.STATUS_HTTP_202_ACCEPTED);
}
private void validatePreferAsyncHeader(ServletRequestDetails theRequestDetails) {
String preferHeader = theRequestDetails.getHeader(Constants.HEADER_PREFER);
PreferHeader prefer = RestfulServerUtils.parsePreferHeader(null, preferHeader);
if (prefer.getRespondAsync() == false) {
throw new InvalidRequestException("Must request async processing for $export");
}
}
private Set<String> splitTypeFilters(IPrimitiveType<String> theTypeFilter) {
if (theTypeFilter== null) {
return null;
}
String typeFilterSring = theTypeFilter.getValueAsString();
String[] typeFilters = typeFilterSring.split(FARM_TO_TABLE_TYPE_FILTER_REGEX);
if (typeFilters == null || typeFilters.length == 0) {
return null;
}
return new HashSet<>(Arrays.asList(typeFilters));
}
}

View File

@ -21,13 +21,15 @@ package ca.uhn.fhir.jpa.bulk.svc;
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.fhirpath.IFhirPath;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.job.BulkExportJobConfig;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
@ -41,19 +43,21 @@ import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.util.UrlUtil;
import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBase;
import org.hl7.fhir.instance.model.api.IBaseBinary;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.InstantType;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.beans.factory.annotation.Autowired;
@ -66,18 +70,19 @@ import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.PostConstruct;
import javax.transaction.Transactional;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import static ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions.ExportStyle.GROUP;
import static ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions.ExportStyle.PATIENT;
import static ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions.ExportStyle.SYSTEM;
import static ca.uhn.fhir.util.UrlUtil.escapeUrlParam;
import static ca.uhn.fhir.util.UrlUtil.escapeUrlParams;
import static org.apache.commons.lang3.StringUtils.contains;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
@ -113,6 +118,12 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
@Qualifier(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME)
private org.springframework.batch.core.Job myGroupBulkExportJob;
@Autowired
@Qualifier(BatchJobsConfig.PATIENT_BULK_EXPORT_JOB_NAME)
private org.springframework.batch.core.Job myPatientBulkExportJob;
private Set<String> myCompartmentResources;
private final int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR);
/**
@ -229,6 +240,8 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
if (isGroupBulkJob(theBulkExportJobEntity)) {
enhanceBulkParametersWithGroupParameters(theBulkExportJobEntity, parameters);
myJobSubmitter.runJob(myGroupBulkExportJob, parameters.toJobParameters());
} else if (isPatientBulkJob(theBulkExportJobEntity)) {
myJobSubmitter.runJob(myPatientBulkExportJob, parameters.toJobParameters());
} else {
myJobSubmitter.runJob(myBulkExportJob, parameters.toJobParameters());
}
@ -237,6 +250,14 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
}
}
private boolean isPatientBulkJob(BulkExportJobEntity theBulkExportJobEntity) {
return theBulkExportJobEntity.getRequest().startsWith("/Patient/");
}
private boolean isGroupBulkJob(BulkExportJobEntity theBulkExportJobEntity) {
return theBulkExportJobEntity.getRequest().startsWith("/Group/");
}
private void enhanceBulkParametersWithGroupParameters(BulkExportJobEntity theBulkExportJobEntity, JobParametersBuilder theParameters) {
String theGroupId = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_GROUP_ID);
String expandMdm = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_MDM);
@ -244,9 +265,6 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
theParameters.addString(BulkExportJobConfig.EXPAND_MDM_PARAMETER, expandMdm);
}
private boolean isGroupBulkJob(BulkExportJobEntity theBulkExportJobEntity) {
return getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_GROUP_ID) != null;
}
@SuppressWarnings("unchecked")
private IFhirResourceDao<IBaseBinary> getBinaryDao() {
@ -271,6 +289,12 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
@Transactional
@Override
public JobInfo submitJob(BulkDataExportOptions theBulkDataExportOptions) {
return submitJob(theBulkDataExportOptions, true);
}
@Transactional
@Override
public JobInfo submitJob(BulkDataExportOptions theBulkDataExportOptions, Boolean useCache) {
String outputFormat = Constants.CT_FHIR_NDJSON;
if (isNotBlank(theBulkDataExportOptions.getOutputFormat())) {
outputFormat = theBulkDataExportOptions.getOutputFormat();
@ -282,7 +306,16 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
// TODO GGG KS can we encode BulkDataExportOptions as a JSON string as opposed to this request string. Feels like it would be a more extensible encoding...
//Probably yes, but this will all need to be rebuilt when we remove this bridge entity
StringBuilder requestBuilder = new StringBuilder();
requestBuilder.append("/").append(JpaConstants.OPERATION_EXPORT);
requestBuilder.append("/");
//Prefix the export url with Group/[id]/ or /Patient/ depending on what type of request it is.
if (theBulkDataExportOptions.getExportStyle().equals(GROUP)) {
requestBuilder.append(theBulkDataExportOptions.getGroupId().toVersionless()).append("/");
} else if (theBulkDataExportOptions.getExportStyle().equals(PATIENT)) {
requestBuilder.append("Patient/");
}
requestBuilder.append(JpaConstants.OPERATION_EXPORT);
requestBuilder.append("?").append(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT).append("=").append(escapeUrlParam(outputFormat));
Set<String> resourceTypes = theBulkDataExportOptions.getResourceTypes();
if (resourceTypes != null) {
@ -293,20 +326,28 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_SINCE).append("=").append(new InstantType(since).setTimeZoneZulu(true).getValueAsString());
}
if (theBulkDataExportOptions.getFilters() != null && theBulkDataExportOptions.getFilters().size() > 0) {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_TYPE_FILTER).append("=").append(String.join(",", escapeUrlParams(theBulkDataExportOptions.getFilters())));
theBulkDataExportOptions.getFilters().stream()
.forEach(filter -> requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_TYPE_FILTER).append("=").append(escapeUrlParam(filter)));
}
if (theBulkDataExportOptions instanceof GroupBulkDataExportOptions) {
GroupBulkDataExportOptions groupOptions = (GroupBulkDataExportOptions) theBulkDataExportOptions;
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_GROUP_ID).append("=").append(groupOptions.getGroupId().getValue());
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_MDM).append("=").append(groupOptions.isMdm());
if (theBulkDataExportOptions.getExportStyle().equals(GROUP)) {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_GROUP_ID).append("=").append(theBulkDataExportOptions.getGroupId().getValue());
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_MDM).append("=").append(theBulkDataExportOptions.isExpandMdm());
}
String request = requestBuilder.toString();
Date cutoff = DateUtils.addMilliseconds(new Date(), -myReuseBulkExportForMillis);
Pageable page = PageRequest.of(0, 10);
Slice<BulkExportJobEntity> existing = myBulkExportJobDao.findExistingJob(page, request, cutoff, BulkJobStatusEnum.ERROR);
if (!existing.isEmpty()) {
return toSubmittedJobInfo(existing.iterator().next());
//If we are using the cache, then attempt to retrieve a matching job based on the Request String, otherwise just make a new one.
if (useCache) {
Date cutoff = DateUtils.addMilliseconds(new Date(), -myReuseBulkExportForMillis);
Pageable page = PageRequest.of(0, 10);
Slice<BulkExportJobEntity> existing = myBulkExportJobDao.findExistingJob(page, request, cutoff, BulkJobStatusEnum.ERROR);
if (!existing.isEmpty()) {
return toSubmittedJobInfo(existing.iterator().next());
}
}
if (resourceTypes != null && resourceTypes.contains("Binary")) {
@ -318,7 +359,7 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
// This is probably not a useful default, but having the default be "download the whole
// server" seems like a risky default too. We'll deal with that by having the default involve
// only returning a small time span
resourceTypes = myContext.getResourceTypes();
resourceTypes = getAllowedResourceTypesForBulkExportStyle(theBulkDataExportOptions.getExportStyle());
if (since == null) {
since = DateUtils.addDays(new Date(), -1);
}
@ -369,17 +410,13 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
public void validateTypeFilters(Set<String> theTheFilters, Set<String> theResourceTypes) {
if (theTheFilters != null) {
Set<String> types = new HashSet<>();
for (String next : theTheFilters) {
if (!next.contains("?")) {
throw new InvalidRequestException("Invalid " + JpaConstants.PARAM_EXPORT_TYPE_FILTER + " value \"" + next + "\". Must be in the form [ResourceType]?[params]");
}
String resourceType = next.substring(0, next.indexOf("?"));
if (!theResourceTypes.contains(resourceType)) {
throw new InvalidRequestException("Invalid " + JpaConstants.PARAM_EXPORT_TYPE_FILTER + " value \"" + next + "\". Resource type does not appear in " + JpaConstants.PARAM_EXPORT_TYPE + " list");
}
if (!types.add(resourceType)) {
throw new InvalidRequestException("Invalid " + JpaConstants.PARAM_EXPORT_TYPE_FILTER + " value \"" + next + "\". Multiple filters found for type " + resourceType);
throw new InvalidRequestException("Invalid " + JpaConstants.PARAM_EXPORT_TYPE_FILTER + " value \"" + next + "\". Resource type does not appear in " + JpaConstants.PARAM_EXPORT_TYPE+ " list");
}
}
}
@ -421,6 +458,35 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
return retVal;
}
@Override
public Set<String> getPatientCompartmentResources() {
if (myCompartmentResources == null) {
myCompartmentResources = myContext.getResourceTypes().stream()
.filter(this::resourceTypeIsInPatientCompartment)
.collect(Collectors.toSet());
}
return myCompartmentResources;
}
/**
* Return true if any search parameter in the resource can point at a patient, false otherwise
*/
private boolean resourceTypeIsInPatientCompartment(String theResourceType) {
RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(theResourceType);
List<RuntimeSearchParam> searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient");
return searchParams != null && searchParams.size() >= 1;
}
public Set<String> getAllowedResourceTypesForBulkExportStyle(BulkDataExportOptions.ExportStyle theExportStyle) {
if (theExportStyle.equals(SYSTEM)) {
return myContext.getResourceTypes();
} else if (theExportStyle.equals(GROUP) || theExportStyle.equals(PATIENT)) {
return getPatientCompartmentResources();
} else {
throw new IllegalArgumentException(String.format("HAPI FHIR does not recognize a Bulk Export request of type: %s", theExportStyle));
}
}
private IIdType toId(String theResourceId) {
IIdType retVal = myContext.getVersion().newIdType();
retVal.setValue(theResourceId);

View File

@ -43,7 +43,7 @@ public interface IBulkExportJobDao extends JpaRepository<BulkExportJobEntity, Lo
@Query("SELECT j FROM BulkExportJobEntity j WHERE j.myExpiry < :cutoff")
Slice<BulkExportJobEntity> findByExpiry(Pageable thePage, @Param("cutoff") Date theCutoff);
@Query("SELECT j FROM BulkExportJobEntity j WHERE j.myRequest = :request AND j.myCreated > :createdAfter AND j.myStatus <> :status")
@Query("SELECT j FROM BulkExportJobEntity j WHERE j.myRequest = :request AND j.myCreated > :createdAfter AND j.myStatus <> :status ORDER BY j.myCreated DESC")
Slice<BulkExportJobEntity> findExistingJob(Pageable thePage, @Param("request") String theRequest, @Param("createdAfter") Date theCreatedAfter, @Param("status") BulkJobStatusEnum theNotStatus);
@Modifying

View File

@ -3,7 +3,6 @@ package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.model.BulkExportResponseJson;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
@ -17,6 +16,7 @@ import ca.uhn.fhir.test.utilities.JettyUtil;
import ca.uhn.fhir.util.JsonUtil;
import ca.uhn.fhir.util.UrlUtil;
import com.google.common.base.Charsets;
import com.google.common.collect.Sets;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
@ -27,7 +27,6 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.InstantType;
import org.hl7.fhir.r4.model.Parameters;
@ -57,6 +56,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.times;
@ -79,7 +79,7 @@ public class BulkDataExportProviderTest {
@Captor
private ArgumentCaptor<BulkDataExportOptions> myBulkDataExportOptionsCaptor;
@Captor
private ArgumentCaptor<GroupBulkDataExportOptions> myGroupBulkDataExportOptionsCaptor;
private ArgumentCaptor<Boolean> myBooleanArgumentCaptor;
@AfterEach
public void after() throws Exception {
@ -116,7 +116,7 @@ public class BulkDataExportProviderTest {
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo()
.setJobId(A_JOB_ID);
when(myBulkDataExportSvc.submitJob(any())).thenReturn(jobInfo);
when(myBulkDataExportSvc.submitJob(any(), any())).thenReturn(jobInfo);
InstantType now = InstantType.now();
@ -140,7 +140,7 @@ public class BulkDataExportProviderTest {
assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
}
verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture());
verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), any());
BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue();
assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat());
assertThat(options.getResourceTypes(), containsInAnyOrder("Patient", "Practitioner"));
@ -153,7 +153,7 @@ public class BulkDataExportProviderTest {
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo()
.setJobId(A_JOB_ID);
when(myBulkDataExportSvc.submitJob(any())).thenReturn(jobInfo);
when(myBulkDataExportSvc.submitJob(any(),any())).thenReturn(jobInfo);
InstantType now = InstantType.now();
@ -174,7 +174,7 @@ public class BulkDataExportProviderTest {
assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
}
verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture());
verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), any());
BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue();
assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat());
assertThat(options.getResourceTypes(), containsInAnyOrder("Patient", "Practitioner"));
@ -267,7 +267,6 @@ public class BulkDataExportProviderTest {
assertEquals("Patient", responseJson.getOutput().get(2).getType());
assertEquals("http://localhost:" + myPort + "/Binary/333", responseJson.getOutput().get(2).getUrl());
}
}
@Test
@ -304,7 +303,8 @@ public class BulkDataExportProviderTest {
public void testSuccessfulInitiateGroupBulkRequest_Post() throws IOException {
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo().setJobId(G_JOB_ID);
when(myBulkDataExportSvc.submitJob(any())).thenReturn(jobInfo);
when(myBulkDataExportSvc.submitJob(any(),any())).thenReturn(jobInfo);
when(myBulkDataExportSvc.getPatientCompartmentResources()).thenReturn(Sets.newHashSet("Observation", "DiagnosticReport"));
InstantType now = InstantType.now();
@ -330,13 +330,207 @@ public class BulkDataExportProviderTest {
assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + G_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
}
verify(myBulkDataExportSvc, times(1)).submitJob(myGroupBulkDataExportOptionsCaptor.capture());
GroupBulkDataExportOptions options = myGroupBulkDataExportOptionsCaptor.getValue();
verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), any());
BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue();
assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat());
assertThat(options.getResourceTypes(), containsInAnyOrder("Observation", "DiagnosticReport"));
assertThat(options.getSince(), notNullValue());
assertThat(options.getFilters(), notNullValue());
assertEquals(GROUP_ID, options.getGroupId().getValue());
assertThat(options.isMdm(), is(equalTo(true)));
assertThat(options.isExpandMdm(), is(equalTo(true)));
}
@Test
public void testSuccessfulInitiateGroupBulkRequest_Get() throws IOException {
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo().setJobId(G_JOB_ID);
when(myBulkDataExportSvc.submitJob(any(), any())).thenReturn(jobInfo);
when(myBulkDataExportSvc.getPatientCompartmentResources()).thenReturn(Sets.newHashSet("Patient", "Practitioner"));
InstantType now = InstantType.now();
String url = "http://localhost:" + myPort + "/" + GROUP_ID + "/" + JpaConstants.OPERATION_EXPORT
+ "?" + JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT + "=" + UrlUtil.escapeUrlParam(Constants.CT_FHIR_NDJSON)
+ "&" + JpaConstants.PARAM_EXPORT_TYPE + "=" + UrlUtil.escapeUrlParam("Patient, Practitioner")
+ "&" + JpaConstants.PARAM_EXPORT_SINCE + "=" + UrlUtil.escapeUrlParam(now.getValueAsString())
+ "&" + JpaConstants.PARAM_EXPORT_TYPE_FILTER + "=" + UrlUtil.escapeUrlParam("Patient?identifier=foo|bar")
+ "&" + JpaConstants.PARAM_EXPORT_MDM+ "=true";
HttpGet get = new HttpGet(url);
get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
ourLog.info("Request: {}", url);
try (CloseableHttpResponse response = myClient.execute(get)) {
ourLog.info("Response: {}", response.toString());
assertEquals(202, response.getStatusLine().getStatusCode());
assertEquals("Accepted", response.getStatusLine().getReasonPhrase());
assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + G_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
}
verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), any());
BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue();
assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat());
assertThat(options.getResourceTypes(), containsInAnyOrder("Patient", "Practitioner"));
assertThat(options.getSince(), notNullValue());
assertThat(options.getFilters(), notNullValue());
assertEquals(GROUP_ID, options.getGroupId().getValue());
assertThat(options.isExpandMdm(), is(equalTo(true)));
}
@Test
public void testInitiateWithGetAndMultipleTypeFilters() throws IOException {
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo()
.setJobId(A_JOB_ID);
when(myBulkDataExportSvc.submitJob(any())).thenReturn(jobInfo);
InstantType now = InstantType.now();
String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT
+ "?" + JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT + "=" + UrlUtil.escapeUrlParam(Constants.CT_FHIR_NDJSON)
+ "&" + JpaConstants.PARAM_EXPORT_TYPE + "=" + UrlUtil.escapeUrlParam("Immunization, Observation")
+ "&" + JpaConstants.PARAM_EXPORT_SINCE + "=" + UrlUtil.escapeUrlParam(now.getValueAsString());
String immunizationTypeFilter1 = "Immunization?patient.identifier=SC378274-MRN|009999997,SC378274-MRN|009999998,SC378274-MRN|009999999&date=2020-01-02";
String immunizationTypeFilter2 = "Immunization?patient=Patient/123";
String observationFilter1 = "Observation?subject=Patient/123&created=ge2020-01-01";
StringBuilder multiValuedTypeFilterBuilder = new StringBuilder()
.append("&")
.append(JpaConstants.PARAM_EXPORT_TYPE_FILTER)
.append("=")
.append(UrlUtil.escapeUrlParam(immunizationTypeFilter1))
.append(",")
.append(UrlUtil.escapeUrlParam(immunizationTypeFilter2))
.append(",")
.append(UrlUtil.escapeUrlParam(observationFilter1));
url += multiValuedTypeFilterBuilder.toString();
HttpGet get = new HttpGet(url);
get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
myClient.execute(get);
verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), anyBoolean());
BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue();
assertThat(options.getFilters(), containsInAnyOrder(immunizationTypeFilter1, immunizationTypeFilter2, observationFilter1));
}
@Test
public void testInitiateGroupExportWithInvalidResourceTypesFails() throws IOException {
when (myBulkDataExportSvc.getPatientCompartmentResources()).thenReturn(Sets.newHashSet("Observation"));
String url = "http://localhost:" + myPort + "/" + "Group/123/" +JpaConstants.OPERATION_EXPORT
+ "?" + JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT + "=" + UrlUtil.escapeUrlParam(Constants.CT_FHIR_NDJSON)
+ "&" + JpaConstants.PARAM_EXPORT_TYPE + "=" + UrlUtil.escapeUrlParam("StructureDefinition,Observation");
HttpGet get = new HttpGet(url);
get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
CloseableHttpResponse execute = myClient.execute(get);
String responseBody = IOUtils.toString(execute.getEntity().getContent());
assertThat(execute.getStatusLine().getStatusCode(), is(equalTo(400)));
assertThat(responseBody, is(containsString("Resource types [StructureDefinition] are invalid for this type of export, as they do not contain search parameters that refer to patients.")));
}
@Test
public void testInitiateWithPostAndMultipleTypeFilters() throws IOException {
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo()
.setJobId(A_JOB_ID);
when(myBulkDataExportSvc.submitJob(any(), any())).thenReturn(jobInfo);
InstantType now = InstantType.now();
Parameters input = new Parameters();
input.addParameter(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, new StringType(Constants.CT_FHIR_NDJSON));
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE, new StringType("Patient"));
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE_FILTER, new StringType("Patient?gender=male,Patient?gender=female"));
ourLog.info(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(input));
HttpPost post = new HttpPost("http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT);
post.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
post.setEntity(new ResourceEntity(myCtx, input));
ourLog.info("Request: {}", post);
try (CloseableHttpResponse response = myClient.execute(post)) {
ourLog.info("Response: {}", response.toString());
assertEquals(202, response.getStatusLine().getStatusCode());
assertEquals("Accepted", response.getStatusLine().getReasonPhrase());
assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
}
verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), myBooleanArgumentCaptor.capture());
BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue();
assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat());
assertThat(options.getResourceTypes(), containsInAnyOrder("Patient"));
assertThat(options.getFilters(), containsInAnyOrder("Patient?gender=male", "Patient?gender=female"));
}
@Test
public void testInitiatePatientExportRequest() throws IOException {
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo()
.setJobId(A_JOB_ID);
when(myBulkDataExportSvc.submitJob(any(), any())).thenReturn(jobInfo);
when(myBulkDataExportSvc.getPatientCompartmentResources()).thenReturn(Sets.newHashSet("Immunization", "Observation"));
InstantType now = InstantType.now();
Parameters input = new Parameters();
input.addParameter(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, new StringType(Constants.CT_FHIR_NDJSON));
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE, new StringType("Immunization, Observation"));
input.addParameter(JpaConstants.PARAM_EXPORT_SINCE, now);
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE_FILTER, new StringType("Immunization?vaccine-code=foo"));
ourLog.info(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(input));
HttpPost post = new HttpPost("http://localhost:" + myPort + "/Patient/" + JpaConstants.OPERATION_EXPORT);
post.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
post.setEntity(new ResourceEntity(myCtx, input));
ourLog.info("Request: {}", post);
try (CloseableHttpResponse response = myClient.execute(post)) {
ourLog.info("Response: {}", response.toString());
assertEquals(202, response.getStatusLine().getStatusCode());
assertEquals("Accepted", response.getStatusLine().getReasonPhrase());
assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
}
verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), myBooleanArgumentCaptor.capture());
BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue();
assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat());
assertThat(options.getResourceTypes(), containsInAnyOrder("Immunization", "Observation"));
assertThat(options.getSince(), notNullValue());
assertThat(options.getFilters(), containsInAnyOrder("Immunization?vaccine-code=foo"));
}
@Test
public void testProviderProcessesNoCacheHeader() throws IOException {
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo()
.setJobId(A_JOB_ID);
when(myBulkDataExportSvc.submitJob(any(), anyBoolean())).thenReturn(jobInfo);
Parameters input = new Parameters();
input.addParameter(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, new StringType(Constants.CT_FHIR_NDJSON));
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE, new StringType("Patient, Practitioner"));
HttpPost post = new HttpPost("http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT);
post.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
post.addHeader(Constants.HEADER_CACHE_CONTROL, Constants.CACHE_CONTROL_NO_CACHE);
post.setEntity(new ResourceEntity(myCtx, input));
ourLog.info("Request: {}", post);
try (CloseableHttpResponse response = myClient.execute(post)) {
ourLog.info("Response: {}", response.toString());
assertEquals(202, response.getStatusLine().getStatusCode());
assertEquals("Accepted", response.getStatusLine().getReasonPhrase());
assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
}
verify(myBulkDataExportSvc).submitJob(myBulkDataExportOptionsCaptor.capture(), myBooleanArgumentCaptor.capture());
Boolean usedCache = myBooleanArgumentCaptor.getValue();
assertThat(usedCache, is(equalTo(false)));
}
}

View File

@ -2,11 +2,11 @@ package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.job.BulkExportJobParametersBuilder;
import ca.uhn.fhir.jpa.bulk.job.GroupBulkExportJobParametersBuilder;
@ -61,6 +61,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.containsString;
@ -70,6 +71,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@ -97,8 +99,20 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Qualifier(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME)
private Job myGroupBulkJob;
@Autowired
@Qualifier(BatchJobsConfig.PATIENT_BULK_EXPORT_JOB_NAME)
private Job myPatientBulkJob;
private IIdType myPatientGroupId;
@Override
public void beforeFlushFT() {
super.beforeFlushFT();
//This is needed for patient level export.
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED);
}
@Test
public void testPurgeExpiredJobs() {
@ -155,7 +169,11 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test
public void testSubmit_InvalidOutputFormat() {
try {
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Patient", "Observation"), null, null));
BulkDataExportOptions options = new BulkDataExportOptions();
options.setOutputFormat(Constants.CT_FHIR_JSON_NEW);
options.setResourceTypes(Sets.newHashSet("Patient", "Observation"));
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
myBulkDataExportSvc.submitJob(options);
fail();
} catch (InvalidRequestException e) {
assertEquals("Invalid output format: application/fhir+json", e.getMessage());
@ -165,37 +183,34 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test
public void testSubmit_OnlyBinarySelected() {
try {
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Binary"), null, null));
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Binary"));
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
myBulkDataExportSvc.submitJob(options);
fail();
} catch (InvalidRequestException e) {
assertEquals("Invalid output format: application/fhir+json", e.getMessage());
assertEquals("Binary resources may not be exported with bulk export", e.getMessage());
}
}
@Test
public void testSubmit_InvalidResourceTypes() {
try {
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient", "FOO"), null, null));
myBulkDataExportSvc.submitJob(buildBulkDataForResourceTypes(Sets.newHashSet("Patient", "FOO")));
fail();
} catch (InvalidRequestException e) {
assertEquals("Unknown or unsupported resource type: FOO", e.getMessage());
}
}
@Test
public void testSubmit_MultipleTypeFiltersForSameType() {
try {
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Patient?name=a", "Patient?active=true")));
fail();
} catch (InvalidRequestException e) {
assertEquals("Invalid _typeFilter value \"Patient?name=a\". Multiple filters found for type Patient", e.getMessage());
}
}
@Test
public void testSubmit_TypeFilterForNonSelectedType() {
try {
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Observation?code=123")));
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Patient"));
options.setFilters(Sets.newHashSet("Observation?code=123"));
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
myBulkDataExportSvc.submitJob(options);
fail();
} catch (InvalidRequestException e) {
assertEquals("Invalid _typeFilter value \"Observation?code=123\". Resource type does not appear in _type list", e.getMessage());
@ -205,22 +220,32 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test
public void testSubmit_TypeFilterInvalid() {
try {
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Hello")));
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Patient"));
options.setFilters(Sets.newHashSet("Hello"));
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
myBulkDataExportSvc.submitJob(options);
fail();
} catch (InvalidRequestException e) {
assertEquals("Invalid _typeFilter value \"Hello\". Must be in the form [ResourceType]?[params]", e.getMessage());
}
}
private BulkDataExportOptions buildBulkDataForResourceTypes(Set<String> resourceTypes) {
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(resourceTypes);
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
return options;
}
@Test
public void testSubmit_ReusesExisting() {
// Submit
IBulkDataExportSvc.JobInfo jobDetails1 = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null));
IBulkDataExportSvc.JobInfo jobDetails1 = myBulkDataExportSvc.submitJob(buildBulkDataForResourceTypes(Sets.newHashSet("Patient", "Observation")));
assertNotNull(jobDetails1.getJobId());
// Submit again
IBulkDataExportSvc.JobInfo jobDetails2 = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null));
IBulkDataExportSvc.JobInfo jobDetails2 = myBulkDataExportSvc.submitJob(buildBulkDataForResourceTypes(Sets.newHashSet("Patient", "Observation")));
assertNotNull(jobDetails2.getJobId());
assertEquals(jobDetails1.getJobId(), jobDetails2.getJobId());
@ -241,7 +266,10 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient"), null, null));
BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Patient"));
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
assertNotNull(jobDetails.getJobId());
// Check the status
@ -271,7 +299,12 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, Sets.newHashSet(TEST_FILTER)));
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Patient", "Observation"));
options.setFilters(Sets.newHashSet(TEST_FILTER));
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options);
assertNotNull(jobDetails.getJobId());
// Check the status
@ -300,7 +333,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertEquals(7, nextContents.split("\n").length); // Only female patients
} else if ("Observation".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"subject\":{\"reference\":\"Patient/PAT0\"}}\n"));
assertEquals(16, nextContents.split("\n").length);
assertEquals(26, nextContents.split("\n").length);
} else {
fail(next.getResourceType());
}
@ -324,7 +357,9 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
myBinaryDao.create(b);
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, null, null, null));
BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
assertNotNull(jobDetails.getJobId());
// Check the status
@ -353,10 +388,10 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertEquals(17, nextContents.split("\n").length);
} else if ("Observation".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"subject\":{\"reference\":\"Patient/PAT0\"}}\n"));
assertEquals(16, nextContents.split("\n").length);
assertEquals(26, nextContents.split("\n").length);
}else if ("Immunization".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"patient\":{\"reference\":\"Patient/PAT0\"}}\n"));
assertEquals(16, nextContents.split("\n").length);
assertEquals(26, nextContents.split("\n").length);
} else if ("CareTeam".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"id\":\"CT0\""));
assertEquals(16, nextContents.split("\n").length);
@ -378,7 +413,11 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
// Create a bulk job
HashSet<String> types = Sets.newHashSet("Patient");
Set<String> typeFilters = Sets.newHashSet("Patient?_has:Observation:patient:identifier=SYS|VAL3");
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, types, null, typeFilters));
BulkDataExportOptions options = new BulkDataExportOptions();
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
options.setResourceTypes(types);
options.setFilters(typeFilters);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options);
assertNotNull(jobDetails.getJobId());
// Check the status
@ -430,7 +469,12 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
}
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), cutoff.getValue(), null));
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Patient", "Observation"));
options.setSince(cutoff.getValue());
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options);
assertNotNull(jobDetails.getJobId());
// Check the status
@ -478,6 +522,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
paramBuilder
.setReadChunkSize(100L)
.setOutputFormat(Constants.CT_FHIR_NDJSON)
.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM)
.setResourceTypes(Arrays.asList("Patient", "Observation"));
JobExecution jobExecution = myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
@ -492,6 +537,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
public void awaitAllBulkJobCompletions() {
List<JobInstance> bulkExport = myJobExplorer.findJobInstancesByJobName(BatchJobsConfig.BULK_EXPORT_JOB_NAME, 0, 100);
bulkExport.addAll(myJobExplorer.findJobInstancesByJobName(BatchJobsConfig.PATIENT_BULK_EXPORT_JOB_NAME, 0, 100));
bulkExport.addAll(myJobExplorer.findJobInstancesByJobName(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME, 0, 100));
if (bulkExport.isEmpty()) {
fail("There are no bulk export jobs running!");
@ -509,7 +555,10 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null));
BulkDataExportOptions options = new BulkDataExportOptions();
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
options.setResourceTypes(Sets.newHashSet("Patient", "Observation"));
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options);
//Add the UUID to the job
BulkExportJobParametersBuilder paramBuilder = new BulkExportJobParametersBuilder()
@ -531,17 +580,20 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("Immunization"), null, null, myPatientGroupId, true));
BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setOutputFormat(null);
bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Immunization"));
bulkDataExportOptions.setSince(null);
bulkDataExportOptions.setFilters(null);
bulkDataExportOptions.setGroupId(myPatientGroupId);
bulkDataExportOptions.setExpandMdm(true);
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
GroupBulkExportJobParametersBuilder paramBuilder = new GroupBulkExportJobParametersBuilder();
paramBuilder.setGroupId(myPatientGroupId.getIdPart());
paramBuilder.setJobUUID(jobDetails.getJobId());
paramBuilder.setReadChunkSize(10L);
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
JobExecution jobExecution = myBatchJobSubmitter.runJob(myGroupBulkJob, paramBuilder.toJobParameters());
awaitJobCompletion(jobExecution);
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
@ -549,10 +601,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
// Iterate over the files
Binary nextBinary = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents);
String nextContents = getBinaryContents(jobInfo, 0);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
assertThat(nextContents, is(containsString("IMM0")));
@ -562,33 +611,83 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(nextContents, is(containsString("IMM8")));
}
// CareTeam has two patient references: participant and patient. This test checks if we find the patient if participant is null but patient is not null
@Test
public void testGroupBatchJobCareTeam() throws Exception {
public void testPatientLevelExportWorks() throws JobParametersInvalidException {
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED);
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("CareTeam"), null, null, myPatientGroupId, true));
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Immunization", "Observation"));
options.setExportStyle(BulkDataExportOptions.ExportStyle.PATIENT);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options);
GroupBulkExportJobParametersBuilder paramBuilder = new GroupBulkExportJobParametersBuilder();
paramBuilder.setGroupId(myPatientGroupId.getIdPart());
paramBuilder.setJobUUID(jobDetails.getJobId());
paramBuilder.setReadChunkSize(10L);
JobExecution jobExecution = myBatchJobSubmitter.runJob(myGroupBulkJob, paramBuilder.toJobParameters());
JobExecution jobExecution = myBatchJobSubmitter.runJob(myPatientBulkJob, paramBuilder.toJobParameters());
awaitJobCompletion(jobExecution);
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(2));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
// Iterate over the files
String nextContents = getBinaryContents(jobInfo, 0);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
assertThat(nextContents, is(containsString("IMM0")));
assertThat(nextContents, is(containsString("IMM1")));
assertThat(nextContents, is(containsString("IMM2")));
assertThat(nextContents, is(containsString("IMM3")));
assertThat(nextContents, is(containsString("IMM4")));
assertThat(nextContents, is(containsString("IMM5")));
assertThat(nextContents, is(containsString("IMM6")));
assertThat(nextContents, is(containsString("IMM7")));
assertThat(nextContents, is(containsString("IMM8")));
assertThat(nextContents, is(containsString("IMM9")));
assertThat(nextContents, is(containsString("IMM999")));
assertThat(nextContents, is(not(containsString("IMM2000"))));
assertThat(nextContents, is(not(containsString("IMM2001"))));
assertThat(nextContents, is(not(containsString("IMM2002"))));
assertThat(nextContents, is(not(containsString("IMM2003"))));
assertThat(nextContents, is(not(containsString("IMM2004"))));
assertThat(nextContents, is(not(containsString("IMM2005"))));
}
// CareTeam has two patient references: participant and patient. This test checks if we find the patient if participant is null but patient is not null
@Test
public void testGroupBatchJobCareTeam() throws Exception {
createResources();
BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setOutputFormat(null);
bulkDataExportOptions.setResourceTypes(Sets.newHashSet("CareTeam"));
bulkDataExportOptions.setSince(null);
bulkDataExportOptions.setFilters(null);
bulkDataExportOptions.setGroupId(myPatientGroupId);
bulkDataExportOptions.setExpandMdm(true);
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(1));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("CareTeam")));
// Iterate over the files
Binary nextBinary = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents);
String nextContents = getBinaryContents(jobInfo, 0);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("CareTeam")));
assertThat(nextContents, is(containsString("CT0")));
@ -604,19 +703,102 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
JobParametersBuilder paramBuilder = new JobParametersBuilder().addString("jobUUID", "I'm not real!");
try {
myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
fail("Should have had invalid parameter execption!");
fail("Should have had invalid parameter exception!");
} catch (JobParametersInvalidException e) {
// good
}
}
@Test
public void testSystemExportWithMultipleTypeFilters() {
createResources();
// Create a bulk job
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Immunization"));
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
options.setFilters(Sets.newHashSet("Immunization?vaccine-code=Flu", "Immunization?patient=Patient/PAT1"));
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options);
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(1));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
// Iterate over the files
String nextContents = getBinaryContents(jobInfo, 0);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
//These are the COVID-19 entries
assertThat(nextContents, is(containsString("IMM0")));
assertThat(nextContents, is(containsString("IMM2")));
assertThat(nextContents, is(containsString("IMM4")));
assertThat(nextContents, is(containsString("IMM6")));
assertThat(nextContents, is(containsString("IMM8")));
//This is the entry for the one referencing patient/1
assertThat(nextContents, is(containsString("IMM1")));
}
@Test
public void testGroupExportWithMultipleTypeFilters() {
createResources();
// Create a bulk job
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Observation"));
options.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
options.setGroupId(myPatientGroupId);
options.setExpandMdm(false);
options.setFilters(Sets.newHashSet("Observation?identifier=VAL0,VAL2", "Observation?identifier=VAL4"));
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options);
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(1));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Observation")));
String nextContents = getBinaryContents(jobInfo, 0);
//These are the Observation entries
assertThat(nextContents, is(containsString("OBS0")));
assertThat(nextContents, is(containsString("OBS2")));
assertThat(nextContents, is(containsString("OBS4")));
assertEquals(3, nextContents.split("\n").length);
}
public String getBinaryContents(IBulkDataExportSvc.JobInfo theJobInfo, int theIndex) {
// Iterate over the files
Binary nextBinary = myBinaryDao.read(theJobInfo.getFiles().get(theIndex).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents);
return nextContents;
}
@Test
public void testMdmExpansionSuccessfullyExtractsPatients() throws JobParametersInvalidException {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("Patient"), null, null, myPatientGroupId, true));
BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setOutputFormat(null);
bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Patient"));
bulkDataExportOptions.setSince(null);
bulkDataExportOptions.setFilters(null);
bulkDataExportOptions.setGroupId(myPatientGroupId);
bulkDataExportOptions.setExpandMdm(true);
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
@ -626,10 +808,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(jobInfo.getFiles().size(), equalTo(1));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Patient")));
Binary patientExportContent = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, patientExportContent.getContentType());
String nextContents = new String(patientExportContent.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", patientExportContent.getResourceType(), nextContents);
String nextContents = getBinaryContents(jobInfo, 0);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Patient")));
//Output contains The entire group, plus the Mdm expansion, plus the golden resource
@ -641,23 +820,28 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("Immunization", "Observation"), null, null, myPatientGroupId, true));
BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setOutputFormat(null);
bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Immunization", "Observation"));
bulkDataExportOptions.setSince(null);
bulkDataExportOptions.setFilters(null);
bulkDataExportOptions.setGroupId(myPatientGroupId);
bulkDataExportOptions.setExpandMdm(true);
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Observation,Immunization&_groupId=" + myPatientGroupId +"&_mdm=true", jobInfo.getRequest());
assertEquals("/Group/G0/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Observation,Immunization&_groupId=" + myPatientGroupId +"&_mdm=true", jobInfo.getRequest());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(2));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
// Check immunization Content
Binary immunizationExportContent = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, immunizationExportContent.getContentType());
String nextContents = new String(immunizationExportContent.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", immunizationExportContent.getResourceType(), nextContents);
String nextContents = getBinaryContents(jobInfo, 0);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
assertThat(nextContents, is(containsString("IMM0")));
assertThat(nextContents, is(containsString("IMM2")));
@ -697,23 +881,24 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test
public void testGroupBulkExportSupportsTypeFilters() throws JobParametersInvalidException {
createResources();
Set<String> filters = new HashSet<>();
//Only get COVID-19 vaccinations
Set<String> filters = new HashSet<>();
filters.add("Immunization?vaccine-code=vaccines|COVID-19");
GroupBulkDataExportOptions groupBulkDataExportOptions = new GroupBulkDataExportOptions(null, Sets.newHashSet("Immunization"), null, filters, myPatientGroupId, true );
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(groupBulkDataExportOptions);
BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setOutputFormat(null);
bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Immunization"));
bulkDataExportOptions.setSince(null);
bulkDataExportOptions.setFilters(filters);
bulkDataExportOptions.setGroupId(myPatientGroupId);
bulkDataExportOptions.setExpandMdm(true);
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
GroupBulkExportJobParametersBuilder paramBuilder = new GroupBulkExportJobParametersBuilder();
paramBuilder.setGroupId(myPatientGroupId.getIdPart());
paramBuilder.setMdm(true);
paramBuilder.setJobUUID(jobDetails.getJobId());
paramBuilder.setReadChunkSize(10L);
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
JobExecution jobExecution = myBatchJobSubmitter.runJob(myGroupBulkJob, paramBuilder.toJobParameters());
awaitJobCompletion(jobExecution);
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
@ -721,12 +906,8 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
// Check immunization Content
Binary immunizationExportContent = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, immunizationExportContent.getContentType());
String nextContents = new String(immunizationExportContent.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", immunizationExportContent.getResourceType(), nextContents);
String nextContents = getBinaryContents(jobInfo, 0);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
assertThat(nextContents, is(containsString("IMM1")));
assertThat(nextContents, is(containsString("IMM3")));
assertThat(nextContents, is(containsString("IMM5")));
@ -737,6 +918,82 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(nextContents, is(not(containsString("Flu"))));
}
@Test
public void testAllExportStylesWorkWithNullResourceTypes() {
createResources();
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED);
// Create a bulk job
BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setOutputFormat(null);
bulkDataExportOptions.setResourceTypes(null);
bulkDataExportOptions.setSince(null);
bulkDataExportOptions.setFilters(null);
bulkDataExportOptions.setGroupId(myPatientGroupId);
bulkDataExportOptions.setExpandMdm(true);
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.PATIENT);
//Patient-style
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), is(equalTo(BulkJobStatusEnum.COMPLETE)));
//Group-style
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
bulkDataExportOptions.setGroupId(myPatientGroupId);
jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), is(equalTo(BulkJobStatusEnum.COMPLETE)));
//System-style
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), is(equalTo(BulkJobStatusEnum.COMPLETE)));
}
@Test
public void testCacheSettingIsRespectedWhenCreatingNewJobs() {
BulkDataExportOptions options = new BulkDataExportOptions();
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
options.setResourceTypes(Sets.newHashSet("Procedure"));
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.submitJob(options, true);
IBulkDataExportSvc.JobInfo jobInfo1 = myBulkDataExportSvc.submitJob(options, true);
IBulkDataExportSvc.JobInfo jobInfo2 = myBulkDataExportSvc.submitJob(options, true);
IBulkDataExportSvc.JobInfo jobInfo3 = myBulkDataExportSvc.submitJob(options, true);
IBulkDataExportSvc.JobInfo jobInfo4 = myBulkDataExportSvc.submitJob(options, true);
//Cached should have all identical Job IDs.
String initialJobId = jobInfo.getJobId();
boolean allMatch = Stream.of(jobInfo, jobInfo1, jobInfo2, jobInfo3, jobInfo4).allMatch(job -> job.getJobId().equals(initialJobId));
assertTrue(allMatch);
IBulkDataExportSvc.JobInfo jobInfo5 = myBulkDataExportSvc.submitJob(options, false);
IBulkDataExportSvc.JobInfo jobInfo6 = myBulkDataExportSvc.submitJob(options, false);
IBulkDataExportSvc.JobInfo jobInfo7 = myBulkDataExportSvc.submitJob(options, false);
IBulkDataExportSvc.JobInfo jobInfo8 = myBulkDataExportSvc.submitJob(options, false);
IBulkDataExportSvc.JobInfo jobInfo9 = myBulkDataExportSvc.submitJob(options, false);
//First non-cached should retrieve new ID.
assertThat(initialJobId, is(not(equalTo(jobInfo5.getJobId()))));
//Non-cached should all have unique IDs
List<String> jobIds = Stream.of(jobInfo5, jobInfo6, jobInfo7, jobInfo8, jobInfo9).map(IBulkDataExportSvc.JobInfo::getJobId).collect(Collectors.toList());
ourLog.info("ZOOP {}", String.join(", ", jobIds));
Set<String> uniqueJobIds = new HashSet<>(jobIds);
assertEquals(uniqueJobIds.size(), jobIds.size());
//Now if we create another one and ask for the cache, we should get the most-recently-insert entry.
IBulkDataExportSvc.JobInfo jobInfo10 = myBulkDataExportSvc.submitJob(options, true);
assertThat(jobInfo10.getJobId(), is(equalTo(jobInfo9.getJobId())));
}
private void awaitJobCompletion(JobExecution theJobExecution) {
await().atMost(120, TimeUnit.SECONDS).until(() -> {
JobExecution jobExecution = myJobExplorer.getJobExecution(theJobExecution.getId());
@ -760,8 +1017,6 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createImmunizationWithIndex(999, g1Outcome.getId());
createCareTeamWithIndex(999, g1Outcome.getId());
//Lets create an observation and an immunization for our golden patient.
for (int i = 0; i < 10; i++) {
DaoMethodOutcome patientOutcome = createPatientWithIndex(i);
IIdType patId = patientOutcome.getId().toUnqualifiedVersionless();
@ -780,6 +1035,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createImmunizationWithIndex(i, patId);
createCareTeamWithIndex(i, patId);
}
myPatientGroupId = myGroupDao.update(group).getId();
//Manually create another golden record
@ -789,14 +1045,22 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
Long goldenPid2 = myIdHelperService.getPidOrNull(g2Outcome.getResource());
//Create some nongroup patients MDM linked to a different golden resource. They shouldnt be included in the query.
for (int i = 1000; i < 1005; i++) {
DaoMethodOutcome patientOutcome = createPatientWithIndex(i);
for (int i = 0; i < 5; i++) {
int index = 1000 + i;
DaoMethodOutcome patientOutcome = createPatientWithIndex(index);
IIdType patId = patientOutcome.getId().toUnqualifiedVersionless();
Long sourcePid = myIdHelperService.getPidOrNull(patientOutcome.getResource());
linkToGoldenResource(goldenPid2, sourcePid);
createObservationWithIndex(i, patId);
createImmunizationWithIndex(i, patId);
createCareTeamWithIndex(i, patId);
createObservationWithIndex(index, patId);
createImmunizationWithIndex(index, patId);
createCareTeamWithIndex(index, patId);
}
//Create some Observations and immunizations which have _no subjects!_ These will be exlucded from the Patient level export.
for (int i = 0; i < 10; i++) {
int index = 2000 + i;
createObservationWithIndex(index, null);
createImmunizationWithIndex(index, null);
}
}
@ -820,7 +1084,9 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
private void createImmunizationWithIndex(int i, IIdType patId) {
Immunization immunization = new Immunization();
immunization.setId("IMM" + i);
immunization.setPatient(new Reference(patId));
if (patId != null ) {
immunization.setPatient(new Reference(patId));
}
if (i % 2 == 0) {
CodeableConcept cc = new CodeableConcept();
cc.addCoding().setSystem("vaccines").setCode("Flu");
@ -838,7 +1104,9 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
obs.setId("OBS" + i);
obs.addIdentifier().setSystem("SYS").setValue("VAL" + i);
obs.setStatus(Observation.ObservationStatus.FINAL);
obs.getSubject().setReference(patId.getValue());
if (patId != null) {
obs.getSubject().setReference(patId.getValue());
}
myObservationDao.update(obs);
}

View File

@ -162,6 +162,7 @@ import org.hl7.fhir.r4.model.ValueSet;
import org.hl7.fhir.r5.utils.IResourceValidator;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
@ -539,7 +540,6 @@ public abstract class BaseJpaR4Test extends BaseJpaTest implements ITestDataBuil
runInTransaction(() -> {
SearchSession searchSession = Search.session(myEntityManager);
searchSession.workspace(ResourceTable.class).purge();
// searchSession.workspace(ResourceIndexedSearchParamString.class).purge();
searchSession.indexingPlan().execute();
});