Merge pull request #2420 from hapifhir/issue-2401-add-support-for-group-id-export

Issue 2401 add support for group id export
This commit is contained in:
Tadgh 2021-02-26 10:23:42 -05:00 committed by GitHub
commit 74066a81b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 809 additions and 95 deletions

View File

@ -0,0 +1,6 @@
---
type: add
issue: 2401
title: "Group Bulk exports are now supported. You can export all data for a Group of Patients via the
`/Group/[id]/$export` endpoint for any resource type which contains a patient compartment.
The _typeFilter and _since criteria are currently not supported at this level, but may eventually be"

View File

@ -58,6 +58,8 @@ public class PidToIBaseResourceProcessor implements ItemProcessor<List<ResourceP
@Override
public List<IBaseResource> process(List<ResourcePersistentId> theResourcePersistentId) {
String collect = theResourcePersistentId.stream().map(pid -> pid.getId().toString()).collect(Collectors.joining(","));
ourLog.trace("Processing PIDs: {}" + collect);
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(myResourceType);
Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();

View File

@ -0,0 +1,34 @@
package ca.uhn.fhir.jpa.bulk.api;
import java.util.Date;
import java.util.Set;
public class BulkDataExportOptions {
private final String myOutputFormat;
private final Set<String> myResourceTypes;
private final Date mySince;
private final Set<String> myFilters;
public BulkDataExportOptions(String theOutputFormat, Set<String> theResourceTypes, Date theSince, Set<String> theFilters) {
myOutputFormat = theOutputFormat;
myResourceTypes = theResourceTypes;
mySince = theSince;
myFilters = theFilters;
}
public String getOutputFormat() {
return myOutputFormat;
}
public Set<String> getResourceTypes() {
return myResourceTypes;
}
public Date getSince() {
return mySince;
}
public Set<String> getFilters() {
return myFilters;
}
}

View File

@ -0,0 +1,25 @@
package ca.uhn.fhir.jpa.bulk.api;
import org.hl7.fhir.instance.model.api.IIdType;
import java.util.Date;
import java.util.Set;
public class GroupBulkDataExportOptions extends BulkDataExportOptions {
private final IIdType myGroupId;
private final boolean myMdm;
public GroupBulkDataExportOptions(String theOutputFormat, Set<String> theResourceTypes, Date theSince, Set<String> theFilters, IIdType theGroupId, boolean theMdm) {
super(theOutputFormat, theResourceTypes, theSince, theFilters);
myGroupId = theGroupId;
myMdm = theMdm;
}
public IIdType getGroupId() {
return myGroupId;
}
public boolean isMdm() {
return myMdm;
}
}

View File

@ -27,7 +27,6 @@ import javax.transaction.Transactional;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
public interface IBulkDataExportSvc {
void buildExportFiles();
@ -35,7 +34,7 @@ public interface IBulkDataExportSvc {
@Transactional(value = Transactional.TxType.NEVER)
void purgeExpiredFiles();
JobInfo submitJob(String theOutputFormat, Set<String> theResourceTypes, Date theSince, Set<String> theFilters);
JobInfo submitJob(BulkDataExportOptions theBulkDataExportOptions);
JobInfo getJobInfoOrThrowResourceNotFound(String theJobId);

View File

@ -48,7 +48,11 @@ public class BulkExportGenerateResourceFilesStepListener implements StepExecutio
@Override
public ExitStatus afterStep(StepExecution theStepExecution) {
if (theStepExecution.getExitStatus().getExitCode().equals(ExitStatus.FAILED.getExitCode())) {
String jobUuid = theStepExecution.getJobExecution().getJobParameters().getString("jobUUID");
//Try to fetch it from the parameters first, and if it doesn't exist, fetch it from the context.
String jobUuid = theStepExecution.getJobExecution().getJobParameters().getString(BulkExportJobConfig.JOB_UUID_PARAMETER);
if (jobUuid == null) {
jobUuid = theStepExecution.getJobExecution().getExecutionContext().getString(BulkExportJobConfig.JOB_UUID_PARAMETER);
}
assert isNotBlank(jobUuid);
String exitDescription = theStepExecution.getExitStatus().getExitDescription();
myBulkExportDaoSvc.setJobToStatus(jobUuid, BulkJobStatusEnum.ERROR, exitDescription);

View File

@ -31,6 +31,7 @@ import org.springframework.batch.core.configuration.annotation.JobBuilderFactory
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
@ -45,6 +46,11 @@ import java.util.List;
*/
@Configuration
public class BulkExportJobConfig {
public static final String JOB_UUID_PARAMETER = "jobUUID";
public static final String READ_CHUNK_PARAMETER = "readChunkSize";
public static final String GROUP_ID_PARAMETER = "groupId";
public static final String RESOURCE_TYPES_PARAMETER = "resourceTypes";
public static final int CHUNK_SIZE = 100;
@Autowired
private StepBuilderFactory myStepBuilderFactory;
@ -71,6 +77,23 @@ public class BulkExportJobConfig {
.build();
}
@Bean
@Lazy
public Job groupBulkExportJob() {
return myJobBuilderFactory.get("groupBulkExportJob")
.validator(groupBulkJobParameterValidator())
.validator(bulkJobParameterValidator())
.start(createBulkExportEntityStep())
.next(groupPartitionStep())
.next(closeJobStep())
.build();
}
@Bean
public GroupIdPresentValidator groupBulkJobParameterValidator() {
return new GroupIdPresentValidator();
}
@Bean
public Step createBulkExportEntityStep() {
return myStepBuilderFactory.get("createBulkExportEntityStep")
@ -89,10 +112,28 @@ public class BulkExportJobConfig {
return new BulkExportJobParameterValidator();
}
@Bean
public Step groupBulkExportGenerateResourceFilesStep() {
return myStepBuilderFactory.get("groupBulkExportGenerateResourceFilesStep")
.<List<ResourcePersistentId>, List<IBaseResource>> chunk(CHUNK_SIZE) //1000 resources per generated file, as the reader returns 10 resources at a time.
.reader(groupBulkItemReader())
.processor(myPidToIBaseResourceProcessor)
.writer(resourceToFileWriter())
.listener(bulkExportGenrateResourceFilesStepListener())
.build();
}
@Bean
@StepScope
public GroupBulkItemReader groupBulkItemReader(){
return new GroupBulkItemReader();
}
@Bean
public Step bulkExportGenerateResourceFilesStep() {
return myStepBuilderFactory.get("bulkExportGenerateResourceFilesStep")
.<List<ResourcePersistentId>, List<IBaseResource>> chunk(100) //1000 resources per generated file, as the reader returns 10 resources at a time.
.<List<ResourcePersistentId>, List<IBaseResource>> chunk(CHUNK_SIZE) //1000 resources per generated file, as the reader returns 10 resources at a time.
.reader(bulkItemReader())
.processor(myPidToIBaseResourceProcessor)
.writer(resourceToFileWriter())
@ -125,6 +166,14 @@ public class BulkExportJobConfig {
return new BulkExportGenerateResourceFilesStepListener();
}
@Bean
public Step groupPartitionStep() {
return myStepBuilderFactory.get("partitionStep")
.partitioner("groupBulkExportGenerateResourceFilesStep", bulkExportResourceTypePartitioner())
.step(groupBulkExportGenerateResourceFilesStep())
.build();
}
@Bean
public Step partitionStep() {
return myStepBuilderFactory.get("partitionStep")

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.rest.api.Constants;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.JobParametersValidator;
@ -34,10 +35,13 @@ import org.springframework.transaction.support.TransactionTemplate;
import java.util.Arrays;
import java.util.Optional;
import static org.slf4j.LoggerFactory.getLogger;
/**
* This class will prevent a job from running if the UUID does not exist or is invalid.
*/
public class BulkExportJobParameterValidator implements JobParametersValidator {
@Autowired
private IBulkExportJobDao myBulkExportJobDao;
@Autowired
@ -52,11 +56,11 @@ public class BulkExportJobParameterValidator implements JobParametersValidator {
TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager);
String errorMessage = txTemplate.execute(tx -> {
StringBuilder errorBuilder = new StringBuilder();
Long readChunkSize = theJobParameters.getLong("readChunkSize");
Long readChunkSize = theJobParameters.getLong(BulkExportJobConfig.READ_CHUNK_PARAMETER);
if (readChunkSize == null || readChunkSize < 1) {
errorBuilder.append("There must be a valid number for readChunkSize, which is at least 1. ");
}
String jobUUID = theJobParameters.getString("jobUUID");
String jobUUID = theJobParameters.getString(BulkExportJobConfig.JOB_UUID_PARAMETER);
Optional<BulkExportJobEntity> oJob = myBulkExportJobDao.findByJobId(jobUUID);
if (!StringUtils.isBlank(jobUUID) && !oJob.isPresent()) {
errorBuilder.append("There is no persisted job that exists with UUID: " + jobUUID + ". ");
@ -66,9 +70,9 @@ public class BulkExportJobParameterValidator implements JobParametersValidator {
boolean hasExistingJob = oJob.isPresent();
//Check for to-be-created parameters.
if (!hasExistingJob) {
String resourceTypes = theJobParameters.getString("resourceTypes");
String resourceTypes = theJobParameters.getString(BulkExportJobConfig.RESOURCE_TYPES_PARAMETER);
if (StringUtils.isBlank(resourceTypes)) {
errorBuilder.append("You must include [resourceTypes] as a Job Parameter");
errorBuilder.append("You must include [").append(BulkExportJobConfig.RESOURCE_TYPES_PARAMETER).append("] as a Job Parameter");
} else {
String[] resourceArray = resourceTypes.split(",");
Arrays.stream(resourceArray).filter(resourceType -> resourceType.equalsIgnoreCase("Binary"))
@ -82,9 +86,8 @@ public class BulkExportJobParameterValidator implements JobParametersValidator {
if (!StringUtils.isBlank(outputFormat) && !Constants.CT_FHIR_NDJSON.equals(outputFormat)) {
errorBuilder.append("The only allowed format for Bulk Export is currently " + Constants.CT_FHIR_NDJSON);
}
}
return errorBuilder.toString();
});

View File

@ -54,8 +54,15 @@ import java.util.Optional;
public class BulkItemReader implements ItemReader<List<ResourcePersistentId>> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
Iterator<ResourcePersistentId> myPidIterator;
@Value("#{jobParameters['readChunkSize']}")
private Long READ_CHUNK_SIZE;
@Value("#{jobParameters['" + BulkExportJobConfig.READ_CHUNK_PARAMETER + "']}")
private Long myReadChunkSize;
@Value("#{jobExecutionContext['"+ BulkExportJobConfig.JOB_UUID_PARAMETER+"']}")
private String myJobUUID;
@Value("#{stepExecutionContext['resourceType']}")
private String myResourceType;
@Autowired
private IBulkExportJobDao myBulkExportJobDao;
@Autowired
@ -64,10 +71,7 @@ public class BulkItemReader implements ItemReader<List<ResourcePersistentId>> {
private FhirContext myContext;
@Autowired
private SearchBuilderFactory mySearchBuilderFactory;
@Value("#{jobExecutionContext['jobUUID']}")
private String myJobUUID;
@Value("#{stepExecutionContext['resourceType']}")
private String myResourceType;
@Autowired
private MatchUrlService myMatchUrlService;
@ -124,7 +128,7 @@ public class BulkItemReader implements ItemReader<List<ResourcePersistentId>> {
}
int count = 0;
List<ResourcePersistentId> outgoing = new ArrayList<>();
while (myPidIterator.hasNext() && count < READ_CHUNK_SIZE) {
while (myPidIterator.hasNext() && count < myReadChunkSize) {
outgoing.add(myPidIterator.next());
count += 1;
}

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.bulk.job;
* #L%
*/
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.rest.api.Constants;
import org.apache.commons.lang3.StringUtils;
@ -45,8 +46,8 @@ public class CreateBulkExportEntityTasklet implements Tasklet {
Map<String, Object> jobParameters = theChunkContext.getStepContext().getJobParameters();
//We can leave early if they provided us with an existing job.
if (jobParameters.containsKey("jobUUID")) {
addUUIDToJobContext(theChunkContext, (String)jobParameters.get("jobUUID"));
if (jobParameters.containsKey(BulkExportJobConfig.JOB_UUID_PARAMETER)) {
addUUIDToJobContext(theChunkContext, (String)jobParameters.get(BulkExportJobConfig.JOB_UUID_PARAMETER));
return RepeatStatus.FINISHED;
} else {
String resourceTypes = (String)jobParameters.get("resourceTypes");
@ -60,12 +61,12 @@ public class CreateBulkExportEntityTasklet implements Tasklet {
}
Set<String> resourceTypeSet = Arrays.stream(resourceTypes.split(",")).collect(Collectors.toSet());
String outputFormat = (String)jobParameters.get("outputFormat");
String outputFormat = (String) jobParameters.get("outputFormat");
if (StringUtils.isBlank(outputFormat)) {
outputFormat = Constants.CT_FHIR_NDJSON;
}
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.submitJob(outputFormat, resourceTypeSet, since, filterSet);
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(outputFormat, resourceTypeSet, since, filterSet));
addUUIDToJobContext(theChunkContext, jobInfo.getJobId());
return RepeatStatus.FINISHED;
@ -78,6 +79,6 @@ public class CreateBulkExportEntityTasklet implements Tasklet {
.getStepExecution()
.getJobExecution()
.getExecutionContext()
.putString("jobUUID", theJobUUID);
.putString(BulkExportJobConfig.JOB_UUID_PARAMETER, theJobUUID);
}
}

View File

@ -0,0 +1,8 @@
package ca.uhn.fhir.jpa.bulk.job;
public class GroupBulkExportJobParametersBuilder extends BulkExportJobParametersBuilder {
public GroupBulkExportJobParametersBuilder setGroupId(String theGroupId) {
this.addString(BulkExportJobConfig.GROUP_ID_PARAMETER, theGroupId);
return this;
}
}

View File

@ -0,0 +1,197 @@
package ca.uhn.fhir.jpa.bulk.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2021 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.QueryChunker;
import ca.uhn.fhir.model.api.Include;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.rest.param.HasParam;
import ca.uhn.fhir.util.UrlUtil;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import javax.annotation.Nonnull;
import javax.persistence.EntityManager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId>> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
Iterator<ResourcePersistentId> myPidIterator;
@Value("#{stepExecutionContext['resourceType']}")
private String myResourceType;
@Value("#{jobParameters['" + BulkExportJobConfig.GROUP_ID_PARAMETER + "']}")
private String myGroupId;
@Value("#{jobExecutionContext['"+ BulkExportJobConfig.JOB_UUID_PARAMETER+"']}")
private String myJobUUID;
@Value("#{jobParameters['" + BulkExportJobConfig.READ_CHUNK_PARAMETER + "']}")
private Long myReadChunkSize;
@Autowired
private IBulkExportJobDao myBulkExportJobDao;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private FhirContext myContext;
@Autowired
private SearchBuilderFactory mySearchBuilderFactory;
@Autowired
private EntityManager myEntityManager;
private void loadResourcePids() {
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
if (!jobOpt.isPresent()) {
ourLog.warn("Job appears to be deleted");
return;
}
BulkExportJobEntity jobEntity = jobOpt.get();
ourLog.info("Group Bulk export starting generation for batch export job: [{}] with resourceType [{}] and UUID [{}]", jobEntity, myResourceType, myJobUUID);
//Fetch all the pids given the query.
ISearchBuilder searchBuilder = getSearchBuilder();
//Build complex-ish _has query with a revincludes which allows lookup by group membership
SearchParameterMap searchParameterMap = getSearchParameterMap(jobEntity);
IResultIterator resultIterator = searchBuilder.createQuery(
searchParameterMap,
new SearchRuntimeDetails(null, myJobUUID),
null,
RequestPartitionId.allPartitions()
);
List<ResourcePersistentId> myReadPids = new ArrayList<>();
while (resultIterator.hasNext()) {
myReadPids.add(resultIterator.next());
}
//Given that databases explode when you have an IN clause with >1000 resources, we use the QueryChunker to break this into multiple queries.
List<ResourcePersistentId> revIncludePids = new ArrayList<>();
QueryChunker<ResourcePersistentId> chunker = new QueryChunker<>();
chunker.chunk(myReadPids, pidChunk -> {
revIncludePids.addAll(searchBuilder.loadIncludes(myContext, myEntityManager, pidChunk, searchParameterMap.getRevIncludes(), true, searchParameterMap.getLastUpdated(), myJobUUID, null));
});
myPidIterator = revIncludePids.iterator();
}
//For all group revinclude queries, you need to perform the search on the Patient DAO, which is why this is hardcoded here.
private ISearchBuilder getSearchBuilder() {
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao("Patient");
RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient");
Class<? extends IBaseResource> nextTypeClass = def.getImplementingClass();
return mySearchBuilderFactory.newSearchBuilder(dao, "Patient", nextTypeClass);
}
@Nonnull
private SearchParameterMap getSearchParameterMap(BulkExportJobEntity jobEntity) {
SearchParameterMap searchParameterMap = new SearchParameterMap();
searchParameterMap.add("_has", new HasParam("Group", "member", "_id", myGroupId));
String revIncludeString = buildRevIncludeString();
searchParameterMap.addRevInclude(new Include(revIncludeString).toLocked());
if (jobEntity.getSince() != null) {
searchParameterMap.setLastUpdated(new DateRangeParam(jobEntity.getSince(), null));
}
searchParameterMap.setLoadSynchronous(true);
return searchParameterMap;
}
/**
* Given the resource type of the job, fetch its patient compartment name, formatted for usage in an Include.
* e.g. Immunization -> Immunization:patient
*
* @return A string which can be dropped directly into an Include.
*/
private String buildRevIncludeString() {
RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(myResourceType);
RuntimeSearchParam patientSearchParam = runtimeResourceDefinition.getSearchParam("patient");
if (patientSearchParam == null) {
patientSearchParam = runtimeResourceDefinition.getSearchParam("subject");
if (patientSearchParam == null) {
patientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition);
}
}
String includeString = runtimeResourceDefinition.getName() + ":" + patientSearchParam.getName();
return includeString;
}
/**
* Search the resource definition for a compartment named 'patient' and return its related Search Parameter.
*/
private RuntimeSearchParam getRuntimeSearchParamByCompartment(RuntimeResourceDefinition runtimeResourceDefinition) {
RuntimeSearchParam patientSearchParam;
List<RuntimeSearchParam> searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient");
if (searchParams == null || searchParams.size() == 0) {
String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as it contains no Patient compartment, and no `patient` or `subject` search parameter", myResourceType);
throw new IllegalArgumentException(errorMessage);
} else if (searchParams.size() == 1) {
patientSearchParam = searchParams.get(0);
} else {
String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as we are unable to disambiguate which patient search parameter we should be searching by.", myResourceType);
throw new IllegalArgumentException(errorMessage);
}
return patientSearchParam;
}
@Override
public List<ResourcePersistentId> read() {
if (myPidIterator == null) {
loadResourcePids();
}
int count = 0;
List<ResourcePersistentId> outgoing = new ArrayList<>();
while (myPidIterator.hasNext() && count < myReadChunkSize) {
outgoing.add(myPidIterator.next());
count += 1;
}
return outgoing.size() == 0 ? null : outgoing;
}
}

View File

@ -0,0 +1,44 @@
package ca.uhn.fhir.jpa.bulk.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2021 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.slf4j.Logger;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.JobParametersValidator;
import static ca.uhn.fhir.jpa.bulk.job.BulkExportJobConfig.*;
import static org.slf4j.LoggerFactory.getLogger;
public class GroupIdPresentValidator implements JobParametersValidator {
private static final Logger ourLog = getLogger(GroupIdPresentValidator.class);
@Override
public void validate(JobParameters theJobParameters) throws JobParametersInvalidException {
if (theJobParameters == null || theJobParameters.getString(GROUP_ID_PARAMETER) == null) {
throw new JobParametersInvalidException("Group Bulk Export jobs must have a " + GROUP_ID_PARAMETER + " attribute");
} else {
ourLog.debug("detected we are running in group mode with group id [{}]", theJobParameters.getString(GROUP_ID_PARAMETER));
}
}
}

View File

@ -64,7 +64,7 @@ public class ResourceToFileWriter implements ItemWriter<List<IBaseResource>> {
private Long myBulkExportCollectionEntityId;
@Value("#{stepExecutionContext['resourceType']}")
private String myReosurceType;
private String myResourceType;
private IFhirResourceDao<IBaseBinary> myBinaryDao;
@ -100,7 +100,6 @@ public class ResourceToFileWriter implements ItemWriter<List<IBaseResource>> {
IBaseBinary binary = BinaryUtil.newBinary(myFhirContext);
binary.setContentType(Constants.CT_FHIR_NDJSON);
binary.setContent(myOutputStream.toByteArray());
DaoMethodOutcome outcome = myBinaryDao.create(binary);
return outcome.getResource().getIdElement();
}
@ -124,7 +123,7 @@ public class ResourceToFileWriter implements ItemWriter<List<IBaseResource>> {
Optional<IIdType> createdId = flushToFiles();
if (createdId.isPresent()) {
ourLog.info("Created {} resources for bulk export file containing {} resources of type {} ", count, createdId.get().toUnqualifiedVersionless().getValue(), myReosurceType);
ourLog.info("Created {} resources for bulk export file containing {} resources of type {} ", count, createdId.get().toUnqualifiedVersionless().getValue(), myResourceType);
}
}
}

View File

@ -60,7 +60,7 @@ public class ResourceTypePartitioner implements Partitioner {
// The worker step needs to know which parent job it is processing for, and which collection entity it will be
// attaching its results to.
context.putString("jobUUID", myJobUUID);
context.putString(BulkExportJobConfig.JOB_UUID_PARAMETER, myJobUUID);
context.putLong("bulkExportCollectionEntityId", collectionEntityId);
// Name the partition based on the resource type

View File

@ -21,9 +21,12 @@ package ca.uhn.fhir.jpa.bulk.provider;
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.model.BulkExportResponseJson;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.rest.annotation.IdParam;
import ca.uhn.fhir.rest.annotation.Operation;
import ca.uhn.fhir.rest.annotation.OperationParam;
import ca.uhn.fhir.rest.api.Constants;
@ -37,6 +40,7 @@ import ca.uhn.fhir.util.OperationOutcomeUtil;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseOperationOutcome;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.InstantType;
import org.springframework.beans.factory.annotation.Autowired;
@ -98,7 +102,7 @@ public class BulkDataExportProvider {
filters = ArrayUtil.commaSeparatedListToCleanSet(theTypeFilter.getValueAsString());
}
IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(outputFormat, resourceTypes, since, filters);
IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(outputFormat, resourceTypes, since, filters));
String serverBase = getServerBase(theRequestDetails);
String pollLocation = serverBase + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" + JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + outcome.getJobId();
@ -177,4 +181,47 @@ public class BulkDataExportProvider {
return StringUtils.removeEnd(theRequestDetails.getServerBaseForRequest(), "/");
}
/**
* Group/Id/$export
*/
@Operation(name = JpaConstants.OPERATION_EXPORT, manualResponse = true, idempotent = true, typeName = "Group")
public void groupExport(
@IdParam IIdType theIdParam,
@OperationParam(name = JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theOutputFormat,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theType,
ServletRequestDetails theRequestDetails
) {
String preferHeader = theRequestDetails.getHeader(Constants.HEADER_PREFER);
PreferHeader prefer = RestfulServerUtils.parsePreferHeader(null, preferHeader);
if (prefer.getRespondAsync() == false) {
throw new InvalidRequestException("Must request async processing for $export");
}
String outputFormat = theOutputFormat != null ? theOutputFormat.getValueAsString() : null;
Set<String> resourceTypes = null;
if (theType != null) {
resourceTypes = ArrayUtil.commaSeparatedListToCleanSet(theType.getValueAsString());
}
//TODO GGG eventually, we will support these things.
Set<String> filters = null;
Date since = null;
boolean theMdm = false;
IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(outputFormat, resourceTypes, since, filters, theIdParam, theMdm));
String serverBase = getServerBase(theRequestDetails);
String pollLocation = serverBase + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" + JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + outcome.getJobId();
HttpServletResponse response = theRequestDetails.getServletResponse();
// Add standard headers
theRequestDetails.getServer().addHeadersToResponse(response);
// Successful 202 Accepted
response.addHeader(Constants.HEADER_CONTENT_LOCATION, pollLocation);
response.setStatus(Constants.STATUS_HTTP_202_ACCEPTED);
}
}

View File

@ -25,7 +25,10 @@ import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.job.BulkExportJobConfig;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
@ -40,6 +43,8 @@ import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.util.UrlUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseBinary;
import org.hl7.fhir.instance.model.api.IIdType;
@ -60,8 +65,10 @@ import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.PostConstruct;
import javax.transaction.Transactional;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
@ -69,13 +76,14 @@ import java.util.stream.Collectors;
import static ca.uhn.fhir.util.UrlUtil.escapeUrlParam;
import static ca.uhn.fhir.util.UrlUtil.escapeUrlParams;
import static org.apache.commons.lang3.StringUtils.contains;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
private static final Long READ_CHUNK_SIZE = 10L;
private static final Logger ourLog = LoggerFactory.getLogger(BulkDataExportSvcImpl.class);
private int myReuseBulkExportForMillis = (int) (60 * DateUtils.MILLIS_PER_MINUTE);
private final int myReuseBulkExportForMillis = (int) (60 * DateUtils.MILLIS_PER_MINUTE);
@Autowired
private IBulkExportJobDao myBulkExportJobDao;
@ -100,7 +108,11 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
@Qualifier("bulkExportJob")
private org.springframework.batch.core.Job myBulkExportJob;
private int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR);
@Autowired
@Qualifier("groupBulkExportJob")
private org.springframework.batch.core.Job myGroupBulkExportJob;
private final int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR);
/**
* This method is called by the scheduler to run a pass of the
@ -123,10 +135,12 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
return;
}
String jobUuid = jobToProcessOpt.get().getJobId();
BulkExportJobEntity bulkExportJobEntity = jobToProcessOpt.get();
String jobUuid = bulkExportJobEntity.getJobId();
String theGroupId = getGroupIdIfPresent(bulkExportJobEntity.getRequest());
try {
processJob(jobUuid);
processJob(jobUuid, theGroupId);
} catch (Exception e) {
ourLog.error("Failure while preparing bulk export extract", e);
myTxTemplate.execute(t -> {
@ -142,6 +156,16 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
}
}
private String getGroupIdIfPresent(String theRequestString) {
Map<String, String[]> stringMap = UrlUtil.parseQueryString(theRequestString);
if (stringMap != null) {
String[] strings = stringMap.get(JpaConstants.PARAM_EXPORT_GROUP_ID);
if (strings != null) {
return String.join(",", strings);
}
}
return null;
}
/**
@ -191,22 +215,25 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
}
private void processJob(String theJobUuid) {
JobParameters parameters = new JobParametersBuilder()
.addString("jobUUID", theJobUuid)
.addLong("readChunkSize", READ_CHUNK_SIZE)
.toJobParameters();
private void processJob(String theJobUuid, String theGroupId) {
JobParametersBuilder parameters = new JobParametersBuilder()
.addString(BulkExportJobConfig.JOB_UUID_PARAMETER, theJobUuid)
.addLong(BulkExportJobConfig.READ_CHUNK_PARAMETER, READ_CHUNK_SIZE);
ourLog.info("Submitting bulk export job {} to job scheduler", theJobUuid);
try {
myJobSubmitter.runJob(myBulkExportJob, parameters);
if (!StringUtils.isBlank(theGroupId)) {
parameters.addString(BulkExportJobConfig.GROUP_ID_PARAMETER, theGroupId);
myJobSubmitter.runJob(myGroupBulkExportJob, parameters.toJobParameters());
} else {
myJobSubmitter.runJob(myBulkExportJob, parameters.toJobParameters());
}
} catch (JobParametersInvalidException theE) {
ourLog.error("Unable to start job with UUID: {}, the parameters are invalid. {}", theJobUuid, theE.getMessage());
}
}
@SuppressWarnings("unchecked")
private IFhirResourceDao<IBaseBinary> getBinaryDao() {
return myDaoRegistry.getResourceDao("Binary");
@ -229,28 +256,36 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
@Transactional
@Override
public JobInfo submitJob(String theOutputFormat, Set<String> theResourceTypes, Date theSince, Set<String> theFilters) {
public JobInfo submitJob(BulkDataExportOptions theBulkDataExportOptions) {
String outputFormat = Constants.CT_FHIR_NDJSON;
if (isNotBlank(theOutputFormat)) {
outputFormat = theOutputFormat;
if (isNotBlank(theBulkDataExportOptions.getOutputFormat())) {
outputFormat = theBulkDataExportOptions.getOutputFormat();
}
if (!Constants.CTS_NDJSON.contains(outputFormat)) {
throw new InvalidRequestException("Invalid output format: " + theOutputFormat);
throw new InvalidRequestException("Invalid output format: " + theBulkDataExportOptions.getOutputFormat());
}
// TODO GGG KS can we encode BulkDataExportOptions as a JSON string as opposed to this request string. Feels like it would be a more extensible encoding...
//Probably yes, but this will all need to be rebuilt when we remove this bridge entity
StringBuilder requestBuilder = new StringBuilder();
requestBuilder.append("/").append(JpaConstants.OPERATION_EXPORT);
requestBuilder.append("?").append(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT).append("=").append(escapeUrlParam(outputFormat));
Set<String> resourceTypes = theResourceTypes;
Set<String> resourceTypes = theBulkDataExportOptions.getResourceTypes();
if (resourceTypes != null) {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_TYPE).append("=").append(String.join(",", escapeUrlParams(resourceTypes)));
}
Date since = theSince;
Date since = theBulkDataExportOptions.getSince();
if (since != null) {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_SINCE).append("=").append(new InstantType(since).setTimeZoneZulu(true).getValueAsString());
}
if (theFilters != null && theFilters.size() > 0) {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_TYPE_FILTER).append("=").append(String.join(",", escapeUrlParams(theFilters)));
if (theBulkDataExportOptions.getFilters() != null && theBulkDataExportOptions.getFilters().size() > 0) {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_TYPE_FILTER).append("=").append(String.join(",", escapeUrlParams(theBulkDataExportOptions.getFilters())));
}
if (theBulkDataExportOptions instanceof GroupBulkDataExportOptions) {
GroupBulkDataExportOptions groupOptions = (GroupBulkDataExportOptions) theBulkDataExportOptions;
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_GROUP_ID).append("=").append(groupOptions.getGroupId().getValue());
//TODO GGG eventually we will support this
// requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_MDM).append("=").append(groupOptions.isMdm());
}
String request = requestBuilder.toString();
@ -291,7 +326,7 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
// Validate types
validateTypes(resourceTypes);
validateTypeFilters(theFilters, resourceTypes);
validateTypeFilters(theBulkDataExportOptions.getFilters(), resourceTypes);
updateExpiry(job);
myBulkExportJobDao.save(job);

View File

@ -8,7 +8,24 @@ import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc;
import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc;
import ca.uhn.fhir.jpa.term.api.ITermLoaderSvc;
import ca.uhn.fhir.jpa.term.custom.CustomTerminologySet;
import ca.uhn.fhir.jpa.term.loinc.*;
import ca.uhn.fhir.jpa.term.loinc.LoincAnswerListHandler;
import ca.uhn.fhir.jpa.term.loinc.LoincAnswerListLinkHandler;
import ca.uhn.fhir.jpa.term.loinc.LoincDocumentOntologyHandler;
import ca.uhn.fhir.jpa.term.loinc.LoincGroupFileHandler;
import ca.uhn.fhir.jpa.term.loinc.LoincGroupTermsFileHandler;
import ca.uhn.fhir.jpa.term.loinc.LoincHandler;
import ca.uhn.fhir.jpa.term.loinc.LoincHierarchyHandler;
import ca.uhn.fhir.jpa.term.loinc.LoincIeeeMedicalDeviceCodeHandler;
import ca.uhn.fhir.jpa.term.loinc.LoincImagingDocumentCodeHandler;
import ca.uhn.fhir.jpa.term.loinc.LoincParentGroupFileHandler;
import ca.uhn.fhir.jpa.term.loinc.LoincPartHandler;
import ca.uhn.fhir.jpa.term.loinc.LoincPartLinkHandler;
import ca.uhn.fhir.jpa.term.loinc.LoincPartRelatedCodeMappingHandler;
import ca.uhn.fhir.jpa.term.loinc.LoincRsnaPlaybookHandler;
import ca.uhn.fhir.jpa.term.loinc.LoincTop2000LabResultsSiHandler;
import ca.uhn.fhir.jpa.term.loinc.LoincTop2000LabResultsUsHandler;
import ca.uhn.fhir.jpa.term.loinc.LoincUniversalOrderSetHandler;
import ca.uhn.fhir.jpa.term.loinc.PartTypeAndPartName;
import ca.uhn.fhir.jpa.term.snomedct.SctHandlerConcept;
import ca.uhn.fhir.jpa.term.snomedct.SctHandlerDescription;
import ca.uhn.fhir.jpa.term.snomedct.SctHandlerRelationship;
@ -38,12 +55,66 @@ import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import javax.validation.constraints.NotNull;
import java.io.*;
import java.util.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.*;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_ANSWERLIST_FILE;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_ANSWERLIST_FILE_DEFAULT;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_ANSWERLIST_LINK_FILE;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_ANSWERLIST_LINK_FILE_DEFAULT;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_CODESYSTEM_VERSION;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_DOCUMENT_ONTOLOGY_FILE;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_DOCUMENT_ONTOLOGY_FILE_DEFAULT;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_FILE;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_FILE_DEFAULT;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_GROUP_FILE;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_GROUP_FILE_DEFAULT;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_GROUP_TERMS_FILE;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_GROUP_TERMS_FILE_DEFAULT;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_HIERARCHY_FILE;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_HIERARCHY_FILE_DEFAULT;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_IEEE_MEDICAL_DEVICE_CODE_MAPPING_TABLE_FILE;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_IEEE_MEDICAL_DEVICE_CODE_MAPPING_TABLE_FILE_DEFAULT;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_IMAGING_DOCUMENT_CODES_FILE;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_IMAGING_DOCUMENT_CODES_FILE_DEFAULT;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_PARENT_GROUP_FILE;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_PARENT_GROUP_FILE_DEFAULT;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_PART_FILE;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_PART_FILE_DEFAULT;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_PART_LINK_FILE;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_PART_LINK_FILE_DEFAULT;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_PART_LINK_FILE_PRIMARY;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_PART_LINK_FILE_PRIMARY_DEFAULT;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_PART_LINK_FILE_SUPPLEMENTARY;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_PART_LINK_FILE_SUPPLEMENTARY_DEFAULT;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_PART_RELATED_CODE_MAPPING_FILE;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_PART_RELATED_CODE_MAPPING_FILE_DEFAULT;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_RSNA_PLAYBOOK_FILE;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_RSNA_PLAYBOOK_FILE_DEFAULT;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_TOP2000_COMMON_LAB_RESULTS_SI_FILE;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_TOP2000_COMMON_LAB_RESULTS_SI_FILE_DEFAULT;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_TOP2000_COMMON_LAB_RESULTS_US_FILE;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_TOP2000_COMMON_LAB_RESULTS_US_FILE_DEFAULT;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_UNIVERSAL_LAB_ORDER_VALUESET_FILE;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_UNIVERSAL_LAB_ORDER_VALUESET_FILE_DEFAULT;
import static ca.uhn.fhir.jpa.term.loinc.LoincUploadPropertiesEnum.LOINC_UPLOAD_PROPERTIES_FILE;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
/*

View File

@ -23,9 +23,9 @@ package ca.uhn.fhir.jpa.util;
public class Counter {
private long myCount;
public long getThenAdd() {
return myCount++;
}
}

View File

@ -2,6 +2,8 @@ package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.model.BulkExportResponseJson;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
@ -25,6 +27,7 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.InstantType;
import org.hl7.fhir.r4.model.Parameters;
@ -41,16 +44,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -61,20 +66,18 @@ public class BulkDataExportProviderTest {
private static final String A_JOB_ID = "0000000-AAAAAA";
private static final Logger ourLog = LoggerFactory.getLogger(BulkDataExportProviderTest.class);
private static final String GROUP_ID = "Group/G2401";
private static final String G_JOB_ID = "0000000-GGGGGG";
private Server myServer;
private FhirContext myCtx = FhirContext.forCached(FhirVersionEnum.R4);
private final FhirContext myCtx = FhirContext.forCached(FhirVersionEnum.R4);
private int myPort;
@Mock
private IBulkDataExportSvc myBulkDataExportSvc;
private CloseableHttpClient myClient;
@Captor
private ArgumentCaptor<String> myOutputFormatCaptor;
private ArgumentCaptor<BulkDataExportOptions> myBulkDataExportOptionsCaptor;
@Captor
private ArgumentCaptor<Set<String>> myResourceTypesCaptor;
@Captor
private ArgumentCaptor<Date> mySinceCaptor;
@Captor
private ArgumentCaptor<Set<String>> myFiltersCaptor;
private ArgumentCaptor<GroupBulkDataExportOptions> myGroupBulkDataExportOptionsCaptor;
@AfterEach
public void after() throws Exception {
@ -111,7 +114,7 @@ public class BulkDataExportProviderTest {
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo()
.setJobId(A_JOB_ID);
when(myBulkDataExportSvc.submitJob(any(), any(), any(), any())).thenReturn(jobInfo);
when(myBulkDataExportSvc.submitJob(any())).thenReturn(jobInfo);
InstantType now = InstantType.now();
@ -135,12 +138,12 @@ public class BulkDataExportProviderTest {
assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
}
verify(myBulkDataExportSvc, times(1)).submitJob(myOutputFormatCaptor.capture(), myResourceTypesCaptor.capture(), mySinceCaptor.capture(), myFiltersCaptor.capture());
assertEquals(Constants.CT_FHIR_NDJSON, myOutputFormatCaptor.getValue());
assertThat(myResourceTypesCaptor.getValue(), containsInAnyOrder("Patient", "Practitioner"));
assertThat(mySinceCaptor.getValue(), notNullValue());
assertThat(myFiltersCaptor.getValue(), containsInAnyOrder("Patient?identifier=foo"));
verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture());
BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue();
assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat());
assertThat(options.getResourceTypes(), containsInAnyOrder("Patient", "Practitioner"));
assertThat(options.getSince(), notNullValue());
assertThat(options.getFilters(), containsInAnyOrder("Patient?identifier=foo"));
}
@Test
@ -148,7 +151,7 @@ public class BulkDataExportProviderTest {
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo()
.setJobId(A_JOB_ID);
when(myBulkDataExportSvc.submitJob(any(), any(), any(), any())).thenReturn(jobInfo);
when(myBulkDataExportSvc.submitJob(any())).thenReturn(jobInfo);
InstantType now = InstantType.now();
@ -169,12 +172,12 @@ public class BulkDataExportProviderTest {
assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
}
verify(myBulkDataExportSvc, times(1)).submitJob(myOutputFormatCaptor.capture(), myResourceTypesCaptor.capture(), mySinceCaptor.capture(), myFiltersCaptor.capture());
assertEquals(Constants.CT_FHIR_NDJSON, myOutputFormatCaptor.getValue());
assertThat(myResourceTypesCaptor.getValue(), containsInAnyOrder("Patient", "Practitioner"));
assertThat(mySinceCaptor.getValue(), notNullValue());
assertThat(myFiltersCaptor.getValue(), containsInAnyOrder("Patient?identifier=foo"));
verify(myBulkDataExportSvc, times(1)).submitJob(myBulkDataExportOptionsCaptor.capture());
BulkDataExportOptions options = myBulkDataExportOptionsCaptor.getValue();
assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat());
assertThat(options.getResourceTypes(), containsInAnyOrder("Patient", "Practitioner"));
assertThat(options.getSince(), notNullValue());
assertThat(options.getFilters(), containsInAnyOrder("Patient?identifier=foo"));
}
@Test
@ -286,5 +289,51 @@ public class BulkDataExportProviderTest {
}
/**
* Group export tests
* See https://build.fhir.org/ig/HL7/us-bulk-data/
* <p>
* GET [fhir base]/Group/[id]/$export
* <p>
* FHIR Operation to obtain data on all patients listed in a single FHIR Group Resource.
*/
@Test
public void testSuccessfulInitiateGroupBulkRequest_Post() throws IOException {
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo().setJobId(G_JOB_ID);
when(myBulkDataExportSvc.submitJob(any())).thenReturn(jobInfo);
InstantType now = InstantType.now();
Parameters input = new Parameters();
input.addParameter(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, new StringType(Constants.CT_FHIR_NDJSON));
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE, new StringType("Observation, DiagnosticReport"));
input.addParameter(JpaConstants.PARAM_EXPORT_SINCE, now);
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE_FILTER, new StringType("Observation?code=OBSCODE,DiagnosticReport?code=DRCODE"));
ourLog.info(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(input));
HttpPost post = new HttpPost("http://localhost:" + myPort + "/" + GROUP_ID + "/" + JpaConstants.OPERATION_EXPORT);
post.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
post.setEntity(new ResourceEntity(myCtx, input));
ourLog.info("Request: {}", post);
try (CloseableHttpResponse response = myClient.execute(post)) {
ourLog.info("Response: {}", response.toString());
assertEquals(202, response.getStatusLine().getStatusCode());
assertEquals("Accepted", response.getStatusLine().getReasonPhrase());
assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + G_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
}
verify(myBulkDataExportSvc, times(1)).submitJob(myGroupBulkDataExportOptionsCaptor.capture());
GroupBulkDataExportOptions options = myGroupBulkDataExportOptionsCaptor.getValue();
assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat());
assertThat(options.getResourceTypes(), containsInAnyOrder("Observation", "DiagnosticReport"));
//TODO GGG eventually, we will support since in group exports
assertThat(options.getSince(), nullValue());
//TODO GGG eventually, we will support filters in group exports
assertThat(options.getFilters(), nullValue());
assertEquals(GROUP_ID, options.getGroupId().getValue());
assertFalse(options.isMdm());
}
}

View File

@ -3,8 +3,11 @@ package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.job.BulkExportJobParametersBuilder;
import ca.uhn.fhir.jpa.bulk.job.GroupBulkExportJobParametersBuilder;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
@ -22,10 +25,15 @@ import org.apache.commons.lang3.time.DateUtils;
import org.hamcrest.Matchers;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Binary;
import org.hl7.fhir.r4.model.CareTeam;
import org.hl7.fhir.r4.model.CodeableConcept;
import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.Group;
import org.hl7.fhir.r4.model.Immunization;
import org.hl7.fhir.r4.model.InstantType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Reference;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -53,6 +61,7 @@ import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
@ -78,6 +87,12 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Qualifier("bulkExportJob")
private Job myBulkJob;
@Autowired
@Qualifier("groupBulkExportJob")
private Job myGroupBulkJob;
private IIdType myPatientGroupId;
@Test
public void testPurgeExpiredJobs() {
@ -134,7 +149,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test
public void testSubmit_InvalidOutputFormat() {
try {
myBulkDataExportSvc.submitJob(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Patient", "Observation"), null, null);
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Patient", "Observation"), null, null));
fail();
} catch (InvalidRequestException e) {
assertEquals("Invalid output format: application/fhir+json", e.getMessage());
@ -144,7 +159,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test
public void testSubmit_OnlyBinarySelected() {
try {
myBulkDataExportSvc.submitJob(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Binary"), null, null);
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Binary"), null, null));
fail();
} catch (InvalidRequestException e) {
assertEquals("Invalid output format: application/fhir+json", e.getMessage());
@ -154,7 +169,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test
public void testSubmit_InvalidResourceTypes() {
try {
myBulkDataExportSvc.submitJob(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient", "FOO"), null, null);
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient", "FOO"), null, null));
fail();
} catch (InvalidRequestException e) {
assertEquals("Unknown or unsupported resource type: FOO", e.getMessage());
@ -164,7 +179,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test
public void testSubmit_MultipleTypeFiltersForSameType() {
try {
myBulkDataExportSvc.submitJob(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Patient?name=a", "Patient?active=true"));
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Patient?name=a", "Patient?active=true")));
fail();
} catch (InvalidRequestException e) {
assertEquals("Invalid _typeFilter value \"Patient?name=a\". Multiple filters found for type Patient", e.getMessage());
@ -174,7 +189,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test
public void testSubmit_TypeFilterForNonSelectedType() {
try {
myBulkDataExportSvc.submitJob(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Observation?code=123"));
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Observation?code=123")));
fail();
} catch (InvalidRequestException e) {
assertEquals("Invalid _typeFilter value \"Observation?code=123\". Resource type does not appear in _type list", e.getMessage());
@ -184,7 +199,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test
public void testSubmit_TypeFilterInvalid() {
try {
myBulkDataExportSvc.submitJob(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Hello"));
myBulkDataExportSvc.submitJob(new BulkDataExportOptions(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Hello")));
fail();
} catch (InvalidRequestException e) {
assertEquals("Invalid _typeFilter value \"Hello\". Must be in the form [ResourceType]?[params]", e.getMessage());
@ -195,11 +210,11 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
public void testSubmit_ReusesExisting() {
// Submit
IBulkDataExportSvc.JobInfo jobDetails1 = myBulkDataExportSvc.submitJob(null, Sets.newHashSet("Patient", "Observation"), null, null);
IBulkDataExportSvc.JobInfo jobDetails1 = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null));
assertNotNull(jobDetails1.getJobId());
// Submit again
IBulkDataExportSvc.JobInfo jobDetails2 = myBulkDataExportSvc.submitJob(null, Sets.newHashSet("Patient", "Observation"), null, null);
IBulkDataExportSvc.JobInfo jobDetails2 = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null));
assertNotNull(jobDetails2.getJobId());
assertEquals(jobDetails1.getJobId(), jobDetails2.getJobId());
@ -220,7 +235,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(null, Sets.newHashSet("Patient"), null, null);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient"), null, null));
assertNotNull(jobDetails.getJobId());
// Check the status
@ -250,7 +265,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(null, Sets.newHashSet("Patient", "Observation"), null, Sets.newHashSet(TEST_FILTER));
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, Sets.newHashSet(TEST_FILTER)));
assertNotNull(jobDetails.getJobId());
// Check the status
@ -303,7 +318,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
myBinaryDao.create(b);
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(null, null, null, null);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, null, null, null));
assertNotNull(jobDetails.getJobId());
// Check the status
@ -319,7 +334,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
// Fetch the job again
status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.COMPLETE, status.getStatus());
assertEquals(2, status.getFiles().size());
assertEquals(5, status.getFiles().size());
// Iterate over the files
for (IBulkDataExportSvc.FileEntry next : status.getFiles()) {
@ -327,17 +342,24 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", next.getResourceType(), nextContents);
if ("Patient".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"value\":\"PAT0\""));
assertEquals(10, nextContents.split("\n").length);
} else if ("Observation".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"subject\":{\"reference\":\"Patient/PAT0\"}}\n"));
assertEquals(10, nextContents.split("\n").length);
}else if ("Immunization".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"patient\":{\"reference\":\"Patient/PAT0\"}}\n"));
assertEquals(10, nextContents.split("\n").length);
} else if ("CareTeam".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"id\":\"CT0\""));
assertEquals(10, nextContents.split("\n").length);
} else if ("Group".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"id\":\"G0\""));
assertEquals(1, nextContents.split("\n").length);
} else {
fail(next.getResourceType());
fail();
}
}
}
@ -350,7 +372,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
// Create a bulk job
HashSet<String> types = Sets.newHashSet("Patient");
Set<String> typeFilters = Sets.newHashSet("Patient?_has:Observation:patient:identifier=SYS|VAL3");
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(null, types, null, typeFilters);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, types, null, typeFilters));
assertNotNull(jobDetails.getJobId());
// Check the status
@ -402,7 +424,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
}
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(null, Sets.newHashSet("Patient", "Observation"), cutoff.getValue(), null);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), cutoff.getValue(), null));
assertNotNull(jobDetails.getJobId());
// Check the status
@ -480,7 +502,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(null, Sets.newHashSet("Patient", "Observation"), null, null);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new BulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null));
//Add the UUID to the job
BulkExportJobParametersBuilder paramBuilder = new BulkExportJobParametersBuilder()
@ -496,6 +518,80 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(jobInfo.getFiles().size(), equalTo(2));
}
@Test
public void testGroupBatchJobWorks() throws Exception {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("Immunization"), null, null, myPatientGroupId, true));
GroupBulkExportJobParametersBuilder paramBuilder = new GroupBulkExportJobParametersBuilder();
paramBuilder.setGroupId(myPatientGroupId.getIdPart());
paramBuilder.setJobUUID(jobDetails.getJobId());
paramBuilder.setReadChunkSize(10L);
JobExecution jobExecution = myBatchJobSubmitter.runJob(myGroupBulkJob, paramBuilder.toJobParameters());
awaitJobCompletion(jobExecution);
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(1));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
// Iterate over the files
Binary nextBinary = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
assertThat(nextContents, is(containsString("IMM0")));
assertThat(nextContents, is(containsString("IMM2")));
assertThat(nextContents, is(containsString("IMM4")));
assertThat(nextContents, is(containsString("IMM6")));
assertThat(nextContents, is(containsString("IMM8")));
}
// CareTeam has two patient references: participant and patient. This test checks if we find the patient if participant is null but patient is not null
@Test
public void testGroupBatchJobCareTeam() throws Exception {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("CareTeam"), null, null, myPatientGroupId, true));
GroupBulkExportJobParametersBuilder paramBuilder = new GroupBulkExportJobParametersBuilder();
paramBuilder.setGroupId(myPatientGroupId.getIdPart());
paramBuilder.setJobUUID(jobDetails.getJobId());
paramBuilder.setReadChunkSize(10L);
JobExecution jobExecution = myBatchJobSubmitter.runJob(myGroupBulkJob, paramBuilder.toJobParameters());
awaitJobCompletion(jobExecution);
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(1));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("CareTeam")));
// Iterate over the files
Binary nextBinary = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", nextBinary.getResourceType(), nextContents);
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("CareTeam")));
assertThat(nextContents, is(containsString("CT0")));
assertThat(nextContents, is(containsString("CT2")));
assertThat(nextContents, is(containsString("CT4")));
assertThat(nextContents, is(containsString("CT6")));
assertThat(nextContents, is(containsString("CT8")));
}
@Test
public void testJobParametersValidatorRejectsInvalidParameters() {
JobParametersBuilder paramBuilder = new JobParametersBuilder().addString("jobUUID", "I'm not real!");
@ -517,6 +613,8 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
}
private void createResources() {
Group group = new Group();
group.setId("G0");
for (int i = 0; i < 10; i++) {
Patient patient = new Patient();
patient.setId("PAT" + i);
@ -524,6 +622,10 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
patient.addName().setFamily("FAM" + i);
patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i);
IIdType patId = myPatientDao.update(patient).getId().toUnqualifiedVersionless();
//Only add half the patients to the group.
if (i % 2 == 0 ) {
group.addMember().setEntity(new Reference(patId));
}
Observation obs = new Observation();
obs.setId("OBS" + i);
@ -531,6 +633,28 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
obs.setStatus(Observation.ObservationStatus.FINAL);
obs.getSubject().setReference(patId.getValue());
myObservationDao.update(obs);
Immunization immunization = new Immunization();
immunization.setId("IMM" + i);
immunization.setPatient(new Reference(patId));
if (i % 2 == 0) {
CodeableConcept cc = new CodeableConcept();
cc.addCoding().setSystem("vaccines").setCode("Flu");
immunization.setVaccineCode(cc);
} else {
CodeableConcept cc = new CodeableConcept();
cc.addCoding().setSystem("vaccines").setCode("COVID-19");
immunization.setVaccineCode(cc);
}
myImmunizationDao.update(immunization);
CareTeam careTeam = new CareTeam();
careTeam.setId("CT" + i);
careTeam.setSubject(new Reference(patId)); // This maps to the "patient" search parameter on CareTeam
myCareTeamDao.update(careTeam);
}
myPatientGroupId = myGroupDao.update(group).getId();
}
}

