Rework to allow for job creation inside of batch

This commit is contained in:
Tadgh 2020-06-12 14:20:50 -07:00
parent a9c704c06e
commit 0f469c1a56
14 changed files with 419 additions and 64 deletions

View File

@ -23,36 +23,31 @@ package ca.uhn.fhir.jpa.bulk.job;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum; import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc; import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc;
import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.JobExecutionListener; import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
/** /**
* Will run before and after a job to set the status to whatever is appropriate. * Will run before and after a job to set the status to whatever is appropriate.
*/ */
public class BulkExportJobStatusChangeListener implements JobExecutionListener { public class BulkExportJobCloser implements Tasklet {
@Value("#{jobParameters['jobUUID']}") @Value("#{jobExecutionContext['jobUUID']}")
private String myJobUUID; private String myJobUUID;
@Autowired @Autowired
private BulkExportDaoSvc myBulkExportDaoSvc; private BulkExportDaoSvc myBulkExportDaoSvc;
@Override @Override
public void beforeJob(JobExecution theJobExecution) { public RepeatStatus execute(StepContribution theStepContribution, ChunkContext theChunkContext) throws Exception {
if (theJobExecution.getStatus() == BatchStatus.STARTING) { if (theChunkContext.getStepContext().getStepExecution().getJobExecution().getStatus() == BatchStatus.STARTED) {
myBulkExportDaoSvc.setJobToStatus(myJobUUID, BulkJobStatusEnum.BUILDING);
}
}
@Override
public void afterJob(JobExecution theJobExecution) {
if (theJobExecution.getStatus() == BatchStatus.COMPLETED) {
myBulkExportDaoSvc.setJobToStatus(myJobUUID, BulkJobStatusEnum.COMPLETE); myBulkExportDaoSvc.setJobToStatus(myJobUUID, BulkJobStatusEnum.COMPLETE);
} else { } else {
myBulkExportDaoSvc.setJobToStatus(myJobUUID, BulkJobStatusEnum.ERROR); myBulkExportDaoSvc.setJobToStatus(myJobUUID, BulkJobStatusEnum.ERROR);
} }
return RepeatStatus.FINISHED;
} }
} }

View File

