Merge branch 'master' into ng_validation_s13n_interceptors

This commit is contained in:
Nick Goupinets 2021-03-12 10:30:36 -05:00
commit f9f0b2705d
24 changed files with 1192 additions and 364 deletions

View File

@ -56,6 +56,10 @@ public abstract class BaseParamWithPrefix<T extends BaseParam> extends BaseParam
offset++; offset++;
} }
if (offset > 0 && theString.length() == offset) {
throw new DataFormatException("Invalid date/time format: \"" + theString + "\"");
}
String prefix = theString.substring(0, offset); String prefix = theString.substring(0, offset);
if (!isBlank(prefix)) { if (!isBlank(prefix)) {

View File

@ -50,6 +50,7 @@ public class SearchParameterUtil {
return retVal; return retVal;
} }
@Nullable @Nullable
public static String getCode(FhirContext theContext, IBaseResource theResource) { public static String getCode(FhirContext theContext, IBaseResource theResource) {
return getStringChild(theContext, theResource, "code"); return getStringChild(theContext, theResource, "code");

View File

@ -1,6 +1,7 @@
package ca.uhn.fhir.rest.param; package ca.uhn.fhir.rest.param;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.api.QualifiedParamList; import ca.uhn.fhir.rest.api.QualifiedParamList;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -10,6 +11,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class DateRangeParamTest { public class DateRangeParamTest {
private FhirContext fhirContext; private FhirContext fhirContext;
@ -19,7 +21,9 @@ public class DateRangeParamTest {
fhirContext = Mockito.mock(FhirContext.class); fhirContext = Mockito.mock(FhirContext.class);
} }
/** Can happen e.g. when the query parameter for {@code _lastUpdated} is left empty. */ /**
* Can happen e.g. when the query parameter for {@code _lastUpdated} is left empty.
*/
@Test @Test
public void testParamWithoutPrefixAndWithoutValue() { public void testParamWithoutPrefixAndWithoutValue() {
QualifiedParamList qualifiedParamList = new QualifiedParamList(1); QualifiedParamList qualifiedParamList = new QualifiedParamList(1);
@ -33,7 +37,9 @@ public class DateRangeParamTest {
assertTrue(dateRangeParam.isEmpty()); assertTrue(dateRangeParam.isEmpty());
} }
/** Can happen e.g. when the query parameter for {@code _lastUpdated} is given as {@code lt} without any value. */ /**
* Can happen e.g. when the query parameter for {@code _lastUpdated} is given as {@code lt} without any value.
*/
@Test @Test
public void testUpperBoundWithPrefixWithoutValue() { public void testUpperBoundWithPrefixWithoutValue() {
QualifiedParamList qualifiedParamList = new QualifiedParamList(1); QualifiedParamList qualifiedParamList = new QualifiedParamList(1);
@ -42,12 +48,17 @@ public class DateRangeParamTest {
List<QualifiedParamList> params = new ArrayList<>(1); List<QualifiedParamList> params = new ArrayList<>(1);
params.add(qualifiedParamList); params.add(qualifiedParamList);
DateRangeParam dateRangeParam = new DateRangeParam(); DateRangeParam dateRangeParam = new DateRangeParam();
dateRangeParam.setValuesAsQueryTokens(fhirContext, "_lastUpdated", params); try {
dateRangeParam.setValuesAsQueryTokens(fhirContext, "_lastUpdated", params);
assertTrue(dateRangeParam.isEmpty()); fail();
} catch (DataFormatException e) {
// good
}
} }
/** Can happen e.g. when the query parameter for {@code _lastUpdated} is given as {@code gt} without any value. */ /**
* Can happen e.g. when the query parameter for {@code _lastUpdated} is given as {@code gt} without any value.
*/
@Test @Test
public void testLowerBoundWithPrefixWithoutValue() { public void testLowerBoundWithPrefixWithoutValue() {
QualifiedParamList qualifiedParamList = new QualifiedParamList(1); QualifiedParamList qualifiedParamList = new QualifiedParamList(1);
@ -56,8 +67,11 @@ public class DateRangeParamTest {
List<QualifiedParamList> params = new ArrayList<>(1); List<QualifiedParamList> params = new ArrayList<>(1);
params.add(qualifiedParamList); params.add(qualifiedParamList);
DateRangeParam dateRangeParam = new DateRangeParam(); DateRangeParam dateRangeParam = new DateRangeParam();
dateRangeParam.setValuesAsQueryTokens(fhirContext, "_lastUpdated", params); try {
dateRangeParam.setValuesAsQueryTokens(fhirContext, "_lastUpdated", params);
assertTrue(dateRangeParam.isEmpty()); fail();
} catch (DataFormatException e) {
// good
}
} }
} }

View File

@ -0,0 +1,5 @@
---
type: add
issue: 2433
title: "Support has been added for MDM expansion during Group bulk export. Calling a group export via `/Group/123/$export?_mdm=true`
will cause Bulk Export to not only match group members, but also any MDM-matched patients, and their related golden record patients"

View File

@ -0,0 +1,4 @@
---
type: add
issue: 2445
title: "Support has been added for patient level Bulk export. This can be done via the `/Patient/$export` endpoint. Also, support has been added for setting Cache-Control header to no-cache for Bulk Export requests."

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 2466
title: "When performainfg a search using a date search parameter, invalid values (e.g. `date=foo`) resulted
in a confusing error message. This has been improved."

View File

@ -33,4 +33,5 @@ import org.springframework.context.annotation.Import;
public class BatchJobsConfig { public class BatchJobsConfig {
public static final String BULK_EXPORT_JOB_NAME = "bulkExportJob"; public static final String BULK_EXPORT_JOB_NAME = "bulkExportJob";
public static final String GROUP_BULK_EXPORT_JOB_NAME = "groupBulkExportJob"; public static final String GROUP_BULK_EXPORT_JOB_NAME = "groupBulkExportJob";
public static final String PATIENT_BULK_EXPORT_JOB_NAME = "patientBulkExportJob";
} }

View File

@ -20,22 +20,55 @@ package ca.uhn.fhir.jpa.bulk.api;
* #L% * #L%
*/ */
import org.hl7.fhir.instance.model.api.IIdType;
import java.util.Date; import java.util.Date;
import java.util.Set; import java.util.Set;
public class BulkDataExportOptions { public class BulkDataExportOptions {
private final String myOutputFormat; public BulkDataExportOptions() {
private final Set<String> myResourceTypes;
private final Date mySince;
private final Set<String> myFilters;
public BulkDataExportOptions(String theOutputFormat, Set<String> theResourceTypes, Date theSince, Set<String> theFilters) { }
public enum ExportStyle {
PATIENT,
GROUP,
SYSTEM
}
private String myOutputFormat;
private Set<String> myResourceTypes;
private Date mySince;
private Set<String> myFilters;
private ExportStyle myExportStyle;
private boolean myExpandMdm;
private IIdType myGroupId;
public void setOutputFormat(String theOutputFormat) {
myOutputFormat = theOutputFormat; myOutputFormat = theOutputFormat;
}
public void setResourceTypes(Set<String> theResourceTypes) {
myResourceTypes = theResourceTypes; myResourceTypes = theResourceTypes;
}
public void setSince(Date theSince) {
mySince = theSince; mySince = theSince;
}
public void setFilters(Set<String> theFilters) {
myFilters = theFilters; myFilters = theFilters;
} }
public ExportStyle getExportStyle() {
return myExportStyle;
}
public void setExportStyle(ExportStyle theExportStyle) {
myExportStyle = theExportStyle;
}
public String getOutputFormat() { public String getOutputFormat() {
return myOutputFormat; return myOutputFormat;
} }
@ -51,4 +84,20 @@ public class BulkDataExportOptions {
public Set<String> getFilters() { public Set<String> getFilters() {
return myFilters; return myFilters;
} }
public boolean isExpandMdm() {
return myExpandMdm;
}
public void setExpandMdm(boolean theExpandMdm) {
myExpandMdm = theExpandMdm;
}
public IIdType getGroupId() {
return myGroupId;
}
public void setGroupId(IIdType theGroupId) {
myGroupId = theGroupId;
}
} }

View File

@ -1,45 +0,0 @@
package ca.uhn.fhir.jpa.bulk.api;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2021 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.hl7.fhir.instance.model.api.IIdType;
import java.util.Date;
import java.util.Set;
public class GroupBulkDataExportOptions extends BulkDataExportOptions {
private final IIdType myGroupId;
private final boolean myMdm;
public GroupBulkDataExportOptions(String theOutputFormat, Set<String> theResourceTypes, Date theSince, Set<String> theFilters, IIdType theGroupId, boolean theMdm) {
super(theOutputFormat, theResourceTypes, theSince, theFilters);
myGroupId = theGroupId;
myMdm = theMdm;
}
public IIdType getGroupId() {
return myGroupId;
}
public boolean isMdm() {
return myMdm;
}
}

View File

@ -27,6 +27,7 @@ import javax.transaction.Transactional;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Set;
public interface IBulkDataExportSvc { public interface IBulkDataExportSvc {
void buildExportFiles(); void buildExportFiles();
@ -36,8 +37,15 @@ public interface IBulkDataExportSvc {
JobInfo submitJob(BulkDataExportOptions theBulkDataExportOptions); JobInfo submitJob(BulkDataExportOptions theBulkDataExportOptions);
JobInfo submitJob(BulkDataExportOptions theBulkDataExportOptions, Boolean useCache);
JobInfo getJobInfoOrThrowResourceNotFound(String theJobId); JobInfo getJobInfoOrThrowResourceNotFound(String theJobId);
/**
* Return a set of all resource types which contain search parameters which have Patient as a target.
*/
Set<String> getPatientCompartmentResources();
void cancelAndPurgeAllJobs(); void cancelAndPurgeAllJobs();
class JobInfo { class JobInfo {

View File

@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.bulk.job;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.batch.log.Logs; import ca.uhn.fhir.jpa.batch.log.Logs;
@ -29,6 +30,7 @@ import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService; import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
@ -43,10 +45,12 @@ import org.springframework.beans.factory.annotation.Value;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors;
public abstract class BaseBulkItemReader implements ItemReader<List<ResourcePersistentId>> { public abstract class BaseBulkItemReader implements ItemReader<List<ResourcePersistentId>> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@ -73,6 +77,7 @@ public abstract class BaseBulkItemReader implements ItemReader<List<ResourcePers
private RuntimeResourceDefinition myResourceDefinition; private RuntimeResourceDefinition myResourceDefinition;
private Iterator<ResourcePersistentId> myPidIterator; private Iterator<ResourcePersistentId> myPidIterator;
private RuntimeSearchParam myPatientSearchParam;
/** /**
* Get and cache an ISearchBuilder for the given resource type this partition is responsible for. * Get and cache an ISearchBuilder for the given resource type this partition is responsible for.
@ -98,24 +103,40 @@ public abstract class BaseBulkItemReader implements ItemReader<List<ResourcePers
abstract Iterator<ResourcePersistentId> getResourcePidIterator(); abstract Iterator<ResourcePersistentId> getResourcePidIterator();
protected SearchParameterMap createSearchParameterMapForJob() { protected List<SearchParameterMap> createSearchParameterMapsForResourceType() {
BulkExportJobEntity jobEntity = getJobEntity(); BulkExportJobEntity jobEntity = getJobEntity();
RuntimeResourceDefinition theDef = getResourceDefinition(); RuntimeResourceDefinition theDef = getResourceDefinition();
SearchParameterMap map = new SearchParameterMap();
Map<String, String[]> requestUrl = UrlUtil.parseQueryStrings(jobEntity.getRequest()); Map<String, String[]> requestUrl = UrlUtil.parseQueryStrings(jobEntity.getRequest());
String[] typeFilters = requestUrl.get(JpaConstants.PARAM_EXPORT_TYPE_FILTER); String[] typeFilters = requestUrl.get(JpaConstants.PARAM_EXPORT_TYPE_FILTER);
List<SearchParameterMap> spMaps = null;
if (typeFilters != null) { if (typeFilters != null) {
Optional<String> filter = Arrays.stream(typeFilters).filter(t -> t.startsWith(myResourceType + "?")).findFirst(); spMaps = Arrays.stream(typeFilters)
if (filter.isPresent()) { .filter(typeFilter -> typeFilter.startsWith(myResourceType + "?"))
String matchUrl = filter.get(); .map(filter -> buildSearchParameterMapForTypeFilter(filter, theDef))
map = myMatchUrlService.translateMatchUrl(matchUrl, theDef); .collect(Collectors.toList());
}
} }
if (jobEntity.getSince() != null) {
map.setLastUpdated(new DateRangeParam(jobEntity.getSince(), null)); //None of the _typeFilters applied to the current resource type, so just make a simple one.
if (spMaps == null || spMaps.isEmpty()) {
SearchParameterMap defaultMap = new SearchParameterMap();
enhanceSearchParameterMapWithCommonParameters(defaultMap);
spMaps = Collections.singletonList(defaultMap);
} }
return spMaps;
}
private void enhanceSearchParameterMapWithCommonParameters(SearchParameterMap map) {
map.setLoadSynchronous(true); map.setLoadSynchronous(true);
return map; if (getJobEntity().getSince() != null) {
map.setLastUpdated(new DateRangeParam(getJobEntity().getSince(), null));
}
}
public SearchParameterMap buildSearchParameterMapForTypeFilter(String theFilter, RuntimeResourceDefinition theDef) {
SearchParameterMap searchParameterMap = myMatchUrlService.translateMatchUrl(theFilter, theDef);
enhanceSearchParameterMapWithCommonParameters(searchParameterMap);
return searchParameterMap;
} }
protected RuntimeResourceDefinition getResourceDefinition() { protected RuntimeResourceDefinition getResourceDefinition() {
@ -157,4 +178,48 @@ public abstract class BaseBulkItemReader implements ItemReader<List<ResourcePers
return outgoing.size() == 0 ? null : outgoing; return outgoing.size() == 0 ? null : outgoing;
} }
/**
* Given the resource type, fetch its patient-based search parameter name
* 1. Attempt to find one called 'patient'
* 2. If that fails, find one called 'subject'
* 3. If that fails, find find by Patient Compartment.
* 3.1 If that returns >1 result, throw an error
* 3.2 If that returns 1 result, return it
*/
protected RuntimeSearchParam getPatientSearchParamForCurrentResourceType() {
if (myPatientSearchParam == null) {
RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(myResourceType);
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("patient");
if (myPatientSearchParam == null) {
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("subject");
if (myPatientSearchParam == null) {
myPatientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition);
if (myPatientSearchParam == null) {
String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", myResourceType);
throw new IllegalArgumentException(errorMessage);
}
}
}
}
return myPatientSearchParam;
}
/**
* Search the resource definition for a compartment named 'patient' and return its related Search Parameter.
*/
protected RuntimeSearchParam getRuntimeSearchParamByCompartment(RuntimeResourceDefinition runtimeResourceDefinition) {
RuntimeSearchParam patientSearchParam;
List<RuntimeSearchParam> searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient");
if (searchParams == null || searchParams.size() == 0) {
String errorMessage = String.format("Resource type [%s] is not eligible for this type of export, as it contains no Patient compartment, and no `patient` or `subject` search parameter", myResourceType);
throw new IllegalArgumentException(errorMessage);
} else if (searchParams.size() == 1) {
patientSearchParam = searchParams.get(0);
} else {
String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as we are unable to disambiguate which patient search parameter we should be searching by.", myResourceType);
throw new IllegalArgumentException(errorMessage);
}
return patientSearchParam;
}
} }

View File

@ -92,6 +92,17 @@ public class BulkExportJobConfig {
.build(); .build();
} }
@Bean
@Lazy
public Job patientBulkExportJob() {
return myJobBuilderFactory.get(BatchJobsConfig.PATIENT_BULK_EXPORT_JOB_NAME)
.validator(bulkJobParameterValidator())
.start(createBulkExportEntityStep())
.next(patientPartitionStep())
.next(closeJobStep())
.build();
}
@Bean @Bean
public GroupIdPresentValidator groupBulkJobParameterValidator() { public GroupIdPresentValidator groupBulkJobParameterValidator() {
return new GroupIdPresentValidator(); return new GroupIdPresentValidator();
@ -115,6 +126,7 @@ public class BulkExportJobConfig {
return new BulkExportJobParameterValidator(); return new BulkExportJobParameterValidator();
} }
//Writers
@Bean @Bean
public Step groupBulkExportGenerateResourceFilesStep() { public Step groupBulkExportGenerateResourceFilesStep() {
return myStepBuilderFactory.get("groupBulkExportGenerateResourceFilesStep") return myStepBuilderFactory.get("groupBulkExportGenerateResourceFilesStep")
@ -122,17 +134,10 @@ public class BulkExportJobConfig {
.reader(groupBulkItemReader()) .reader(groupBulkItemReader())
.processor(myPidToIBaseResourceProcessor) .processor(myPidToIBaseResourceProcessor)
.writer(resourceToFileWriter()) .writer(resourceToFileWriter())
.listener(bulkExportGenrateResourceFilesStepListener()) .listener(bulkExportGenerateResourceFilesStepListener())
.build(); .build();
} }
@Bean
@StepScope
public GroupBulkItemReader groupBulkItemReader(){
return new GroupBulkItemReader();
}
@Bean @Bean
public Step bulkExportGenerateResourceFilesStep() { public Step bulkExportGenerateResourceFilesStep() {
return myStepBuilderFactory.get("bulkExportGenerateResourceFilesStep") return myStepBuilderFactory.get("bulkExportGenerateResourceFilesStep")
@ -140,7 +145,17 @@ public class BulkExportJobConfig {
.reader(bulkItemReader()) .reader(bulkItemReader())
.processor(myPidToIBaseResourceProcessor) .processor(myPidToIBaseResourceProcessor)
.writer(resourceToFileWriter()) .writer(resourceToFileWriter())
.listener(bulkExportGenrateResourceFilesStepListener()) .listener(bulkExportGenerateResourceFilesStepListener())
.build();
}
@Bean
public Step patientBulkExportGenerateResourceFilesStep() {
return myStepBuilderFactory.get("patientBulkExportGenerateResourceFilesStep")
.<List<ResourcePersistentId>, List<IBaseResource>> chunk(CHUNK_SIZE) //1000 resources per generated file, as the reader returns 10 resources at a time.
.reader(patientBulkItemReader())
.processor(myPidToIBaseResourceProcessor)
.writer(resourceToFileWriter())
.listener(bulkExportGenerateResourceFilesStepListener())
.build(); .build();
} }
@ -165,10 +180,17 @@ public class BulkExportJobConfig {
@Bean @Bean
@JobScope @JobScope
public BulkExportGenerateResourceFilesStepListener bulkExportGenrateResourceFilesStepListener() { public BulkExportGenerateResourceFilesStepListener bulkExportGenerateResourceFilesStepListener() {
return new BulkExportGenerateResourceFilesStepListener(); return new BulkExportGenerateResourceFilesStepListener();
} }
@Bean
public Step partitionStep() {
return myStepBuilderFactory.get("partitionStep")
.partitioner("bulkExportGenerateResourceFilesStep", bulkExportResourceTypePartitioner())
.step(bulkExportGenerateResourceFilesStep())
.build();
}
@Bean @Bean
public Step groupPartitionStep() { public Step groupPartitionStep() {
@ -177,14 +199,28 @@ public class BulkExportJobConfig {
.step(groupBulkExportGenerateResourceFilesStep()) .step(groupBulkExportGenerateResourceFilesStep())
.build(); .build();
} }
@Bean @Bean
public Step partitionStep() { public Step patientPartitionStep() {
return myStepBuilderFactory.get("partitionStep") return myStepBuilderFactory.get("partitionStep")
.partitioner("bulkExportGenerateResourceFilesStep", bulkExportResourceTypePartitioner()) .partitioner("patientBulkExportGenerateResourceFilesStep", bulkExportResourceTypePartitioner())
.step(bulkExportGenerateResourceFilesStep()) .step(patientBulkExportGenerateResourceFilesStep())
.build(); .build();
} }
@Bean
@StepScope
public GroupBulkItemReader groupBulkItemReader(){
return new GroupBulkItemReader();
}
@Bean
@StepScope
public PatientBulkItemReader patientBulkItemReader() {
return new PatientBulkItemReader();
}
@Bean @Bean
@StepScope @StepScope
public BulkItemReader bulkItemReader(){ public BulkItemReader bulkItemReader(){
@ -199,7 +235,7 @@ public class BulkExportJobConfig {
@Bean @Bean
@StepScope @StepScope
public ItemWriter<List<IBaseResource>> resourceToFileWriter() { public ResourceToFileWriter resourceToFileWriter() {
return new ResourceToFileWriter(); return new ResourceToFileWriter();
} }

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.bulk.job;
* #L% * #L%
*/ */
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;
import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersBuilder;
@ -65,4 +66,8 @@ public class BulkExportJobParametersBuilder extends JobParametersBuilder {
this.addLong("readChunkSize", theReadChunkSize); this.addLong("readChunkSize", theReadChunkSize);
return this; return this;
} }
public BulkExportJobParametersBuilder setExportStyle(BulkDataExportOptions.ExportStyle theExportStyle) {
this.addString("exportStyle", theExportStyle.name());
return this;
}
} }

View File

@ -20,30 +20,20 @@ package ca.uhn.fhir.jpa.bulk.job;
* #L% * #L%
*/ */
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.batch.log.Logs; import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.dao.IResultIterator; import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder; import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.util.UrlUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Set;
import java.util.Optional;
/** /**
* Basic Bulk Export implementation which simply reads all type filters and applies them, along with the _since param * Basic Bulk Export implementation which simply reads all type filters and applies them, along with the _since param
@ -52,19 +42,21 @@ import java.util.Optional;
public class BulkItemReader extends BaseBulkItemReader { public class BulkItemReader extends BaseBulkItemReader {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@Override @Override
Iterator<ResourcePersistentId> getResourcePidIterator() { Iterator<ResourcePersistentId> getResourcePidIterator() {
ourLog.info("Bulk export assembling export of type {} for job {}", myResourceType, myJobUUID); ourLog.info("Bulk export assembling export of type {} for job {}", myResourceType, myJobUUID);
Set<ResourcePersistentId> myReadPids = new HashSet<>();
SearchParameterMap map = createSearchParameterMapForJob(); List<SearchParameterMap> map = createSearchParameterMapsForResourceType();
ISearchBuilder sb = getSearchBuilderForLocalResourceType(); ISearchBuilder sb = getSearchBuilderForLocalResourceType();
IResultIterator myResultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
List<ResourcePersistentId> myReadPids = new ArrayList<>(); for (SearchParameterMap spMap: map) {
while (myResultIterator.hasNext()) { ourLog.debug("About to evaluate query {}", spMap.toNormalizedQueryString(myContext));
myReadPids.add(myResultIterator.next()); IResultIterator myResultIterator = sb.createQuery(spMap, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
while (myResultIterator.hasNext()) {
myReadPids.add(myResultIterator.next());
}
} }
return myReadPids.iterator(); return myReadPids.iterator();
} }

View File

@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.bulk.job;
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions; import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.StepContribution;
@ -66,7 +67,20 @@ public class CreateBulkExportEntityTasklet implements Tasklet {
outputFormat = Constants.CT_FHIR_NDJSON; outputFormat = Constants.CT_FHIR_NDJSON;
} }
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(outputFormat, resourceTypeSet, since, filterSet)); BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setOutputFormat(outputFormat);
bulkDataExportOptions.setResourceTypes(resourceTypeSet);
bulkDataExportOptions.setSince(since);
bulkDataExportOptions.setFilters(filterSet);
//Set export style
String exportStyle = (String)jobParameters.get("exportStyle");
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.valueOf(exportStyle));
//Set group id if present
String groupId = (String)jobParameters.get("groupId");
bulkDataExportOptions.setGroupId(new IdDt(groupId));
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
addUUIDToJobContext(theChunkContext, jobInfo.getJobId()); addUUIDToJobContext(theChunkContext, jobInfo.getJobId());
return RepeatStatus.FINISHED; return RepeatStatus.FINISHED;

View File

@ -20,7 +20,6 @@ package ca.uhn.fhir.jpa.bulk.job;
* #L% * #L%
*/ */
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam; import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.batch.log.Logs; import ca.uhn.fhir.jpa.batch.log.Logs;
@ -28,6 +27,7 @@ import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder; import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao; import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao;
import ca.uhn.fhir.jpa.dao.index.IdHelperService; import ca.uhn.fhir.jpa.dao.index.IdHelperService;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.QueryChunker; import ca.uhn.fhir.jpa.util.QueryChunker;
@ -39,7 +39,6 @@ import ca.uhn.fhir.rest.param.ReferenceParam;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -77,11 +76,9 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
@Autowired @Autowired
private IMdmLinkDao myMdmLinkDao; private IMdmLinkDao myMdmLinkDao;
private RuntimeSearchParam myPatientSearchParam;
@Override @Override
Iterator<ResourcePersistentId> getResourcePidIterator() { Iterator<ResourcePersistentId> getResourcePidIterator() {
List<ResourcePersistentId> myReadPids = new ArrayList<>(); Set<ResourcePersistentId> myReadPids = new HashSet<>();
//Short circuit out if we detect we are attempting to extract patients //Short circuit out if we detect we are attempting to extract patients
if (myResourceType.equalsIgnoreCase("Patient")) { if (myResourceType.equalsIgnoreCase("Patient")) {
@ -113,7 +110,6 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
*/ */
private Iterator<ResourcePersistentId> getExpandedPatientIterator() { private Iterator<ResourcePersistentId> getExpandedPatientIterator() {
Set<Long> patientPidsToExport = new HashSet<>(); Set<Long> patientPidsToExport = new HashSet<>();
//This gets all member pids
List<String> members = getMembers(); List<String> members = getMembers();
List<IIdType> ids = members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList()); List<IIdType> ids = members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList());
List<Long> pidsOrThrowException = myIdHelperService.getPidsOrThrowException(ids); List<Long> pidsOrThrowException = myIdHelperService.getPidsOrThrowException(ids);
@ -162,8 +158,15 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
//Now lets translate these pids into resource IDs //Now lets translate these pids into resource IDs
Set<Long> uniquePids = new HashSet<>(); Set<Long> uniquePids = new HashSet<>();
goldenPidTargetPidTuple.forEach(uniquePids::addAll); goldenPidTargetPidTuple.forEach(uniquePids::addAll);
Map<Long, Optional<String>> longOptionalMap = myIdHelperService.translatePidsToForcedIds(uniquePids);
expandedIds = longOptionalMap.values().stream().map(Optional::get).collect(Collectors.toSet()); Map<Long, Optional<String>> pidToForcedIdMap = myIdHelperService.translatePidsToForcedIds(uniquePids);
//If the result of the translation is an empty optional, it means there is no forced id, and we can use the PID as the resource ID.
Set<String> resolvedResourceIds = pidToForcedIdMap.entrySet().stream()
.map(entry -> entry.getValue().isPresent() ? entry.getValue().get() : entry.getKey().toString())
.collect(Collectors.toSet());
expandedIds.addAll(resolvedResourceIds);
} }
//Now manually add the members of the group (its possible even with mdm expansion that some members dont have MDM matches, //Now manually add the members of the group (its possible even with mdm expansion that some members dont have MDM matches,
@ -173,83 +176,40 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade
return expandedIds; return expandedIds;
} }
private void queryResourceTypeWithReferencesToPatients(Set<ResourcePersistentId> myReadPids, List<String> idChunk) {
private void queryResourceTypeWithReferencesToPatients(List<ResourcePersistentId> myReadPids, List<String> idChunk) {
//Build SP map //Build SP map
//First, inject the _typeFilters and _since from the export job //First, inject the _typeFilters and _since from the export job
SearchParameterMap expandedSpMap = createSearchParameterMapForJob(); List<SearchParameterMap> expandedSpMaps = createSearchParameterMapsForResourceType();
for (SearchParameterMap expandedSpMap: expandedSpMaps) {
//Since we are in a bulk job, we have to ensure the user didn't jam in a patient search param, since we need to manually set that. //Since we are in a bulk job, we have to ensure the user didn't jam in a patient search param, since we need to manually set that.
validateSearchParameters(expandedSpMap); validateSearchParameters(expandedSpMap);
// Now, further filter the query with patient references defined by the chunk of IDs we have. // Now, further filter the query with patient references defined by the chunk of IDs we have.
filterSearchByResourceIds(idChunk, expandedSpMap); filterSearchByResourceIds(idChunk, expandedSpMap);
// Fetch and cache a search builder for this resource type // Fetch and cache a search builder for this resource type
ISearchBuilder searchBuilder = getSearchBuilderForLocalResourceType(); ISearchBuilder searchBuilder = getSearchBuilderForLocalResourceType();
//Execute query and all found pids to our local iterator. //Execute query and all found pids to our local iterator.
IResultIterator resultIterator = searchBuilder.createQuery(expandedSpMap, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions()); IResultIterator resultIterator = searchBuilder.createQuery(expandedSpMap, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
while (resultIterator.hasNext()) { while (resultIterator.hasNext()) {
myReadPids.add(resultIterator.next()); myReadPids.add(resultIterator.next());
}
} }
} }
private void filterSearchByResourceIds(List<String> idChunk, SearchParameterMap expandedSpMap) { private void filterSearchByResourceIds(List<String> idChunk, SearchParameterMap expandedSpMap) {
ReferenceOrListParam orList = new ReferenceOrListParam(); ReferenceOrListParam orList = new ReferenceOrListParam();
idChunk.forEach(id -> orList.add(new ReferenceParam(id))); idChunk.forEach(id -> orList.add(new ReferenceParam(id)));
expandedSpMap.add(getPatientSearchParam().getName(), orList); expandedSpMap.add(getPatientSearchParamForCurrentResourceType().getName(), orList);
} }
private RuntimeSearchParam validateSearchParameters(SearchParameterMap expandedSpMap) { private RuntimeSearchParam validateSearchParameters(SearchParameterMap expandedSpMap) {
RuntimeSearchParam runtimeSearchParam = getPatientSearchParam(); RuntimeSearchParam runtimeSearchParam = getPatientSearchParamForCurrentResourceType();
if (expandedSpMap.get(runtimeSearchParam.getName()) != null) { if (expandedSpMap.get(runtimeSearchParam.getName()) != null) {
throw new IllegalArgumentException(String.format("Group Bulk Export manually modifies the Search Parameter called [%s], so you may not include this search parameter in your _typeFilter!", runtimeSearchParam.getName())); throw new IllegalArgumentException(String.format("Group Bulk Export manually modifies the Search Parameter called [%s], so you may not include this search parameter in your _typeFilter!", runtimeSearchParam.getName()));
} }
return runtimeSearchParam; return runtimeSearchParam;
} }
/**
* Given the resource type, fetch its patient-based search parameter name
* 1. Attempt to find one called 'patient'
* 2. If that fails, find one called 'subject'
* 3. If that fails, find find by Patient Compartment.
* 3.1 If that returns >1 result, throw an error
* 3.2 If that returns 1 result, return it
*/
private RuntimeSearchParam getPatientSearchParam() {
if (myPatientSearchParam == null) {
RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(myResourceType);
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("patient");
if (myPatientSearchParam == null) {
myPatientSearchParam = runtimeResourceDefinition.getSearchParam("subject");
if (myPatientSearchParam == null) {
myPatientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition);
if (myPatientSearchParam == null) {
String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", myResourceType);
throw new IllegalArgumentException(errorMessage);
}
}
}
}
return myPatientSearchParam;
}
/**
* Search the resource definition for a compartment named 'patient' and return its related Search Parameter.
*/
private RuntimeSearchParam getRuntimeSearchParamByCompartment(RuntimeResourceDefinition runtimeResourceDefinition) {
RuntimeSearchParam patientSearchParam;
List<RuntimeSearchParam> searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient");
if (searchParams == null || searchParams.size() == 0) {
String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as it contains no Patient compartment, and no `patient` or `subject` search parameter", myResourceType);
throw new IllegalArgumentException(errorMessage);
} else if (searchParams.size() == 1) {
patientSearchParam = searchParams.get(0);
} else {
String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as we are unable to disambiguate which patient search parameter we should be searching by.", myResourceType);
throw new IllegalArgumentException(errorMessage);
}
return patientSearchParam;
}
} }

View File

@ -0,0 +1,98 @@
package ca.uhn.fhir.jpa.bulk.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2021 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.ReferenceParam;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* Bulk Item reader for the Patient Bulk Export job.
* Instead of performing a normal query on the resource type using type filters, we instead
*
* 1. Determine the resourcetype
* 2. Search for anything that has `patient-compartment-search-param:missing=false`
*/
public class PatientBulkItemReader extends BaseBulkItemReader implements ItemReader<List<ResourcePersistentId>> {
@Autowired
private DaoConfig myDaoConfig;
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
private RuntimeSearchParam validateSearchParameters(SearchParameterMap expandedSpMap) {
RuntimeSearchParam runtimeSearchParam = getPatientSearchParamForCurrentResourceType();
if (expandedSpMap.get(runtimeSearchParam.getName()) != null) {
throw new IllegalArgumentException(String.format("Patient Bulk Export manually modifies the Search Parameter called [%s], so you may not include this search parameter in your _typeFilter!", runtimeSearchParam.getName()));
}
return runtimeSearchParam;
}
@Override
Iterator<ResourcePersistentId> getResourcePidIterator() {
if (myDaoConfig.getIndexMissingFields() == DaoConfig.IndexEnabledEnum.DISABLED) {
String errorMessage = "You attempted to start a Patient Bulk Export, but the system has `Index Missing Fields` disabled. It must be enabled for Patient Bulk Export";
ourLog.error(errorMessage);
throw new IllegalStateException(errorMessage);
}
List<ResourcePersistentId> myReadPids = new ArrayList<>();
//use _typeFilter and _since and all those fancy bits and bobs to generate our basic SP map.
List<SearchParameterMap> maps = createSearchParameterMapsForResourceType();
String patientSearchParam = getPatientSearchParamForCurrentResourceType().getName();
for (SearchParameterMap map: maps) {
//Ensure users did not monkey with the patient compartment search parameter.
validateSearchParameters(map);
//Skip adding the parameter querying for patient= if we are in fact querying the patient resource type.
if (!myResourceType.equalsIgnoreCase("Patient")) {
map.add(patientSearchParam, new ReferenceParam().setMissing(false));
}
ourLog.debug("About to execute query {}", map.toNormalizedQueryString(myContext));
ISearchBuilder sb = getSearchBuilderForLocalResourceType();
IResultIterator myResultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
while (myResultIterator.hasNext()) {
myReadPids.add(myResultIterator.next());
}
}
return myReadPids.iterator();
}
}

View File

@ -22,13 +22,13 @@ package ca.uhn.fhir.jpa.bulk.provider;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions; import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.model.BulkExportResponseJson; import ca.uhn.fhir.jpa.bulk.model.BulkExportResponseJson;
import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.rest.annotation.IdParam; import ca.uhn.fhir.rest.annotation.IdParam;
import ca.uhn.fhir.rest.annotation.Operation; import ca.uhn.fhir.rest.annotation.Operation;
import ca.uhn.fhir.rest.annotation.OperationParam; import ca.uhn.fhir.rest.annotation.OperationParam;
import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.PreferHeader; import ca.uhn.fhir.rest.api.PreferHeader;
import ca.uhn.fhir.rest.server.RestfulServerUtils; import ca.uhn.fhir.rest.server.RestfulServerUtils;
@ -43,14 +43,24 @@ import org.hl7.fhir.instance.model.api.IBaseOperationOutcome;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.InstantType; import org.hl7.fhir.r4.model.InstantType;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import static org.slf4j.LoggerFactory.getLogger;
public class BulkDataExportProvider { public class BulkDataExportProvider {
public static final String FARM_TO_TABLE_TYPE_FILTER_REGEX = "(?:,)(?=[A-Z][a-z]+\\?)";
private static final Logger ourLog = getLogger(BulkDataExportProvider.class);
@Autowired @Autowired
private IBulkDataExportSvc myBulkDataExportSvc; private IBulkDataExportSvc myBulkDataExportSvc;
@ -78,43 +88,75 @@ public class BulkDataExportProvider {
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theTypeFilter, @OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theTypeFilter,
ServletRequestDetails theRequestDetails ServletRequestDetails theRequestDetails
) { ) {
validatePreferAsyncHeader(theRequestDetails);
BulkDataExportOptions bulkDataExportOptions = buildSystemBulkExportOptions(theOutputFormat, theType, theSince, theTypeFilter);
Boolean useCache = shouldUseCache(theRequestDetails);
IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(bulkDataExportOptions, useCache);
writePollingLocationToResponseHeaders(theRequestDetails, outcome);
}
String preferHeader = theRequestDetails.getHeader(Constants.HEADER_PREFER); private boolean shouldUseCache(ServletRequestDetails theRequestDetails) {
PreferHeader prefer = RestfulServerUtils.parsePreferHeader(null, preferHeader); CacheControlDirective cacheControlDirective = new CacheControlDirective().parse(theRequestDetails.getHeaders(Constants.HEADER_CACHE_CONTROL));
if (prefer.getRespondAsync() == false) { return !cacheControlDirective.isNoCache();
throw new InvalidRequestException("Must request async processing for $export"); }
private String getServerBase(ServletRequestDetails theRequestDetails) {
return StringUtils.removeEnd(theRequestDetails.getServerBaseForRequest(), "/");
}
/**
* Group/Id/$export
*/
@Operation(name = JpaConstants.OPERATION_EXPORT, manualResponse = true, idempotent = true, typeName = "Group")
public void groupExport(
@IdParam IIdType theIdParam,
@OperationParam(name = JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theOutputFormat,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theType,
@OperationParam(name = JpaConstants.PARAM_EXPORT_SINCE, min = 0, max = 1, typeName = "instant") IPrimitiveType<Date> theSince,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theTypeFilter,
@OperationParam(name = JpaConstants.PARAM_EXPORT_MDM, min = 0, max = 1, typeName = "boolean") IPrimitiveType<Boolean> theMdm,
ServletRequestDetails theRequestDetails
) {
ourLog.debug("Received Group Bulk Export Request for Group {}", theIdParam);
ourLog.debug("_type={}", theIdParam);
ourLog.debug("_since={}", theSince);
ourLog.debug("_typeFilter={}", theTypeFilter);
ourLog.debug("_mdm=", theMdm);
validatePreferAsyncHeader(theRequestDetails);
BulkDataExportOptions bulkDataExportOptions = buildGroupBulkExportOptions(theOutputFormat, theType, theSince, theTypeFilter, theIdParam, theMdm);
validateResourceTypesAllContainPatientSearchParams(bulkDataExportOptions.getResourceTypes());
IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(bulkDataExportOptions, shouldUseCache(theRequestDetails));
writePollingLocationToResponseHeaders(theRequestDetails, outcome);
}
private void validateResourceTypesAllContainPatientSearchParams(Set<String> theResourceTypes) {
List<String> badResourceTypes = theResourceTypes.stream()
.filter(resourceType -> !myBulkDataExportSvc.getPatientCompartmentResources().contains(resourceType))
.collect(Collectors.toList());
if (!badResourceTypes.isEmpty()) {
throw new InvalidRequestException(String.format("Resource types [%s] are invalid for this type of export, as they do not contain search parameters that refer to patients.", String.join(",", badResourceTypes)));
} }
}
String outputFormat = theOutputFormat != null ? theOutputFormat.getValueAsString() : null; /**
* Patient/$export
Set<String> resourceTypes = null; */
if (theType != null) { @Operation(name = JpaConstants.OPERATION_EXPORT, manualResponse = true, idempotent = true, typeName = "Patient")
resourceTypes = ArrayUtil.commaSeparatedListToCleanSet(theType.getValueAsString()); public void patientExport(
} @OperationParam(name = JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theOutputFormat,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theType,
Date since = null; @OperationParam(name = JpaConstants.PARAM_EXPORT_SINCE, min = 0, max = 1, typeName = "instant") IPrimitiveType<Date> theSince,
if (theSince != null) { @OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theTypeFilter,
since = theSince.getValue(); ServletRequestDetails theRequestDetails
} ) {
validatePreferAsyncHeader(theRequestDetails);
Set<String> filters = null; BulkDataExportOptions bulkDataExportOptions = buildPatientBulkExportOptions(theOutputFormat, theType, theSince, theTypeFilter);
if (theTypeFilter != null) { validateResourceTypesAllContainPatientSearchParams(bulkDataExportOptions.getResourceTypes());
filters = ArrayUtil.commaSeparatedListToCleanSet(theTypeFilter.getValueAsString()); IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(bulkDataExportOptions, shouldUseCache(theRequestDetails));
} writePollingLocationToResponseHeaders(theRequestDetails, outcome);
IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(outputFormat, resourceTypes, since, filters));
String serverBase = getServerBase(theRequestDetails);
String pollLocation = serverBase + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" + JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + outcome.getJobId();
HttpServletResponse response = theRequestDetails.getServletResponse();
// Add standard headers
theRequestDetails.getServer().addHeadersToResponse(response);
// Successful 202 Accepted
response.addHeader(Constants.HEADER_CONTENT_LOCATION, pollLocation);
response.setStatus(Constants.STATUS_HTTP_202_ACCEPTED);
} }
/** /**
@ -171,37 +213,31 @@ public class BulkDataExportProvider {
OperationOutcomeUtil.addIssue(myFhirContext, oo, "error", status.getStatusMessage(), null, null); OperationOutcomeUtil.addIssue(myFhirContext, oo, "error", status.getStatusMessage(), null, null);
myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToWriter(oo, response.getWriter()); myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToWriter(oo, response.getWriter());
response.getWriter().close(); response.getWriter().close();
} }
} }
private String getServerBase(ServletRequestDetails theRequestDetails) { private BulkDataExportOptions buildSystemBulkExportOptions(IPrimitiveType<String> theOutputFormat, IPrimitiveType<String> theType, IPrimitiveType<Date> theSince, IPrimitiveType<String> theTypeFilter) {
return StringUtils.removeEnd(theRequestDetails.getServerBaseForRequest(), "/"); return buildBulkDataExportOptions(theOutputFormat, theType, theSince, theTypeFilter, BulkDataExportOptions.ExportStyle.SYSTEM);
} }
/** private BulkDataExportOptions buildGroupBulkExportOptions(IPrimitiveType<String> theOutputFormat, IPrimitiveType<String> theType, IPrimitiveType<Date> theSince, IPrimitiveType<String> theTypeFilter, IIdType theGroupId, IPrimitiveType<Boolean> theExpandMdm) {
* Group/Id/$export BulkDataExportOptions bulkDataExportOptions = buildBulkDataExportOptions(theOutputFormat, theType, theSince, theTypeFilter, BulkDataExportOptions.ExportStyle.GROUP);
*/ bulkDataExportOptions.setGroupId(theGroupId);
@Operation(name = JpaConstants.OPERATION_EXPORT, manualResponse = true, idempotent = true, typeName = "Group")
public void groupExport(
@IdParam IIdType theIdParam,
@OperationParam(name = JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theOutputFormat,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theType,
@OperationParam(name = JpaConstants.PARAM_EXPORT_SINCE, min = 0, max = 1, typeName = "instant") IPrimitiveType<Date> theSince,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theTypeFilter,
@OperationParam(name = JpaConstants.PARAM_EXPORT_MDM, min = 0, max = 1, typeName = "boolean") IPrimitiveType<Boolean> theMdm,
ServletRequestDetails theRequestDetails boolean mdm = false;
) { if (theExpandMdm != null) {
mdm = theExpandMdm.getValue();
String preferHeader = theRequestDetails.getHeader(Constants.HEADER_PREFER);
PreferHeader prefer = RestfulServerUtils.parsePreferHeader(null, preferHeader);
if (prefer.getRespondAsync() == false) {
throw new InvalidRequestException("Must request async processing for $export");
} }
bulkDataExportOptions.setExpandMdm(mdm);
return bulkDataExportOptions;
}
private BulkDataExportOptions buildPatientBulkExportOptions(IPrimitiveType<String> theOutputFormat, IPrimitiveType<String> theType, IPrimitiveType<Date> theSince, IPrimitiveType<String> theTypeFilter) {
return buildBulkDataExportOptions(theOutputFormat, theType, theSince, theTypeFilter, BulkDataExportOptions.ExportStyle.PATIENT);
}
private BulkDataExportOptions buildBulkDataExportOptions(IPrimitiveType<String> theOutputFormat, IPrimitiveType<String> theType, IPrimitiveType<Date> theSince, IPrimitiveType<String> theTypeFilter, BulkDataExportOptions.ExportStyle theExportStyle) {
String outputFormat = theOutputFormat != null ? theOutputFormat.getValueAsString() : null; String outputFormat = theOutputFormat != null ? theOutputFormat.getValueAsString() : null;
Set<String> resourceTypes = null; Set<String> resourceTypes = null;
@ -209,25 +245,25 @@ public class BulkDataExportProvider {
resourceTypes = ArrayUtil.commaSeparatedListToCleanSet(theType.getValueAsString()); resourceTypes = ArrayUtil.commaSeparatedListToCleanSet(theType.getValueAsString());
} }
//TODO GGG eventually, we will support these things.
Set<String> filters = null;
Date since = null; Date since = null;
if (theSince != null) { if (theSince != null) {
since = theSince.getValue(); since = theSince.getValue();
} }
boolean mdm = false; Set<String> typeFilters = splitTypeFilters(theTypeFilter);
if (theMdm != null) {
mdm = theMdm.getValue();
}
if (theTypeFilter != null) {
filters = ArrayUtil.commaSeparatedListToCleanSet(theTypeFilter.getValueAsString());
}
IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(outputFormat, resourceTypes, since, filters, theIdParam, mdm));
BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setFilters(typeFilters);
bulkDataExportOptions.setExportStyle(theExportStyle);
bulkDataExportOptions.setSince(since);
bulkDataExportOptions.setResourceTypes(resourceTypes);
bulkDataExportOptions.setOutputFormat(outputFormat);
return bulkDataExportOptions;
}
public void writePollingLocationToResponseHeaders(ServletRequestDetails theRequestDetails, IBulkDataExportSvc.JobInfo theOutcome) {
String serverBase = getServerBase(theRequestDetails); String serverBase = getServerBase(theRequestDetails);
String pollLocation = serverBase + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" + JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + outcome.getJobId(); String pollLocation = serverBase + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" + JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + theOutcome.getJobId();
HttpServletResponse response = theRequestDetails.getServletResponse(); HttpServletResponse response = theRequestDetails.getServletResponse();
@ -238,4 +274,26 @@ public class BulkDataExportProvider {
response.addHeader(Constants.HEADER_CONTENT_LOCATION, pollLocation); response.addHeader(Constants.HEADER_CONTENT_LOCATION, pollLocation);
response.setStatus(Constants.STATUS_HTTP_202_ACCEPTED); response.setStatus(Constants.STATUS_HTTP_202_ACCEPTED);
} }
private void validatePreferAsyncHeader(ServletRequestDetails theRequestDetails) {
String preferHeader = theRequestDetails.getHeader(Constants.HEADER_PREFER);
PreferHeader prefer = RestfulServerUtils.parsePreferHeader(null, preferHeader);
if (prefer.getRespondAsync() == false) {
throw new InvalidRequestException("Must request async processing for $export");
}
}
private Set<String> splitTypeFilters(IPrimitiveType<String> theTypeFilter) {
if (theTypeFilter== null) {
return null;
}
String typeFilterSring = theTypeFilter.getValueAsString();
String[] typeFilters = typeFilterSring.split(FARM_TO_TABLE_TYPE_FILTER_REGEX);
if (typeFilters == null || typeFilters.length == 0) {
return null;
}
return new HashSet<>(Arrays.asList(typeFilters));
}
} }

View File

@ -21,13 +21,15 @@ package ca.uhn.fhir.jpa.bulk.svc;
*/ */
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.fhirpath.IFhirPath;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions; import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.batch.BatchJobsConfig; import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions; import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.job.BulkExportJobConfig; import ca.uhn.fhir.jpa.bulk.job.BulkExportJobConfig;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum; import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
@ -41,19 +43,21 @@ import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.util.UrlUtil; import ca.uhn.fhir.util.UrlUtil;
import org.apache.commons.lang3.StringUtils; import com.google.common.collect.Sets;
import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBase;
import org.hl7.fhir.instance.model.api.IBaseBinary; import org.hl7.fhir.instance.model.api.IBaseBinary;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.InstantType; import org.hl7.fhir.r4.model.InstantType;
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -66,18 +70,19 @@ import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.transaction.Transactional; import javax.transaction.Transactional;
import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.HashSet; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions.ExportStyle.GROUP;
import static ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions.ExportStyle.PATIENT;
import static ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions.ExportStyle.SYSTEM;
import static ca.uhn.fhir.util.UrlUtil.escapeUrlParam; import static ca.uhn.fhir.util.UrlUtil.escapeUrlParam;
import static ca.uhn.fhir.util.UrlUtil.escapeUrlParams; import static ca.uhn.fhir.util.UrlUtil.escapeUrlParams;
import static org.apache.commons.lang3.StringUtils.contains;
import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class BulkDataExportSvcImpl implements IBulkDataExportSvc { public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
@ -113,6 +118,12 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
@Qualifier(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME) @Qualifier(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME)
private org.springframework.batch.core.Job myGroupBulkExportJob; private org.springframework.batch.core.Job myGroupBulkExportJob;
@Autowired
@Qualifier(BatchJobsConfig.PATIENT_BULK_EXPORT_JOB_NAME)
private org.springframework.batch.core.Job myPatientBulkExportJob;
private Set<String> myCompartmentResources;
private final int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR); private final int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR);
/** /**
@ -229,6 +240,8 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
if (isGroupBulkJob(theBulkExportJobEntity)) { if (isGroupBulkJob(theBulkExportJobEntity)) {
enhanceBulkParametersWithGroupParameters(theBulkExportJobEntity, parameters); enhanceBulkParametersWithGroupParameters(theBulkExportJobEntity, parameters);
myJobSubmitter.runJob(myGroupBulkExportJob, parameters.toJobParameters()); myJobSubmitter.runJob(myGroupBulkExportJob, parameters.toJobParameters());
} else if (isPatientBulkJob(theBulkExportJobEntity)) {
myJobSubmitter.runJob(myPatientBulkExportJob, parameters.toJobParameters());
} else { } else {
myJobSubmitter.runJob(myBulkExportJob, parameters.toJobParameters()); myJobSubmitter.runJob(myBulkExportJob, parameters.toJobParameters());
} }
@ -237,6 +250,14 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
} }
} }
private boolean isPatientBulkJob(BulkExportJobEntity theBulkExportJobEntity) {
return theBulkExportJobEntity.getRequest().startsWith("/Patient/");
}
private boolean isGroupBulkJob(BulkExportJobEntity theBulkExportJobEntity) {
return theBulkExportJobEntity.getRequest().startsWith("/Group/");
}
private void enhanceBulkParametersWithGroupParameters(BulkExportJobEntity theBulkExportJobEntity, JobParametersBuilder theParameters) { private void enhanceBulkParametersWithGroupParameters(BulkExportJobEntity theBulkExportJobEntity, JobParametersBuilder theParameters) {
String theGroupId = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_GROUP_ID); String theGroupId = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_GROUP_ID);
String expandMdm = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_MDM); String expandMdm = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_MDM);
@ -244,9 +265,6 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
theParameters.addString(BulkExportJobConfig.EXPAND_MDM_PARAMETER, expandMdm); theParameters.addString(BulkExportJobConfig.EXPAND_MDM_PARAMETER, expandMdm);
} }
private boolean isGroupBulkJob(BulkExportJobEntity theBulkExportJobEntity) {
return getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_GROUP_ID) != null;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private IFhirResourceDao<IBaseBinary> getBinaryDao() { private IFhirResourceDao<IBaseBinary> getBinaryDao() {
@ -271,6 +289,12 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
@Transactional @Transactional
@Override @Override
public JobInfo submitJob(BulkDataExportOptions theBulkDataExportOptions) { public JobInfo submitJob(BulkDataExportOptions theBulkDataExportOptions) {
return submitJob(theBulkDataExportOptions, true);
}
@Transactional
@Override
public JobInfo submitJob(BulkDataExportOptions theBulkDataExportOptions, Boolean useCache) {
String outputFormat = Constants.CT_FHIR_NDJSON; String outputFormat = Constants.CT_FHIR_NDJSON;
if (isNotBlank(theBulkDataExportOptions.getOutputFormat())) { if (isNotBlank(theBulkDataExportOptions.getOutputFormat())) {
outputFormat = theBulkDataExportOptions.getOutputFormat(); outputFormat = theBulkDataExportOptions.getOutputFormat();
@ -282,7 +306,16 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
// TODO GGG KS can we encode BulkDataExportOptions as a JSON string as opposed to this request string. Feels like it would be a more extensible encoding... // TODO GGG KS can we encode BulkDataExportOptions as a JSON string as opposed to this request string. Feels like it would be a more extensible encoding...
//Probably yes, but this will all need to be rebuilt when we remove this bridge entity //Probably yes, but this will all need to be rebuilt when we remove this bridge entity
StringBuilder requestBuilder = new StringBuilder(); StringBuilder requestBuilder = new StringBuilder();
requestBuilder.append("/").append(JpaConstants.OPERATION_EXPORT); requestBuilder.append("/");
//Prefix the export url with Group/[id]/ or /Patient/ depending on what type of request it is.
if (theBulkDataExportOptions.getExportStyle().equals(GROUP)) {
requestBuilder.append(theBulkDataExportOptions.getGroupId().toVersionless()).append("/");
} else if (theBulkDataExportOptions.getExportStyle().equals(PATIENT)) {
requestBuilder.append("Patient/");
}
requestBuilder.append(JpaConstants.OPERATION_EXPORT);
requestBuilder.append("?").append(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT).append("=").append(escapeUrlParam(outputFormat)); requestBuilder.append("?").append(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT).append("=").append(escapeUrlParam(outputFormat));
Set<String> resourceTypes = theBulkDataExportOptions.getResourceTypes(); Set<String> resourceTypes = theBulkDataExportOptions.getResourceTypes();
if (resourceTypes != null) { if (resourceTypes != null) {
@ -293,20 +326,28 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_SINCE).append("=").append(new InstantType(since).setTimeZoneZulu(true).getValueAsString()); requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_SINCE).append("=").append(new InstantType(since).setTimeZoneZulu(true).getValueAsString());
} }
if (theBulkDataExportOptions.getFilters() != null && theBulkDataExportOptions.getFilters().size() > 0) { if (theBulkDataExportOptions.getFilters() != null && theBulkDataExportOptions.getFilters().size() > 0) {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_TYPE_FILTER).append("=").append(String.join(",", escapeUrlParams(theBulkDataExportOptions.getFilters()))); theBulkDataExportOptions.getFilters().stream()
.forEach(filter -> requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_TYPE_FILTER).append("=").append(escapeUrlParam(filter)));
} }
if (theBulkDataExportOptions instanceof GroupBulkDataExportOptions) {
GroupBulkDataExportOptions groupOptions = (GroupBulkDataExportOptions) theBulkDataExportOptions; if (theBulkDataExportOptions.getExportStyle().equals(GROUP)) {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_GROUP_ID).append("=").append(groupOptions.getGroupId().getValue()); requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_GROUP_ID).append("=").append(theBulkDataExportOptions.getGroupId().getValue());
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_MDM).append("=").append(groupOptions.isMdm()); requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_MDM).append("=").append(theBulkDataExportOptions.isExpandMdm());
} }
String request = requestBuilder.toString(); String request = requestBuilder.toString();
Date cutoff = DateUtils.addMilliseconds(new Date(), -myReuseBulkExportForMillis);
Pageable page = PageRequest.of(0, 10); //If we are using the cache, then attempt to retrieve a matching job based on the Request String, otherwise just make a new one.
Slice<BulkExportJobEntity> existing = myBulkExportJobDao.findExistingJob(page, request, cutoff, BulkJobStatusEnum.ERROR); if (useCache) {
if (!existing.isEmpty()) { Date cutoff = DateUtils.addMilliseconds(new Date(), -myReuseBulkExportForMillis);
return toSubmittedJobInfo(existing.iterator().next()); Pageable page = PageRequest.of(0, 10);
Slice<BulkExportJobEntity> existing = myBulkExportJobDao.findExistingJob(page, request, cutoff, BulkJobStatusEnum.ERROR);
if (!existing.isEmpty()) {
return toSubmittedJobInfo(existing.iterator().next());
}
} }
if (resourceTypes != null && resourceTypes.contains("Binary")) { if (resourceTypes != null && resourceTypes.contains("Binary")) {
@ -318,7 +359,7 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
// This is probably not a useful default, but having the default be "download the whole // This is probably not a useful default, but having the default be "download the whole
// server" seems like a risky default too. We'll deal with that by having the default involve // server" seems like a risky default too. We'll deal with that by having the default involve
// only returning a small time span // only returning a small time span
resourceTypes = myContext.getResourceTypes(); resourceTypes = getAllowedResourceTypesForBulkExportStyle(theBulkDataExportOptions.getExportStyle());
if (since == null) { if (since == null) {
since = DateUtils.addDays(new Date(), -1); since = DateUtils.addDays(new Date(), -1);
} }
@ -369,17 +410,13 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
public void validateTypeFilters(Set<String> theTheFilters, Set<String> theResourceTypes) { public void validateTypeFilters(Set<String> theTheFilters, Set<String> theResourceTypes) {
if (theTheFilters != null) { if (theTheFilters != null) {
Set<String> types = new HashSet<>();
for (String next : theTheFilters) { for (String next : theTheFilters) {
if (!next.contains("?")) { if (!next.contains("?")) {
throw new InvalidRequestException("Invalid " + JpaConstants.PARAM_EXPORT_TYPE_FILTER + " value \"" + next + "\". Must be in the form [ResourceType]?[params]"); throw new InvalidRequestException("Invalid " + JpaConstants.PARAM_EXPORT_TYPE_FILTER + " value \"" + next + "\". Must be in the form [ResourceType]?[params]");
} }
String resourceType = next.substring(0, next.indexOf("?")); String resourceType = next.substring(0, next.indexOf("?"));
if (!theResourceTypes.contains(resourceType)) { if (!theResourceTypes.contains(resourceType)) {
throw new InvalidRequestException("Invalid " + JpaConstants.PARAM_EXPORT_TYPE_FILTER + " value \"" + next + "\". Resource type does not appear in " + JpaConstants.PARAM_EXPORT_TYPE + " list"); throw new InvalidRequestException("Invalid " + JpaConstants.PARAM_EXPORT_TYPE_FILTER + " value \"" + next + "\". Resource type does not appear in " + JpaConstants.PARAM_EXPORT_TYPE+ " list");
}
if (!types.add(resourceType)) {
throw new InvalidRequestException("Invalid " + JpaConstants.PARAM_EXPORT_TYPE_FILTER + " value \"" + next + "\". Multiple filters found for type " + resourceType);
} }
} }
} }
@ -421,6 +458,35 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
return retVal; return retVal;
} }
@Override
public Set<String> getPatientCompartmentResources() {
if (myCompartmentResources == null) {
myCompartmentResources = myContext.getResourceTypes().stream()
.filter(this::resourceTypeIsInPatientCompartment)
.collect(Collectors.toSet());
}
return myCompartmentResources;
}
/**
* Return true if any search parameter in the resource can point at a patient, false otherwise
*/
private boolean resourceTypeIsInPatientCompartment(String theResourceType) {
RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(theResourceType);
List<RuntimeSearchParam> searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient");
return searchParams != null && searchParams.size() >= 1;
}
public Set<String> getAllowedResourceTypesForBulkExportStyle(BulkDataExportOptions.ExportStyle theExportStyle) {
if (theExportStyle.equals(SYSTEM)) {
return myContext.getResourceTypes();
} else if (theExportStyle.equals(GROUP) || theExportStyle.equals(PATIENT)) {
return getPatientCompartmentResources();
} else {
throw new IllegalArgumentException(String.format("HAPI FHIR does not recognize a Bulk Export request of type: %s", theExportStyle));
}
}
private IIdType toId(String theResourceId) { private IIdType toId(String theResourceId) {
IIdType retVal = myContext.getVersion().newIdType(); IIdType retVal = myContext.getVersion().newIdType();
retVal.setValue(theResourceId); retVal.setValue(theResourceId);

View File

@ -43,7 +43,7 @@ public interface IBulkExportJobDao extends JpaRepository<BulkExportJobEntity, Lo
@Query("SELECT j FROM BulkExportJobEntity j WHERE j.myExpiry < :cutoff") @Query("SELECT j FROM BulkExportJobEntity j WHERE j.myExpiry < :cutoff")
Slice<BulkExportJobEntity> findByExpiry(Pageable thePage, @Param("cutoff") Date theCutoff); Slice<BulkExportJobEntity> findByExpiry(Pageable thePage, @Param("cutoff") Date theCutoff);
@Query("SELECT j FROM BulkExportJobEntity j WHERE j.myRequest = :request AND j.myCreated > :createdAfter AND j.myStatus <> :status") @Query("SELECT j FROM BulkExportJobEntity j WHERE j.myRequest = :request AND j.myCreated > :createdAfter AND j.myStatus <> :status ORDER BY j.myCreated DESC")
Slice<BulkExportJobEntity> findExistingJob(Pageable thePage, @Param("request") String theRequest, @Param("createdAfter") Date theCreatedAfter, @Param("status") BulkJobStatusEnum theNotStatus); Slice<BulkExportJobEntity> findExistingJob(Pageable thePage, @Param("request") String theRequest, @Param("createdAfter") Date theCreatedAfter, @Param("status") BulkJobStatusEnum theNotStatus);
@Modifying @Modifying

View File

@ -3,7 +3,6 @@ package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum; import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions; import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.model.BulkExportResponseJson; import ca.uhn.fhir.jpa.bulk.model.BulkExportResponseJson;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum; import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
@ -17,6 +16,7 @@ import ca.uhn.fhir.test.utilities.JettyUtil;
import ca.uhn.fhir.util.JsonUtil; import ca.uhn.fhir.util.JsonUtil;
import ca.uhn.fhir.util.UrlUtil; import ca.uhn.fhir.util.UrlUtil;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.collect.Sets;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpGet;
@ -27,7 +27,6 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletHandler; import org.eclipse.jetty.servlet.ServletHandler;
import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlet.ServletHolder;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.IdType; import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.InstantType; import org.hl7.fhir.r4.model.InstantType;
import org.hl7.fhir.r4.model.Parameters; import org.hl7.fhir.r4.model.Parameters;
@ -57,6 +56,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.eq; import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -79,7 +79,7 @@ public class BulkDataExportProviderTest {
@Captor @Captor
private ArgumentCaptor<BulkDataExportOptions> myBulkDataExportOptionsCaptor; private ArgumentCaptor<BulkDataExportOptions> myBulkDataExportOptionsCaptor;
@Captor @Captor
private ArgumentCaptor<GroupBulkDataExportOptions> myGroupBulkDataExportOptionsCaptor; private ArgumentCaptor<Boolean> myBooleanArgumentCaptor;
@AfterEach @AfterEach
public void after() throws Exception { public void after() throws Exception {
@ -116,7 +116,7 @@ public class BulkDataExportProviderTest {
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo() IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo()
.setJobId(A_JOB_ID); .setJobId(A_JOB_ID);
when(myBulkDataExportSvc.submitJob(any())).thenReturn(jobInfo); when(myBulkDataExportSvc.submitJob(any(), any())).thenReturn(jobInfo);
InstantType now = InstantType.now(); InstantType now = InstantType.now();
@ -140,7 +140,7 @@ public class BulkDataExportProviderTest {
assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue()); assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
} }
verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture()); verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), any());
BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue(); BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue();
assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat()); assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat());
assertThat(options.getResourceTypes(), containsInAnyOrder("Patient", "Practitioner")); assertThat(options.getResourceTypes(), containsInAnyOrder("Patient", "Practitioner"));
@ -153,7 +153,7 @@ public class BulkDataExportProviderTest {
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo() IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo()
.setJobId(A_JOB_ID); .setJobId(A_JOB_ID);
when(myBulkDataExportSvc.submitJob(any())).thenReturn(jobInfo); when(myBulkDataExportSvc.submitJob(any(),any())).thenReturn(jobInfo);
InstantType now = InstantType.now(); InstantType now = InstantType.now();
@ -174,7 +174,7 @@ public class BulkDataExportProviderTest {
assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue()); assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
} }
verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture()); verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), any());
BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue(); BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue();
assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat()); assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat());
assertThat(options.getResourceTypes(), containsInAnyOrder("Patient", "Practitioner")); assertThat(options.getResourceTypes(), containsInAnyOrder("Patient", "Practitioner"));
@ -267,7 +267,6 @@ public class BulkDataExportProviderTest {
assertEquals("Patient", responseJson.getOutput().get(2).getType()); assertEquals("Patient", responseJson.getOutput().get(2).getType());
assertEquals("http://localhost:" + myPort + "/Binary/333", responseJson.getOutput().get(2).getUrl()); assertEquals("http://localhost:" + myPort + "/Binary/333", responseJson.getOutput().get(2).getUrl());
} }
} }
@Test @Test
@ -304,7 +303,8 @@ public class BulkDataExportProviderTest {
public void testSuccessfulInitiateGroupBulkRequest_Post() throws IOException { public void testSuccessfulInitiateGroupBulkRequest_Post() throws IOException {
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo().setJobId(G_JOB_ID); IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo().setJobId(G_JOB_ID);
when(myBulkDataExportSvc.submitJob(any())).thenReturn(jobInfo); when(myBulkDataExportSvc.submitJob(any(),any())).thenReturn(jobInfo);
when(myBulkDataExportSvc.getPatientCompartmentResources()).thenReturn(Sets.newHashSet("Observation", "DiagnosticReport"));
InstantType now = InstantType.now(); InstantType now = InstantType.now();
@ -330,13 +330,207 @@ public class BulkDataExportProviderTest {
assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + G_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue()); assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + G_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
} }
verify(myBulkDataExportSvc, times(1)).submitJob(myGroupBulkDataExportOptionsCaptor.capture()); verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), any());
GroupBulkDataExportOptions options = myGroupBulkDataExportOptionsCaptor.getValue(); BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue();
assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat()); assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat());
assertThat(options.getResourceTypes(), containsInAnyOrder("Observation", "DiagnosticReport")); assertThat(options.getResourceTypes(), containsInAnyOrder("Observation", "DiagnosticReport"));
assertThat(options.getSince(), notNullValue()); assertThat(options.getSince(), notNullValue());
assertThat(options.getFilters(), notNullValue()); assertThat(options.getFilters(), notNullValue());
assertEquals(GROUP_ID, options.getGroupId().getValue()); assertEquals(GROUP_ID, options.getGroupId().getValue());
assertThat(options.isMdm(), is(equalTo(true))); assertThat(options.isExpandMdm(), is(equalTo(true)));
} }
@Test
public void testSuccessfulInitiateGroupBulkRequest_Get() throws IOException {
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo().setJobId(G_JOB_ID);
when(myBulkDataExportSvc.submitJob(any(), any())).thenReturn(jobInfo);
when(myBulkDataExportSvc.getPatientCompartmentResources()).thenReturn(Sets.newHashSet("Patient", "Practitioner"));
InstantType now = InstantType.now();
String url = "http://localhost:" + myPort + "/" + GROUP_ID + "/" + JpaConstants.OPERATION_EXPORT
+ "?" + JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT + "=" + UrlUtil.escapeUrlParam(Constants.CT_FHIR_NDJSON)
+ "&" + JpaConstants.PARAM_EXPORT_TYPE + "=" + UrlUtil.escapeUrlParam("Patient, Practitioner")
+ "&" + JpaConstants.PARAM_EXPORT_SINCE + "=" + UrlUtil.escapeUrlParam(now.getValueAsString())
+ "&" + JpaConstants.PARAM_EXPORT_TYPE_FILTER + "=" + UrlUtil.escapeUrlParam("Patient?identifier=foo|bar")
+ "&" + JpaConstants.PARAM_EXPORT_MDM+ "=true";
HttpGet get = new HttpGet(url);
get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
ourLog.info("Request: {}", url);
try (CloseableHttpResponse response = myClient.execute(get)) {
ourLog.info("Response: {}", response.toString());
assertEquals(202, response.getStatusLine().getStatusCode());
assertEquals("Accepted", response.getStatusLine().getReasonPhrase());
assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + G_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
}
verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), any());
BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue();
assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat());
assertThat(options.getResourceTypes(), containsInAnyOrder("Patient", "Practitioner"));
assertThat(options.getSince(), notNullValue());
assertThat(options.getFilters(), notNullValue());
assertEquals(GROUP_ID, options.getGroupId().getValue());
assertThat(options.isExpandMdm(), is(equalTo(true)));
}
@Test
public void testInitiateWithGetAndMultipleTypeFilters() throws IOException {
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo()
.setJobId(A_JOB_ID);
when(myBulkDataExportSvc.submitJob(any())).thenReturn(jobInfo);
InstantType now = InstantType.now();
String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT
+ "?" + JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT + "=" + UrlUtil.escapeUrlParam(Constants.CT_FHIR_NDJSON)
+ "&" + JpaConstants.PARAM_EXPORT_TYPE + "=" + UrlUtil.escapeUrlParam("Immunization, Observation")
+ "&" + JpaConstants.PARAM_EXPORT_SINCE + "=" + UrlUtil.escapeUrlParam(now.getValueAsString());
String immunizationTypeFilter1 = "Immunization?patient.identifier=SC378274-MRN|009999997,SC378274-MRN|009999998,SC378274-MRN|009999999&date=2020-01-02";
String immunizationTypeFilter2 = "Immunization?patient=Patient/123";
String observationFilter1 = "Observation?subject=Patient/123&created=ge2020-01-01";
StringBuilder multiValuedTypeFilterBuilder = new StringBuilder()
.append("&")
.append(JpaConstants.PARAM_EXPORT_TYPE_FILTER)
.append("=")
.append(UrlUtil.escapeUrlParam(immunizationTypeFilter1))
.append(",")
.append(UrlUtil.escapeUrlParam(immunizationTypeFilter2))
.append(",")
.append(UrlUtil.escapeUrlParam(observationFilter1));
url += multiValuedTypeFilterBuilder.toString();
HttpGet get = new HttpGet(url);
get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
myClient.execute(get);
verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), anyBoolean());
BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue();
assertThat(options.getFilters(), containsInAnyOrder(immunizationTypeFilter1, immunizationTypeFilter2, observationFilter1));
}
@Test
public void testInitiateGroupExportWithInvalidResourceTypesFails() throws IOException {
when (myBulkDataExportSvc.getPatientCompartmentResources()).thenReturn(Sets.newHashSet("Observation"));
String url = "http://localhost:" + myPort + "/" + "Group/123/" +JpaConstants.OPERATION_EXPORT
+ "?" + JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT + "=" + UrlUtil.escapeUrlParam(Constants.CT_FHIR_NDJSON)
+ "&" + JpaConstants.PARAM_EXPORT_TYPE + "=" + UrlUtil.escapeUrlParam("StructureDefinition,Observation");
HttpGet get = new HttpGet(url);
get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
CloseableHttpResponse execute = myClient.execute(get);
String responseBody = IOUtils.toString(execute.getEntity().getContent());
assertThat(execute.getStatusLine().getStatusCode(), is(equalTo(400)));
assertThat(responseBody, is(containsString("Resource types [StructureDefinition] are invalid for this type of export, as they do not contain search parameters that refer to patients.")));
}
@Test
public void testInitiateWithPostAndMultipleTypeFilters() throws IOException {
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo()
.setJobId(A_JOB_ID);
when(myBulkDataExportSvc.submitJob(any(), any())).thenReturn(jobInfo);
InstantType now = InstantType.now();
Parameters input = new Parameters();
input.addParameter(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, new StringType(Constants.CT_FHIR_NDJSON));
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE, new StringType("Patient"));
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE_FILTER, new StringType("Patient?gender=male,Patient?gender=female"));
ourLog.info(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(input));
HttpPost post = new HttpPost("http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT);
post.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
post.setEntity(new ResourceEntity(myCtx, input));
ourLog.info("Request: {}", post);
try (CloseableHttpResponse response = myClient.execute(post)) {
ourLog.info("Response: {}", response.toString());
assertEquals(202, response.getStatusLine().getStatusCode());
assertEquals("Accepted", response.getStatusLine().getReasonPhrase());
assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
}
verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), myBooleanArgumentCaptor.capture());
BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue();
assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat());
assertThat(options.getResourceTypes(), containsInAnyOrder("Patient"));
assertThat(options.getFilters(), containsInAnyOrder("Patient?gender=male", "Patient?gender=female"));
}
@Test
public void testInitiatePatientExportRequest() throws IOException {
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo()
.setJobId(A_JOB_ID);
when(myBulkDataExportSvc.submitJob(any(), any())).thenReturn(jobInfo);
when(myBulkDataExportSvc.getPatientCompartmentResources()).thenReturn(Sets.newHashSet("Immunization", "Observation"));
InstantType now = InstantType.now();
Parameters input = new Parameters();
input.addParameter(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, new StringType(Constants.CT_FHIR_NDJSON));
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE, new StringType("Immunization, Observation"));
input.addParameter(JpaConstants.PARAM_EXPORT_SINCE, now);
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE_FILTER, new StringType("Immunization?vaccine-code=foo"));
ourLog.info(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(input));
HttpPost post = new HttpPost("http://localhost:" + myPort + "/Patient/" + JpaConstants.OPERATION_EXPORT);
post.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
post.setEntity(new ResourceEntity(myCtx, input));
ourLog.info("Request: {}", post);
try (CloseableHttpResponse response = myClient.execute(post)) {
ourLog.info("Response: {}", response.toString());
assertEquals(202, response.getStatusLine().getStatusCode());
assertEquals("Accepted", response.getStatusLine().getReasonPhrase());
assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
}
verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), myBooleanArgumentCaptor.capture());
BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue();
assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat());
assertThat(options.getResourceTypes(), containsInAnyOrder("Immunization", "Observation"));
assertThat(options.getSince(), notNullValue());
assertThat(options.getFilters(), containsInAnyOrder("Immunization?vaccine-code=foo"));
}
@Test
public void testProviderProcessesNoCacheHeader() throws IOException {
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo()
.setJobId(A_JOB_ID);
when(myBulkDataExportSvc.submitJob(any(), anyBoolean())).thenReturn(jobInfo);
Parameters input = new Parameters();
input.addParameter(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, new StringType(Constants.CT_FHIR_NDJSON));
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE, new StringType("Patient, Practitioner"));
HttpPost post = new HttpPost("http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT);
post.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
post.addHeader(Constants.HEADER_CACHE_CONTROL, Constants.CACHE_CONTROL_NO_CACHE);
post.setEntity(new ResourceEntity(myCtx, input));
ourLog.info("Request: {}", post);
try (CloseableHttpResponse response = myClient.execute(post)) {
ourLog.info("Response: {}", response.toString());
assertEquals(202, response.getStatusLine().getStatusCode());
assertEquals("Accepted", response.getStatusLine().getReasonPhrase());
assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
}
verify(myBulkDataExportSvc).submitJob(myBulkDataExportOptionsCaptor.capture(), myBooleanArgumentCaptor.capture());
Boolean usedCache = myBooleanArgumentCaptor.getValue();
assertThat(usedCache, is(equalTo(false)));
}
} }

