Suport typeFilter parameter for bulk export (#2147)

* Cleanup

* Fix tests

* Work on bulk export

* Work on export

* Test fixes

* Add changelog

* Address review comments

* Address review comments
This commit is contained in:
James Agnew 2020-10-26 12:28:07 -04:00 committed by GitHub
parent 137e215003
commit 72294fc0a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 405 additions and 265 deletions

View File

@ -12,16 +12,19 @@ import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URLEncodedUtils;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import javax.annotation.Nonnull;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.StringTokenizer;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.defaultIfBlank;
import static org.apache.commons.lang3.StringUtils.defaultString;
@ -162,6 +165,18 @@ public class UrlUtil {
return PARAMETER_ESCAPER.escape(theUnescaped);
}
/**
* Applies the same encodong as {@link #escapeUrlParam(String)} but against all
* values in a collection
*/
public static List<String> escapeUrlParams(@Nonnull Collection<String> theUnescaped) {
return theUnescaped
.stream()
.map(t -> PARAMETER_ESCAPER.escape(t))
.collect(Collectors.toList());
}
public static boolean isAbsolute(String theValue) {
String value = theValue.toLowerCase();
return value.startsWith("http://") || value.startsWith("https://");

View File

@ -0,0 +1,4 @@
---
type: fix
issue: 2147
title: "The `_typeFilter` parameter did not correctly work for JPA Bulk Export jobs. This has been corrected."

View File

@ -57,15 +57,17 @@ public class PidToIBaseResourceProcessor implements ItemProcessor<List<ResourceP
private FhirContext myContext;
@Override
public List<IBaseResource> process(List<ResourcePersistentId> theResourcePersistentId) throws Exception {
public List<IBaseResource> process(List<ResourcePersistentId> theResourcePersistentId) {
IFhirResourceDao dao = myDaoRegistry.getResourceDao(myResourceType);
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(myResourceType);
Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, resourceTypeClass);
List<IBaseResource> outgoing = new ArrayList<>();
sb.loadResourcesByPid(theResourcePersistentId, Collections.emptyList(), outgoing, false, null);
ourLog.trace("Loaded resources: {}", outgoing.stream().map(Object::toString).collect(Collectors.joining(", ")));
ourLog.trace("Loaded resources: {}", outgoing.stream().map(t->t.getIdElement().getValue()).collect(Collectors.joining(", ")));
return outgoing;
}

View File

@ -29,26 +29,26 @@ import org.springframework.batch.core.StepExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
/**
* Will run before and after a job to set the status to whatever is appropriate.
*/
public class BulkExportJobStartedListener implements StepExecutionListener {
@Value("#{jobExecutionContext['jobUUID']}")
private String myJobUUID;
public class BulkExportCreateEntityStepListener implements StepExecutionListener {
@Autowired
private BulkExportDaoSvc myBulkExportDaoSvc;
@Override
public void beforeStep(StepExecution theStepExecution) {
String jobUuid = theStepExecution.getJobExecution().getJobParameters().getString("jobUUID");
if (jobUuid != null) {
myBulkExportDaoSvc.setJobToStatus(jobUuid, BulkJobStatusEnum.BUILDING);
}
}
@Override
public ExitStatus afterStep(StepExecution theStepExecution) {
if (theStepExecution.getStatus() == BatchStatus.STARTING) {
myBulkExportDaoSvc.setJobToStatus(myJobUUID, BulkJobStatusEnum.BUILDING);
}
return ExitStatus.EXECUTING;
}
}

View File

@ -0,0 +1,38 @@
package ca.uhn.fhir.jpa.bulk.job;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
/**
* This class sets the job status to ERROR if any failures occur while actually
* generating the export files.
*/
public class BulkExportGenerateResourceFilesStepListener implements StepExecutionListener {
@Autowired
private BulkExportDaoSvc myBulkExportDaoSvc;
@Override
public void beforeStep(@Nonnull StepExecution stepExecution) {
// nothing
}
@Override
public ExitStatus afterStep(StepExecution theStepExecution) {
if (theStepExecution.getExitStatus().getExitCode().equals(ExitStatus.FAILED.getExitCode())) {
String jobUuid = theStepExecution.getJobExecution().getJobParameters().getString("jobUUID");
assert isNotBlank(jobUuid);
String exitDescription = theStepExecution.getExitStatus().getExitDescription();
myBulkExportDaoSvc.setJobToStatus(jobUuid, BulkJobStatusEnum.ERROR, exitDescription);
}
return theStepExecution.getExitStatus();
}
}

View File

@ -42,7 +42,7 @@ public class BulkExportJobCloser implements Tasklet {
private BulkExportDaoSvc myBulkExportDaoSvc;
@Override
public RepeatStatus execute(StepContribution theStepContribution, ChunkContext theChunkContext) throws Exception {
public RepeatStatus execute(StepContribution theStepContribution, ChunkContext theChunkContext) {
if (theChunkContext.getStepContext().getStepExecution().getJobExecution().getStatus() == BatchStatus.STARTED) {
myBulkExportDaoSvc.setJobToStatus(myJobUUID, BulkJobStatusEnum.COMPLETE);
} else {

View File

@ -69,7 +69,7 @@ public class BulkExportJobConfig {
public Step createBulkExportEntityStep() {
return myStepBuilderFactory.get("createBulkExportEntityStep")
.tasklet(createBulkExportEntityTasklet())
.listener(bulkExportJobStartedListener())
.listener(bulkExportCreateEntityStepListener())
.build();
}
@ -90,6 +90,7 @@ public class BulkExportJobConfig {
.reader(bulkItemReader())
.processor(myPidToIBaseResourceProcessor)
.writer(resourceToFileWriter())
.listener(bulkExportGenrateResourceFilesStepListener())
.build();
}
@ -108,8 +109,14 @@ public class BulkExportJobConfig {
@Bean
@JobScope
public BulkExportJobStartedListener bulkExportJobStartedListener() {
return new BulkExportJobStartedListener();
public BulkExportCreateEntityStepListener bulkExportCreateEntityStepListener() {
return new BulkExportCreateEntityStepListener();
}
@Bean
@JobScope
public BulkExportGenerateResourceFilesStepListener bulkExportGenrateResourceFilesStepListener() {
return new BulkExportGenerateResourceFilesStepListener();
}
@Bean

View File

@ -21,6 +21,7 @@ package ca.uhn.fhir.jpa.bulk.job;
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
@ -31,50 +32,44 @@ import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.util.UrlUtil;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class BulkItemReader implements ItemReader<List<ResourcePersistentId>> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
Iterator<ResourcePersistentId> myPidIterator;
@Value("#{jobParameters['readChunkSize']}")
private Long READ_CHUNK_SIZE;
@Autowired
private IBulkExportJobDao myBulkExportJobDao;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private FhirContext myContext;
@Autowired
private SearchBuilderFactory mySearchBuilderFactory;
private BulkExportJobEntity myJobEntity;
@Value("#{jobExecutionContext['jobUUID']}")
private String myJobUUID;
@Value("#{stepExecutionContext['resourceType']}")
private String myResourceType;
Iterator<ResourcePersistentId> myPidIterator;
@Autowired
private MatchUrlService myMatchUrlService;
private void loadResourcePids() {
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
@ -82,22 +77,24 @@ public class BulkItemReader implements ItemReader<List<ResourcePersistentId>> {
ourLog.warn("Job appears to be deleted");
return;
}
myJobEntity = jobOpt.get();
ourLog.info("Bulk export starting generation for batch export job: {}", myJobEntity);
BulkExportJobEntity jobEntity = jobOpt.get();
ourLog.info("Bulk export starting generation for batch export job: {}", jobEntity);
IFhirResourceDao dao = myDaoRegistry.getResourceDao(myResourceType);
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(myResourceType);
ourLog.info("Bulk export assembling export of type {} for job {}", myResourceType, myJobUUID);
Class<? extends IBaseResource> nextTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
RuntimeResourceDefinition def = myContext.getResourceDefinition(myResourceType);
Class<? extends IBaseResource> nextTypeClass = def.getImplementingClass();
ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, nextTypeClass);
SearchParameterMap map = new SearchParameterMap();
map.setLoadSynchronous(true);
if (myJobEntity.getSince() != null) {
map.setLastUpdated(new DateRangeParam(myJobEntity.getSince(), null));
SearchParameterMap map = createSearchParameterMapFromTypeFilter(jobEntity, def);
if (jobEntity.getSince() != null) {
map.setLastUpdated(new DateRangeParam(jobEntity.getSince(), null));
}
map.setLoadSynchronous(true);
IResultIterator myResultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
List<ResourcePersistentId> myReadPids = new ArrayList<>();
while (myResultIterator.hasNext()) {
@ -106,8 +103,22 @@ public class BulkItemReader implements ItemReader<List<ResourcePersistentId>> {
myPidIterator = myReadPids.iterator();
}
private SearchParameterMap createSearchParameterMapFromTypeFilter(BulkExportJobEntity theJobEntity, RuntimeResourceDefinition theDef) {
SearchParameterMap map = new SearchParameterMap();
Map<String, String[]> requestUrl = UrlUtil.parseQueryStrings(theJobEntity.getRequest());
String[] typeFilters = requestUrl.get(JpaConstants.PARAM_EXPORT_TYPE_FILTER);
if (typeFilters != null) {
Optional<String> filter = Arrays.stream(typeFilters).filter(t -> t.startsWith(myResourceType + "?")).findFirst();
if (filter.isPresent()) {
String matchUrl = filter.get();
map = myMatchUrlService.translateMatchUrl(matchUrl, theDef);
}
}
return map;
}
@Override
public List<ResourcePersistentId> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
public List<ResourcePersistentId> read() {
if (myPidIterator == null) {
loadResourcePids();
}

View File

@ -61,12 +61,14 @@ import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.PostConstruct;
import javax.transaction.Transactional;
import java.util.Date;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import static ca.uhn.fhir.util.UrlUtil.escapeUrlParam;
import static ca.uhn.fhir.util.UrlUtil.escapeUrlParams;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
@ -195,6 +197,8 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
.addLong("readChunkSize", READ_CHUNK_SIZE)
.toJobParameters();
ourLog.info("Submitting bulk export job {} to job scheduler", theJobUuid);
try {
myJobSubmitter.runJob(myBulkExportJob, parameters);
} catch (JobParametersInvalidException theE) {
@ -213,19 +217,14 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
myTxTemplate = new TransactionTemplate(myTxManager);
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(getClass().getName());
jobDetail.setId(Job.class.getName());
jobDetail.setJobClass(Job.class);
mySchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail);
}
public static class Job implements HapiJob {
@Autowired
private IBulkDataExportSvc myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.buildExportFiles();
}
jobDetail = new ScheduledJobDefinition();
jobDetail.setId(PurgeExpiredFilesJob.class.getName());
jobDetail.setJobClass(PurgeExpiredFilesJob.class);
mySchedulerService.scheduleClusteredJob(DateUtils.MILLIS_PER_HOUR, jobDetail);
}
@Transactional
@ -244,14 +243,14 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
requestBuilder.append("?").append(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT).append("=").append(escapeUrlParam(outputFormat));
Set<String> resourceTypes = theResourceTypes;
if (resourceTypes != null) {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_TYPE).append("=").append(String.join(",", resourceTypes));
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_TYPE).append("=").append(String.join(",", escapeUrlParams(resourceTypes)));
}
Date since = theSince;
if (since != null) {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_SINCE).append("=").append(new InstantType(since).setTimeZoneZulu(true).getValueAsString());
}
if (theFilters != null && theFilters.size() > 0) {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_TYPE_FILTER).append("=").append(String.join(",", theFilters));
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_TYPE_FILTER).append("=").append(String.join(",", escapeUrlParams(theFilters)));
}
String request = requestBuilder.toString();
@ -290,14 +289,14 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
job.setCreated(new Date());
job.setRequest(request);
// Validate types
validateTypes(resourceTypes);
validateTypeFilters(theFilters, resourceTypes);
updateExpiry(job);
myBulkExportJobDao.save(job);
for (String nextType : resourceTypes) {
if (!myDaoRegistry.isResourceTypeSupported(nextType)) {
String msg = myContext.getLocalizer().getMessage(BulkDataExportSvcImpl.class, "unknownResourceType", nextType);
throw new InvalidRequestException(msg);
}
BulkExportCollectionEntity collection = new BulkExportCollectionEntity();
collection.setJob(job);
@ -311,11 +310,37 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
return toSubmittedJobInfo(job);
}
public void validateTypes(Set<String> theResourceTypes) {
for (String nextType : theResourceTypes) {
if (!myDaoRegistry.isResourceTypeSupported(nextType)) {
String msg = myContext.getLocalizer().getMessage(BulkDataExportSvcImpl.class, "unknownResourceType", nextType);
throw new InvalidRequestException(msg);
}
}
}
public void validateTypeFilters(Set<String> theTheFilters, Set<String> theResourceTypes) {
if (theTheFilters != null) {
Set<String> types = new HashSet<>();
for (String next : theTheFilters) {
if (!next.contains("?")) {
throw new InvalidRequestException("Invalid " + JpaConstants.PARAM_EXPORT_TYPE_FILTER + " value \"" + next + "\". Must be in the form [ResourceType]?[params]");
}
String resourceType = next.substring(0, next.indexOf("?"));
if (!theResourceTypes.contains(resourceType)) {
throw new InvalidRequestException("Invalid " + JpaConstants.PARAM_EXPORT_TYPE_FILTER + " value \"" + next + "\". Resource type does not appear in " + JpaConstants.PARAM_EXPORT_TYPE + " list");
}
if (!types.add(resourceType)) {
throw new InvalidRequestException("Invalid " + JpaConstants.PARAM_EXPORT_TYPE_FILTER + " value \"" + next + "\". Multiple filters found for type " + resourceType);
}
}
}
}
private JobInfo toSubmittedJobInfo(BulkExportJobEntity theJob) {
return new JobInfo().setJobId(theJob.getJobId());
}
private void updateExpiry(BulkExportJobEntity theJob) {
theJob.setExpiry(DateUtils.addMilliseconds(new Date(), myRetentionPeriod));
}
@ -374,4 +399,25 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
return null;
});
}
public static class Job implements HapiJob {
@Autowired
private IBulkDataExportSvc myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.buildExportFiles();
}
}
public static class PurgeExpiredFilesJob implements HapiJob {
@Autowired
private IBulkDataExportSvc myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.purgeExpiredFiles();
}
}
}

