diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/rest/param/BaseParamWithPrefix.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/rest/param/BaseParamWithPrefix.java index d962d6aa88d..257f5702be7 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/rest/param/BaseParamWithPrefix.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/rest/param/BaseParamWithPrefix.java @@ -56,6 +56,10 @@ public abstract class BaseParamWithPrefix extends BaseParam offset++; } + if (offset > 0 && theString.length() == offset) { + throw new DataFormatException("Invalid date/time format: \"" + theString + "\""); + } + String prefix = theString.substring(0, offset); if (!isBlank(prefix)) { diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java index f066d30fac6..75d1c7c4b2a 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SearchParameterUtil.java @@ -50,6 +50,7 @@ public class SearchParameterUtil { return retVal; } + @Nullable public static String getCode(FhirContext theContext, IBaseResource theResource) { return getStringChild(theContext, theResource, "code"); diff --git a/hapi-fhir-base/src/test/java/ca/uhn/fhir/rest/param/DateRangeParamTest.java b/hapi-fhir-base/src/test/java/ca/uhn/fhir/rest/param/DateRangeParamTest.java index 2c590d415db..3da72a6a4b8 100644 --- a/hapi-fhir-base/src/test/java/ca/uhn/fhir/rest/param/DateRangeParamTest.java +++ b/hapi-fhir-base/src/test/java/ca/uhn/fhir/rest/param/DateRangeParamTest.java @@ -1,6 +1,7 @@ package ca.uhn.fhir.rest.param; import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.parser.DataFormatException; import ca.uhn.fhir.rest.api.QualifiedParamList; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -10,6 +11,7 @@ import java.util.ArrayList; import java.util.List; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class DateRangeParamTest { private FhirContext fhirContext; @@ -19,7 +21,9 @@ public class DateRangeParamTest { fhirContext = Mockito.mock(FhirContext.class); } - /** Can happen e.g. when the query parameter for {@code _lastUpdated} is left empty. */ + /** + * Can happen e.g. when the query parameter for {@code _lastUpdated} is left empty. + */ @Test public void testParamWithoutPrefixAndWithoutValue() { QualifiedParamList qualifiedParamList = new QualifiedParamList(1); @@ -33,7 +37,9 @@ public class DateRangeParamTest { assertTrue(dateRangeParam.isEmpty()); } - /** Can happen e.g. when the query parameter for {@code _lastUpdated} is given as {@code lt} without any value. */ + /** + * Can happen e.g. when the query parameter for {@code _lastUpdated} is given as {@code lt} without any value. + */ @Test public void testUpperBoundWithPrefixWithoutValue() { QualifiedParamList qualifiedParamList = new QualifiedParamList(1); @@ -42,12 +48,17 @@ public class DateRangeParamTest { List params = new ArrayList<>(1); params.add(qualifiedParamList); DateRangeParam dateRangeParam = new DateRangeParam(); - dateRangeParam.setValuesAsQueryTokens(fhirContext, "_lastUpdated", params); - - assertTrue(dateRangeParam.isEmpty()); + try { + dateRangeParam.setValuesAsQueryTokens(fhirContext, "_lastUpdated", params); + fail(); + } catch (DataFormatException e) { + // good + } } - /** Can happen e.g. when the query parameter for {@code _lastUpdated} is given as {@code gt} without any value. */ + /** + * Can happen e.g. when the query parameter for {@code _lastUpdated} is given as {@code gt} without any value. + */ @Test public void testLowerBoundWithPrefixWithoutValue() { QualifiedParamList qualifiedParamList = new QualifiedParamList(1); @@ -56,8 +67,11 @@ public class DateRangeParamTest { List params = new ArrayList<>(1); params.add(qualifiedParamList); DateRangeParam dateRangeParam = new DateRangeParam(); - dateRangeParam.setValuesAsQueryTokens(fhirContext, "_lastUpdated", params); - - assertTrue(dateRangeParam.isEmpty()); + try { + dateRangeParam.setValuesAsQueryTokens(fhirContext, "_lastUpdated", params); + fail(); + } catch (DataFormatException e) { + // good + } } } diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_4_0/2433-group-bulk-export-support.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_4_0/2433-group-bulk-export-support.yaml new file mode 100644 index 00000000000..4e040e2b258 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_4_0/2433-group-bulk-export-support.yaml @@ -0,0 +1,5 @@ +--- +type: add +issue: 2433 +title: "Support has been added for MDM expansion during Group bulk export. Calling a group export via `/Group/123/$export?_mdm=true` + will cause Bulk Export to not only match group members, but also any MDM-matched patients, and their related golden record patients" diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_4_0/2445-support-patient-level-export.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_4_0/2445-support-patient-level-export.yaml new file mode 100644 index 00000000000..1350b9b452f --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_4_0/2445-support-patient-level-export.yaml @@ -0,0 +1,4 @@ +--- +type: add +issue: 2445 +title: "Support has been added for patient level Bulk export. This can be done via the `/Patient/$export` endpoint. Also, support has been added for setting Cache-Control header to no-cache for Bulk Export requests." diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_4_0/2466-improve-error-message-for-invalid-dateparam.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_4_0/2466-improve-error-message-for-invalid-dateparam.yaml new file mode 100644 index 00000000000..a02b92226b4 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_4_0/2466-improve-error-message-for-invalid-dateparam.yaml @@ -0,0 +1,5 @@ +--- +type: fix +issue: 2466 +title: "When performainfg a search using a date search parameter, invalid values (e.g. `date=foo`) resulted + in a confusing error message. This has been improved." diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/BatchJobsConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/BatchJobsConfig.java index 043d9775df1..178ee7358d5 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/BatchJobsConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/BatchJobsConfig.java @@ -33,4 +33,5 @@ import org.springframework.context.annotation.Import; public class BatchJobsConfig { public static final String BULK_EXPORT_JOB_NAME = "bulkExportJob"; public static final String GROUP_BULK_EXPORT_JOB_NAME = "groupBulkExportJob"; + public static final String PATIENT_BULK_EXPORT_JOB_NAME = "patientBulkExportJob"; } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/api/BulkDataExportOptions.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/api/BulkDataExportOptions.java index ef5e3445eae..c63a0df0546 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/api/BulkDataExportOptions.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/api/BulkDataExportOptions.java @@ -20,22 +20,55 @@ package ca.uhn.fhir.jpa.bulk.api; * #L% */ +import org.hl7.fhir.instance.model.api.IIdType; + import java.util.Date; import java.util.Set; public class BulkDataExportOptions { - private final String myOutputFormat; - private final Set myResourceTypes; - private final Date mySince; - private final Set myFilters; + public BulkDataExportOptions() { - public BulkDataExportOptions(String theOutputFormat, Set theResourceTypes, Date theSince, Set theFilters) { + } + + public enum ExportStyle { + PATIENT, + GROUP, + SYSTEM + } + private String myOutputFormat; + private Set myResourceTypes; + private Date mySince; + private Set myFilters; + private ExportStyle myExportStyle; + private boolean myExpandMdm; + private IIdType myGroupId; + + + + public void setOutputFormat(String theOutputFormat) { myOutputFormat = theOutputFormat; + } + + public void setResourceTypes(Set theResourceTypes) { myResourceTypes = theResourceTypes; + } + + public void setSince(Date theSince) { mySince = theSince; + } + + public void setFilters(Set theFilters) { myFilters = theFilters; } + public ExportStyle getExportStyle() { + return myExportStyle; + } + + public void setExportStyle(ExportStyle theExportStyle) { + myExportStyle = theExportStyle; + } + public String getOutputFormat() { return myOutputFormat; } @@ -51,4 +84,20 @@ public class BulkDataExportOptions { public Set getFilters() { return myFilters; } + + public boolean isExpandMdm() { + return myExpandMdm; + } + + public void setExpandMdm(boolean theExpandMdm) { + myExpandMdm = theExpandMdm; + } + + public IIdType getGroupId() { + return myGroupId; + } + + public void setGroupId(IIdType theGroupId) { + myGroupId = theGroupId; + } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/api/GroupBulkDataExportOptions.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/api/GroupBulkDataExportOptions.java deleted file mode 100644 index 0daee677175..00000000000 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/api/GroupBulkDataExportOptions.java +++ /dev/null @@ -1,45 +0,0 @@ -package ca.uhn.fhir.jpa.bulk.api; - -/*- - * #%L - * HAPI FHIR JPA Server - * %% - * Copyright (C) 2014 - 2021 Smile CDR, Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.hl7.fhir.instance.model.api.IIdType; - -import java.util.Date; -import java.util.Set; - -public class GroupBulkDataExportOptions extends BulkDataExportOptions { - private final IIdType myGroupId; - private final boolean myMdm; - - public GroupBulkDataExportOptions(String theOutputFormat, Set theResourceTypes, Date theSince, Set theFilters, IIdType theGroupId, boolean theMdm) { - super(theOutputFormat, theResourceTypes, theSince, theFilters); - myGroupId = theGroupId; - myMdm = theMdm; - } - - public IIdType getGroupId() { - return myGroupId; - } - - public boolean isMdm() { - return myMdm; - } -} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/api/IBulkDataExportSvc.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/api/IBulkDataExportSvc.java index 3784fc8a18d..af39667b19b 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/api/IBulkDataExportSvc.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/api/IBulkDataExportSvc.java @@ -27,6 +27,7 @@ import javax.transaction.Transactional; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.Set; public interface IBulkDataExportSvc { void buildExportFiles(); @@ -36,8 +37,15 @@ public interface IBulkDataExportSvc { JobInfo submitJob(BulkDataExportOptions theBulkDataExportOptions); + JobInfo submitJob(BulkDataExportOptions theBulkDataExportOptions, Boolean useCache); + JobInfo getJobInfoOrThrowResourceNotFound(String theJobId); + /** + * Return a set of all resource types which contain search parameters which have Patient as a target. + */ + Set getPatientCompartmentResources(); + void cancelAndPurgeAllJobs(); class JobInfo { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BaseBulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BaseBulkItemReader.java index 84df3471331..c47506ce768 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BaseBulkItemReader.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BaseBulkItemReader.java @@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.bulk.job; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.RuntimeResourceDefinition; +import ca.uhn.fhir.context.RuntimeSearchParam; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.batch.log.Logs; @@ -29,6 +30,7 @@ import ca.uhn.fhir.jpa.dao.ISearchBuilder; import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; +import ca.uhn.fhir.jpa.entity.Search; import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.jpa.searchparam.MatchUrlService; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; @@ -43,10 +45,12 @@ import org.springframework.beans.factory.annotation.Value; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; public abstract class BaseBulkItemReader implements ItemReader> { private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); @@ -73,6 +77,7 @@ public abstract class BaseBulkItemReader implements ItemReader myPidIterator; + private RuntimeSearchParam myPatientSearchParam; /** * Get and cache an ISearchBuilder for the given resource type this partition is responsible for. @@ -98,24 +103,40 @@ public abstract class BaseBulkItemReader implements ItemReader getResourcePidIterator(); - protected SearchParameterMap createSearchParameterMapForJob() { + protected List createSearchParameterMapsForResourceType() { BulkExportJobEntity jobEntity = getJobEntity(); RuntimeResourceDefinition theDef = getResourceDefinition(); - SearchParameterMap map = new SearchParameterMap(); Map requestUrl = UrlUtil.parseQueryStrings(jobEntity.getRequest()); String[] typeFilters = requestUrl.get(JpaConstants.PARAM_EXPORT_TYPE_FILTER); + List spMaps = null; if (typeFilters != null) { - Optional filter = Arrays.stream(typeFilters).filter(t -> t.startsWith(myResourceType + "?")).findFirst(); - if (filter.isPresent()) { - String matchUrl = filter.get(); - map = myMatchUrlService.translateMatchUrl(matchUrl, theDef); - } + spMaps = Arrays.stream(typeFilters) + .filter(typeFilter -> typeFilter.startsWith(myResourceType + "?")) + .map(filter -> buildSearchParameterMapForTypeFilter(filter, theDef)) + .collect(Collectors.toList()); } - if (jobEntity.getSince() != null) { - map.setLastUpdated(new DateRangeParam(jobEntity.getSince(), null)); + + //None of the _typeFilters applied to the current resource type, so just make a simple one. + if (spMaps == null || spMaps.isEmpty()) { + SearchParameterMap defaultMap = new SearchParameterMap(); + enhanceSearchParameterMapWithCommonParameters(defaultMap); + spMaps = Collections.singletonList(defaultMap); } + + return spMaps; + } + + private void enhanceSearchParameterMapWithCommonParameters(SearchParameterMap map) { map.setLoadSynchronous(true); - return map; + if (getJobEntity().getSince() != null) { + map.setLastUpdated(new DateRangeParam(getJobEntity().getSince(), null)); + } + } + + public SearchParameterMap buildSearchParameterMapForTypeFilter(String theFilter, RuntimeResourceDefinition theDef) { + SearchParameterMap searchParameterMap = myMatchUrlService.translateMatchUrl(theFilter, theDef); + enhanceSearchParameterMapWithCommonParameters(searchParameterMap); + return searchParameterMap; } protected RuntimeResourceDefinition getResourceDefinition() { @@ -157,4 +178,48 @@ public abstract class BaseBulkItemReader implements ItemReader1 result, throw an error + * 3.2 If that returns 1 result, return it + */ + protected RuntimeSearchParam getPatientSearchParamForCurrentResourceType() { + if (myPatientSearchParam == null) { + RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(myResourceType); + myPatientSearchParam = runtimeResourceDefinition.getSearchParam("patient"); + if (myPatientSearchParam == null) { + myPatientSearchParam = runtimeResourceDefinition.getSearchParam("subject"); + if (myPatientSearchParam == null) { + myPatientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition); + if (myPatientSearchParam == null) { + String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", myResourceType); + throw new IllegalArgumentException(errorMessage); + } + } + } + } + return myPatientSearchParam; + } + + /** + * Search the resource definition for a compartment named 'patient' and return its related Search Parameter. + */ + protected RuntimeSearchParam getRuntimeSearchParamByCompartment(RuntimeResourceDefinition runtimeResourceDefinition) { + RuntimeSearchParam patientSearchParam; + List searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient"); + if (searchParams == null || searchParams.size() == 0) { + String errorMessage = String.format("Resource type [%s] is not eligible for this type of export, as it contains no Patient compartment, and no `patient` or `subject` search parameter", myResourceType); + throw new IllegalArgumentException(errorMessage); + } else if (searchParams.size() == 1) { + patientSearchParam = searchParams.get(0); + } else { + String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as we are unable to disambiguate which patient search parameter we should be searching by.", myResourceType); + throw new IllegalArgumentException(errorMessage); + } + return patientSearchParam; + } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java index 4db16392b82..a2b8d804e3b 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java @@ -92,6 +92,17 @@ public class BulkExportJobConfig { .build(); } + @Bean + @Lazy + public Job patientBulkExportJob() { + return myJobBuilderFactory.get(BatchJobsConfig.PATIENT_BULK_EXPORT_JOB_NAME) + .validator(bulkJobParameterValidator()) + .start(createBulkExportEntityStep()) + .next(patientPartitionStep()) + .next(closeJobStep()) + .build(); + } + @Bean public GroupIdPresentValidator groupBulkJobParameterValidator() { return new GroupIdPresentValidator(); @@ -115,6 +126,7 @@ public class BulkExportJobConfig { return new BulkExportJobParameterValidator(); } + //Writers @Bean public Step groupBulkExportGenerateResourceFilesStep() { return myStepBuilderFactory.get("groupBulkExportGenerateResourceFilesStep") @@ -122,17 +134,10 @@ public class BulkExportJobConfig { .reader(groupBulkItemReader()) .processor(myPidToIBaseResourceProcessor) .writer(resourceToFileWriter()) - .listener(bulkExportGenrateResourceFilesStepListener()) + .listener(bulkExportGenerateResourceFilesStepListener()) .build(); } - @Bean - @StepScope - public GroupBulkItemReader groupBulkItemReader(){ - return new GroupBulkItemReader(); - } - - @Bean public Step bulkExportGenerateResourceFilesStep() { return myStepBuilderFactory.get("bulkExportGenerateResourceFilesStep") @@ -140,7 +145,17 @@ public class BulkExportJobConfig { .reader(bulkItemReader()) .processor(myPidToIBaseResourceProcessor) .writer(resourceToFileWriter()) - .listener(bulkExportGenrateResourceFilesStepListener()) + .listener(bulkExportGenerateResourceFilesStepListener()) + .build(); + } + @Bean + public Step patientBulkExportGenerateResourceFilesStep() { + return myStepBuilderFactory.get("patientBulkExportGenerateResourceFilesStep") + ., List> chunk(CHUNK_SIZE) //1000 resources per generated file, as the reader returns 10 resources at a time. + .reader(patientBulkItemReader()) + .processor(myPidToIBaseResourceProcessor) + .writer(resourceToFileWriter()) + .listener(bulkExportGenerateResourceFilesStepListener()) .build(); } @@ -165,10 +180,17 @@ public class BulkExportJobConfig { @Bean @JobScope - public BulkExportGenerateResourceFilesStepListener bulkExportGenrateResourceFilesStepListener() { + public BulkExportGenerateResourceFilesStepListener bulkExportGenerateResourceFilesStepListener() { return new BulkExportGenerateResourceFilesStepListener(); } + @Bean + public Step partitionStep() { + return myStepBuilderFactory.get("partitionStep") + .partitioner("bulkExportGenerateResourceFilesStep", bulkExportResourceTypePartitioner()) + .step(bulkExportGenerateResourceFilesStep()) + .build(); + } @Bean public Step groupPartitionStep() { @@ -177,14 +199,28 @@ public class BulkExportJobConfig { .step(groupBulkExportGenerateResourceFilesStep()) .build(); } + @Bean - public Step partitionStep() { + public Step patientPartitionStep() { return myStepBuilderFactory.get("partitionStep") - .partitioner("bulkExportGenerateResourceFilesStep", bulkExportResourceTypePartitioner()) - .step(bulkExportGenerateResourceFilesStep()) + .partitioner("patientBulkExportGenerateResourceFilesStep", bulkExportResourceTypePartitioner()) + .step(patientBulkExportGenerateResourceFilesStep()) .build(); } + + @Bean + @StepScope + public GroupBulkItemReader groupBulkItemReader(){ + return new GroupBulkItemReader(); + } + + @Bean + @StepScope + public PatientBulkItemReader patientBulkItemReader() { + return new PatientBulkItemReader(); + } + @Bean @StepScope public BulkItemReader bulkItemReader(){ @@ -199,7 +235,7 @@ public class BulkExportJobConfig { @Bean @StepScope - public ItemWriter> resourceToFileWriter() { + public ResourceToFileWriter resourceToFileWriter() { return new ResourceToFileWriter(); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobParametersBuilder.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobParametersBuilder.java index d9bc48b2b2a..7219c900c87 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobParametersBuilder.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobParametersBuilder.java @@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.bulk.job; * #L% */ +import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions; import ca.uhn.fhir.rest.api.Constants; import org.springframework.batch.core.JobParametersBuilder; @@ -65,4 +66,8 @@ public class BulkExportJobParametersBuilder extends JobParametersBuilder { this.addLong("readChunkSize", theReadChunkSize); return this; } + public BulkExportJobParametersBuilder setExportStyle(BulkDataExportOptions.ExportStyle theExportStyle) { + this.addString("exportStyle", theExportStyle.name()); + return this; + } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkItemReader.java index a88d1a2f648..0c9bf4fae2b 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkItemReader.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkItemReader.java @@ -20,30 +20,20 @@ package ca.uhn.fhir.jpa.bulk.job; * #L% */ -import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.interceptor.model.RequestPartitionId; -import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.batch.log.Logs; import ca.uhn.fhir.jpa.dao.IResultIterator; import ca.uhn.fhir.jpa.dao.ISearchBuilder; -import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; -import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; -import ca.uhn.fhir.jpa.model.util.JpaConstants; -import ca.uhn.fhir.jpa.searchparam.MatchUrlService; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; -import ca.uhn.fhir.rest.param.DateRangeParam; -import ca.uhn.fhir.util.UrlUtil; import org.slf4j.Logger; -import org.springframework.beans.factory.annotation.Autowired; import java.util.ArrayList; -import java.util.Arrays; +import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.Set; /** * Basic Bulk Export implementation which simply reads all type filters and applies them, along with the _since param @@ -52,19 +42,21 @@ import java.util.Optional; public class BulkItemReader extends BaseBulkItemReader { private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); - @Override Iterator getResourcePidIterator() { ourLog.info("Bulk export assembling export of type {} for job {}", myResourceType, myJobUUID); + Set myReadPids = new HashSet<>(); - SearchParameterMap map = createSearchParameterMapForJob(); + List map = createSearchParameterMapsForResourceType(); ISearchBuilder sb = getSearchBuilderForLocalResourceType(); - IResultIterator myResultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions()); - List myReadPids = new ArrayList<>(); - while (myResultIterator.hasNext()) { - myReadPids.add(myResultIterator.next()); + for (SearchParameterMap spMap: map) { + ourLog.debug("About to evaluate query {}", spMap.toNormalizedQueryString(myContext)); + IResultIterator myResultIterator = sb.createQuery(spMap, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions()); + while (myResultIterator.hasNext()) { + myReadPids.add(myResultIterator.next()); + } } return myReadPids.iterator(); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/CreateBulkExportEntityTasklet.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/CreateBulkExportEntityTasklet.java index 91df88ef7cf..74e85d2188a 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/CreateBulkExportEntityTasklet.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/CreateBulkExportEntityTasklet.java @@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.bulk.job; import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions; import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; +import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.rest.api.Constants; import org.apache.commons.lang3.StringUtils; import org.springframework.batch.core.StepContribution; @@ -66,7 +67,20 @@ public class CreateBulkExportEntityTasklet implements Tasklet { outputFormat = Constants.CT_FHIR_NDJSON; } - IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(outputFormat, resourceTypeSet, since, filterSet)); + BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions(); + bulkDataExportOptions.setOutputFormat(outputFormat); + bulkDataExportOptions.setResourceTypes(resourceTypeSet); + bulkDataExportOptions.setSince(since); + bulkDataExportOptions.setFilters(filterSet); + //Set export style + String exportStyle = (String)jobParameters.get("exportStyle"); + bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.valueOf(exportStyle)); + + //Set group id if present + String groupId = (String)jobParameters.get("groupId"); + bulkDataExportOptions.setGroupId(new IdDt(groupId)); + + IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.submitJob(bulkDataExportOptions); addUUIDToJobContext(theChunkContext, jobInfo.getJobId()); return RepeatStatus.FINISHED; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java index f57729cdff7..dd03381fa83 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java @@ -20,7 +20,6 @@ package ca.uhn.fhir.jpa.bulk.job; * #L% */ -import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.context.RuntimeSearchParam; import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.batch.log.Logs; @@ -28,6 +27,7 @@ import ca.uhn.fhir.jpa.dao.IResultIterator; import ca.uhn.fhir.jpa.dao.ISearchBuilder; import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao; import ca.uhn.fhir.jpa.dao.index.IdHelperService; +import ca.uhn.fhir.jpa.entity.Search; import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.util.QueryChunker; @@ -39,7 +39,6 @@ import ca.uhn.fhir.rest.param.ReferenceParam; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IPrimitiveType; -import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.springframework.batch.item.ItemReader; import org.springframework.beans.factory.annotation.Autowired; @@ -77,11 +76,9 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade @Autowired private IMdmLinkDao myMdmLinkDao; - private RuntimeSearchParam myPatientSearchParam; - @Override Iterator getResourcePidIterator() { - List myReadPids = new ArrayList<>(); + Set myReadPids = new HashSet<>(); //Short circuit out if we detect we are attempting to extract patients if (myResourceType.equalsIgnoreCase("Patient")) { @@ -113,7 +110,6 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade */ private Iterator getExpandedPatientIterator() { Set patientPidsToExport = new HashSet<>(); - //This gets all member pids List members = getMembers(); List ids = members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList()); List pidsOrThrowException = myIdHelperService.getPidsOrThrowException(ids); @@ -162,8 +158,15 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade //Now lets translate these pids into resource IDs Set uniquePids = new HashSet<>(); goldenPidTargetPidTuple.forEach(uniquePids::addAll); - Map> longOptionalMap = myIdHelperService.translatePidsToForcedIds(uniquePids); - expandedIds = longOptionalMap.values().stream().map(Optional::get).collect(Collectors.toSet()); + + Map> pidToForcedIdMap = myIdHelperService.translatePidsToForcedIds(uniquePids); + + //If the result of the translation is an empty optional, it means there is no forced id, and we can use the PID as the resource ID. + Set resolvedResourceIds = pidToForcedIdMap.entrySet().stream() + .map(entry -> entry.getValue().isPresent() ? entry.getValue().get() : entry.getKey().toString()) + .collect(Collectors.toSet()); + + expandedIds.addAll(resolvedResourceIds); } //Now manually add the members of the group (its possible even with mdm expansion that some members dont have MDM matches, @@ -173,83 +176,40 @@ public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReade return expandedIds; } - - private void queryResourceTypeWithReferencesToPatients(List myReadPids, List idChunk) { + private void queryResourceTypeWithReferencesToPatients(Set myReadPids, List idChunk) { //Build SP map //First, inject the _typeFilters and _since from the export job - SearchParameterMap expandedSpMap = createSearchParameterMapForJob(); + List expandedSpMaps = createSearchParameterMapsForResourceType(); + for (SearchParameterMap expandedSpMap: expandedSpMaps) { - //Since we are in a bulk job, we have to ensure the user didn't jam in a patient search param, since we need to manually set that. - validateSearchParameters(expandedSpMap); + //Since we are in a bulk job, we have to ensure the user didn't jam in a patient search param, since we need to manually set that. + validateSearchParameters(expandedSpMap); - // Now, further filter the query with patient references defined by the chunk of IDs we have. - filterSearchByResourceIds(idChunk, expandedSpMap); + // Now, further filter the query with patient references defined by the chunk of IDs we have. + filterSearchByResourceIds(idChunk, expandedSpMap); - // Fetch and cache a search builder for this resource type - ISearchBuilder searchBuilder = getSearchBuilderForLocalResourceType(); + // Fetch and cache a search builder for this resource type + ISearchBuilder searchBuilder = getSearchBuilderForLocalResourceType(); - //Execute query and all found pids to our local iterator. - IResultIterator resultIterator = searchBuilder.createQuery(expandedSpMap, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions()); - while (resultIterator.hasNext()) { - myReadPids.add(resultIterator.next()); + //Execute query and all found pids to our local iterator. + IResultIterator resultIterator = searchBuilder.createQuery(expandedSpMap, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions()); + while (resultIterator.hasNext()) { + myReadPids.add(resultIterator.next()); + } } } private void filterSearchByResourceIds(List idChunk, SearchParameterMap expandedSpMap) { ReferenceOrListParam orList = new ReferenceOrListParam(); idChunk.forEach(id -> orList.add(new ReferenceParam(id))); - expandedSpMap.add(getPatientSearchParam().getName(), orList); + expandedSpMap.add(getPatientSearchParamForCurrentResourceType().getName(), orList); } private RuntimeSearchParam validateSearchParameters(SearchParameterMap expandedSpMap) { - RuntimeSearchParam runtimeSearchParam = getPatientSearchParam(); + RuntimeSearchParam runtimeSearchParam = getPatientSearchParamForCurrentResourceType(); if (expandedSpMap.get(runtimeSearchParam.getName()) != null) { throw new IllegalArgumentException(String.format("Group Bulk Export manually modifies the Search Parameter called [%s], so you may not include this search parameter in your _typeFilter!", runtimeSearchParam.getName())); } return runtimeSearchParam; } - - /** - * Given the resource type, fetch its patient-based search parameter name - * 1. Attempt to find one called 'patient' - * 2. If that fails, find one called 'subject' - * 3. If that fails, find find by Patient Compartment. - * 3.1 If that returns >1 result, throw an error - * 3.2 If that returns 1 result, return it - */ - private RuntimeSearchParam getPatientSearchParam() { - if (myPatientSearchParam == null) { - RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(myResourceType); - myPatientSearchParam = runtimeResourceDefinition.getSearchParam("patient"); - if (myPatientSearchParam == null) { - myPatientSearchParam = runtimeResourceDefinition.getSearchParam("subject"); - if (myPatientSearchParam == null) { - myPatientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition); - if (myPatientSearchParam == null) { - String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", myResourceType); - throw new IllegalArgumentException(errorMessage); - } - } - } - } - return myPatientSearchParam; - } - - /** - * Search the resource definition for a compartment named 'patient' and return its related Search Parameter. - */ - private RuntimeSearchParam getRuntimeSearchParamByCompartment(RuntimeResourceDefinition runtimeResourceDefinition) { - RuntimeSearchParam patientSearchParam; - List searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient"); - if (searchParams == null || searchParams.size() == 0) { - String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as it contains no Patient compartment, and no `patient` or `subject` search parameter", myResourceType); - throw new IllegalArgumentException(errorMessage); - } else if (searchParams.size() == 1) { - patientSearchParam = searchParams.get(0); - } else { - String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as we are unable to disambiguate which patient search parameter we should be searching by.", myResourceType); - throw new IllegalArgumentException(errorMessage); - } - return patientSearchParam; - } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/PatientBulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/PatientBulkItemReader.java new file mode 100644 index 00000000000..c206404ac95 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/PatientBulkItemReader.java @@ -0,0 +1,98 @@ +package ca.uhn.fhir.jpa.bulk.job; + +/*- + * #%L + * HAPI FHIR JPA Server + * %% + * Copyright (C) 2014 - 2021 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import ca.uhn.fhir.context.RuntimeSearchParam; +import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.jpa.api.config.DaoConfig; +import ca.uhn.fhir.jpa.batch.log.Logs; +import ca.uhn.fhir.jpa.dao.IResultIterator; +import ca.uhn.fhir.jpa.dao.ISearchBuilder; +import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; +import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; +import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; +import ca.uhn.fhir.rest.param.ReferenceParam; +import org.slf4j.Logger; +import org.springframework.batch.item.ItemReader; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Bulk Item reader for the Patient Bulk Export job. + * Instead of performing a normal query on the resource type using type filters, we instead + * + * 1. Determine the resourcetype + * 2. Search for anything that has `patient-compartment-search-param:missing=false` + */ +public class PatientBulkItemReader extends BaseBulkItemReader implements ItemReader> { + @Autowired + private DaoConfig myDaoConfig; + + private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); + + + private RuntimeSearchParam validateSearchParameters(SearchParameterMap expandedSpMap) { + RuntimeSearchParam runtimeSearchParam = getPatientSearchParamForCurrentResourceType(); + if (expandedSpMap.get(runtimeSearchParam.getName()) != null) { + throw new IllegalArgumentException(String.format("Patient Bulk Export manually modifies the Search Parameter called [%s], so you may not include this search parameter in your _typeFilter!", runtimeSearchParam.getName())); + } + return runtimeSearchParam; + } + + @Override + Iterator getResourcePidIterator() { + if (myDaoConfig.getIndexMissingFields() == DaoConfig.IndexEnabledEnum.DISABLED) { + String errorMessage = "You attempted to start a Patient Bulk Export, but the system has `Index Missing Fields` disabled. It must be enabled for Patient Bulk Export"; + ourLog.error(errorMessage); + throw new IllegalStateException(errorMessage); + } + + List myReadPids = new ArrayList<>(); + + //use _typeFilter and _since and all those fancy bits and bobs to generate our basic SP map. + List maps = createSearchParameterMapsForResourceType(); + + String patientSearchParam = getPatientSearchParamForCurrentResourceType().getName(); + for (SearchParameterMap map: maps) { + //Ensure users did not monkey with the patient compartment search parameter. + validateSearchParameters(map); + + //Skip adding the parameter querying for patient= if we are in fact querying the patient resource type. + if (!myResourceType.equalsIgnoreCase("Patient")) { + map.add(patientSearchParam, new ReferenceParam().setMissing(false)); + } + + ourLog.debug("About to execute query {}", map.toNormalizedQueryString(myContext)); + ISearchBuilder sb = getSearchBuilderForLocalResourceType(); + IResultIterator myResultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions()); + + while (myResultIterator.hasNext()) { + myReadPids.add(myResultIterator.next()); + } + } + return myReadPids.iterator(); + } + + +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/provider/BulkDataExportProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/provider/BulkDataExportProvider.java index e251bcd31d1..dbdb57e3f82 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/provider/BulkDataExportProvider.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/provider/BulkDataExportProvider.java @@ -22,13 +22,13 @@ package ca.uhn.fhir.jpa.bulk.provider; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions; -import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions; import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.bulk.model.BulkExportResponseJson; import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.rest.annotation.IdParam; import ca.uhn.fhir.rest.annotation.Operation; import ca.uhn.fhir.rest.annotation.OperationParam; +import ca.uhn.fhir.rest.api.CacheControlDirective; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.PreferHeader; import ca.uhn.fhir.rest.server.RestfulServerUtils; @@ -43,14 +43,24 @@ import org.hl7.fhir.instance.model.api.IBaseOperationOutcome; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.r4.model.InstantType; +import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import javax.servlet.http.HttpServletResponse; import java.io.IOException; +import java.util.Arrays; import java.util.Date; +import java.util.HashSet; +import java.util.List; import java.util.Set; +import java.util.stream.Collectors; + +import static org.slf4j.LoggerFactory.getLogger; + public class BulkDataExportProvider { + public static final String FARM_TO_TABLE_TYPE_FILTER_REGEX = "(?:,)(?=[A-Z][a-z]+\\?)"; + private static final Logger ourLog = getLogger(BulkDataExportProvider.class); @Autowired private IBulkDataExportSvc myBulkDataExportSvc; @@ -78,43 +88,75 @@ public class BulkDataExportProvider { @OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = 1, typeName = "string") IPrimitiveType theTypeFilter, ServletRequestDetails theRequestDetails ) { + validatePreferAsyncHeader(theRequestDetails); + BulkDataExportOptions bulkDataExportOptions = buildSystemBulkExportOptions(theOutputFormat, theType, theSince, theTypeFilter); + Boolean useCache = shouldUseCache(theRequestDetails); + IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(bulkDataExportOptions, useCache); + writePollingLocationToResponseHeaders(theRequestDetails, outcome); + } - String preferHeader = theRequestDetails.getHeader(Constants.HEADER_PREFER); - PreferHeader prefer = RestfulServerUtils.parsePreferHeader(null, preferHeader); - if (prefer.getRespondAsync() == false) { - throw new InvalidRequestException("Must request async processing for $export"); + private boolean shouldUseCache(ServletRequestDetails theRequestDetails) { + CacheControlDirective cacheControlDirective = new CacheControlDirective().parse(theRequestDetails.getHeaders(Constants.HEADER_CACHE_CONTROL)); + return !cacheControlDirective.isNoCache(); + } + + private String getServerBase(ServletRequestDetails theRequestDetails) { + return StringUtils.removeEnd(theRequestDetails.getServerBaseForRequest(), "/"); + } + + /** + * Group/Id/$export + */ + @Operation(name = JpaConstants.OPERATION_EXPORT, manualResponse = true, idempotent = true, typeName = "Group") + public void groupExport( + @IdParam IIdType theIdParam, + @OperationParam(name = JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, min = 0, max = 1, typeName = "string") IPrimitiveType theOutputFormat, + @OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType theType, + @OperationParam(name = JpaConstants.PARAM_EXPORT_SINCE, min = 0, max = 1, typeName = "instant") IPrimitiveType theSince, + @OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = 1, typeName = "string") IPrimitiveType theTypeFilter, + @OperationParam(name = JpaConstants.PARAM_EXPORT_MDM, min = 0, max = 1, typeName = "boolean") IPrimitiveType theMdm, + ServletRequestDetails theRequestDetails + ) { + ourLog.debug("Received Group Bulk Export Request for Group {}", theIdParam); + ourLog.debug("_type={}", theIdParam); + ourLog.debug("_since={}", theSince); + ourLog.debug("_typeFilter={}", theTypeFilter); + ourLog.debug("_mdm=", theMdm); + + + validatePreferAsyncHeader(theRequestDetails); + BulkDataExportOptions bulkDataExportOptions = buildGroupBulkExportOptions(theOutputFormat, theType, theSince, theTypeFilter, theIdParam, theMdm); + validateResourceTypesAllContainPatientSearchParams(bulkDataExportOptions.getResourceTypes()); + IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(bulkDataExportOptions, shouldUseCache(theRequestDetails)); + writePollingLocationToResponseHeaders(theRequestDetails, outcome); + } + + private void validateResourceTypesAllContainPatientSearchParams(Set theResourceTypes) { + List badResourceTypes = theResourceTypes.stream() + .filter(resourceType -> !myBulkDataExportSvc.getPatientCompartmentResources().contains(resourceType)) + .collect(Collectors.toList()); + + if (!badResourceTypes.isEmpty()) { + throw new InvalidRequestException(String.format("Resource types [%s] are invalid for this type of export, as they do not contain search parameters that refer to patients.", String.join(",", badResourceTypes))); } + } - String outputFormat = theOutputFormat != null ? theOutputFormat.getValueAsString() : null; - - Set resourceTypes = null; - if (theType != null) { - resourceTypes = ArrayUtil.commaSeparatedListToCleanSet(theType.getValueAsString()); - } - - Date since = null; - if (theSince != null) { - since = theSince.getValue(); - } - - Set filters = null; - if (theTypeFilter != null) { - filters = ArrayUtil.commaSeparatedListToCleanSet(theTypeFilter.getValueAsString()); - } - - IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(outputFormat, resourceTypes, since, filters)); - - String serverBase = getServerBase(theRequestDetails); - String pollLocation = serverBase + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" + JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + outcome.getJobId(); - - HttpServletResponse response = theRequestDetails.getServletResponse(); - - // Add standard headers - theRequestDetails.getServer().addHeadersToResponse(response); - - // Successful 202 Accepted - response.addHeader(Constants.HEADER_CONTENT_LOCATION, pollLocation); - response.setStatus(Constants.STATUS_HTTP_202_ACCEPTED); + /** + * Patient/$export + */ + @Operation(name = JpaConstants.OPERATION_EXPORT, manualResponse = true, idempotent = true, typeName = "Patient") + public void patientExport( + @OperationParam(name = JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, min = 0, max = 1, typeName = "string") IPrimitiveType theOutputFormat, + @OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType theType, + @OperationParam(name = JpaConstants.PARAM_EXPORT_SINCE, min = 0, max = 1, typeName = "instant") IPrimitiveType theSince, + @OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = 1, typeName = "string") IPrimitiveType theTypeFilter, + ServletRequestDetails theRequestDetails + ) { + validatePreferAsyncHeader(theRequestDetails); + BulkDataExportOptions bulkDataExportOptions = buildPatientBulkExportOptions(theOutputFormat, theType, theSince, theTypeFilter); + validateResourceTypesAllContainPatientSearchParams(bulkDataExportOptions.getResourceTypes()); + IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(bulkDataExportOptions, shouldUseCache(theRequestDetails)); + writePollingLocationToResponseHeaders(theRequestDetails, outcome); } /** @@ -171,37 +213,31 @@ public class BulkDataExportProvider { OperationOutcomeUtil.addIssue(myFhirContext, oo, "error", status.getStatusMessage(), null, null); myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToWriter(oo, response.getWriter()); response.getWriter().close(); - } - - } - private String getServerBase(ServletRequestDetails theRequestDetails) { - return StringUtils.removeEnd(theRequestDetails.getServerBaseForRequest(), "/"); + private BulkDataExportOptions buildSystemBulkExportOptions(IPrimitiveType theOutputFormat, IPrimitiveType theType, IPrimitiveType theSince, IPrimitiveType theTypeFilter) { + return buildBulkDataExportOptions(theOutputFormat, theType, theSince, theTypeFilter, BulkDataExportOptions.ExportStyle.SYSTEM); } - /** - * Group/Id/$export - */ - @Operation(name = JpaConstants.OPERATION_EXPORT, manualResponse = true, idempotent = true, typeName = "Group") - public void groupExport( - @IdParam IIdType theIdParam, - @OperationParam(name = JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, min = 0, max = 1, typeName = "string") IPrimitiveType theOutputFormat, - @OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType theType, - @OperationParam(name = JpaConstants.PARAM_EXPORT_SINCE, min = 0, max = 1, typeName = "instant") IPrimitiveType theSince, - @OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = 1, typeName = "string") IPrimitiveType theTypeFilter, - @OperationParam(name = JpaConstants.PARAM_EXPORT_MDM, min = 0, max = 1, typeName = "boolean") IPrimitiveType theMdm, + private BulkDataExportOptions buildGroupBulkExportOptions(IPrimitiveType theOutputFormat, IPrimitiveType theType, IPrimitiveType theSince, IPrimitiveType theTypeFilter, IIdType theGroupId, IPrimitiveType theExpandMdm) { + BulkDataExportOptions bulkDataExportOptions = buildBulkDataExportOptions(theOutputFormat, theType, theSince, theTypeFilter, BulkDataExportOptions.ExportStyle.GROUP); + bulkDataExportOptions.setGroupId(theGroupId); - ServletRequestDetails theRequestDetails - ) { - - String preferHeader = theRequestDetails.getHeader(Constants.HEADER_PREFER); - PreferHeader prefer = RestfulServerUtils.parsePreferHeader(null, preferHeader); - if (prefer.getRespondAsync() == false) { - throw new InvalidRequestException("Must request async processing for $export"); + boolean mdm = false; + if (theExpandMdm != null) { + mdm = theExpandMdm.getValue(); } + bulkDataExportOptions.setExpandMdm(mdm); + return bulkDataExportOptions; + } + + private BulkDataExportOptions buildPatientBulkExportOptions(IPrimitiveType theOutputFormat, IPrimitiveType theType, IPrimitiveType theSince, IPrimitiveType theTypeFilter) { + return buildBulkDataExportOptions(theOutputFormat, theType, theSince, theTypeFilter, BulkDataExportOptions.ExportStyle.PATIENT); + } + + private BulkDataExportOptions buildBulkDataExportOptions(IPrimitiveType theOutputFormat, IPrimitiveType theType, IPrimitiveType theSince, IPrimitiveType theTypeFilter, BulkDataExportOptions.ExportStyle theExportStyle) { String outputFormat = theOutputFormat != null ? theOutputFormat.getValueAsString() : null; Set resourceTypes = null; @@ -209,25 +245,25 @@ public class BulkDataExportProvider { resourceTypes = ArrayUtil.commaSeparatedListToCleanSet(theType.getValueAsString()); } - //TODO GGG eventually, we will support these things. - Set filters = null; - Date since = null; if (theSince != null) { since = theSince.getValue(); } - boolean mdm = false; - if (theMdm != null) { - mdm = theMdm.getValue(); - } - if (theTypeFilter != null) { - filters = ArrayUtil.commaSeparatedListToCleanSet(theTypeFilter.getValueAsString()); - } - IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(outputFormat, resourceTypes, since, filters, theIdParam, mdm)); + Set typeFilters = splitTypeFilters(theTypeFilter); + BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions(); + bulkDataExportOptions.setFilters(typeFilters); + bulkDataExportOptions.setExportStyle(theExportStyle); + bulkDataExportOptions.setSince(since); + bulkDataExportOptions.setResourceTypes(resourceTypes); + bulkDataExportOptions.setOutputFormat(outputFormat); + return bulkDataExportOptions; + } + + public void writePollingLocationToResponseHeaders(ServletRequestDetails theRequestDetails, IBulkDataExportSvc.JobInfo theOutcome) { String serverBase = getServerBase(theRequestDetails); - String pollLocation = serverBase + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" + JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + outcome.getJobId(); + String pollLocation = serverBase + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" + JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + theOutcome.getJobId(); HttpServletResponse response = theRequestDetails.getServletResponse(); @@ -238,4 +274,26 @@ public class BulkDataExportProvider { response.addHeader(Constants.HEADER_CONTENT_LOCATION, pollLocation); response.setStatus(Constants.STATUS_HTTP_202_ACCEPTED); } + + private void validatePreferAsyncHeader(ServletRequestDetails theRequestDetails) { + String preferHeader = theRequestDetails.getHeader(Constants.HEADER_PREFER); + PreferHeader prefer = RestfulServerUtils.parsePreferHeader(null, preferHeader); + if (prefer.getRespondAsync() == false) { + throw new InvalidRequestException("Must request async processing for $export"); + } + } + + private Set splitTypeFilters(IPrimitiveType theTypeFilter) { + if (theTypeFilter== null) { + return null; + } + String typeFilterSring = theTypeFilter.getValueAsString(); + String[] typeFilters = typeFilterSring.split(FARM_TO_TABLE_TYPE_FILTER_REGEX); + if (typeFilters == null || typeFilters.length == 0) { + return null; + } + + return new HashSet<>(Arrays.asList(typeFilters)); + } + } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java index 14be984a072..7bec3d61a89 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java @@ -21,13 +21,15 @@ package ca.uhn.fhir.jpa.bulk.svc; */ import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.context.RuntimeResourceDefinition; +import ca.uhn.fhir.context.RuntimeSearchParam; +import ca.uhn.fhir.fhirpath.IFhirPath; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.model.ExpungeOptions; import ca.uhn.fhir.jpa.batch.BatchJobsConfig; import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions; -import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions; import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.bulk.job.BulkExportJobConfig; import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum; @@ -41,19 +43,21 @@ import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.model.util.JpaConstants; +import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; import ca.uhn.fhir.util.UrlUtil; -import org.apache.commons.lang3.StringUtils; +import com.google.common.collect.Sets; import org.apache.commons.lang3.time.DateUtils; +import org.hl7.fhir.instance.model.api.IBase; import org.hl7.fhir.instance.model.api.IBaseBinary; +import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.r4.model.InstantType; import org.quartz.JobExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.beans.factory.annotation.Autowired; @@ -66,18 +70,19 @@ import org.springframework.transaction.support.TransactionTemplate; import javax.annotation.PostConstruct; import javax.transaction.Transactional; -import java.util.Arrays; import java.util.Date; -import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import static ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions.ExportStyle.GROUP; +import static ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions.ExportStyle.PATIENT; +import static ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions.ExportStyle.SYSTEM; import static ca.uhn.fhir.util.UrlUtil.escapeUrlParam; import static ca.uhn.fhir.util.UrlUtil.escapeUrlParams; -import static org.apache.commons.lang3.StringUtils.contains; import static org.apache.commons.lang3.StringUtils.isNotBlank; public class BulkDataExportSvcImpl implements IBulkDataExportSvc { @@ -113,6 +118,12 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { @Qualifier(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME) private org.springframework.batch.core.Job myGroupBulkExportJob; + @Autowired + @Qualifier(BatchJobsConfig.PATIENT_BULK_EXPORT_JOB_NAME) + private org.springframework.batch.core.Job myPatientBulkExportJob; + + private Set myCompartmentResources; + private final int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR); /** @@ -229,6 +240,8 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { if (isGroupBulkJob(theBulkExportJobEntity)) { enhanceBulkParametersWithGroupParameters(theBulkExportJobEntity, parameters); myJobSubmitter.runJob(myGroupBulkExportJob, parameters.toJobParameters()); + } else if (isPatientBulkJob(theBulkExportJobEntity)) { + myJobSubmitter.runJob(myPatientBulkExportJob, parameters.toJobParameters()); } else { myJobSubmitter.runJob(myBulkExportJob, parameters.toJobParameters()); } @@ -237,6 +250,14 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { } } + private boolean isPatientBulkJob(BulkExportJobEntity theBulkExportJobEntity) { + return theBulkExportJobEntity.getRequest().startsWith("/Patient/"); + } + + private boolean isGroupBulkJob(BulkExportJobEntity theBulkExportJobEntity) { + return theBulkExportJobEntity.getRequest().startsWith("/Group/"); + } + private void enhanceBulkParametersWithGroupParameters(BulkExportJobEntity theBulkExportJobEntity, JobParametersBuilder theParameters) { String theGroupId = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_GROUP_ID); String expandMdm = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_MDM); @@ -244,9 +265,6 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { theParameters.addString(BulkExportJobConfig.EXPAND_MDM_PARAMETER, expandMdm); } - private boolean isGroupBulkJob(BulkExportJobEntity theBulkExportJobEntity) { - return getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_GROUP_ID) != null; - } @SuppressWarnings("unchecked") private IFhirResourceDao getBinaryDao() { @@ -271,6 +289,12 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { @Transactional @Override public JobInfo submitJob(BulkDataExportOptions theBulkDataExportOptions) { + return submitJob(theBulkDataExportOptions, true); + } + + @Transactional + @Override + public JobInfo submitJob(BulkDataExportOptions theBulkDataExportOptions, Boolean useCache) { String outputFormat = Constants.CT_FHIR_NDJSON; if (isNotBlank(theBulkDataExportOptions.getOutputFormat())) { outputFormat = theBulkDataExportOptions.getOutputFormat(); @@ -282,7 +306,16 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { // TODO GGG KS can we encode BulkDataExportOptions as a JSON string as opposed to this request string. Feels like it would be a more extensible encoding... //Probably yes, but this will all need to be rebuilt when we remove this bridge entity StringBuilder requestBuilder = new StringBuilder(); - requestBuilder.append("/").append(JpaConstants.OPERATION_EXPORT); + requestBuilder.append("/"); + + //Prefix the export url with Group/[id]/ or /Patient/ depending on what type of request it is. + if (theBulkDataExportOptions.getExportStyle().equals(GROUP)) { + requestBuilder.append(theBulkDataExportOptions.getGroupId().toVersionless()).append("/"); + } else if (theBulkDataExportOptions.getExportStyle().equals(PATIENT)) { + requestBuilder.append("Patient/"); + } + + requestBuilder.append(JpaConstants.OPERATION_EXPORT); requestBuilder.append("?").append(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT).append("=").append(escapeUrlParam(outputFormat)); Set resourceTypes = theBulkDataExportOptions.getResourceTypes(); if (resourceTypes != null) { @@ -293,20 +326,28 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_SINCE).append("=").append(new InstantType(since).setTimeZoneZulu(true).getValueAsString()); } if (theBulkDataExportOptions.getFilters() != null && theBulkDataExportOptions.getFilters().size() > 0) { - requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_TYPE_FILTER).append("=").append(String.join(",", escapeUrlParams(theBulkDataExportOptions.getFilters()))); + theBulkDataExportOptions.getFilters().stream() + .forEach(filter -> requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_TYPE_FILTER).append("=").append(escapeUrlParam(filter))); } - if (theBulkDataExportOptions instanceof GroupBulkDataExportOptions) { - GroupBulkDataExportOptions groupOptions = (GroupBulkDataExportOptions) theBulkDataExportOptions; - requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_GROUP_ID).append("=").append(groupOptions.getGroupId().getValue()); - requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_MDM).append("=").append(groupOptions.isMdm()); + + if (theBulkDataExportOptions.getExportStyle().equals(GROUP)) { + requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_GROUP_ID).append("=").append(theBulkDataExportOptions.getGroupId().getValue()); + requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_MDM).append("=").append(theBulkDataExportOptions.isExpandMdm()); } + + + String request = requestBuilder.toString(); - Date cutoff = DateUtils.addMilliseconds(new Date(), -myReuseBulkExportForMillis); - Pageable page = PageRequest.of(0, 10); - Slice existing = myBulkExportJobDao.findExistingJob(page, request, cutoff, BulkJobStatusEnum.ERROR); - if (!existing.isEmpty()) { - return toSubmittedJobInfo(existing.iterator().next()); + + //If we are using the cache, then attempt to retrieve a matching job based on the Request String, otherwise just make a new one. + if (useCache) { + Date cutoff = DateUtils.addMilliseconds(new Date(), -myReuseBulkExportForMillis); + Pageable page = PageRequest.of(0, 10); + Slice existing = myBulkExportJobDao.findExistingJob(page, request, cutoff, BulkJobStatusEnum.ERROR); + if (!existing.isEmpty()) { + return toSubmittedJobInfo(existing.iterator().next()); + } } if (resourceTypes != null && resourceTypes.contains("Binary")) { @@ -318,7 +359,7 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { // This is probably not a useful default, but having the default be "download the whole // server" seems like a risky default too. We'll deal with that by having the default involve // only returning a small time span - resourceTypes = myContext.getResourceTypes(); + resourceTypes = getAllowedResourceTypesForBulkExportStyle(theBulkDataExportOptions.getExportStyle()); if (since == null) { since = DateUtils.addDays(new Date(), -1); } @@ -369,17 +410,13 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { public void validateTypeFilters(Set theTheFilters, Set theResourceTypes) { if (theTheFilters != null) { - Set types = new HashSet<>(); for (String next : theTheFilters) { if (!next.contains("?")) { throw new InvalidRequestException("Invalid " + JpaConstants.PARAM_EXPORT_TYPE_FILTER + " value \"" + next + "\". Must be in the form [ResourceType]?[params]"); } String resourceType = next.substring(0, next.indexOf("?")); if (!theResourceTypes.contains(resourceType)) { - throw new InvalidRequestException("Invalid " + JpaConstants.PARAM_EXPORT_TYPE_FILTER + " value \"" + next + "\". Resource type does not appear in " + JpaConstants.PARAM_EXPORT_TYPE + " list"); - } - if (!types.add(resourceType)) { - throw new InvalidRequestException("Invalid " + JpaConstants.PARAM_EXPORT_TYPE_FILTER + " value \"" + next + "\". Multiple filters found for type " + resourceType); + throw new InvalidRequestException("Invalid " + JpaConstants.PARAM_EXPORT_TYPE_FILTER + " value \"" + next + "\". Resource type does not appear in " + JpaConstants.PARAM_EXPORT_TYPE+ " list"); } } } @@ -421,6 +458,35 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { return retVal; } + @Override + public Set getPatientCompartmentResources() { + if (myCompartmentResources == null) { + myCompartmentResources = myContext.getResourceTypes().stream() + .filter(this::resourceTypeIsInPatientCompartment) + .collect(Collectors.toSet()); + } + return myCompartmentResources; + } + + /** + * Return true if any search parameter in the resource can point at a patient, false otherwise + */ + private boolean resourceTypeIsInPatientCompartment(String theResourceType) { + RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(theResourceType); + List searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient"); + return searchParams != null && searchParams.size() >= 1; + } + + public Set getAllowedResourceTypesForBulkExportStyle(BulkDataExportOptions.ExportStyle theExportStyle) { + if (theExportStyle.equals(SYSTEM)) { + return myContext.getResourceTypes(); + } else if (theExportStyle.equals(GROUP) || theExportStyle.equals(PATIENT)) { + return getPatientCompartmentResources(); + } else { + throw new IllegalArgumentException(String.format("HAPI FHIR does not recognize a Bulk Export request of type: %s", theExportStyle)); + } + } + private IIdType toId(String theResourceId) { IIdType retVal = myContext.getVersion().newIdType(); retVal.setValue(theResourceId); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBulkExportJobDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBulkExportJobDao.java index a99dae53aac..708425d5fdb 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBulkExportJobDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBulkExportJobDao.java @@ -43,7 +43,7 @@ public interface IBulkExportJobDao extends JpaRepository findByExpiry(Pageable thePage, @Param("cutoff") Date theCutoff); - @Query("SELECT j FROM BulkExportJobEntity j WHERE j.myRequest = :request AND j.myCreated > :createdAfter AND j.myStatus <> :status") + @Query("SELECT j FROM BulkExportJobEntity j WHERE j.myRequest = :request AND j.myCreated > :createdAfter AND j.myStatus <> :status ORDER BY j.myCreated DESC") Slice findExistingJob(Pageable thePage, @Param("request") String theRequest, @Param("createdAfter") Date theCreatedAfter, @Param("status") BulkJobStatusEnum theNotStatus); @Modifying diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportProviderTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportProviderTest.java index 45ea95c7afb..f33e57d7b06 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportProviderTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportProviderTest.java @@ -3,7 +3,6 @@ package ca.uhn.fhir.jpa.bulk; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirVersionEnum; import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions; -import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions; import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.bulk.model.BulkExportResponseJson; import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum; @@ -17,6 +16,7 @@ import ca.uhn.fhir.test.utilities.JettyUtil; import ca.uhn.fhir.util.JsonUtil; import ca.uhn.fhir.util.UrlUtil; import com.google.common.base.Charsets; +import com.google.common.collect.Sets; import org.apache.commons.io.IOUtils; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; @@ -27,7 +27,6 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.ServletHandler; import org.eclipse.jetty.servlet.ServletHolder; -import org.hl7.fhir.r4.model.BooleanType; import org.hl7.fhir.r4.model.IdType; import org.hl7.fhir.r4.model.InstantType; import org.hl7.fhir.r4.model.Parameters; @@ -57,6 +56,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.times; @@ -79,7 +79,7 @@ public class BulkDataExportProviderTest { @Captor private ArgumentCaptor myBulkDataExportOptionsCaptor; @Captor - private ArgumentCaptor myGroupBulkDataExportOptionsCaptor; + private ArgumentCaptor myBooleanArgumentCaptor; @AfterEach public void after() throws Exception { @@ -116,7 +116,7 @@ public class BulkDataExportProviderTest { IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo() .setJobId(A_JOB_ID); - when(myBulkDataExportSvc.submitJob(any())).thenReturn(jobInfo); + when(myBulkDataExportSvc.submitJob(any(), any())).thenReturn(jobInfo); InstantType now = InstantType.now(); @@ -140,7 +140,7 @@ public class BulkDataExportProviderTest { assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue()); } - verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture()); + verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), any()); BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue(); assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat()); assertThat(options.getResourceTypes(), containsInAnyOrder("Patient", "Practitioner")); @@ -153,7 +153,7 @@ public class BulkDataExportProviderTest { IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo() .setJobId(A_JOB_ID); - when(myBulkDataExportSvc.submitJob(any())).thenReturn(jobInfo); + when(myBulkDataExportSvc.submitJob(any(),any())).thenReturn(jobInfo); InstantType now = InstantType.now(); @@ -174,7 +174,7 @@ public class BulkDataExportProviderTest { assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue()); } - verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture()); + verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), any()); BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue(); assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat()); assertThat(options.getResourceTypes(), containsInAnyOrder("Patient", "Practitioner")); @@ -267,7 +267,6 @@ public class BulkDataExportProviderTest { assertEquals("Patient", responseJson.getOutput().get(2).getType()); assertEquals("http://localhost:" + myPort + "/Binary/333", responseJson.getOutput().get(2).getUrl()); } - } @Test @@ -304,7 +303,8 @@ public class BulkDataExportProviderTest { public void testSuccessfulInitiateGroupBulkRequest_Post() throws IOException { IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo().setJobId(G_JOB_ID); - when(myBulkDataExportSvc.submitJob(any())).thenReturn(jobInfo); + when(myBulkDataExportSvc.submitJob(any(),any())).thenReturn(jobInfo); + when(myBulkDataExportSvc.getPatientCompartmentResources()).thenReturn(Sets.newHashSet("Observation", "DiagnosticReport")); InstantType now = InstantType.now(); @@ -330,13 +330,207 @@ public class BulkDataExportProviderTest { assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + G_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue()); } - verify(myBulkDataExportSvc, times(1)).submitJob(myGroupBulkDataExportOptionsCaptor.capture()); - GroupBulkDataExportOptions options = myGroupBulkDataExportOptionsCaptor.getValue(); + verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), any()); + BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue(); assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat()); assertThat(options.getResourceTypes(), containsInAnyOrder("Observation", "DiagnosticReport")); assertThat(options.getSince(), notNullValue()); assertThat(options.getFilters(), notNullValue()); assertEquals(GROUP_ID, options.getGroupId().getValue()); - assertThat(options.isMdm(), is(equalTo(true))); + assertThat(options.isExpandMdm(), is(equalTo(true))); } + + @Test + public void testSuccessfulInitiateGroupBulkRequest_Get() throws IOException { + + IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo().setJobId(G_JOB_ID); + when(myBulkDataExportSvc.submitJob(any(), any())).thenReturn(jobInfo); + when(myBulkDataExportSvc.getPatientCompartmentResources()).thenReturn(Sets.newHashSet("Patient", "Practitioner")); + + InstantType now = InstantType.now(); + + String url = "http://localhost:" + myPort + "/" + GROUP_ID + "/" + JpaConstants.OPERATION_EXPORT + + "?" + JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT + "=" + UrlUtil.escapeUrlParam(Constants.CT_FHIR_NDJSON) + + "&" + JpaConstants.PARAM_EXPORT_TYPE + "=" + UrlUtil.escapeUrlParam("Patient, Practitioner") + + "&" + JpaConstants.PARAM_EXPORT_SINCE + "=" + UrlUtil.escapeUrlParam(now.getValueAsString()) + + "&" + JpaConstants.PARAM_EXPORT_TYPE_FILTER + "=" + UrlUtil.escapeUrlParam("Patient?identifier=foo|bar") + + "&" + JpaConstants.PARAM_EXPORT_MDM+ "=true"; + + HttpGet get = new HttpGet(url); + get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC); + ourLog.info("Request: {}", url); + try (CloseableHttpResponse response = myClient.execute(get)) { + ourLog.info("Response: {}", response.toString()); + + assertEquals(202, response.getStatusLine().getStatusCode()); + assertEquals("Accepted", response.getStatusLine().getReasonPhrase()); + assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + G_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue()); + } + + verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), any()); + BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue(); + assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat()); + assertThat(options.getResourceTypes(), containsInAnyOrder("Patient", "Practitioner")); + assertThat(options.getSince(), notNullValue()); + assertThat(options.getFilters(), notNullValue()); + assertEquals(GROUP_ID, options.getGroupId().getValue()); + assertThat(options.isExpandMdm(), is(equalTo(true))); + } + + @Test + public void testInitiateWithGetAndMultipleTypeFilters() throws IOException { + IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo() + .setJobId(A_JOB_ID); + when(myBulkDataExportSvc.submitJob(any())).thenReturn(jobInfo); + + InstantType now = InstantType.now(); + + String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT + + "?" + JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT + "=" + UrlUtil.escapeUrlParam(Constants.CT_FHIR_NDJSON) + + "&" + JpaConstants.PARAM_EXPORT_TYPE + "=" + UrlUtil.escapeUrlParam("Immunization, Observation") + + "&" + JpaConstants.PARAM_EXPORT_SINCE + "=" + UrlUtil.escapeUrlParam(now.getValueAsString()); + + String immunizationTypeFilter1 = "Immunization?patient.identifier=SC378274-MRN|009999997,SC378274-MRN|009999998,SC378274-MRN|009999999&date=2020-01-02"; + String immunizationTypeFilter2 = "Immunization?patient=Patient/123"; + String observationFilter1 = "Observation?subject=Patient/123&created=ge2020-01-01"; + StringBuilder multiValuedTypeFilterBuilder = new StringBuilder() + .append("&") + .append(JpaConstants.PARAM_EXPORT_TYPE_FILTER) + .append("=") + .append(UrlUtil.escapeUrlParam(immunizationTypeFilter1)) + .append(",") + .append(UrlUtil.escapeUrlParam(immunizationTypeFilter2)) + .append(",") + .append(UrlUtil.escapeUrlParam(observationFilter1)); + + url += multiValuedTypeFilterBuilder.toString(); + + HttpGet get = new HttpGet(url); + get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC); + myClient.execute(get); + + verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), anyBoolean()); + BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue(); + + assertThat(options.getFilters(), containsInAnyOrder(immunizationTypeFilter1, immunizationTypeFilter2, observationFilter1)); + } + + + @Test + public void testInitiateGroupExportWithInvalidResourceTypesFails() throws IOException { + when (myBulkDataExportSvc.getPatientCompartmentResources()).thenReturn(Sets.newHashSet("Observation")); + String url = "http://localhost:" + myPort + "/" + "Group/123/" +JpaConstants.OPERATION_EXPORT + + "?" + JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT + "=" + UrlUtil.escapeUrlParam(Constants.CT_FHIR_NDJSON) + + "&" + JpaConstants.PARAM_EXPORT_TYPE + "=" + UrlUtil.escapeUrlParam("StructureDefinition,Observation"); + + HttpGet get = new HttpGet(url); + get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC); + CloseableHttpResponse execute = myClient.execute(get); + String responseBody = IOUtils.toString(execute.getEntity().getContent()); + + assertThat(execute.getStatusLine().getStatusCode(), is(equalTo(400))); + assertThat(responseBody, is(containsString("Resource types [StructureDefinition] are invalid for this type of export, as they do not contain search parameters that refer to patients."))); + } + + @Test + public void testInitiateWithPostAndMultipleTypeFilters() throws IOException { + + IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo() + .setJobId(A_JOB_ID); + when(myBulkDataExportSvc.submitJob(any(), any())).thenReturn(jobInfo); + + InstantType now = InstantType.now(); + + Parameters input = new Parameters(); + input.addParameter(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, new StringType(Constants.CT_FHIR_NDJSON)); + input.addParameter(JpaConstants.PARAM_EXPORT_TYPE, new StringType("Patient")); + input.addParameter(JpaConstants.PARAM_EXPORT_TYPE_FILTER, new StringType("Patient?gender=male,Patient?gender=female")); + + ourLog.info(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(input)); + + HttpPost post = new HttpPost("http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT); + post.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC); + post.setEntity(new ResourceEntity(myCtx, input)); + ourLog.info("Request: {}", post); + try (CloseableHttpResponse response = myClient.execute(post)) { + ourLog.info("Response: {}", response.toString()); + + assertEquals(202, response.getStatusLine().getStatusCode()); + assertEquals("Accepted", response.getStatusLine().getReasonPhrase()); + assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue()); + } + + verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), myBooleanArgumentCaptor.capture()); + BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue(); + assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat()); + assertThat(options.getResourceTypes(), containsInAnyOrder("Patient")); + assertThat(options.getFilters(), containsInAnyOrder("Patient?gender=male", "Patient?gender=female")); + } + + @Test + public void testInitiatePatientExportRequest() throws IOException { + IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo() + .setJobId(A_JOB_ID); + when(myBulkDataExportSvc.submitJob(any(), any())).thenReturn(jobInfo); + when(myBulkDataExportSvc.getPatientCompartmentResources()).thenReturn(Sets.newHashSet("Immunization", "Observation")); + + InstantType now = InstantType.now(); + + Parameters input = new Parameters(); + input.addParameter(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, new StringType(Constants.CT_FHIR_NDJSON)); + input.addParameter(JpaConstants.PARAM_EXPORT_TYPE, new StringType("Immunization, Observation")); + input.addParameter(JpaConstants.PARAM_EXPORT_SINCE, now); + input.addParameter(JpaConstants.PARAM_EXPORT_TYPE_FILTER, new StringType("Immunization?vaccine-code=foo")); + + ourLog.info(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(input)); + + HttpPost post = new HttpPost("http://localhost:" + myPort + "/Patient/" + JpaConstants.OPERATION_EXPORT); + post.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC); + post.setEntity(new ResourceEntity(myCtx, input)); + ourLog.info("Request: {}", post); + try (CloseableHttpResponse response = myClient.execute(post)) { + ourLog.info("Response: {}", response.toString()); + + assertEquals(202, response.getStatusLine().getStatusCode()); + assertEquals("Accepted", response.getStatusLine().getReasonPhrase()); + assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue()); + } + + verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture(), myBooleanArgumentCaptor.capture()); + BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue(); + assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat()); + assertThat(options.getResourceTypes(), containsInAnyOrder("Immunization", "Observation")); + assertThat(options.getSince(), notNullValue()); + assertThat(options.getFilters(), containsInAnyOrder("Immunization?vaccine-code=foo")); + } + + @Test + public void testProviderProcessesNoCacheHeader() throws IOException { + IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo() + .setJobId(A_JOB_ID); + when(myBulkDataExportSvc.submitJob(any(), anyBoolean())).thenReturn(jobInfo); + + + Parameters input = new Parameters(); + input.addParameter(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, new StringType(Constants.CT_FHIR_NDJSON)); + input.addParameter(JpaConstants.PARAM_EXPORT_TYPE, new StringType("Patient, Practitioner")); + + HttpPost post = new HttpPost("http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT); + post.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC); + post.addHeader(Constants.HEADER_CACHE_CONTROL, Constants.CACHE_CONTROL_NO_CACHE); + post.setEntity(new ResourceEntity(myCtx, input)); + ourLog.info("Request: {}", post); + try (CloseableHttpResponse response = myClient.execute(post)) { + ourLog.info("Response: {}", response.toString()); + assertEquals(202, response.getStatusLine().getStatusCode()); + assertEquals("Accepted", response.getStatusLine().getReasonPhrase()); + assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue()); + } + + + verify(myBulkDataExportSvc).submitJob(myBulkDataExportOptionsCaptor.capture(), myBooleanArgumentCaptor.capture()); + Boolean usedCache = myBooleanArgumentCaptor.getValue(); + assertThat(usedCache, is(equalTo(false))); + } + } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java index 94ba91f5fd6..b06c03fa049 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java @@ -2,11 +2,11 @@ package ca.uhn.fhir.jpa.bulk; import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor; import ca.uhn.fhir.interceptor.api.Pointcut; +import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome; import ca.uhn.fhir.jpa.batch.BatchJobsConfig; import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions; -import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions; import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.bulk.job.BulkExportJobParametersBuilder; import ca.uhn.fhir.jpa.bulk.job.GroupBulkExportJobParametersBuilder; @@ -61,6 +61,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.containsString; @@ -70,6 +71,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { @@ -97,8 +99,20 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { @Qualifier(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME) private Job myGroupBulkJob; + @Autowired + @Qualifier(BatchJobsConfig.PATIENT_BULK_EXPORT_JOB_NAME) + private Job myPatientBulkJob; + private IIdType myPatientGroupId; + + @Override + public void beforeFlushFT() { + super.beforeFlushFT(); + //This is needed for patient level export. + myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED); + } + @Test public void testPurgeExpiredJobs() { @@ -155,7 +169,11 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { @Test public void testSubmit_InvalidOutputFormat() { try { - myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Patient", "Observation"), null, null)); + BulkDataExportOptions options = new BulkDataExportOptions(); + options.setOutputFormat(Constants.CT_FHIR_JSON_NEW); + options.setResourceTypes(Sets.newHashSet("Patient", "Observation")); + options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM); + myBulkDataExportSvc.submitJob(options); fail(); } catch (InvalidRequestException e) { assertEquals("Invalid output format: application/fhir+json", e.getMessage()); @@ -165,37 +183,34 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { @Test public void testSubmit_OnlyBinarySelected() { try { - myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Binary"), null, null)); + BulkDataExportOptions options = new BulkDataExportOptions(); + options.setResourceTypes(Sets.newHashSet("Binary")); + options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM); + myBulkDataExportSvc.submitJob(options); fail(); } catch (InvalidRequestException e) { - assertEquals("Invalid output format: application/fhir+json", e.getMessage()); + assertEquals("Binary resources may not be exported with bulk export", e.getMessage()); } } @Test public void testSubmit_InvalidResourceTypes() { try { - myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient", "FOO"), null, null)); + myBulkDataExportSvc.submitJob(buildBulkDataForResourceTypes(Sets.newHashSet("Patient", "FOO"))); fail(); } catch (InvalidRequestException e) { assertEquals("Unknown or unsupported resource type: FOO", e.getMessage()); } } - @Test - public void testSubmit_MultipleTypeFiltersForSameType() { - try { - myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Patient?name=a", "Patient?active=true"))); - fail(); - } catch (InvalidRequestException e) { - assertEquals("Invalid _typeFilter value \"Patient?name=a\". Multiple filters found for type Patient", e.getMessage()); - } - } - @Test public void testSubmit_TypeFilterForNonSelectedType() { try { - myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Observation?code=123"))); + BulkDataExportOptions options = new BulkDataExportOptions(); + options.setResourceTypes(Sets.newHashSet("Patient")); + options.setFilters(Sets.newHashSet("Observation?code=123")); + options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM); + myBulkDataExportSvc.submitJob(options); fail(); } catch (InvalidRequestException e) { assertEquals("Invalid _typeFilter value \"Observation?code=123\". Resource type does not appear in _type list", e.getMessage()); @@ -205,22 +220,32 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { @Test public void testSubmit_TypeFilterInvalid() { try { - myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Hello"))); + BulkDataExportOptions options = new BulkDataExportOptions(); + options.setResourceTypes(Sets.newHashSet("Patient")); + options.setFilters(Sets.newHashSet("Hello")); + options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM); + myBulkDataExportSvc.submitJob(options); fail(); } catch (InvalidRequestException e) { assertEquals("Invalid _typeFilter value \"Hello\". Must be in the form [ResourceType]?[params]", e.getMessage()); } } + private BulkDataExportOptions buildBulkDataForResourceTypes(Set resourceTypes) { + BulkDataExportOptions options = new BulkDataExportOptions(); + options.setResourceTypes(resourceTypes); + options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM); + return options; + } @Test public void testSubmit_ReusesExisting() { // Submit - IBulkDataExportSvc.JobInfo jobDetails1 = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null)); + IBulkDataExportSvc.JobInfo jobDetails1 = myBulkDataExportSvc.submitJob(buildBulkDataForResourceTypes(Sets.newHashSet("Patient", "Observation"))); assertNotNull(jobDetails1.getJobId()); // Submit again - IBulkDataExportSvc.JobInfo jobDetails2 = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null)); + IBulkDataExportSvc.JobInfo jobDetails2 = myBulkDataExportSvc.submitJob(buildBulkDataForResourceTypes(Sets.newHashSet("Patient", "Observation"))); assertNotNull(jobDetails2.getJobId()); assertEquals(jobDetails1.getJobId(), jobDetails2.getJobId()); @@ -241,7 +266,10 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { createResources(); // Create a bulk job - IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient"), null, null)); + BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions(); + bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM); + bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Patient")); + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); assertNotNull(jobDetails.getJobId()); // Check the status @@ -271,7 +299,12 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { createResources(); // Create a bulk job - IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, Sets.newHashSet(TEST_FILTER))); + BulkDataExportOptions options = new BulkDataExportOptions(); + options.setResourceTypes(Sets.newHashSet("Patient", "Observation")); + options.setFilters(Sets.newHashSet(TEST_FILTER)); + options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM); + + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options); assertNotNull(jobDetails.getJobId()); // Check the status @@ -300,7 +333,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { assertEquals(7, nextContents.split("\n").length); // Only female patients } else if ("Observation".equals(next.getResourceType())) { assertThat(nextContents, containsString("\"subject\":{\"reference\":\"Patient/PAT0\"}}\n")); - assertEquals(16, nextContents.split("\n").length); + assertEquals(26, nextContents.split("\n").length); } else { fail(next.getResourceType()); } @@ -324,7 +357,9 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { myBinaryDao.create(b); // Create a bulk job - IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, null, null, null)); + BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions(); + bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM); + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); assertNotNull(jobDetails.getJobId()); // Check the status @@ -353,10 +388,10 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { assertEquals(17, nextContents.split("\n").length); } else if ("Observation".equals(next.getResourceType())) { assertThat(nextContents, containsString("\"subject\":{\"reference\":\"Patient/PAT0\"}}\n")); - assertEquals(16, nextContents.split("\n").length); + assertEquals(26, nextContents.split("\n").length); }else if ("Immunization".equals(next.getResourceType())) { assertThat(nextContents, containsString("\"patient\":{\"reference\":\"Patient/PAT0\"}}\n")); - assertEquals(16, nextContents.split("\n").length); + assertEquals(26, nextContents.split("\n").length); } else if ("CareTeam".equals(next.getResourceType())) { assertThat(nextContents, containsString("\"id\":\"CT0\"")); assertEquals(16, nextContents.split("\n").length); @@ -378,7 +413,11 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { // Create a bulk job HashSet types = Sets.newHashSet("Patient"); Set typeFilters = Sets.newHashSet("Patient?_has:Observation:patient:identifier=SYS|VAL3"); - IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, types, null, typeFilters)); + BulkDataExportOptions options = new BulkDataExportOptions(); + options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM); + options.setResourceTypes(types); + options.setFilters(typeFilters); + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options); assertNotNull(jobDetails.getJobId()); // Check the status @@ -430,7 +469,12 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { } // Create a bulk job - IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), cutoff.getValue(), null)); + BulkDataExportOptions options = new BulkDataExportOptions(); + options.setResourceTypes(Sets.newHashSet("Patient", "Observation")); + options.setSince(cutoff.getValue()); + options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM); + + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options); assertNotNull(jobDetails.getJobId()); // Check the status @@ -478,6 +522,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { paramBuilder .setReadChunkSize(100L) .setOutputFormat(Constants.CT_FHIR_NDJSON) + .setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM) .setResourceTypes(Arrays.asList("Patient", "Observation")); JobExecution jobExecution = myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters()); @@ -492,6 +537,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { public void awaitAllBulkJobCompletions() { List bulkExport = myJobExplorer.findJobInstancesByJobName(BatchJobsConfig.BULK_EXPORT_JOB_NAME, 0, 100); + bulkExport.addAll(myJobExplorer.findJobInstancesByJobName(BatchJobsConfig.PATIENT_BULK_EXPORT_JOB_NAME, 0, 100)); bulkExport.addAll(myJobExplorer.findJobInstancesByJobName(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME, 0, 100)); if (bulkExport.isEmpty()) { fail("There are no bulk export jobs running!"); @@ -509,7 +555,10 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { createResources(); // Create a bulk job - IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null)); + BulkDataExportOptions options = new BulkDataExportOptions(); + options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM); + options.setResourceTypes(Sets.newHashSet("Patient", "Observation")); + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options); //Add the UUID to the job BulkExportJobParametersBuilder paramBuilder = new BulkExportJobParametersBuilder() @@ -531,17 +580,20 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { createResources(); // Create a bulk job - IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("Immunization"), null, null, myPatientGroupId, true)); + BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions(); + bulkDataExportOptions.setOutputFormat(null); + bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Immunization")); + bulkDataExportOptions.setSince(null); + bulkDataExportOptions.setFilters(null); + bulkDataExportOptions.setGroupId(myPatientGroupId); + bulkDataExportOptions.setExpandMdm(true); + bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP); + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); - GroupBulkExportJobParametersBuilder paramBuilder = new GroupBulkExportJobParametersBuilder(); - paramBuilder.setGroupId(myPatientGroupId.getIdPart()); - paramBuilder.setJobUUID(jobDetails.getJobId()); - paramBuilder.setReadChunkSize(10L); + myBulkDataExportSvc.buildExportFiles(); + awaitAllBulkJobCompletions(); - JobExecution jobExecution = myBatchJobSubmitter.runJob(myGroupBulkJob, paramBuilder.toJobParameters()); - - awaitJobCompletion(jobExecution); IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE)); @@ -549,10 +601,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); // Iterate over the files - Binary nextBinary = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId()); - assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType()); - String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8); - ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents); + String nextContents = getBinaryContents(jobInfo, 0); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); assertThat(nextContents, is(containsString("IMM0"))); @@ -562,33 +611,83 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { assertThat(nextContents, is(containsString("IMM8"))); } - // CareTeam has two patient references: participant and patient. This test checks if we find the patient if participant is null but patient is not null @Test - public void testGroupBatchJobCareTeam() throws Exception { + public void testPatientLevelExportWorks() throws JobParametersInvalidException { + myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED); createResources(); // Create a bulk job - IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("CareTeam"), null, null, myPatientGroupId, true)); + BulkDataExportOptions options = new BulkDataExportOptions(); + options.setResourceTypes(Sets.newHashSet("Immunization", "Observation")); + options.setExportStyle(BulkDataExportOptions.ExportStyle.PATIENT); + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options); GroupBulkExportJobParametersBuilder paramBuilder = new GroupBulkExportJobParametersBuilder(); paramBuilder.setGroupId(myPatientGroupId.getIdPart()); paramBuilder.setJobUUID(jobDetails.getJobId()); paramBuilder.setReadChunkSize(10L); - JobExecution jobExecution = myBatchJobSubmitter.runJob(myGroupBulkJob, paramBuilder.toJobParameters()); + JobExecution jobExecution = myBatchJobSubmitter.runJob(myPatientBulkJob, paramBuilder.toJobParameters()); awaitJobCompletion(jobExecution); IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); + assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE)); + assertThat(jobInfo.getFiles().size(), equalTo(2)); + assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); + + // Iterate over the files + String nextContents = getBinaryContents(jobInfo, 0); + + assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); + assertThat(nextContents, is(containsString("IMM0"))); + assertThat(nextContents, is(containsString("IMM1"))); + assertThat(nextContents, is(containsString("IMM2"))); + assertThat(nextContents, is(containsString("IMM3"))); + assertThat(nextContents, is(containsString("IMM4"))); + assertThat(nextContents, is(containsString("IMM5"))); + assertThat(nextContents, is(containsString("IMM6"))); + assertThat(nextContents, is(containsString("IMM7"))); + assertThat(nextContents, is(containsString("IMM8"))); + assertThat(nextContents, is(containsString("IMM9"))); + assertThat(nextContents, is(containsString("IMM999"))); + + assertThat(nextContents, is(not(containsString("IMM2000")))); + assertThat(nextContents, is(not(containsString("IMM2001")))); + assertThat(nextContents, is(not(containsString("IMM2002")))); + assertThat(nextContents, is(not(containsString("IMM2003")))); + assertThat(nextContents, is(not(containsString("IMM2004")))); + assertThat(nextContents, is(not(containsString("IMM2005")))); + + } + + // CareTeam has two patient references: participant and patient. This test checks if we find the patient if participant is null but patient is not null + @Test + public void testGroupBatchJobCareTeam() throws Exception { + createResources(); + + BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions(); + bulkDataExportOptions.setOutputFormat(null); + bulkDataExportOptions.setResourceTypes(Sets.newHashSet("CareTeam")); + bulkDataExportOptions.setSince(null); + bulkDataExportOptions.setFilters(null); + bulkDataExportOptions.setGroupId(myPatientGroupId); + bulkDataExportOptions.setExpandMdm(true); + bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP); + // Create a bulk job + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); + + myBulkDataExportSvc.buildExportFiles(); + awaitAllBulkJobCompletions(); + + IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); + assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE)); assertThat(jobInfo.getFiles().size(), equalTo(1)); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("CareTeam"))); // Iterate over the files - Binary nextBinary = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId()); - assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType()); - String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8); - ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents); + String nextContents = getBinaryContents(jobInfo, 0); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("CareTeam"))); assertThat(nextContents, is(containsString("CT0"))); @@ -604,19 +703,102 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { JobParametersBuilder paramBuilder = new JobParametersBuilder().addString("jobUUID", "I'm not real!"); try { myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters()); - fail("Should have had invalid parameter execption!"); + fail("Should have had invalid parameter exception!"); } catch (JobParametersInvalidException e) { // good } } + @Test + public void testSystemExportWithMultipleTypeFilters() { + createResources(); + + // Create a bulk job + BulkDataExportOptions options = new BulkDataExportOptions(); + options.setResourceTypes(Sets.newHashSet("Immunization")); + options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM); + options.setFilters(Sets.newHashSet("Immunization?vaccine-code=Flu", "Immunization?patient=Patient/PAT1")); + + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options); + myBulkDataExportSvc.buildExportFiles(); + awaitAllBulkJobCompletions(); + + + IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); + + assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE)); + assertThat(jobInfo.getFiles().size(), equalTo(1)); + assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); + + // Iterate over the files + String nextContents = getBinaryContents(jobInfo, 0); + + assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); + //These are the COVID-19 entries + assertThat(nextContents, is(containsString("IMM0"))); + assertThat(nextContents, is(containsString("IMM2"))); + assertThat(nextContents, is(containsString("IMM4"))); + assertThat(nextContents, is(containsString("IMM6"))); + assertThat(nextContents, is(containsString("IMM8"))); + + //This is the entry for the one referencing patient/1 + assertThat(nextContents, is(containsString("IMM1"))); + } + + @Test + public void testGroupExportWithMultipleTypeFilters() { + createResources(); + + // Create a bulk job + BulkDataExportOptions options = new BulkDataExportOptions(); + options.setResourceTypes(Sets.newHashSet("Observation")); + options.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP); + options.setGroupId(myPatientGroupId); + options.setExpandMdm(false); + options.setFilters(Sets.newHashSet("Observation?identifier=VAL0,VAL2", "Observation?identifier=VAL4")); + + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(options); + myBulkDataExportSvc.buildExportFiles(); + awaitAllBulkJobCompletions(); + + IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); + + assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE)); + assertThat(jobInfo.getFiles().size(), equalTo(1)); + assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Observation"))); + String nextContents = getBinaryContents(jobInfo, 0); + + //These are the Observation entries + assertThat(nextContents, is(containsString("OBS0"))); + assertThat(nextContents, is(containsString("OBS2"))); + assertThat(nextContents, is(containsString("OBS4"))); + assertEquals(3, nextContents.split("\n").length); + } + + public String getBinaryContents(IBulkDataExportSvc.JobInfo theJobInfo, int theIndex) { + // Iterate over the files + Binary nextBinary = myBinaryDao.read(theJobInfo.getFiles().get(theIndex).getResourceId()); + assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType()); + String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8); + ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents); + return nextContents; + } + @Test public void testMdmExpansionSuccessfullyExtractsPatients() throws JobParametersInvalidException { createResources(); // Create a bulk job - IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("Patient"), null, null, myPatientGroupId, true)); + BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions(); + bulkDataExportOptions.setOutputFormat(null); + bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Patient")); + bulkDataExportOptions.setSince(null); + bulkDataExportOptions.setFilters(null); + bulkDataExportOptions.setGroupId(myPatientGroupId); + bulkDataExportOptions.setExpandMdm(true); + bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP); + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); myBulkDataExportSvc.buildExportFiles(); awaitAllBulkJobCompletions(); @@ -626,10 +808,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { assertThat(jobInfo.getFiles().size(), equalTo(1)); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Patient"))); - Binary patientExportContent = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId()); - assertEquals(Constants.CT_FHIR_NDJSON, patientExportContent.getContentType()); - String nextContents = new String(patientExportContent.getContent(), Constants.CHARSET_UTF8); - ourLog.info("Next contents for type {}:\n{}", patientExportContent.getResourceType(), nextContents); + String nextContents = getBinaryContents(jobInfo, 0); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Patient"))); //Output contains The entire group, plus the Mdm expansion, plus the golden resource @@ -641,23 +820,28 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { createResources(); // Create a bulk job - IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("Immunization", "Observation"), null, null, myPatientGroupId, true)); + BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions(); + bulkDataExportOptions.setOutputFormat(null); + bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Immunization", "Observation")); + bulkDataExportOptions.setSince(null); + bulkDataExportOptions.setFilters(null); + bulkDataExportOptions.setGroupId(myPatientGroupId); + bulkDataExportOptions.setExpandMdm(true); + bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP); + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); myBulkDataExportSvc.buildExportFiles(); awaitAllBulkJobCompletions(); IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); - assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Observation,Immunization&_groupId=" + myPatientGroupId +"&_mdm=true", jobInfo.getRequest()); + assertEquals("/Group/G0/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Observation,Immunization&_groupId=" + myPatientGroupId +"&_mdm=true", jobInfo.getRequest()); assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE)); assertThat(jobInfo.getFiles().size(), equalTo(2)); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); // Check immunization Content - Binary immunizationExportContent = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId()); - assertEquals(Constants.CT_FHIR_NDJSON, immunizationExportContent.getContentType()); - String nextContents = new String(immunizationExportContent.getContent(), Constants.CHARSET_UTF8); - ourLog.info("Next contents for type {}:\n{}", immunizationExportContent.getResourceType(), nextContents); + String nextContents = getBinaryContents(jobInfo, 0); assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); assertThat(nextContents, is(containsString("IMM0"))); assertThat(nextContents, is(containsString("IMM2"))); @@ -697,23 +881,24 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { @Test public void testGroupBulkExportSupportsTypeFilters() throws JobParametersInvalidException { createResources(); - Set filters = new HashSet<>(); //Only get COVID-19 vaccinations + Set filters = new HashSet<>(); filters.add("Immunization?vaccine-code=vaccines|COVID-19"); - GroupBulkDataExportOptions groupBulkDataExportOptions = new GroupBulkDataExportOptions(null, Sets.newHashSet("Immunization"), null, filters, myPatientGroupId, true ); - IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(groupBulkDataExportOptions); + BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions(); + bulkDataExportOptions.setOutputFormat(null); + bulkDataExportOptions.setResourceTypes(Sets.newHashSet("Immunization")); + bulkDataExportOptions.setSince(null); + bulkDataExportOptions.setFilters(filters); + bulkDataExportOptions.setGroupId(myPatientGroupId); + bulkDataExportOptions.setExpandMdm(true); + bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP); + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); - GroupBulkExportJobParametersBuilder paramBuilder = new GroupBulkExportJobParametersBuilder(); - paramBuilder.setGroupId(myPatientGroupId.getIdPart()); - paramBuilder.setMdm(true); - paramBuilder.setJobUUID(jobDetails.getJobId()); - paramBuilder.setReadChunkSize(10L); + myBulkDataExportSvc.buildExportFiles(); + awaitAllBulkJobCompletions(); - JobExecution jobExecution = myBatchJobSubmitter.runJob(myGroupBulkJob, paramBuilder.toJobParameters()); - - awaitJobCompletion(jobExecution); IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE)); @@ -721,12 +906,8 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); // Check immunization Content - Binary immunizationExportContent = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId()); - assertEquals(Constants.CT_FHIR_NDJSON, immunizationExportContent.getContentType()); - String nextContents = new String(immunizationExportContent.getContent(), Constants.CHARSET_UTF8); - ourLog.info("Next contents for type {}:\n{}", immunizationExportContent.getResourceType(), nextContents); + String nextContents = getBinaryContents(jobInfo, 0); - assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); assertThat(nextContents, is(containsString("IMM1"))); assertThat(nextContents, is(containsString("IMM3"))); assertThat(nextContents, is(containsString("IMM5"))); @@ -737,6 +918,82 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { assertThat(nextContents, is(not(containsString("Flu")))); } + @Test + public void testAllExportStylesWorkWithNullResourceTypes() { + createResources(); + myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED); + // Create a bulk job + BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions(); + bulkDataExportOptions.setOutputFormat(null); + bulkDataExportOptions.setResourceTypes(null); + bulkDataExportOptions.setSince(null); + bulkDataExportOptions.setFilters(null); + bulkDataExportOptions.setGroupId(myPatientGroupId); + bulkDataExportOptions.setExpandMdm(true); + bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.PATIENT); + + //Patient-style + IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); + myBulkDataExportSvc.buildExportFiles(); + awaitAllBulkJobCompletions(); + IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); + assertThat(jobInfo.getStatus(), is(equalTo(BulkJobStatusEnum.COMPLETE))); + + //Group-style + bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP); + bulkDataExportOptions.setGroupId(myPatientGroupId); + jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); + myBulkDataExportSvc.buildExportFiles(); + awaitAllBulkJobCompletions(); + jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); + assertThat(jobInfo.getStatus(), is(equalTo(BulkJobStatusEnum.COMPLETE))); + + //System-style + bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM); + jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions); + myBulkDataExportSvc.buildExportFiles(); + awaitAllBulkJobCompletions(); + jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); + assertThat(jobInfo.getStatus(), is(equalTo(BulkJobStatusEnum.COMPLETE))); + } + + @Test + public void testCacheSettingIsRespectedWhenCreatingNewJobs() { + BulkDataExportOptions options = new BulkDataExportOptions(); + options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM); + options.setResourceTypes(Sets.newHashSet("Procedure")); + IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.submitJob(options, true); + IBulkDataExportSvc.JobInfo jobInfo1 = myBulkDataExportSvc.submitJob(options, true); + IBulkDataExportSvc.JobInfo jobInfo2 = myBulkDataExportSvc.submitJob(options, true); + IBulkDataExportSvc.JobInfo jobInfo3 = myBulkDataExportSvc.submitJob(options, true); + IBulkDataExportSvc.JobInfo jobInfo4 = myBulkDataExportSvc.submitJob(options, true); + + //Cached should have all identical Job IDs. + String initialJobId = jobInfo.getJobId(); + boolean allMatch = Stream.of(jobInfo, jobInfo1, jobInfo2, jobInfo3, jobInfo4).allMatch(job -> job.getJobId().equals(initialJobId)); + assertTrue(allMatch); + + IBulkDataExportSvc.JobInfo jobInfo5 = myBulkDataExportSvc.submitJob(options, false); + IBulkDataExportSvc.JobInfo jobInfo6 = myBulkDataExportSvc.submitJob(options, false); + IBulkDataExportSvc.JobInfo jobInfo7 = myBulkDataExportSvc.submitJob(options, false); + IBulkDataExportSvc.JobInfo jobInfo8 = myBulkDataExportSvc.submitJob(options, false); + IBulkDataExportSvc.JobInfo jobInfo9 = myBulkDataExportSvc.submitJob(options, false); + + //First non-cached should retrieve new ID. + assertThat(initialJobId, is(not(equalTo(jobInfo5.getJobId())))); + + //Non-cached should all have unique IDs + List jobIds = Stream.of(jobInfo5, jobInfo6, jobInfo7, jobInfo8, jobInfo9).map(IBulkDataExportSvc.JobInfo::getJobId).collect(Collectors.toList()); + ourLog.info("ZOOP {}", String.join(", ", jobIds)); + Set uniqueJobIds = new HashSet<>(jobIds); + assertEquals(uniqueJobIds.size(), jobIds.size()); + + //Now if we create another one and ask for the cache, we should get the most-recently-insert entry. + IBulkDataExportSvc.JobInfo jobInfo10 = myBulkDataExportSvc.submitJob(options, true); + assertThat(jobInfo10.getJobId(), is(equalTo(jobInfo9.getJobId()))); + + } + private void awaitJobCompletion(JobExecution theJobExecution) { await().atMost(120, TimeUnit.SECONDS).until(() -> { JobExecution jobExecution = myJobExplorer.getJobExecution(theJobExecution.getId()); @@ -760,8 +1017,6 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { createImmunizationWithIndex(999, g1Outcome.getId()); createCareTeamWithIndex(999, g1Outcome.getId()); - //Lets create an observation and an immunization for our golden patient. - for (int i = 0; i < 10; i++) { DaoMethodOutcome patientOutcome = createPatientWithIndex(i); IIdType patId = patientOutcome.getId().toUnqualifiedVersionless(); @@ -780,6 +1035,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { createImmunizationWithIndex(i, patId); createCareTeamWithIndex(i, patId); } + myPatientGroupId = myGroupDao.update(group).getId(); //Manually create another golden record @@ -789,14 +1045,22 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { Long goldenPid2 = myIdHelperService.getPidOrNull(g2Outcome.getResource()); //Create some nongroup patients MDM linked to a different golden resource. They shouldnt be included in the query. - for (int i = 1000; i < 1005; i++) { - DaoMethodOutcome patientOutcome = createPatientWithIndex(i); + for (int i = 0; i < 5; i++) { + int index = 1000 + i; + DaoMethodOutcome patientOutcome = createPatientWithIndex(index); IIdType patId = patientOutcome.getId().toUnqualifiedVersionless(); Long sourcePid = myIdHelperService.getPidOrNull(patientOutcome.getResource()); linkToGoldenResource(goldenPid2, sourcePid); - createObservationWithIndex(i, patId); - createImmunizationWithIndex(i, patId); - createCareTeamWithIndex(i, patId); + createObservationWithIndex(index, patId); + createImmunizationWithIndex(index, patId); + createCareTeamWithIndex(index, patId); + } + + //Create some Observations and immunizations which have _no subjects!_ These will be exlucded from the Patient level export. + for (int i = 0; i < 10; i++) { + int index = 2000 + i; + createObservationWithIndex(index, null); + createImmunizationWithIndex(index, null); } } @@ -820,7 +1084,9 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { private void createImmunizationWithIndex(int i, IIdType patId) { Immunization immunization = new Immunization(); immunization.setId("IMM" + i); - immunization.setPatient(new Reference(patId)); + if (patId != null ) { + immunization.setPatient(new Reference(patId)); + } if (i % 2 == 0) { CodeableConcept cc = new CodeableConcept(); cc.addCoding().setSystem("vaccines").setCode("Flu"); @@ -838,7 +1104,9 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { obs.setId("OBS" + i); obs.addIdentifier().setSystem("SYS").setValue("VAL" + i); obs.setStatus(Observation.ObservationStatus.FINAL); - obs.getSubject().setReference(patId.getValue()); + if (patId != null) { + obs.getSubject().setReference(patId.getValue()); + } myObservationDao.update(obs); } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/BaseJpaR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/BaseJpaR4Test.java index bfbf2dfbe8f..4d36ee8c648 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/BaseJpaR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/BaseJpaR4Test.java @@ -162,6 +162,7 @@ import org.hl7.fhir.r4.model.ValueSet; import org.hl7.fhir.r5.utils.IResourceValidator; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Autowired; @@ -539,7 +540,6 @@ public abstract class BaseJpaR4Test extends BaseJpaTest implements ITestDataBuil runInTransaction(() -> { SearchSession searchSession = Search.session(myEntityManager); searchSession.workspace(ResourceTable.class).purge(); -// searchSession.workspace(ResourceIndexedSearchParamString.class).purge(); searchSession.indexingPlan().execute(); }); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4Test.java index d61f84b905e..f7675d5566f 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4Test.java @@ -333,6 +333,32 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test { } + @Test + public void testSearchWithDateInvalid() throws IOException { + HttpGet get = new HttpGet(ourServerBase + "/Condition?onset-date=junk"); + try (CloseableHttpResponse resp = ourHttpClient.execute(get)) { + String output = IOUtils.toString(resp.getEntity().getContent(), Charsets.UTF_8); + assertThat(output, containsString("Invalid date/time format: "junk"")); + assertEquals(400, resp.getStatusLine().getStatusCode()); + } + + get = new HttpGet(ourServerBase + "/Condition?onset-date=ge"); + try (CloseableHttpResponse resp = ourHttpClient.execute(get)) { + String output = IOUtils.toString(resp.getEntity().getContent(), Charsets.UTF_8); + assertThat(output, containsString("Invalid date/time format: "ge"")); + assertEquals(400, resp.getStatusLine().getStatusCode()); + } + + get = new HttpGet(ourServerBase + "/Condition?onset-date=" + UrlUtil.escapeUrlParam(">")); + try (CloseableHttpResponse resp = ourHttpClient.execute(get)) { + String output = IOUtils.toString(resp.getEntity().getContent(), Charsets.UTF_8); + assertThat(output, containsString("Invalid date/time format: ">"")); + assertEquals(400, resp.getStatusLine().getStatusCode()); + } + + } + + @Test public void testSearchWithSlashes() { myDaoConfig.setSearchPreFetchThresholds(Lists.newArrayList(10, 50, 10000));