View File

@ -5,6 +5,7 @@ import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
@ -87,6 +88,7 @@ import org.hl7.fhir.r4.model.Enumerations.AdministrativeGender;
import org.hl7.fhir.r4.model.EpisodeOfCare;
import org.hl7.fhir.r4.model.Group;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Immunization;
import org.hl7.fhir.r4.model.InstantType;
import org.hl7.fhir.r4.model.IntegerType;
import org.hl7.fhir.r4.model.Location;

View File

@ -3,6 +3,7 @@ package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.model.api.Include;
import ca.uhn.fhir.rest.param.HasParam;
import ca.uhn.fhir.test.BaseTest;
import org.junit.jupiter.api.Test;
@ -20,5 +21,4 @@ public class SearchParameterMapTest extends BaseTest {
String criteria = params.toNormalizedQueryString(myContext);
assertEquals(criteria, "?_has:Observation:identifier:urn:system|FOO=urn%3Asystem%7CFOO");
}
}

View File

@ -184,6 +184,17 @@ public class JpaConstants {
*/
public static final String PARAM_EXPORT_TYPE_FILTER = "_typeFilter";
/**
* The [id] of the group when $export is called on /Group/[id]/$export
*/
public static final Object PARAM_EXPORT_GROUP_ID = "_groupId";
/**
* TODO GGG eventually we will support this.
* Whether mdm should be performed on group export items to expand the group items to linked items before performing the export
*/
// public static final String PARAM_EXPORT_MDM = "_mdm";
/**
* Parameter for delete to indicate the deleted resources should also be expunged
*/