View File

@ -22,7 +22,6 @@ package ca.uhn.fhir.jpa.bulk.svc;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
@ -30,51 +29,31 @@ import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.r4.model.InstantType;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.stereotype.Service;
import javax.transaction.Transactional;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import static ca.uhn.fhir.util.UrlUtil.escapeUrlParam;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.slf4j.LoggerFactory.getLogger;
@Service
public class BulkExportDaoSvc {
private static final Logger ourLog = getLogger(BulkExportDaoSvc.class);
private int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR);
@Autowired
private FhirContext myFhirContext;
@Autowired
IBulkExportJobDao myBulkExportJobDao;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
IBulkExportCollectionDao myBulkExportCollectionDao;
@Autowired
IBulkExportCollectionFileDao myBulkExportCollectionFileDao;
@Autowired
private FhirContext myFhirContext;
@Autowired
private DaoRegistry myDaoRegistry;
@Transactional
public void addFileToCollectionWithId(Long theCollectionEntityId, BulkExportCollectionFileEntity theFile) {
@ -111,6 +90,11 @@ public class BulkExportDaoSvc {
@Transactional
public void setJobToStatus(String theJobUUID, BulkJobStatusEnum theStatus) {
setJobToStatus(theJobUUID, theStatus, null);
}
@Transactional
public void setJobToStatus(String theJobUUID, BulkJobStatusEnum theStatus, String theStatusMessage) {
Optional<BulkExportJobEntity> oJob = myBulkExportJobDao.findByJobId(theJobUUID);
if (!oJob.isPresent()) {
ourLog.error("Job with UUID {} doesn't exist!", theJobUUID);
@ -120,96 +104,9 @@ public class BulkExportDaoSvc {
ourLog.info("Setting job with UUID {} to {}", theJobUUID, theStatus);
BulkExportJobEntity bulkExportJobEntity = oJob.get();
bulkExportJobEntity.setStatus(theStatus);
bulkExportJobEntity.setStatusMessage(theStatusMessage);
myBulkExportJobDao.save(bulkExportJobEntity);
}
public IBulkDataExportSvc.JobInfo submitJob(String theOutputFormat, Set<String> theResourceTypes, Date theSince, Set<String> theFilters, int theReuseMillis) {
String outputFormat = Constants.CT_FHIR_NDJSON;
if (isNotBlank(theOutputFormat)) {
outputFormat = theOutputFormat;
}
if (!Constants.CTS_NDJSON.contains(outputFormat)) {
throw new InvalidRequestException("Invalid output format: " + theOutputFormat);
}
StringBuilder requestBuilder = new StringBuilder();
requestBuilder.append("/").append(JpaConstants.OPERATION_EXPORT);
requestBuilder.append("?").append(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT).append("=").append(escapeUrlParam(outputFormat));
Set<String> resourceTypes = theResourceTypes;
if (resourceTypes != null) {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_TYPE).append("=").append(String.join(",", resourceTypes));
}
Date since = theSince;
if (since != null) {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_SINCE).append("=").append(new InstantType(since).setTimeZoneZulu(true).getValueAsString());
}
if (theFilters != null && theFilters.size() > 0) {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_TYPE_FILTER).append("=").append(String.join(",", theFilters));
}
String request = requestBuilder.toString();
Date cutoff = DateUtils.addMilliseconds(new Date(), -theReuseMillis);
Pageable page = PageRequest.of(0, 10);
Slice<BulkExportJobEntity> existing = myBulkExportJobDao.findExistingJob(page, request, cutoff, BulkJobStatusEnum.ERROR);
if (!existing.isEmpty()) {
return toSubmittedJobInfo(existing.iterator().next());
}
if (resourceTypes != null && resourceTypes.contains("Binary")) {
String msg = myFhirContext.getLocalizer().getMessage(BulkDataExportSvcImpl.class, "onlyBinarySelected");
throw new InvalidRequestException(msg);
}
if (resourceTypes == null || resourceTypes.isEmpty()) {
// This is probably not a useful default, but having the default be "download the whole
// server" seems like a risky default too. We'll deal with that by having the default involve
// only returning a small time span
resourceTypes = myFhirContext.getResourceTypes();
if (since == null) {
since = DateUtils.addDays(new Date(), -1);
}
}
resourceTypes =
resourceTypes
.stream()
.filter(t -> !"Binary".equals(t))
.collect(Collectors.toSet());
BulkExportJobEntity job = new BulkExportJobEntity();
job.setJobId(UUID.randomUUID().toString());
job.setStatus(BulkJobStatusEnum.SUBMITTED);
job.setSince(since);
job.setCreated(new Date());
job.setRequest(request);
updateExpiry(job);
myBulkExportJobDao.save(job);
for (String nextType : resourceTypes) {
if (!myDaoRegistry.isResourceTypeSupported(nextType)) {
String msg = myFhirContext.getLocalizer().getMessage(BulkDataExportSvcImpl.class, "unknownResourceType", nextType);
throw new InvalidRequestException(msg);
}
BulkExportCollectionEntity collection = new BulkExportCollectionEntity();
collection.setJob(job);
collection.setResourceType(nextType);
job.getCollections().add(collection);
myBulkExportCollectionDao.save(collection);
}
ourLog.info("Bulk export job submitted: {}", job.toString());
return toSubmittedJobInfo(job);
}
private void updateExpiry(BulkExportJobEntity theJob) {
theJob.setExpiry(DateUtils.addMilliseconds(new Date(), myRetentionPeriod));
}
private IBulkDataExportSvc.JobInfo toSubmittedJobInfo(BulkExportJobEntity theJob) {
return new IBulkDataExportSvc.JobInfo().setJobId(theJob.getJobId());
}
}