@ -32,10 +32,9 @@ import org.springframework.batch.core.configuration.annotation.StepBuilderFactor
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor; import org.springframework.context.annotation.Lazy;
import java.util.List; import java.util.List;
@ -55,21 +54,34 @@ public class BulkExportJobConfig {
@Autowired @Autowired
private PidToIBaseResourceProcessor myPidToIBaseResourceProcessor; private PidToIBaseResourceProcessor myPidToIBaseResourceProcessor;
@Autowired
private TaskExecutor myTaskExecutor;
@Bean @Bean
@Lazy
public Job bulkExportJob() { public Job bulkExportJob() {
return myJobBuilderFactory.get("bulkExportJob") return myJobBuilderFactory.get("bulkExportJob")
.validator(jobExistsValidator()) .validator(jobExistsValidator())
.start(partitionStep()) .start(createBulkExportEntityStep())
.listener(bulkExportJobCompletionListener()) .next(partitionStep())
.next(closeJobStep())
.build(); .build();
} }
@Bean
public Step createBulkExportEntityStep() {
return myStepBuilderFactory.get("createBulkExportEntityStep")
.tasklet(createBulkExportEntityTasklet())
.listener(bulkExportJobStartedListener())
.build();
}
@Bean
public CreateBulkExportEntityTasklet createBulkExportEntityTasklet() {
return new CreateBulkExportEntityTasklet();
}
@Bean @Bean
public JobParametersValidator jobExistsValidator() { public JobParametersValidator jobExistsValidator() {
return new JobExistsParameterValidator(); return new BulkExportJobParameterValidator();
} }
@ -77,39 +89,51 @@ public class BulkExportJobConfig {
public Step bulkExportGenerateResourceFilesStep() { public Step bulkExportGenerateResourceFilesStep() {
return myStepBuilderFactory.get("bulkExportGenerateResourceFilesStep") return myStepBuilderFactory.get("bulkExportGenerateResourceFilesStep")
.<List<ResourcePersistentId>, List<IBaseResource>> chunk(100) //1000 resources per generated file, as the reader returns 10 resources at a time. .<List<ResourcePersistentId>, List<IBaseResource>> chunk(100) //1000 resources per generated file, as the reader returns 10 resources at a time.
.reader(bulkItemReader(null)) .reader(bulkItemReader())
.processor(myPidToIBaseResourceProcessor) .processor(myPidToIBaseResourceProcessor)
.writer(resourceToFileWriter()) .writer(resourceToFileWriter())
.build(); .build();
} }
@Bean @Bean
@JobScope @JobScope
public BulkExportJobStatusChangeListener bulkExportJobCompletionListener() { public BulkExportJobCloser bulkExportJobCloser() {
return new BulkExportJobStatusChangeListener(); return new BulkExportJobCloser();
}
@Bean
public Step closeJobStep() {
return myStepBuilderFactory.get("closeJobStep")
.tasklet(bulkExportJobCloser())
.build();
}
@Bean
@JobScope
public BulkExportJobStartedListener bulkExportJobStartedListener() {
return new BulkExportJobStartedListener();
} }
@Bean @Bean
public Step partitionStep() { public Step partitionStep() {
return myStepBuilderFactory.get("partitionStep") return myStepBuilderFactory.get("partitionStep")
.partitioner("bulkExportGenerateResourceFilesStep", bulkExportResourceTypePartitioner(null)) .partitioner("bulkExportGenerateResourceFilesStep", bulkExportResourceTypePartitioner())
.step(bulkExportGenerateResourceFilesStep()) .step(bulkExportGenerateResourceFilesStep())
.taskExecutor(myTaskExecutor)
.build(); .build();
} }
@Bean @Bean
@StepScope @StepScope
public BulkItemReader bulkItemReader(@Value("#{jobParameters['jobUUID']}") String theJobUUID) { public BulkItemReader bulkItemReader(){
BulkItemReader bulkItemReader = new BulkItemReader(); return new BulkItemReader();
bulkItemReader.setJobUUID(theJobUUID);
return bulkItemReader;
} }
@Bean @Bean
@JobScope @JobScope
public ResourceTypePartitioner bulkExportResourceTypePartitioner(@Value("#{jobParameters['jobUUID']}") String theJobUUID) { public ResourceTypePartitioner bulkExportResourceTypePartitioner() {
return new ResourceTypePartitioner(theJobUUID); return new ResourceTypePartitioner();
} }
@Bean @Bean

View File

@ -22,41 +22,65 @@ package ca.uhn.fhir.jpa.bulk.job;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.rest.api.Constants;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.JobParametersValidator; import org.springframework.batch.core.JobParametersValidator;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import java.util.Arrays;
import java.util.Optional; import java.util.Optional;
/** /**
* This class will prevent a job from running if the UUID does not exist or is invalid. * This class will prevent a job from running if the UUID does not exist or is invalid.
*/ */
public class JobExistsParameterValidator implements JobParametersValidator { public class BulkExportJobParameterValidator implements JobParametersValidator {
@Autowired @Autowired
private IBulkExportJobDao myBulkExportJobDao; private IBulkExportJobDao myBulkExportJobDao;
@Override @Override
public void validate(JobParameters theJobParameters) throws JobParametersInvalidException { public void validate(JobParameters theJobParameters) throws JobParametersInvalidException {
if (theJobParameters == null) { if (theJobParameters == null) {
throw new JobParametersInvalidException("This job requires Parameters: [readChunkSize] and [jobUUID]"); throw new JobParametersInvalidException("This job needs Parameters: [readChunkSize], [jobUUID], [filters], [outputFormat], [resourceTypes]");
} }
StringBuilder errorBuilder = new StringBuilder();
Long readChunkSize = theJobParameters.getLong("readChunkSize"); Long readChunkSize = theJobParameters.getLong("readChunkSize");
String errorMessage = "";
if (readChunkSize == null || readChunkSize < 1) { if (readChunkSize == null || readChunkSize < 1) {
errorMessage += "There must be a valid number for readChunkSize, which is at least 1. "; errorBuilder.append("There must be a valid number for readChunkSize, which is at least 1. ");
} }
String jobUUID = theJobParameters.getString("jobUUID"); String jobUUID = theJobParameters.getString("jobUUID");
if (StringUtils.isBlank(jobUUID)) { Optional<BulkExportJobEntity> oJob = myBulkExportJobDao.findByJobId(jobUUID);
errorMessage += "Missing jobUUID Job parameter. "; if (!StringUtils.isBlank(jobUUID) && !oJob.isPresent()) {
errorBuilder.append("There is no persisted job that exists with UUID: " + jobUUID + ". ");
} }
Optional<BulkExportJobEntity> oJob = myBulkExportJobDao.findByJobId(jobUUID);
if (!oJob.isPresent()) { boolean hasExistingJob = oJob.isPresent();
errorMessage += "There is no persisted job that exists with UUID: " + jobUUID + ". "; //Check for to-be-created parameters.
if (!hasExistingJob) {
String resourceTypes = theJobParameters.getString("resourceTypes");
if (StringUtils.isBlank(resourceTypes)) {
errorBuilder.append("You must include [resourceTypes] as a Job Parameter");
} else {
String[] resourceArray = resourceTypes.split(",");
Arrays.stream(resourceArray).filter(resourceType -> resourceType.equalsIgnoreCase("Binary"))
.findFirst()
.ifPresent(resourceType -> {
errorBuilder.append("Bulk export of Binary resources is forbidden");
});
} }
String outputFormat = theJobParameters.getString("outputFormat");
if (!StringUtils.isBlank(outputFormat) && !Constants.CT_FHIR_NDJSON.equals(outputFormat)) {
errorBuilder.append("The only allowed format for Bulk Export is currently " + Constants.CT_FHIR_NDJSON);
}
}
String errorMessage = errorBuilder.toString();
if (!StringUtils.isEmpty(errorMessage)) { if (!StringUtils.isEmpty(errorMessage)) {
throw new JobParametersInvalidException(errorMessage); throw new JobParametersInvalidException(errorMessage);
} }

View File

@ -0,0 +1,48 @@
package ca.uhn.fhir.jpa.bulk.job;
import ca.uhn.fhir.rest.api.Constants;
import org.springframework.batch.core.JobParametersBuilder;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* This builder is a helper so you don't have to go lookup what job parameters are required for a bulk export job.
*
*/
public class BulkExportJobParametersBuilder extends JobParametersBuilder {
public BulkExportJobParametersBuilder setResourceTypes(List<String> resourceTypes) {
String resourceTypesString = resourceTypes.stream().collect(Collectors.joining(","));
this.addString("resourceTypes", resourceTypesString);
return this;
}
public BulkExportJobParametersBuilder setSince(Date theSince) {
this.addDate("since", theSince);
return this;
}
public BulkExportJobParametersBuilder setOutputFormat(String theOutputFormat) {
//TODO eventually we will support more types.
theOutputFormat = Constants.CT_FHIR_NDJSON;
this.addString("outputFormat", theOutputFormat);
return this;
}
public BulkExportJobParametersBuilder setFilters(Set<String> theFilters) {
this.addString("filters", theFilters.stream().collect(Collectors.joining(",")));
return this;
}
public BulkExportJobParametersBuilder setJobUUID(String theJobUUID) {
this.addString("jobUUID", theJobUUID);
return this;
}
public BulkExportJobParametersBuilder setReadChunkSize(Long theReadChunkSize) {
this.addLong("readChunkSize", theReadChunkSize);
return this;
}
}

View File

@ -0,0 +1,54 @@
package ca.uhn.fhir.jpa.bulk.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
/**
* Will run before and after a job to set the status to whatever is appropriate.
*/
public class BulkExportJobStartedListener implements StepExecutionListener {
@Value("#{jobExecutionContext['jobUUID']}")
private String myJobUUID;
@Autowired
private BulkExportDaoSvc myBulkExportDaoSvc;
@Override
public void beforeStep(StepExecution theStepExecution) {
}
@Override
public ExitStatus afterStep(StepExecution theStepExecution) {
if (theStepExecution.getStatus() == BatchStatus.STARTING) {
myBulkExportDaoSvc.setJobToStatus(myJobUUID, BulkJobStatusEnum.BUILDING);
}
return ExitStatus.EXECUTING;
}
}

View File

@ -68,6 +68,7 @@ public class BulkItemReader implements ItemReader<List<ResourcePersistentId>> {
private BulkExportJobEntity myJobEntity; private BulkExportJobEntity myJobEntity;
@Value("#{jobExecutionContext['jobUUID']}")
private String myJobUUID; private String myJobUUID;
@Value("#{stepExecutionContext['resourceType']}") @Value("#{stepExecutionContext['resourceType']}")
@ -105,10 +106,6 @@ public class BulkItemReader implements ItemReader<List<ResourcePersistentId>> {
myPidIterator = myReadPids.iterator(); myPidIterator = myReadPids.iterator();
} }
public void setJobUUID(String theUUID) {
this.myJobUUID = theUUID;
}
@Override @Override
public List<ResourcePersistentId> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { public List<ResourcePersistentId> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (myPidIterator == null) { if (myPidIterator == null) {

View File

@ -0,0 +1,63 @@
package ca.uhn.fhir.jpa.bulk.job;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.rest.api.Constants;
import org.apache.commons.lang3.StringUtils;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class CreateBulkExportEntityTasklet implements Tasklet {
@Autowired
private IBulkDataExportSvc myBulkDataExportSvc;
@Override
public RepeatStatus execute(StepContribution theStepContribution, ChunkContext theChunkContext) throws Exception {
Map<String, Object> jobParameters = theChunkContext.getStepContext().getJobParameters();
//We can leave early if they provided us with an existing job.
if (jobParameters.containsKey("jobUUID")) {
addUUIDToJobContext(theChunkContext, (String)jobParameters.get("jobUUID"));
return RepeatStatus.FINISHED;
} else {
String resourceTypes = (String)jobParameters.get("resourceTypes");
Date since = (Date)jobParameters.get("since");
String filters = (String)jobParameters.get("filters");
Set<String> filterSet;
if (StringUtils.isBlank(filters)) {
filterSet = null;
} else {
filterSet = Arrays.stream(filters.split(",")).collect(Collectors.toSet());
}
Set<String> resourceTypeSet = Arrays.stream(resourceTypes.split(",")).collect(Collectors.toSet());
String outputFormat = (String)jobParameters.get("outputFormat");
if (StringUtils.isBlank(outputFormat)) {
outputFormat = Constants.CT_FHIR_NDJSON;
}
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.submitJob(outputFormat, resourceTypeSet, since, filterSet);
addUUIDToJobContext(theChunkContext, jobInfo.getJobId());
return RepeatStatus.FINISHED;
}
}
private void addUUIDToJobContext(ChunkContext theChunkContext, String theJobUUID) {
theChunkContext
.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext()
.putString("jobUUID", theJobUUID);
}
}

View File

@ -62,6 +62,9 @@ public class ResourceToFileWriter implements ItemWriter<List<IBaseResource>> {
@Value("#{stepExecutionContext['bulkExportCollectionEntityId']}") @Value("#{stepExecutionContext['bulkExportCollectionEntityId']}")
private Long myBulkExportCollectionEntityId; private Long myBulkExportCollectionEntityId;
@Value("#{stepExecutionContext['resourceType']}")
private String myReosurceType;
private IFhirResourceDao<IBaseBinary> myBinaryDao; private IFhirResourceDao<IBaseBinary> myBinaryDao;
@ -109,16 +112,18 @@ public class ResourceToFileWriter implements ItemWriter<List<IBaseResource>> {
@Override @Override
public void write(List<? extends List<IBaseResource>> theList) throws Exception { public void write(List<? extends List<IBaseResource>> theList) throws Exception {
int count = 0;
for (List<IBaseResource> resourceList : theList) { for (List<IBaseResource> resourceList : theList) {
for (IBaseResource nextFileResource : resourceList) { for (IBaseResource nextFileResource : resourceList) {
myParser.encodeResourceToWriter(nextFileResource, myWriter); myParser.encodeResourceToWriter(nextFileResource, myWriter);
myWriter.append("\n"); myWriter.append("\n");
count++;
} }
} }
Optional<IIdType> createdId = flushToFiles(); Optional<IIdType> createdId = flushToFiles();
if (createdId.isPresent()) { if (createdId.isPresent()) {
ourLog.info("Created resources for bulk export file containing {} resources of type ", createdId.get().toUnqualifiedVersionless().getValue()); ourLog.info("Created {} resources for bulk export file containing {} resources of type {} ", count, createdId.get().toUnqualifiedVersionless().getValue(), myReosurceType);
} }
} }
} }

View File

@ -25,6 +25,7 @@ import org.slf4j.Logger;
import org.springframework.batch.core.partition.support.Partitioner; import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ExecutionContext;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -34,20 +35,18 @@ import static org.slf4j.LoggerFactory.getLogger;
public class ResourceTypePartitioner implements Partitioner { public class ResourceTypePartitioner implements Partitioner {
private static final Logger ourLog = getLogger(ResourceTypePartitioner.class); private static final Logger ourLog = getLogger(ResourceTypePartitioner.class);
@Value("#{jobExecutionContext['jobUUID']}")
private String myJobUUID; private String myJobUUID;
@Autowired @Autowired
private BulkExportDaoSvc myBulkExportDaoSvc; private BulkExportDaoSvc myBulkExportDaoSvc;
public ResourceTypePartitioner(String theJobUUID) {
myJobUUID = theJobUUID;
}
@Override @Override
public Map<String, ExecutionContext> partition(int gridSize) { public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> partitionContextMap = new HashMap<>(); Map<String, ExecutionContext> partitionContextMap = new HashMap<>();
Map<Long, String> idToResourceType = myBulkExportDaoSvc.getBulkJobCollectionIdToResourceTypeMap( myJobUUID); Map<Long, String> idToResourceType = myBulkExportDaoSvc.getBulkJobCollectionIdToResourceTypeMap(myJobUUID);
idToResourceType.entrySet().stream() idToResourceType.entrySet().stream()
.forEach(entry -> { .forEach(entry -> {

View File

@ -20,6 +20,9 @@ package ca.uhn.fhir.jpa.bulk.svc;
* #L% * #L%
*/ */
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum; import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
@ -27,25 +30,46 @@ import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity; import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity; import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.r4.model.InstantType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.transaction.Transactional; import javax.transaction.Transactional;
import java.util.Collection; import java.util.Collection;
import java.util.Date;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static ca.uhn.fhir.util.UrlUtil.escapeUrlParam;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.slf4j.LoggerFactory.getLogger; import static org.slf4j.LoggerFactory.getLogger;
@Service @Service
public class BulkExportDaoSvc { public class BulkExportDaoSvc {
private static final Logger ourLog = getLogger(BulkExportDaoSvc.class); private static final Logger ourLog = getLogger(BulkExportDaoSvc.class);
private int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR);
@Autowired
private FhirContext myFhirContext;
@Autowired @Autowired
IBulkExportJobDao myBulkExportJobDao; IBulkExportJobDao myBulkExportJobDao;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired @Autowired
IBulkExportCollectionDao myBulkExportCollectionDao; IBulkExportCollectionDao myBulkExportCollectionDao;
@ -58,6 +82,7 @@ public class BulkExportDaoSvc {
if (byId.isPresent()) { if (byId.isPresent()) {
BulkExportCollectionEntity exportCollectionEntity = byId.get(); BulkExportCollectionEntity exportCollectionEntity = byId.get();
theFile.setCollection(exportCollectionEntity); theFile.setCollection(exportCollectionEntity);
exportCollectionEntity.getFiles().add(theFile);
myBulkExportCollectionFileDao.saveAndFlush(theFile); myBulkExportCollectionFileDao.saveAndFlush(theFile);
myBulkExportCollectionDao.saveAndFlush(exportCollectionEntity); myBulkExportCollectionDao.saveAndFlush(exportCollectionEntity);
} }
@ -99,4 +124,92 @@ public class BulkExportDaoSvc {
} }
public IBulkDataExportSvc.JobInfo submitJob(String theOutputFormat, Set<String> theResourceTypes, Date theSince, Set<String> theFilters, int theReuseMillis) {
String outputFormat = Constants.CT_FHIR_NDJSON;
if (isNotBlank(theOutputFormat)) {
outputFormat = theOutputFormat;
}
if (!Constants.CTS_NDJSON.contains(outputFormat)) {
throw new InvalidRequestException("Invalid output format: " + theOutputFormat);
}
StringBuilder requestBuilder = new StringBuilder();
requestBuilder.append("/").append(JpaConstants.OPERATION_EXPORT);
requestBuilder.append("?").append(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT).append("=").append(escapeUrlParam(outputFormat));
Set<String> resourceTypes = theResourceTypes;
if (resourceTypes != null) {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_TYPE).append("=").append(String.join(",", resourceTypes));
}
Date since = theSince;
if (since != null) {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_SINCE).append("=").append(new InstantType(since).setTimeZoneZulu(true).getValueAsString());
}
if (theFilters != null && theFilters.size() > 0) {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_TYPE_FILTER).append("=").append(String.join(",", theFilters));
}
String request = requestBuilder.toString();
Date cutoff = DateUtils.addMilliseconds(new Date(), -theReuseMillis);
Pageable page = PageRequest.of(0, 10);
Slice<BulkExportJobEntity> existing = myBulkExportJobDao.findExistingJob(page, request, cutoff, BulkJobStatusEnum.ERROR);
if (!existing.isEmpty()) {
return toSubmittedJobInfo(existing.iterator().next());
}
if (resourceTypes != null && resourceTypes.contains("Binary")) {
String msg = myFhirContext.getLocalizer().getMessage(BulkDataExportSvcImpl.class, "onlyBinarySelected");
throw new InvalidRequestException(msg);
}
if (resourceTypes == null || resourceTypes.isEmpty()) {
// This is probably not a useful default, but having the default be "download the whole
// server" seems like a risky default too. We'll deal with that by having the default involve
// only returning a small time span
resourceTypes = myFhirContext.getResourceTypes();
if (since == null) {
since = DateUtils.addDays(new Date(), -1);
}
}
resourceTypes =
resourceTypes
.stream()
.filter(t -> !"Binary".equals(t))
.collect(Collectors.toSet());
BulkExportJobEntity job = new BulkExportJobEntity();
job.setJobId(UUID.randomUUID().toString());
job.setStatus(BulkJobStatusEnum.SUBMITTED);
job.setSince(since);
job.setCreated(new Date());
job.setRequest(request);
updateExpiry(job);
myBulkExportJobDao.save(job);
for (String nextType : resourceTypes) {
if (!myDaoRegistry.isResourceTypeSupported(nextType)) {
String msg = myFhirContext.getLocalizer().getMessage(BulkDataExportSvcImpl.class, "unknownResourceType", nextType);
throw new InvalidRequestException(msg);
}
BulkExportCollectionEntity collection = new BulkExportCollectionEntity();
collection.setJob(job);
collection.setResourceType(nextType);
job.getCollections().add(collection);
myBulkExportCollectionDao.save(collection);
}
ourLog.info("Bulk export job submitted: {}", job.toString());
return toSubmittedJobInfo(job);
}
private void updateExpiry(BulkExportJobEntity theJob) {
theJob.setExpiry(DateUtils.addMilliseconds(new Date(), myRetentionPeriod));
}
private IBulkDataExportSvc.JobInfo toSubmittedJobInfo(BulkExportJobEntity theJob) {
return new IBulkDataExportSvc.JobInfo().setJobId(theJob.getJobId());
}
} }

View File

@ -8,6 +8,7 @@ import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IDao; import ca.uhn.fhir.jpa.api.dao.IDao;
import ca.uhn.fhir.jpa.batch.BatchJobsConfig; import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.batch.config.InMemoryJobRepositoryBatchConfig;
import ca.uhn.fhir.jpa.batch.svc.BatchJobSubmitterImpl; import ca.uhn.fhir.jpa.batch.svc.BatchJobSubmitterImpl;
import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider; import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider;
import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor; import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor;
@ -114,10 +115,11 @@ import java.util.Date;
@ComponentScan.Filter(type = FilterType.REGEX, pattern = ".*Test.*"), @ComponentScan.Filter(type = FilterType.REGEX, pattern = ".*Test.*"),
@ComponentScan.Filter(type = FilterType.REGEX, pattern = "ca.uhn.fhir.jpa.subscription.*"), @ComponentScan.Filter(type = FilterType.REGEX, pattern = "ca.uhn.fhir.jpa.subscription.*"),
@ComponentScan.Filter(type = FilterType.REGEX, pattern = "ca.uhn.fhir.jpa.searchparam.*"), @ComponentScan.Filter(type = FilterType.REGEX, pattern = "ca.uhn.fhir.jpa.searchparam.*"),
@ComponentScan.Filter(type = FilterType.REGEX, pattern = "ca.uhn.fhir.jpa.empi.*") @ComponentScan.Filter(type = FilterType.REGEX, pattern = "ca.uhn.fhir.jpa.empi.*"),
@ComponentScan.Filter(type = FilterType.REGEX, pattern = "ca.uhn.fhir.jpa.batch.*")
}) })
@Import({ @Import({
SearchParamConfig.class, BatchJobsConfig.class SearchParamConfig.class, BatchJobsConfig.class, InMemoryJobRepositoryBatchConfig.class
}) })
public abstract class BaseConfig { public abstract class BaseConfig {
@ -151,6 +153,8 @@ public abstract class BaseConfig {
return new BatchJobSubmitterImpl(); return new BatchJobSubmitterImpl();
} }
/** /**
* This method should be overridden to provide an actual completed * This method should be overridden to provide an actual completed
* bean, but it provides a partially completed entity manager * bean, but it provides a partially completed entity manager

View File

@ -2,6 +2,7 @@ package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.job.BulkExportJobParametersBuilder;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum; import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
@ -24,12 +25,15 @@ import org.hl7.fhir.r4.model.Patient;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.UUID; import java.util.UUID;
@ -60,7 +64,6 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Qualifier("bulkExportJob") @Qualifier("bulkExportJob")
private Job myBulkJob; private Job myBulkJob;
@Test @Test
public void testPurgeExpiredJobs() { public void testPurgeExpiredJobs() {
@ -187,6 +190,26 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
} }
} }
@Test
public void testBatchJobIsCapableOfCreatingAnExportEntityIfNoJobIsProvided() throws Exception {
createResources();
//Add the UUID to the job
BulkExportJobParametersBuilder paramBuilder = new BulkExportJobParametersBuilder();
paramBuilder.setReadChunkSize(100L)
.setOutputFormat(Constants.CT_FHIR_NDJSON)
.setResourceTypes(Arrays.asList("Patient", "Observation"));
JobExecution jobExecution = myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
awaitJobCompletion(jobExecution);
String jobUUID = (String)jobExecution.getExecutionContext().get("jobUUID");
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobUUID);
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(2));
}
@Test @Test
public void testSubmitWithoutSpecificResources() { public void testSubmitWithoutSpecificResources() {
@ -260,12 +283,13 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(null, Sets.newHashSet("Patient", "Observation"), null, null); IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(null, Sets.newHashSet("Patient", "Observation"), null, null);
//Add the UUID to the job //Add the UUID to the job
JobParametersBuilder paramBuilder = new JobParametersBuilder() BulkExportJobParametersBuilder paramBuilder = new BulkExportJobParametersBuilder()
.addString("jobUUID", jobDetails.getJobId()) .setJobUUID(jobDetails.getJobId())
.addLong("readChunkSize", 10L); .setReadChunkSize(10L);
myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
awaitJobCompletion(jobDetails.getJobId()); JobExecution jobExecution = myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
awaitJobCompletion(jobExecution);
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE)); assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
@ -283,8 +307,14 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
} }
public void awaitJobCompletion(String theJobId) throws InterruptedException { //Note that if the job is generated, and doesnt rely on an existed persisted BulkExportJobEntity, it will need to
await().until(() -> myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(theJobId).getStatus() == BulkJobStatusEnum.COMPLETE); //create one itself, which means that its jobUUID isnt known until it starts. to get around this, we move
public void awaitJobCompletion(JobExecution theJobExecution) throws InterruptedException {
await().until(() -> {
return theJobExecution.getStatus() == BatchStatus.COMPLETED;
//String jobUUID = theJobExecution.getExecutionContext().getString("jobUUID");
//return myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobUUID).getStatus() == BulkJobStatusEnum.COMPLETE;
});
} }
@Test @Test

View File

@ -20,7 +20,6 @@ abstract public class BaseBatchR4Test {
@Autowired @Autowired
protected PlatformTransactionManager myPlatformTransactionManager; protected PlatformTransactionManager myPlatformTransactionManager;
@Autowired @Autowired
protected JobLauncher myJobLauncher; protected JobLauncher myJobLauncher;
@Autowired @Autowired

View File

@ -72,7 +72,7 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
init510(); // 20200516 - present init510(); // 20200516 - present
} }
private void init510() { protected void init510() {
Builder version = forVersion(VersionEnum.V5_1_0); Builder version = forVersion(VersionEnum.V5_1_0);
// NPM Packages // NPM Packages