View File

@ -2,11 +2,11 @@ package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor; import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome; import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.batch.BatchJobsConfig; import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions; import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.job.BulkExportJobParametersBuilder; import ca.uhn.fhir.jpa.bulk.job.BulkExportJobParametersBuilder;
import ca.uhn.fhir.jpa.bulk.job.GroupBulkExportJobParametersBuilder; import ca.uhn.fhir.jpa.bulk.job.GroupBulkExportJobParametersBuilder;
@ -61,6 +61,7 @@ import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.awaitility.Awaitility.await; import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
@ -70,6 +71,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@ -97,8 +99,20 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Qualifier(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME) @Qualifier(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME)
private Job myGroupBulkJob; private Job myGroupBulkJob;
@Autowired
@Qualifier(BatchJobsConfig.PATIENT_BULK_EXPORT_JOB_NAME)
private Job myPatientBulkJob;
private IIdType myPatientGroupId; private IIdType myPatientGroupId;
@Override
public void beforeFlushFT() {
super.beforeFlushFT();
//This is needed for patient level export.
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED);
}
@Test @Test
public void testPurgeExpiredJobs() { public void testPurgeExpiredJobs() {
@ -155,7 +169,11 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test @Test
public void testSubmit_InvalidOutputFormat() { public void testSubmit_InvalidOutputFormat() {
try { try {
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Patient", "Observation"), null, null)); BulkDataExportOptions options = new BulkDataExportOptions();
options.setOutputFormat(Constants.CT_FHIR_JSON_NEW);
options.setResourceTypes(Sets.newHashSet("Patient", "Observation"));
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
myBulkDataExportSvc.submitJob(options);
fail(); fail();
} catch (InvalidRequestException e) { } catch (InvalidRequestException e) {
assertEquals("Invalid output format: application/fhir+json", e.getMessage()); assertEquals("Invalid output format: application/fhir+json", e.getMessage());
@ -165,37 +183,34 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test @Test
public void testSubmit_OnlyBinarySelected() { public void testSubmit_OnlyBinarySelected() {
try { try {
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Binary"), null, null)); BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Binary"));
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
myBulkDataExportSvc.submitJob(options);
fail(); fail();
} catch (InvalidRequestException e) { } catch (InvalidRequestException e) {
assertEquals("Invalid output format: application/fhir+json", e.getMessage()); assertEquals("Binary resources may not be exported with bulk export", e.getMessage());
} }
} }
@Test @Test
public void testSubmit_InvalidResourceTypes() { public void testSubmit_InvalidResourceTypes() {
try { try {
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient", "FOO"), null, null)); myBulkDataExportSvc.submitJob(buildBulkDataForResourceTypes(Sets.newHashSet("Patient", "FOO")));
fail(); fail();
} catch (InvalidRequestException e) { } catch (InvalidRequestException e) {
assertEquals("Unknown or unsupported resource type: FOO", e.getMessage()); assertEquals("Unknown or unsupported resource type: FOO", e.getMessage());
} }
} }
@Test
public void testSubmit_MultipleTypeFiltersForSameType() {
try {
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Patient?name=a", "Patient?active=true")));
fail();
} catch (InvalidRequestException e) {
assertEquals("Invalid _typeFilter value \"Patient?name=a\". Multiple filters found for type Patient", e.getMessage());
}
}
@Test @Test
public void testSubmit_TypeFilterForNonSelectedType() { public void testSubmit_TypeFilterForNonSelectedType() {
try { try {
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Observation?code=123"))); BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Patient"));
options.setFilters(Sets.newHashSet("Observation?code=123"));
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
myBulkDataExportSvc.submitJob(options);
fail(); fail();
} catch (InvalidRequestException e) { } catch (InvalidRequestException e) {
assertEquals("Invalid _typeFilter value \"Observation?code=123\". Resource type does not appear in _type list", e.getMessage()); assertEquals("Invalid _typeFilter value \"Observation?code=123\". Resource type does not appear in _type list", e.getMessage());
@ -205,22 +220,32 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test @Test
public void testSubmit_TypeFilterInvalid() { public void testSubmit_TypeFilterInvalid() {
try { try {
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Hello"))); BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Patient"));
options.setFilters(Sets.newHashSet("Hello"));
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
myBulkDataExportSvc.submitJob(options);
fail(); fail();
} catch (InvalidRequestException e) { } catch (InvalidRequestException e) {
assertEquals("Invalid _typeFilter value \"Hello\". Must be in the form [ResourceType]?[params]", e.getMessage()); assertEquals("Invalid _typeFilter value \"Hello\". Must be in the form [ResourceType]?[params]", e.getMessage());
} }
} }
private BulkDataExportOptions buildBulkDataForResourceTypes(Set<String> resourceTypes) {
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(resourceTypes);
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
return options;
}
@Test @Test
public void testSubmit_ReusesExisting() { public void testSubmit_ReusesExisting() {
// Submit // Submit
IBulkDataExportSvc.JobInfo jobDetails1 = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null)); IBulkDataExportSvc.JobInfo jobDetails1 = myBulkDataExportSvc.submitJob(buildBulkDataForResourceTypes(Sets.newHashSet("Patient", "Observation")));
assertNotNull(jobDetails1.getJobId()); assertNotNull(jobDetails1.getJobId());
// Submit again // Submit again
IBulkDataExportSvc.JobInfo jobDetails2 = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null)); IBulkDataExportSvc.JobInfo jobDetails2 = myBulkDataExportSvc.submitJob(buildBulkDataForResourceTypes(Sets.newHashSet("Patient", "Observation")));
assertNotNull(jobDetails2.getJobId()); assertNotNull(jobDetails2.getJobId());
assertEquals(jobDetails1.getJobId(), jobDetails2.getJobId()); assertEquals(jobDetails1.getJobId(), jobDetails2.getJobId());
@ -241,7 +266,10 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createResources(); createResources();
// Create a bulk job // Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient"), null, null)); BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Patient"));
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
assertNotNull(jobDetails.getJobId()); assertNotNull(jobDetails.getJobId());
// Check the status // Check the status
@ -271,7 +299,12 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createResources(); createResources();
// Create a bulk job // Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, Sets.newHashSet(TEST_FILTER))); BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Patient", "Observation"));
options.setFilters(Sets.newHashSet(TEST_FILTER));
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options);
assertNotNull(jobDetails.getJobId()); assertNotNull(jobDetails.getJobId());
// Check the status // Check the status
@ -300,7 +333,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertEquals(7, nextContents.split("\n").length); // Only female patients assertEquals(7, nextContents.split("\n").length); // Only female patients
} else if ("Observation".equals(next.getResourceType())) { } else if ("Observation".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"subject\":{\"reference\":\"Patient/PAT0\"}}\n")); assertThat(nextContents, containsString("\"subject\":{\"reference\":\"Patient/PAT0\"}}\n"));
assertEquals(16, nextContents.split("\n").length); assertEquals(26, nextContents.split("\n").length);
} else { } else {
fail(next.getResourceType()); fail(next.getResourceType());
} }
@ -324,7 +357,9 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
myBinaryDao.create(b); myBinaryDao.create(b);
// Create a bulk job // Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, null, null, null)); BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
assertNotNull(jobDetails.getJobId()); assertNotNull(jobDetails.getJobId());
// Check the status // Check the status
@ -353,10 +388,10 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertEquals(17, nextContents.split("\n").length); assertEquals(17, nextContents.split("\n").length);
} else if ("Observation".equals(next.getResourceType())) { } else if ("Observation".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"subject\":{\"reference\":\"Patient/PAT0\"}}\n")); assertThat(nextContents, containsString("\"subject\":{\"reference\":\"Patient/PAT0\"}}\n"));
assertEquals(16, nextContents.split("\n").length); assertEquals(26, nextContents.split("\n").length);
}else if ("Immunization".equals(next.getResourceType())) { }else if ("Immunization".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"patient\":{\"reference\":\"Patient/PAT0\"}}\n")); assertThat(nextContents, containsString("\"patient\":{\"reference\":\"Patient/PAT0\"}}\n"));
assertEquals(16, nextContents.split("\n").length); assertEquals(26, nextContents.split("\n").length);
} else if ("CareTeam".equals(next.getResourceType())) { } else if ("CareTeam".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"id\":\"CT0\"")); assertThat(nextContents, containsString("\"id\":\"CT0\""));
assertEquals(16, nextContents.split("\n").length); assertEquals(16, nextContents.split("\n").length);
@ -378,7 +413,11 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
// Create a bulk job // Create a bulk job
HashSet<String> types = Sets.newHashSet("Patient"); HashSet<String> types = Sets.newHashSet("Patient");
Set<String> typeFilters = Sets.newHashSet("Patient?_has:Observation:patient:identifier=SYS|VAL3"); Set<String> typeFilters = Sets.newHashSet("Patient?_has:Observation:patient:identifier=SYS|VAL3");
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, types, null, typeFilters)); BulkDataExportOptions options = new BulkDataExportOptions();
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
options.setResourceTypes(types);
options.setFilters(typeFilters);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options);
assertNotNull(jobDetails.getJobId()); assertNotNull(jobDetails.getJobId());
// Check the status // Check the status
@ -430,7 +469,12 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
} }
// Create a bulk job // Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), cutoff.getValue(), null)); BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Patient", "Observation"));
options.setSince(cutoff.getValue());
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options);
assertNotNull(jobDetails.getJobId()); assertNotNull(jobDetails.getJobId());
// Check the status // Check the status
@ -478,6 +522,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
paramBuilder paramBuilder
.setReadChunkSize(100L) .setReadChunkSize(100L)
.setOutputFormat(Constants.CT_FHIR_NDJSON) .setOutputFormat(Constants.CT_FHIR_NDJSON)
.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM)
.setResourceTypes(Arrays.asList("Patient", "Observation")); .setResourceTypes(Arrays.asList("Patient", "Observation"));
JobExecution jobExecution = myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters()); JobExecution jobExecution = myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
@ -492,6 +537,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
public void awaitAllBulkJobCompletions() { public void awaitAllBulkJobCompletions() {
List<JobInstance> bulkExport = myJobExplorer.findJobInstancesByJobName(BatchJobsConfig.BULK_EXPORT_JOB_NAME, 0, 100); List<JobInstance> bulkExport = myJobExplorer.findJobInstancesByJobName(BatchJobsConfig.BULK_EXPORT_JOB_NAME, 0, 100);
bulkExport.addAll(myJobExplorer.findJobInstancesByJobName(BatchJobsConfig.PATIENT_BULK_EXPORT_JOB_NAME, 0, 100));
bulkExport.addAll(myJobExplorer.findJobInstancesByJobName(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME, 0, 100)); bulkExport.addAll(myJobExplorer.findJobInstancesByJobName(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME, 0, 100));
if (bulkExport.isEmpty()) { if (bulkExport.isEmpty()) {
fail("There are no bulk export jobs running!"); fail("There are no bulk export jobs running!");
@ -509,7 +555,10 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createResources(); createResources();
// Create a bulk job // Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null)); BulkDataExportOptions options = new BulkDataExportOptions();
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
options.setResourceTypes(Sets.newHashSet("Patient", "Observation"));
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options);
//Add the UUID to the job //Add the UUID to the job
BulkExportJobParametersBuilder paramBuilder = new BulkExportJobParametersBuilder() BulkExportJobParametersBuilder paramBuilder = new BulkExportJobParametersBuilder()
@ -531,17 +580,20 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createResources(); createResources();
// Create a bulk job // Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("Immunization"), null, null, myPatientGroupId, true)); BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setOutputFormat(null);
bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Immunization"));
bulkDataExportOptions.setSince(null);
bulkDataExportOptions.setFilters(null);
bulkDataExportOptions.setGroupId(myPatientGroupId);
bulkDataExportOptions.setExpandMdm(true);
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
GroupBulkExportJobParametersBuilder paramBuilder = new GroupBulkExportJobParametersBuilder(); myBulkDataExportSvc.buildExportFiles();
paramBuilder.setGroupId(myPatientGroupId.getIdPart()); awaitAllBulkJobCompletions();
paramBuilder.setJobUUID(jobDetails.getJobId());
paramBuilder.setReadChunkSize(10L);
JobExecution jobExecution = myBatchJobSubmitter.runJob(myGroupBulkJob, paramBuilder.toJobParameters());
awaitJobCompletion(jobExecution);
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE)); assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
@ -549,10 +601,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
// Iterate over the files // Iterate over the files
Binary nextBinary = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId()); String nextContents = getBinaryContents(jobInfo, 0);
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
assertThat(nextContents, is(containsString("IMM0"))); assertThat(nextContents, is(containsString("IMM0")));
@ -562,33 +611,83 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(nextContents, is(containsString("IMM8"))); assertThat(nextContents, is(containsString("IMM8")));
} }
// CareTeam has two patient references: participant and patient. This test checks if we find the patient if participant is null but patient is not null
@Test @Test
public void testGroupBatchJobCareTeam() throws Exception { public void testPatientLevelExportWorks() throws JobParametersInvalidException {
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED);
createResources(); createResources();
// Create a bulk job // Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("CareTeam"), null, null, myPatientGroupId, true)); BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Immunization", "Observation"));
options.setExportStyle(BulkDataExportOptions.ExportStyle.PATIENT);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options);
GroupBulkExportJobParametersBuilder paramBuilder = new GroupBulkExportJobParametersBuilder(); GroupBulkExportJobParametersBuilder paramBuilder = new GroupBulkExportJobParametersBuilder();
paramBuilder.setGroupId(myPatientGroupId.getIdPart()); paramBuilder.setGroupId(myPatientGroupId.getIdPart());
paramBuilder.setJobUUID(jobDetails.getJobId()); paramBuilder.setJobUUID(jobDetails.getJobId());
paramBuilder.setReadChunkSize(10L); paramBuilder.setReadChunkSize(10L);
JobExecution jobExecution = myBatchJobSubmitter.runJob(myGroupBulkJob, paramBuilder.toJobParameters()); JobExecution jobExecution = myBatchJobSubmitter.runJob(myPatientBulkJob, paramBuilder.toJobParameters());
awaitJobCompletion(jobExecution); awaitJobCompletion(jobExecution);
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(2));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
// Iterate over the files
String nextContents = getBinaryContents(jobInfo, 0);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
assertThat(nextContents, is(containsString("IMM0")));
assertThat(nextContents, is(containsString("IMM1")));
assertThat(nextContents, is(containsString("IMM2")));
assertThat(nextContents, is(containsString("IMM3")));
assertThat(nextContents, is(containsString("IMM4")));
assertThat(nextContents, is(containsString("IMM5")));
assertThat(nextContents, is(containsString("IMM6")));
assertThat(nextContents, is(containsString("IMM7")));
assertThat(nextContents, is(containsString("IMM8")));
assertThat(nextContents, is(containsString("IMM9")));
assertThat(nextContents, is(containsString("IMM999")));
assertThat(nextContents, is(not(containsString("IMM2000"))));
assertThat(nextContents, is(not(containsString("IMM2001"))));
assertThat(nextContents, is(not(containsString("IMM2002"))));
assertThat(nextContents, is(not(containsString("IMM2003"))));
assertThat(nextContents, is(not(containsString("IMM2004"))));
assertThat(nextContents, is(not(containsString("IMM2005"))));
}
// CareTeam has two patient references: participant and patient. This test checks if we find the patient if participant is null but patient is not null
@Test
public void testGroupBatchJobCareTeam() throws Exception {
createResources();
BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setOutputFormat(null);
bulkDataExportOptions.setResourceTypes(Sets.newHashSet("CareTeam"));
bulkDataExportOptions.setSince(null);
bulkDataExportOptions.setFilters(null);
bulkDataExportOptions.setGroupId(myPatientGroupId);
bulkDataExportOptions.setExpandMdm(true);
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE)); assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(1)); assertThat(jobInfo.getFiles().size(), equalTo(1));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("CareTeam"))); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("CareTeam")));
// Iterate over the files // Iterate over the files
Binary nextBinary = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId()); String nextContents = getBinaryContents(jobInfo, 0);
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("CareTeam"))); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("CareTeam")));
assertThat(nextContents, is(containsString("CT0"))); assertThat(nextContents, is(containsString("CT0")));
@ -604,19 +703,102 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
JobParametersBuilder paramBuilder = new JobParametersBuilder().addString("jobUUID", "I'm not real!"); JobParametersBuilder paramBuilder = new JobParametersBuilder().addString("jobUUID", "I'm not real!");
try { try {
myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters()); myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
fail("Should have had invalid parameter execption!"); fail("Should have had invalid parameter exception!");
} catch (JobParametersInvalidException e) { } catch (JobParametersInvalidException e) {
// good // good
} }
} }
@Test
public void testSystemExportWithMultipleTypeFilters() {
createResources();
// Create a bulk job
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Immunization"));
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
options.setFilters(Sets.newHashSet("Immunization?vaccine-code=Flu", "Immunization?patient=Patient/PAT1"));
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options);
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(1));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
// Iterate over the files
String nextContents = getBinaryContents(jobInfo, 0);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
//These are the COVID-19 entries
assertThat(nextContents, is(containsString("IMM0")));
assertThat(nextContents, is(containsString("IMM2")));
assertThat(nextContents, is(containsString("IMM4")));
assertThat(nextContents, is(containsString("IMM6")));
assertThat(nextContents, is(containsString("IMM8")));
//This is the entry for the one referencing patient/1
assertThat(nextContents, is(containsString("IMM1")));
}
@Test
public void testGroupExportWithMultipleTypeFilters() {
createResources();
// Create a bulk job
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Observation"));
options.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
options.setGroupId(myPatientGroupId);
options.setExpandMdm(false);
options.setFilters(Sets.newHashSet("Observation?identifier=VAL0,VAL2", "Observation?identifier=VAL4"));
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options);
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(1));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Observation")));
String nextContents = getBinaryContents(jobInfo, 0);
//These are the Observation entries
assertThat(nextContents, is(containsString("OBS0")));
assertThat(nextContents, is(containsString("OBS2")));
assertThat(nextContents, is(containsString("OBS4")));
assertEquals(3, nextContents.split("\n").length);
}
public String getBinaryContents(IBulkDataExportSvc.JobInfo theJobInfo, int theIndex) {
// Iterate over the files
Binary nextBinary = myBinaryDao.read(theJobInfo.getFiles().get(theIndex).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents);
return nextContents;
}
@Test @Test
public void testMdmExpansionSuccessfullyExtractsPatients() throws JobParametersInvalidException { public void testMdmExpansionSuccessfullyExtractsPatients() throws JobParametersInvalidException {
createResources(); createResources();
// Create a bulk job // Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("Patient"), null, null, myPatientGroupId, true)); BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setOutputFormat(null);
bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Patient"));
bulkDataExportOptions.setSince(null);
bulkDataExportOptions.setFilters(null);
bulkDataExportOptions.setGroupId(myPatientGroupId);
bulkDataExportOptions.setExpandMdm(true);
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles(); myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions(); awaitAllBulkJobCompletions();
@ -626,10 +808,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(jobInfo.getFiles().size(), equalTo(1)); assertThat(jobInfo.getFiles().size(), equalTo(1));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Patient"))); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Patient")));
Binary patientExportContent = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId()); String nextContents = getBinaryContents(jobInfo, 0);
assertEquals(Constants.CT_FHIR_NDJSON, patientExportContent.getContentType());
String nextContents = new String(patientExportContent.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", patientExportContent.getResourceType(), nextContents);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Patient"))); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Patient")));
//Output contains The entire group, plus the Mdm expansion, plus the golden resource //Output contains The entire group, plus the Mdm expansion, plus the golden resource
@ -641,23 +820,28 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createResources(); createResources();
// Create a bulk job // Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("Immunization", "Observation"), null, null, myPatientGroupId, true)); BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setOutputFormat(null);
bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Immunization", "Observation"));
bulkDataExportOptions.setSince(null);
bulkDataExportOptions.setFilters(null);
bulkDataExportOptions.setGroupId(myPatientGroupId);
bulkDataExportOptions.setExpandMdm(true);
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles(); myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions(); awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Observation,Immunization&_groupId=" + myPatientGroupId +"&_mdm=true", jobInfo.getRequest()); assertEquals("/Group/G0/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Observation,Immunization&_groupId=" + myPatientGroupId +"&_mdm=true", jobInfo.getRequest());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE)); assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(2)); assertThat(jobInfo.getFiles().size(), equalTo(2));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
// Check immunization Content // Check immunization Content
Binary immunizationExportContent = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId()); String nextContents = getBinaryContents(jobInfo, 0);
assertEquals(Constants.CT_FHIR_NDJSON, immunizationExportContent.getContentType());
String nextContents = new String(immunizationExportContent.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", immunizationExportContent.getResourceType(), nextContents);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
assertThat(nextContents, is(containsString("IMM0"))); assertThat(nextContents, is(containsString("IMM0")));
assertThat(nextContents, is(containsString("IMM2"))); assertThat(nextContents, is(containsString("IMM2")));
@ -697,23 +881,24 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test @Test
public void testGroupBulkExportSupportsTypeFilters() throws JobParametersInvalidException { public void testGroupBulkExportSupportsTypeFilters() throws JobParametersInvalidException {
createResources(); createResources();
Set<String> filters = new HashSet<>();
//Only get COVID-19 vaccinations //Only get COVID-19 vaccinations
Set<String> filters = new HashSet<>();
filters.add("Immunization?vaccine-code=vaccines|COVID-19"); filters.add("Immunization?vaccine-code=vaccines|COVID-19");
GroupBulkDataExportOptions groupBulkDataExportOptions = new GroupBulkDataExportOptions(null, Sets.newHashSet("Immunization"), null, filters, myPatientGroupId, true ); BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(groupBulkDataExportOptions); bulkDataExportOptions.setOutputFormat(null);
bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Immunization"));
bulkDataExportOptions.setSince(null);
bulkDataExportOptions.setFilters(filters);
bulkDataExportOptions.setGroupId(myPatientGroupId);
bulkDataExportOptions.setExpandMdm(true);
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
GroupBulkExportJobParametersBuilder paramBuilder = new GroupBulkExportJobParametersBuilder(); myBulkDataExportSvc.buildExportFiles();
paramBuilder.setGroupId(myPatientGroupId.getIdPart()); awaitAllBulkJobCompletions();
paramBuilder.setMdm(true);
paramBuilder.setJobUUID(jobDetails.getJobId());
paramBuilder.setReadChunkSize(10L);
JobExecution jobExecution = myBatchJobSubmitter.runJob(myGroupBulkJob, paramBuilder.toJobParameters());
awaitJobCompletion(jobExecution);
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE)); assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
@ -721,12 +906,8 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
// Check immunization Content // Check immunization Content
Binary immunizationExportContent = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId()); String nextContents = getBinaryContents(jobInfo, 0);
assertEquals(Constants.CT_FHIR_NDJSON, immunizationExportContent.getContentType());
String nextContents = new String(immunizationExportContent.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", immunizationExportContent.getResourceType(), nextContents);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
assertThat(nextContents, is(containsString("IMM1"))); assertThat(nextContents, is(containsString("IMM1")));
assertThat(nextContents, is(containsString("IMM3"))); assertThat(nextContents, is(containsString("IMM3")));
assertThat(nextContents, is(containsString("IMM5"))); assertThat(nextContents, is(containsString("IMM5")));
@ -737,6 +918,82 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(nextContents, is(not(containsString("Flu")))); assertThat(nextContents, is(not(containsString("Flu"))));
} }
@Test
public void testAllExportStylesWorkWithNullResourceTypes() {
createResources();
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED);
// Create a bulk job
BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setOutputFormat(null);
bulkDataExportOptions.setResourceTypes(null);
bulkDataExportOptions.setSince(null);
bulkDataExportOptions.setFilters(null);
bulkDataExportOptions.setGroupId(myPatientGroupId);
bulkDataExportOptions.setExpandMdm(true);
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.PATIENT);
//Patient-style
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), is(equalTo(BulkJobStatusEnum.COMPLETE)));
//Group-style
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
bulkDataExportOptions.setGroupId(myPatientGroupId);
jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), is(equalTo(BulkJobStatusEnum.COMPLETE)));
//System-style
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), is(equalTo(BulkJobStatusEnum.COMPLETE)));
}
@Test
public void testCacheSettingIsRespectedWhenCreatingNewJobs() {
BulkDataExportOptions options = new BulkDataExportOptions();
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
options.setResourceTypes(Sets.newHashSet("Procedure"));
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.submitJob(options, true);
IBulkDataExportSvc.JobInfo jobInfo1 = myBulkDataExportSvc.submitJob(options, true);
IBulkDataExportSvc.JobInfo jobInfo2 = myBulkDataExportSvc.submitJob(options, true);
IBulkDataExportSvc.JobInfo jobInfo3 = myBulkDataExportSvc.submitJob(options, true);
IBulkDataExportSvc.JobInfo jobInfo4 = myBulkDataExportSvc.submitJob(options, true);
//Cached should have all identical Job IDs.
String initialJobId = jobInfo.getJobId();
boolean allMatch = Stream.of(jobInfo, jobInfo1, jobInfo2, jobInfo3, jobInfo4).allMatch(job -> job.getJobId().equals(initialJobId));
assertTrue(allMatch);
IBulkDataExportSvc.JobInfo jobInfo5 = myBulkDataExportSvc.submitJob(options, false);
IBulkDataExportSvc.JobInfo jobInfo6 = myBulkDataExportSvc.submitJob(options, false);
IBulkDataExportSvc.JobInfo jobInfo7 = myBulkDataExportSvc.submitJob(options, false);
IBulkDataExportSvc.JobInfo jobInfo8 = myBulkDataExportSvc.submitJob(options, false);
IBulkDataExportSvc.JobInfo jobInfo9 = myBulkDataExportSvc.submitJob(options, false);
//First non-cached should retrieve new ID.
assertThat(initialJobId, is(not(equalTo(jobInfo5.getJobId()))));
//Non-cached should all have unique IDs
List<String> jobIds = Stream.of(jobInfo5, jobInfo6, jobInfo7, jobInfo8, jobInfo9).map(IBulkDataExportSvc.JobInfo::getJobId).collect(Collectors.toList());
ourLog.info("ZOOP {}", String.join(", ", jobIds));
Set<String> uniqueJobIds = new HashSet<>(jobIds);
assertEquals(uniqueJobIds.size(), jobIds.size());
//Now if we create another one and ask for the cache, we should get the most-recently-insert entry.
IBulkDataExportSvc.JobInfo jobInfo10 = myBulkDataExportSvc.submitJob(options, true);
assertThat(jobInfo10.getJobId(), is(equalTo(jobInfo9.getJobId())));
}
private void awaitJobCompletion(JobExecution theJobExecution) { private void awaitJobCompletion(JobExecution theJobExecution) {
await().atMost(120, TimeUnit.SECONDS).until(() -> { await().atMost(120, TimeUnit.SECONDS).until(() -> {
JobExecution jobExecution = myJobExplorer.getJobExecution(theJobExecution.getId()); JobExecution jobExecution = myJobExplorer.getJobExecution(theJobExecution.getId());
@ -760,8 +1017,6 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createImmunizationWithIndex(999, g1Outcome.getId()); createImmunizationWithIndex(999, g1Outcome.getId());
createCareTeamWithIndex(999, g1Outcome.getId()); createCareTeamWithIndex(999, g1Outcome.getId());
//Lets create an observation and an immunization for our golden patient.
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
DaoMethodOutcome patientOutcome = createPatientWithIndex(i); DaoMethodOutcome patientOutcome = createPatientWithIndex(i);
IIdType patId = patientOutcome.getId().toUnqualifiedVersionless(); IIdType patId = patientOutcome.getId().toUnqualifiedVersionless();
@ -780,6 +1035,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createImmunizationWithIndex(i, patId); createImmunizationWithIndex(i, patId);
createCareTeamWithIndex(i, patId); createCareTeamWithIndex(i, patId);
} }
myPatientGroupId = myGroupDao.update(group).getId(); myPatientGroupId = myGroupDao.update(group).getId();
//Manually create another golden record //Manually create another golden record
@ -789,14 +1045,22 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
Long goldenPid2 = myIdHelperService.getPidOrNull(g2Outcome.getResource()); Long goldenPid2 = myIdHelperService.getPidOrNull(g2Outcome.getResource());
//Create some nongroup patients MDM linked to a different golden resource. They shouldnt be included in the query. //Create some nongroup patients MDM linked to a different golden resource. They shouldnt be included in the query.
for (int i = 1000; i < 1005; i++) { for (int i = 0; i < 5; i++) {
DaoMethodOutcome patientOutcome = createPatientWithIndex(i); int index = 1000 + i;
DaoMethodOutcome patientOutcome = createPatientWithIndex(index);
IIdType patId = patientOutcome.getId().toUnqualifiedVersionless(); IIdType patId = patientOutcome.getId().toUnqualifiedVersionless();
Long sourcePid = myIdHelperService.getPidOrNull(patientOutcome.getResource()); Long sourcePid = myIdHelperService.getPidOrNull(patientOutcome.getResource());
linkToGoldenResource(goldenPid2, sourcePid); linkToGoldenResource(goldenPid2, sourcePid);
createObservationWithIndex(i, patId); createObservationWithIndex(index, patId);
createImmunizationWithIndex(i, patId); createImmunizationWithIndex(index, patId);
createCareTeamWithIndex(i, patId); createCareTeamWithIndex(index, patId);
}
//Create some Observations and immunizations which have _no subjects!_ These will be exlucded from the Patient level export.
for (int i = 0; i < 10; i++) {
int index = 2000 + i;
createObservationWithIndex(index, null);
createImmunizationWithIndex(index, null);
} }
} }
@ -820,7 +1084,9 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
private void createImmunizationWithIndex(int i, IIdType patId) { private void createImmunizationWithIndex(int i, IIdType patId) {
Immunization immunization = new Immunization(); Immunization immunization = new Immunization();
immunization.setId("IMM" + i); immunization.setId("IMM" + i);
immunization.setPatient(new Reference(patId)); if (patId != null ) {
immunization.setPatient(new Reference(patId));
}
if (i % 2 == 0) { if (i % 2 == 0) {
CodeableConcept cc = new CodeableConcept(); CodeableConcept cc = new CodeableConcept();
cc.addCoding().setSystem("vaccines").setCode("Flu"); cc.addCoding().setSystem("vaccines").setCode("Flu");
@ -838,7 +1104,9 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
obs.setId("OBS" + i); obs.setId("OBS" + i);
obs.addIdentifier().setSystem("SYS").setValue("VAL" + i); obs.addIdentifier().setSystem("SYS").setValue("VAL" + i);
obs.setStatus(Observation.ObservationStatus.FINAL); obs.setStatus(Observation.ObservationStatus.FINAL);
obs.getSubject().setReference(patId.getValue()); if (patId != null) {
obs.getSubject().setReference(patId.getValue());
}
myObservationDao.update(obs); myObservationDao.update(obs);
} }

View File

@ -162,6 +162,7 @@ import org.hl7.fhir.r4.model.ValueSet;
import org.hl7.fhir.r5.utils.IResourceValidator; import org.hl7.fhir.r5.utils.IResourceValidator;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -539,7 +540,6 @@ public abstract class BaseJpaR4Test extends BaseJpaTest implements ITestDataBuil
runInTransaction(() -> { runInTransaction(() -> {
SearchSession searchSession = Search.session(myEntityManager); SearchSession searchSession = Search.session(myEntityManager);
searchSession.workspace(ResourceTable.class).purge(); searchSession.workspace(ResourceTable.class).purge();
// searchSession.workspace(ResourceIndexedSearchParamString.class).purge();
searchSession.indexingPlan().execute(); searchSession.indexingPlan().execute();
}); });

View File

@ -333,6 +333,32 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
} }
@Test
public void testSearchWithDateInvalid() throws IOException {
HttpGet get = new HttpGet(ourServerBase + "/Condition?onset-date=junk");
try (CloseableHttpResponse resp = ourHttpClient.execute(get)) {
String output = IOUtils.toString(resp.getEntity().getContent(), Charsets.UTF_8);
assertThat(output, containsString("Invalid date/time format: &quot;junk&quot;"));
assertEquals(400, resp.getStatusLine().getStatusCode());
}
get = new HttpGet(ourServerBase + "/Condition?onset-date=ge");
try (CloseableHttpResponse resp = ourHttpClient.execute(get)) {
String output = IOUtils.toString(resp.getEntity().getContent(), Charsets.UTF_8);
assertThat(output, containsString("Invalid date/time format: &quot;ge&quot;"));
assertEquals(400, resp.getStatusLine().getStatusCode());
}
get = new HttpGet(ourServerBase + "/Condition?onset-date=" + UrlUtil.escapeUrlParam(">"));
try (CloseableHttpResponse resp = ourHttpClient.execute(get)) {
String output = IOUtils.toString(resp.getEntity().getContent(), Charsets.UTF_8);
assertThat(output, containsString("Invalid date/time format: &quot;&gt;&quot;"));
assertEquals(400, resp.getStatusLine().getStatusCode());
}
}
@Test @Test
public void testSearchWithSlashes() { public void testSearchWithSlashes() {
myDaoConfig.setSearchPreFetchThresholds(Lists.newArrayList(10, 50, 10000)); myDaoConfig.setSearchPreFetchThresholds(Lists.newArrayList(10, 50, 10000));