View File

@ -47,6 +47,7 @@ import java.util.Collection;
import java.util.Date;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.left;
@Entity
@Table(name = "HFJ_BLK_EXPORT_JOB", uniqueConstraints = {
@ -105,7 +106,7 @@ public class BulkExportJobEntity implements Serializable {
}
public void setStatusMessage(String theStatusMessage) {
myStatusMessage = theStatusMessage;
myStatusMessage = left(theStatusMessage, STATUS_MESSAGE_LEN);
}
public String getRequest() {

View File

@ -1,5 +1,7 @@
package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.job.BulkExportJobParametersBuilder;
@ -13,12 +15,14 @@ import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.util.UrlUtil;
import com.google.common.base.Charsets;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.time.DateUtils;
import org.hamcrest.Matchers;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Binary;
import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.InstantType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient;
@ -38,7 +42,9 @@ import org.springframework.beans.factory.annotation.Qualifier;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -126,7 +132,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
}
@Test
public void testCreateBulkLoad_InvalidOutputFormat() {
public void testSubmit_InvalidOutputFormat() {
try {
myBulkDataExportSvc.submitJob(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Patient", "Observation"), null, null);
fail();
@ -136,7 +142,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
}
@Test
public void testCreateBulkLoad_OnlyBinarySelected() {
public void testSubmit_OnlyBinarySelected() {
try {
myBulkDataExportSvc.submitJob(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Binary"), null, null);
fail();
@ -156,7 +162,89 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
}
@Test
public void testSubmitForSpecificResources() {
public void testSubmit_MultipleTypeFiltersForSameType() {
try {
myBulkDataExportSvc.submitJob(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Patient?name=a", "Patient?active=true"));
fail();
} catch (InvalidRequestException e) {
assertEquals("Invalid _typeFilter value \"Patient?name=a\". Multiple filters found for type Patient", e.getMessage());
}
}
@Test
public void testSubmit_TypeFilterForNonSelectedType() {
try {
myBulkDataExportSvc.submitJob(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Observation?code=123"));
fail();
} catch (InvalidRequestException e) {
assertEquals("Invalid _typeFilter value \"Observation?code=123\". Resource type does not appear in _type list", e.getMessage());
}
}
@Test
public void testSubmit_TypeFilterInvalid() {
try {
myBulkDataExportSvc.submitJob(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient"), null, Sets.newHashSet("Hello"));
fail();
} catch (InvalidRequestException e) {
assertEquals("Invalid _typeFilter value \"Hello\". Must be in the form [ResourceType]?[params]", e.getMessage());
}
}
@Test
public void testSubmit_ReusesExisting() {
// Submit
IBulkDataExportSvc.JobInfo jobDetails1 = myBulkDataExportSvc.submitJob(null, Sets.newHashSet("Patient", "Observation"), null, null);
assertNotNull(jobDetails1.getJobId());
// Submit again
IBulkDataExportSvc.JobInfo jobDetails2 = myBulkDataExportSvc.submitJob(null, Sets.newHashSet("Patient", "Observation"), null, null);
assertNotNull(jobDetails2.getJobId());
assertEquals(jobDetails1.getJobId(), jobDetails2.getJobId());
}
@Test
public void testGenerateBulkExport_FailureDuringGeneration() {
// Register an interceptor that will force the resource search to fail unexpectedly
IAnonymousInterceptor interceptor = (pointcut, args) -> {
throw new NullPointerException("help i'm a bug");
};
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.JPA_PERFTRACE_SEARCH_SELECT_COMPLETE, interceptor);
try {
// Create some resources to load
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(null, Sets.newHashSet("Patient"), null, null);
assertNotNull(jobDetails.getJobId());
// Check the status
IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.SUBMITTED, status.getStatus());
// Run a scheduled pass to build the export
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
// Fetch the job again
status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.ERROR, status.getStatus());
assertThat(status.getStatusMessage(), containsString("help i'm a bug"));
} finally {
myInterceptorRegistry.unregisterInterceptor(interceptor);
}
}
@Test
public void testGenerateBulkExport_SpecificResources() {
// Create some resources to load
createResources();
@ -168,7 +256,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
// Check the status
IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.SUBMITTED, status.getStatus());
assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Observation,Patient&_typeFilter=" + TEST_FILTER, status.getRequest());
assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Observation,Patient&_typeFilter=" + UrlUtil.escapeUrlParam(TEST_FILTER), status.getRequest());
// Run a scheduled pass to build the export
myBulkDataExportSvc.buildExportFiles();
@ -178,7 +266,6 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
// Fetch the job again
status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.COMPLETE, status.getStatus());
assertEquals(2, status.getFiles().size());
// Iterate over the files
for (IBulkDataExportSvc.FileEntry next : status.getFiles()) {
@ -188,8 +275,8 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
ourLog.info("Next contents for type {}:\n{}", next.getResourceType(), nextContents);
if ("Patient".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"value\":\"PAT0\"}]}\n"));
assertEquals(10, nextContents.split("\n").length);
assertThat(nextContents, containsString("\"value\":\"PAT1\"}"));
assertEquals(5, nextContents.split("\n").length); // Only female patients
} else if ("Observation".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"subject\":{\"reference\":\"Patient/PAT0\"}}\n"));
assertEquals(10, nextContents.split("\n").length);
@ -198,30 +285,12 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
}
}
assertEquals(2, status.getFiles().size());
}
@Test
public void testBatchJobIsCapableOfCreatingAnExportEntityIfNoJobIsProvided() throws Exception {
createResources();
//Add the UUID to the job
BulkExportJobParametersBuilder paramBuilder = new BulkExportJobParametersBuilder();
paramBuilder.setReadChunkSize(100L)
.setOutputFormat(Constants.CT_FHIR_NDJSON)
.setResourceTypes(Arrays.asList("Patient", "Observation"));
JobExecution jobExecution = myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
awaitJobCompletion(jobExecution);
String jobUUID = (String) jobExecution.getExecutionContext().get("jobUUID");
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobUUID);
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(2));
}
@Test
public void testSubmitWithoutSpecificResources() {
public void testGenerateBulkExport_WithoutSpecificResources() {
// Create some resources to load
createResources();
@ -260,7 +329,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
ourLog.info("Next contents for type {}:\n{}", next.getResourceType(), nextContents);
if ("Patient".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"value\":\"PAT0\"}]}\n"));
assertThat(nextContents, containsString("\"value\":\"PAT0\""));
assertEquals(10, nextContents.split("\n").length);
} else if ("Observation".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"subject\":{\"reference\":\"Patient/PAT0\"}}\n"));
@ -272,83 +341,51 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
}
}
public void awaitAllBulkJobCompletions() {
List<JobInstance> bulkExport = myJobExplorer.findJobInstancesByJobName("bulkExportJob", 0, 100);
if (bulkExport.isEmpty()) {
fail("There are no bulk export jobs running!");
}
List<JobExecution> bulkExportExecutions = bulkExport.stream().flatMap(jobInstance -> myJobExplorer.getJobExecutions(jobInstance).stream()).collect(Collectors.toList());
awaitJobCompletions(bulkExportExecutions);
}
public void awaitJobCompletions(Collection<JobExecution> theJobs) {
theJobs.stream().forEach(jobExecution -> {
try {
awaitJobCompletion(jobExecution);
} catch (InterruptedException theE) {
fail();
}
});
}
@Test
public void testSubmitReusesExisting() {
public void testGenerateBulkExport_WithHas() {
// Submit
IBulkDataExportSvc.JobInfo jobDetails1 = myBulkDataExportSvc.submitJob(null, Sets.newHashSet("Patient", "Observation"), null, null);
assertNotNull(jobDetails1.getJobId());
// Submit again
IBulkDataExportSvc.JobInfo jobDetails2 = myBulkDataExportSvc.submitJob(null, Sets.newHashSet("Patient", "Observation"), null, null);
assertNotNull(jobDetails2.getJobId());
assertEquals(jobDetails1.getJobId(), jobDetails2.getJobId());
}
@Test
public void testBatchJobSubmitsAndRuns() throws Exception {
// Create some resources to load
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(null, Sets.newHashSet("Patient", "Observation"), null, null);
HashSet<String> types = Sets.newHashSet("Patient");
Set<String> typeFilters = Sets.newHashSet("Patient?_has:Observation:patient:identifier=SYS|VAL3");
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(null, types, null, typeFilters);
assertNotNull(jobDetails.getJobId());
//Add the UUID to the job
BulkExportJobParametersBuilder paramBuilder = new BulkExportJobParametersBuilder()
.setJobUUID(jobDetails.getJobId())
.setReadChunkSize(10L);
// Check the status
IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.SUBMITTED, status.getStatus());
assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Patient&_typeFilter=Patient%3F_has%3AObservation%3Apatient%3Aidentifier%3DSYS%7CVAL3", status.getRequest());
JobExecution jobExecution = myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
// Run a scheduled pass to build the export
myBulkDataExportSvc.buildExportFiles();
awaitJobCompletion(jobExecution);
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
awaitAllBulkJobCompletions();
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(2));
}
// Fetch the job again
status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.COMPLETE, status.getStatus());
assertEquals(1, status.getFiles().size());
@Test
public void testJobParametersValidatorRejectsInvalidParameters() {
JobParametersBuilder paramBuilder = new JobParametersBuilder().addString("jobUUID", "I'm not real!");
try {
myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
fail("Should have had invalid parameter execption!");
} catch (JobParametersInvalidException e) {
// Iterate over the files
for (IBulkDataExportSvc.FileEntry next : status.getFiles()) {
Binary nextBinary = myBinaryDao.read(next.getResourceId());
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {}:\n{}", next.getResourceType(), nextContents);
if ("Patient".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"id\":\"PAT3\""));
assertEquals(1, nextContents.split("\n").length);
} else {
fail(next.getResourceType());
}
}
}
//Note that if the job is generated, and doesnt rely on an existed persisted BulkExportJobEntity, it will need to
//create one itself, which means that its jobUUID isnt known until it starts. to get around this, we move
public void awaitJobCompletion(JobExecution theJobExecution) throws InterruptedException {
await().atMost(120, TimeUnit.SECONDS).until(() -> {
JobExecution jobExecution = myJobExplorer.getJobExecution(theJobExecution.getId());
ourLog.info("JobExecution {} currently has status: {}", theJobExecution.getId(), jobExecution.getStatus());
return jobExecution.getStatus() == BatchStatus.COMPLETED;
});
}
@Test
public void testSubmit_WithSince() throws InterruptedException {
public void testGenerateBulkExport_WithSince() throws InterruptedException {
// Create some resources to load
createResources();
@ -400,15 +437,97 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
}
}
/**
* Note that if the job is generated, and doesnt rely on an existed persisted BulkExportJobEntity, it will need to
* create one itself, which means that its jobUUID isnt known until it starts.
*/
@Test
public void testBatchJobIsCapableOfCreatingAnExportEntityIfNoJobIsProvided() throws Exception {
createResources();
//Add the UUID to the job
BulkExportJobParametersBuilder paramBuilder = new BulkExportJobParametersBuilder();
paramBuilder
.setReadChunkSize(100L)
.setOutputFormat(Constants.CT_FHIR_NDJSON)
.setResourceTypes(Arrays.asList("Patient", "Observation"));
JobExecution jobExecution = myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
awaitJobCompletion(jobExecution);
String jobUUID = (String) jobExecution.getExecutionContext().get("jobUUID");
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobUUID);
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(2));
}
public void awaitAllBulkJobCompletions() {
List<JobInstance> bulkExport = myJobExplorer.findJobInstancesByJobName("bulkExportJob", 0, 100);
if (bulkExport.isEmpty()) {
fail("There are no bulk export jobs running!");
}
List<JobExecution> bulkExportExecutions = bulkExport.stream().flatMap(jobInstance -> myJobExplorer.getJobExecutions(jobInstance).stream()).collect(Collectors.toList());
awaitJobCompletions(bulkExportExecutions);
}
public void awaitJobCompletions(Collection<JobExecution> theJobs) {
theJobs.forEach(jobExecution -> awaitJobCompletion(jobExecution));
}
@Test
public void testBatchJobSubmitsAndRuns() throws Exception {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(null, Sets.newHashSet("Patient", "Observation"), null, null);
//Add the UUID to the job
BulkExportJobParametersBuilder paramBuilder = new BulkExportJobParametersBuilder()
.setJobUUID(jobDetails.getJobId())
.setReadChunkSize(10L);
JobExecution jobExecution = myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
awaitJobCompletion(jobExecution);
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(2));
}
@Test
public void testJobParametersValidatorRejectsInvalidParameters() {
JobParametersBuilder paramBuilder = new JobParametersBuilder().addString("jobUUID", "I'm not real!");
try {
myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
fail("Should have had invalid parameter execption!");
} catch (JobParametersInvalidException e) {
// good
}
}
private void awaitJobCompletion(JobExecution theJobExecution) {
await().atMost(120, TimeUnit.SECONDS).until(() -> {
JobExecution jobExecution = myJobExplorer.getJobExecution(theJobExecution.getId());
ourLog.info("JobExecution {} currently has status: {}", theJobExecution.getId(), jobExecution.getStatus());
return jobExecution.getStatus() == BatchStatus.COMPLETED || jobExecution.getStatus() == BatchStatus.FAILED;
});
}
private void createResources() {
for (int i = 0; i < 10; i++) {
Patient patient = new Patient();
patient.setId("PAT" + i);
patient.setGender(i % 2 == 0 ? Enumerations.AdministrativeGender.MALE : Enumerations.AdministrativeGender.FEMALE);
patient.addName().setFamily("FAM" + i);
patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i);
IIdType patId = myPatientDao.update(patient).getId().toUnqualifiedVersionless();
Observation obs = new Observation();
obs.setId("OBS" + i);
obs.addIdentifier().setSystem("SYS").setValue("VAL" + i);
obs.setStatus(Observation.ObservationStatus.FINAL);
obs.getSubject().setReference(patId.getValue());
myObservationDao.update(obs);

View File

@ -49,12 +49,12 @@ public class BatchJobSubmitterImpl implements IBatchJobSubmitter {
public JobExecution runJob(Job theJob, JobParameters theJobParameters) throws JobParametersInvalidException{
try {
return myJobLauncher.run(theJob, theJobParameters);
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException theE) {
ourLog.warn("Job {} was already running, ignoring the call to start.", theJob.getName());
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException e) {
ourLog.warn("Job {} was already running, ignoring the call to start: {}", theJob.getName(), e.toString());
return myJobRepository.getLastJobExecution(theJob.getName(), theJobParameters);
} catch (JobParametersInvalidException theE) {
ourLog.error("Job Parameters passed to this job were invalid: {}", theE.getMessage());
throw theE;
} catch (JobParametersInvalidException e) {
ourLog.error("Job Parameters passed to this job were invalid: {}", e.getMessage());
throw e;
}
}
}