Final config layout of Bulk Export Batch job

This commit is contained in:
Tadgh 2020-06-08 15:59:08 -07:00
parent 4405e50db9
commit 44aa688a20
33 changed files with 295 additions and 422 deletions

View File

@ -63,8 +63,8 @@ ca.uhn.fhir.validation.ValidationResult.noIssuesDetected=No issues detected duri
# JPA Messages
ca.uhn.fhir.jpa.bulk.BulkDataExportSvcImpl.onlyBinarySelected=Binary resources may not be exported with bulk export
ca.uhn.fhir.jpa.bulk.BulkDataExportSvcImpl.unknownResourceType=Unknown or unsupported resource type: {0}
ca.uhn.fhir.jpa.bulk.svc.BulkDataExportSvcImpl.onlyBinarySelected=Binary resources may not be exported with bulk export
ca.uhn.fhir.jpa.bulk.svc.BulkDataExportSvcImpl.unknownResourceType=Unknown or unsupported resource type: {0}
ca.uhn.fhir.jpa.config.HapiFhirHibernateJpaDialect.resourceVersionConstraintFailure=The operation has failed with a version constraint failure. This generally means that two clients/threads were trying to update the same resource at the same time, and this request was chosen as the failing request.
ca.uhn.fhir.jpa.config.HapiFhirHibernateJpaDialect.resourceIndexedCompositeStringUniqueConstraintFailure=The operation has failed with a unique index constraint failure. This probably means that the operation was trying to create/update a resource that would have resulted in a duplicate value for a unique index.
ca.uhn.fhir.jpa.config.HapiFhirHibernateJpaDialect.forcedIdConstraintFailure=The operation has failed with a client-assigned ID constraint failure. This typically means that multiple client threads are trying to create a new resource with the same client-assigned ID at the same time, and this thread was chosen to be rejected.

View File

@ -0,0 +1,14 @@
package ca.uhn.fhir.jpa.batch;
import ca.uhn.fhir.jpa.bulk.job.BulkExportJobConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@Configuration
//When you define a new batch job, add it here.
@Import({
CommonBatchJobConfig.class,
BulkExportJobConfig.class,})
public class BatchJobsConfig {
//Empty config, as this is just an aggregator for all the various batch jobs defined around the system.
}

View File

@ -0,0 +1,17 @@
package ca.uhn.fhir.jpa.batch;
import ca.uhn.fhir.jpa.batch.processors.PidToIBaseResourceProcessor;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CommonBatchJobConfig {
@Bean
@StepScope
public PidToIBaseResourceProcessor pidToResourceProcessor() {
return new PidToIBaseResourceProcessor();
}
}

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.bulk.batch;
package ca.uhn.fhir.jpa.batch.processors;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
@ -7,7 +7,6 @@ import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@ -16,11 +15,10 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.slf4j.LoggerFactory.getLogger;
public class BulkItemResourceLoaderProcessor implements ItemProcessor<ResourcePersistentId, IBaseResource> {
private static final Logger ourLog = getLogger(BulkItemResourceLoaderProcessor.class);
/**
* Reusable Item Processor which converts a ResourcePersistentId to its IBaseResource
*/
public class PidToIBaseResourceProcessor implements ItemProcessor<ResourcePersistentId, IBaseResource> {
@Autowired
private SearchBuilderFactory mySearchBuilderFactory;
@ -34,8 +32,6 @@ public class BulkItemResourceLoaderProcessor implements ItemProcessor<ResourcePe
@Autowired
private FhirContext myContext;
@Override
public IBaseResource process(ResourcePersistentId theResourcePersistentId) throws Exception {

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.bulk;
package ca.uhn.fhir.jpa.bulk.api;
/*-
* #%L
@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.bulk;
* #L%
*/
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import org.hl7.fhir.instance.model.api.IIdType;
import javax.transaction.Transactional;
@ -36,7 +37,7 @@ public interface IBulkDataExportSvc {
JobInfo submitJob(String theOutputFormat, Set<String> theResourceTypes, Date theSince, Set<String> theFilters);
JobInfo getJobStatusOrThrowResourceNotFound(String theJobId);
JobInfo getJobInfoOrThrowResourceNotFound(String theJobId);
void cancelAndPurgeAllJobs();

View File

@ -1,39 +0,0 @@
package ca.uhn.fhir.jpa.bulk.batch;
import ca.uhn.fhir.jpa.bulk.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import java.util.Optional;
public class BulkExportJobCompletionListener implements JobExecutionListener {
@Value("#{jobParameters['jobUUID']}")
private String myJobUUID;
@Autowired
private IBulkExportJobDao myBulkExportJobDao;
@Override
public void beforeJob(JobExecution theJobExecution) {
}
@Override
public void afterJob(JobExecution theJobExecution) {
if (theJobExecution.getStatus() == BatchStatus.COMPLETED) {
Optional<BulkExportJobEntity> byJobId = myBulkExportJobDao.findByJobId(myJobUUID);
if (byJobId.isPresent()) {
BulkExportJobEntity bulkExportJobEntity = byJobId.get();
bulkExportJobEntity.setStatus(BulkJobStatusEnum.COMPLETE);
myBulkExportJobDao.save(bulkExportJobEntity);
}
}
}
}

View File

@ -0,0 +1,82 @@
package ca.uhn.fhir.jpa.bulk.job;
import ca.uhn.fhir.jpa.batch.processors.PidToIBaseResourceProcessor;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BulkExportJobConfig {
@Autowired
private StepBuilderFactory myStepBuilderFactory;
@Autowired
private JobBuilderFactory myJobBuilderFactory;
@Autowired
private PidToIBaseResourceProcessor myPidToIBaseResourceProcessor;
@Bean
public Job bulkExportJob() {
return myJobBuilderFactory.get("bulkExportJob")
.start(partitionStep())
.listener(bulkExportJobCompletionListener())
.build();
}
@Bean
public Step bulkExportGenerateResourceFilesStep() {
return myStepBuilderFactory.get("bulkExportGenerateResourceFilesStep")
.<ResourcePersistentId, IBaseResource> chunk(1000) //1000 resources per generated file
.reader(bulkItemReader(null))
.processor(myPidToIBaseResourceProcessor)
.writer(resourceToFileWriter())
.build();
}
@Bean
@JobScope
public BulkExportJobStatusChangeListener bulkExportJobCompletionListener() {
return new BulkExportJobStatusChangeListener();
}
@Bean
public Step partitionStep() {
return myStepBuilderFactory.get("partitionStep")
.partitioner("bulkExportGenerateResourceFilesStep", partitioner(null))
.step(bulkExportGenerateResourceFilesStep())
.build();
}
@Bean
@StepScope
public BulkItemReader bulkItemReader(@Value("#{jobParameters['jobUUID']}") String theJobUUID) {
BulkItemReader bulkItemReader = new BulkItemReader();
bulkItemReader.setJobUUID(theJobUUID);
return bulkItemReader;
}
@Bean
@JobScope
public ResourceTypePartitioner partitioner(@Value("#{jobParameters['jobUUID']}") String theJobUUID) {
return new ResourceTypePartitioner(theJobUUID);
}
@Bean
@StepScope
public ItemWriter<IBaseResource> resourceToFileWriter() {
return new ResourceToFileWriter();
}
}

View File

@ -0,0 +1,39 @@
package ca.uhn.fhir.jpa.bulk.job;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
/**
* Will run before and after a job to set the status to whatever is appropriate.
*/
public class BulkExportJobStatusChangeListener implements JobExecutionListener {
@Value("#{jobParameters['jobUUID']}")
private String myJobUUID;
@Autowired
private BulkExportDaoSvc myBulkExportDaoSvc;
@Override
public void beforeJob(JobExecution theJobExecution) {
if (theJobExecution.getStatus() == BatchStatus.STARTING) {
myBulkExportDaoSvc.setJobToStatus(myJobUUID, BulkJobStatusEnum.BUILDING);
}
}
@Override
public void afterJob(JobExecution theJobExecution) {
if (theJobExecution.getStatus() == BatchStatus.COMPLETED) {
myBulkExportDaoSvc.setJobToStatus(myJobUUID, BulkJobStatusEnum.COMPLETE);
} else {
//If the job didn't complete successfully, just set it back to submitted so that it gets picked up again by the scheduler.
myBulkExportDaoSvc.setJobToStatus(myJobUUID, BulkJobStatusEnum.SUBMITTED);
}
}
}

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.bulk.batch;
package ca.uhn.fhir.jpa.bulk.job;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
@ -16,7 +16,10 @@ import ca.uhn.fhir.rest.param.DateRangeParam;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@ -25,7 +28,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
public class BulkItemReader extends AbstractItemCountingItemStreamItemReader<ResourcePersistentId> {
public class BulkItemReader implements ItemReader<ResourcePersistentId> {
private static final Logger ourLog = LoggerFactory.getLogger(BulkItemReader.class);
@Autowired
@ -47,32 +50,8 @@ public class BulkItemReader extends AbstractItemCountingItemStreamItemReader<Res
@Value("#{stepExecutionContext['resourceType']}")
private String myResourceType;
Iterator<ResourcePersistentId> myPidIterator;
protected ResourcePersistentId doRead() throws Exception {
if (myPidIterator == null) {
loadResourcePids();
}
if (myPidIterator.hasNext()) {
return myPidIterator.next();
} else {
return null;
}
}
@Override
protected void doOpen() throws Exception {
}
@Override
protected void doClose() throws Exception {
}
private void loadResourcePids() {
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
if (!jobOpt.isPresent()) {
@ -107,4 +86,15 @@ public class BulkItemReader extends AbstractItemCountingItemStreamItemReader<Res
this.myJobUUID = theUUID;
}
@Override
public ResourcePersistentId read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (myPidIterator == null) {
loadResourcePids();
}
if (myPidIterator.hasNext()) {
return myPidIterator.next();
} else {
return null;
}
}
}

View File

@ -1,10 +1,9 @@
package ca.uhn.fhir.jpa.bulk.batch;
package ca.uhn.fhir.jpa.bulk.job;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.Constants;
@ -31,19 +30,14 @@ import static org.slf4j.LoggerFactory.getLogger;
public class ResourceToFileWriter implements ItemWriter<IBaseResource>, CompletionPolicy {
private static final Logger ourLog = getLogger(ResourceToFileWriter.class);
@Autowired
private FhirContext myContext;
@Autowired
private IBulkExportCollectionDao myBulkExportCollectionDao;
@Autowired
private DaoRegistry myDaoRegistry;
private BulkExportCollectionEntity myBulkExportCollectionEntity;
@Autowired
private BulkExportDaoSvc myBulkExportDaoSvc;
private ByteArrayOutputStream myOutputStream;
private OutputStreamWriter myWriter;
@ -54,8 +48,6 @@ public class ResourceToFileWriter implements ItemWriter<IBaseResource>, Completi
private IFhirResourceDao<IBaseBinary> myBinaryDao;
@Autowired
private BulkExportDaoSvc myBulkExportDaoSvc;
public ResourceToFileWriter() {
myOutputStream = new ByteArrayOutputStream();
@ -92,19 +84,6 @@ public class ResourceToFileWriter implements ItemWriter<IBaseResource>, Completi
return myBinaryDao.create(binary).getResource().getIdElement();
}
private BulkExportCollectionEntity getOrLoadBulkExportCollectionEntity() {
if (myBulkExportCollectionEntity == null) {
Optional<BulkExportCollectionEntity> oBulkExportCollectionEntity = myBulkExportCollectionDao.findById(myBulkExportCollectionEntityId);
if (!oBulkExportCollectionEntity.isPresent()) {
throw new IllegalArgumentException("This BulkExportCollectionEntity doesn't exist!");
} else {
myBulkExportCollectionEntity = oBulkExportCollectionEntity.get();
}
}
return myBulkExportCollectionEntity;
}
@Override
public void write(List<? extends IBaseResource> resources) throws Exception {
@ -114,7 +93,7 @@ public class ResourceToFileWriter implements ItemWriter<IBaseResource>, Completi
}
Optional<IIdType> createdId = flushToFiles();
createdId.ifPresent(theIIdType -> ourLog.warn("Created resources for bulk export file containing {}:{} resources of type ", theIIdType.toUnqualifiedVersionless().getValue(), myBulkExportCollectionEntity.getResourceType()));
createdId.ifPresent(theIIdType -> ourLog.warn("Created resources for bulk export file containing {} resources of type ", theIIdType.toUnqualifiedVersionless().getValue()));
}
@SuppressWarnings("unchecked")

View File

@ -1,5 +1,6 @@
package ca.uhn.fhir.jpa.bulk.batch;
package ca.uhn.fhir.jpa.bulk.job;
import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc;
import org.slf4j.Logger;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.bulk;
package ca.uhn.fhir.jpa.bulk.provider;
/*-
* #%L
@ -21,6 +21,8 @@ package ca.uhn.fhir.jpa.bulk;
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.model.BulkExportResponseJson;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.rest.annotation.Operation;
import ca.uhn.fhir.rest.annotation.OperationParam;
@ -123,7 +125,7 @@ public class BulkDataExportProvider {
HttpServletResponse response = theRequestDetails.getServletResponse();
theRequestDetails.getServer().addHeadersToResponse(response);
IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(theJobId.getValueAsString());
IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(theJobId.getValueAsString());
switch (status.getStatus()) {
case SUBMITTED:

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.bulk;
package ca.uhn.fhir.jpa.bulk.svc;
/*-
* #%L
@ -21,13 +21,12 @@ package ca.uhn.fhir.jpa.bulk;
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
@ -37,28 +36,21 @@ import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.util.BinaryUtil;
import ca.uhn.fhir.util.StopWatch;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseBinary;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.InstantType;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
@ -67,18 +59,10 @@ import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.PostConstruct;
import javax.transaction.Transactional;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static ca.uhn.fhir.util.UrlUtil.escapeUrlParam;
@ -103,11 +87,15 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
private FhirContext myContext;
@Autowired
private PlatformTransactionManager myTxManager;
@Autowired
private SearchBuilderFactory mySearchBuilderFactory;
private TransactionTemplate myTxTemplate;
private long myFileMaxChars = 500 * FileUtils.ONE_KB;
@Autowired
private IBatchJobSubmitter myJobSubmitter;
@Autowired
@Qualifier("bulkExportJob")
private org.springframework.batch.core.Job myBulkExportJob;
private int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR);
/**
@ -134,10 +122,7 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
String jobUuid = jobToProcessOpt.get().getJobId();
try {
myTxTemplate.execute(t -> {
processJob(jobUuid);
return null;
});
processJob(jobUuid);
} catch (Exception e) {
ourLog.error("Failure while preparing bulk export extract", e);
myTxTemplate.execute(t -> {
@ -203,112 +188,13 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
}
private void processJob(String theJobUuid) {
JobParameters parameters = new JobParametersBuilder()
.addString("jobUUID", theJobUuid)
.toJobParameters();
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(theJobUuid);
if (!jobOpt.isPresent()) {
ourLog.info("Job appears to be deleted");
return;
}
StopWatch jobStopwatch = new StopWatch();
AtomicInteger jobResourceCounter = new AtomicInteger();
BulkExportJobEntity job = jobOpt.get();
ourLog.info("Bulk export starting generation for batch export job: {}", job);
for (BulkExportCollectionEntity nextCollection : job.getCollections()) {
String nextType = nextCollection.getResourceType();
IFhirResourceDao dao = myDaoRegistry.getResourceDao(nextType);
ourLog.info("Bulk export assembling export of type {} for job {}", nextType, theJobUuid);
Class<? extends IBaseResource> nextTypeClass = myContext.getResourceDefinition(nextType).getImplementingClass();
ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, nextType, nextTypeClass);
SearchParameterMap map = new SearchParameterMap();
map.setLoadSynchronous(true);
if (job.getSince() != null) {
map.setLastUpdated(new DateRangeParam(job.getSince(), null));
}
IResultIterator resultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, theJobUuid), null, RequestPartitionId.allPartitions());
storeResultsToFiles(nextCollection, sb, resultIterator, jobResourceCounter, jobStopwatch);
}
job.setStatus(BulkJobStatusEnum.COMPLETE);
updateExpiry(job);
myBulkExportJobDao.save(job);
ourLog.info("Bulk export completed job in {}: {}", jobStopwatch, job);
myJobSubmitter.runJob(myBulkExportJob, parameters);
}
private void storeResultsToFiles(BulkExportCollectionEntity theExportCollection, ISearchBuilder theSearchBuilder, IResultIterator theResultIterator, AtomicInteger theJobResourceCounter, StopWatch theJobStopwatch) {
try (IResultIterator query = theResultIterator) {
if (!query.hasNext()) {
return;
}
AtomicInteger fileCounter = new AtomicInteger(0);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
OutputStreamWriter writer = new OutputStreamWriter(outputStream, Constants.CHARSET_UTF8);
IParser parser = myContext.newJsonParser().setPrettyPrint(false);
List<ResourcePersistentId> pidsSpool = new ArrayList<>();
List<IBaseResource> resourcesSpool = new ArrayList<>();
while (query.hasNext()) {
pidsSpool.add(query.next());
fileCounter.incrementAndGet();
theJobResourceCounter.incrementAndGet();
if (pidsSpool.size() >= 10 || !query.hasNext()) {
theSearchBuilder.loadResourcesByPid(pidsSpool, Collections.emptyList(), resourcesSpool, false, null);
for (IBaseResource nextFileResource : resourcesSpool) {
parser.encodeResourceToWriter(nextFileResource, writer);
writer.append("\n");
}
pidsSpool.clear();
resourcesSpool.clear();
if (outputStream.size() >= myFileMaxChars || !query.hasNext()) {
Optional<IIdType> createdId = flushToFiles(theExportCollection, fileCounter, outputStream);
createdId.ifPresent(theIIdType -> ourLog.info("Created resource {} for bulk export file containing {} resources of type {} - Total {} resources ({}/sec)", theIIdType.toUnqualifiedVersionless().getValue(), fileCounter.get(), theExportCollection.getResourceType(), theJobResourceCounter.get(), theJobStopwatch.formatThroughput(theJobResourceCounter.get(), TimeUnit.SECONDS)));
fileCounter.set(0);
}
}
}
} catch (IOException e) {
throw new InternalErrorException(e);
}
}
private Optional<IIdType> flushToFiles(BulkExportCollectionEntity theCollection, AtomicInteger theCounter, ByteArrayOutputStream theOutputStream) {
if (theOutputStream.size() > 0) {
IBaseBinary binary = BinaryUtil.newBinary(myContext);
binary.setContentType(Constants.CT_FHIR_NDJSON);
binary.setContent(theOutputStream.toByteArray());
IIdType createdId = getBinaryDao().create(binary).getResource().getIdElement();
BulkExportCollectionFileEntity file = new BulkExportCollectionFileEntity();
theCollection.getFiles().add(file);
file.setCollection(theCollection);
file.setResource(createdId.getIdPart());
myBulkExportCollectionFileDao.saveAndFlush(file);
theOutputStream.reset();
return Optional.of(createdId);
}
return Optional.empty();
}
@SuppressWarnings("unchecked")
private IFhirResourceDao<IBaseBinary> getBinaryDao() {
@ -429,7 +315,7 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
@Transactional
@Override
public JobInfo getJobStatusOrThrowResourceNotFound(String theJobId) {
public JobInfo getJobInfoOrThrowResourceNotFound(String theJobId) {
BulkExportJobEntity job = myBulkExportJobDao
.findByJobId(theJobId)
.orElseThrow(() -> new ResourceNotFoundException(theJobId));

View File

@ -1,18 +1,14 @@
package ca.uhn.fhir.jpa.bulk.batch;
package ca.uhn.fhir.jpa.bulk.svc;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.transaction.Transactional;
import static org.slf4j.LoggerFactory.getLogger;
@Service
public class BulkExportCollectionFileDaoSvc {
private static final Logger ourLog = getLogger(BulkExportCollectionFileDaoSvc.class);
@Autowired
private IBulkExportCollectionFileDao myBulkExportCollectionFileDao;

View File

@ -1,5 +1,6 @@
package ca.uhn.fhir.jpa.bulk.batch;
package ca.uhn.fhir.jpa.bulk.svc;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
@ -63,4 +64,18 @@ public class BulkExportDaoSvc {
return jobOpt.get();
}
@Transactional
public void setJobToStatus(String theJobUUID, BulkJobStatusEnum theStatus) {
Optional<BulkExportJobEntity> oJob = myBulkExportJobDao.findByJobId(theJobUUID);
if (!oJob.isPresent()) {
ourLog.error("Job doesn't exist!");
} else {
ourLog.info("Setting job with UUID {} to {}", theJobUUID, theStatus);
BulkExportJobEntity bulkExportJobEntity = oJob.get();
bulkExportJobEntity.setStatus(theStatus);
myBulkExportJobDao.save(bulkExportJobEntity);
}
}
}

View File

@ -6,11 +6,13 @@ import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.interceptor.executor.InterceptorService;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IDao;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.batch.svc.BatchJobSubmitterImpl;
import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider;
import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor;
import ca.uhn.fhir.jpa.bulk.BulkDataExportProvider;
import ca.uhn.fhir.jpa.bulk.BulkDataExportSvcImpl;
import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.provider.BulkDataExportProvider;
import ca.uhn.fhir.jpa.bulk.svc.BulkDataExportSvcImpl;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.dao.HistoryBuilder;
import ca.uhn.fhir.jpa.dao.HistoryBuilderFactory;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
@ -46,6 +48,7 @@ import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.search.reindex.ResourceReindexingSvcImpl;
import ca.uhn.fhir.jpa.searchparam.config.SearchParamConfig;
import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
import ca.uhn.fhir.jpa.searchparam.extractor.IResourceLinkResolver;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.interceptor.consent.IConsentContextServices;
@ -53,7 +56,6 @@ import ca.uhn.fhir.rest.server.interceptor.partition.RequestTenantPartitionInter
import org.hibernate.jpa.HibernatePersistenceProvider;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.utilities.graphql.IGraphQLStorageServices;
import org.springframework.batch.core.Job;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
@ -109,7 +111,7 @@ import java.util.Date;
@ComponentScan.Filter(type = FilterType.REGEX, pattern = "ca.uhn.fhir.jpa.empi.*")
})
@Import({
SearchParamConfig.class
SearchParamConfig.class, BatchJobsConfig.class
})
public abstract class BaseConfig {
@ -138,6 +140,11 @@ public abstract class BaseConfig {
return new DatabaseBackedPagingProvider();
}
@Bean
public IBatchJobSubmitter batchJobSubmitter() {
return new BatchJobSubmitterImpl();
}
/**
* This method should be overridden to provide an actual completed
* bean, but it provides a partially completed entity manager
@ -306,7 +313,6 @@ public abstract class BaseConfig {
return new BulkDataExportSvcImpl();
}
@Bean
@Lazy
public BulkDataExportProvider bulkDataExportProvider() {

View File

@ -1,6 +1,6 @@
package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.jpa.bulk.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;

View File

@ -20,7 +20,7 @@ package ca.uhn.fhir.jpa.entity;
* #L%
*/
import ca.uhn.fhir.jpa.bulk.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.hl7.fhir.r5.model.InstantType;

View File

@ -1,6 +1,10 @@
package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.model.BulkExportResponseJson;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.provider.BulkDataExportProvider;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.client.apache.ResourceEntity;
@ -179,7 +183,7 @@ public class BulkDataExportProviderTest {
.setJobId(A_JOB_ID)
.setStatus(BulkJobStatusEnum.BUILDING)
.setStatusTime(InstantType.now().getValue());
when(myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(eq(A_JOB_ID))).thenReturn(jobInfo);
when(myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(eq(A_JOB_ID))).thenReturn(jobInfo);
String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" +
JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID;
@ -204,7 +208,7 @@ public class BulkDataExportProviderTest {
.setStatus(BulkJobStatusEnum.ERROR)
.setStatusTime(InstantType.now().getValue())
.setStatusMessage("Some Error Message");
when(myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(eq(A_JOB_ID))).thenReturn(jobInfo);
when(myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(eq(A_JOB_ID))).thenReturn(jobInfo);
String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" +
JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID;
@ -233,7 +237,7 @@ public class BulkDataExportProviderTest {
jobInfo.addFile().setResourceType("Patient").setResourceId(new IdType("Binary/111"));
jobInfo.addFile().setResourceType("Patient").setResourceId(new IdType("Binary/222"));
jobInfo.addFile().setResourceType("Patient").setResourceId(new IdType("Binary/333"));
when(myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(eq(A_JOB_ID))).thenReturn(jobInfo);
when(myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(eq(A_JOB_ID))).thenReturn(jobInfo);
String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" +
JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID;
@ -263,7 +267,7 @@ public class BulkDataExportProviderTest {
@Test
public void testPollForStatus_Gone() throws IOException {
when(myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(eq(A_JOB_ID))).thenThrow(new ResourceNotFoundException("Unknown job: AAA"));
when(myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(eq(A_JOB_ID))).thenThrow(new ResourceNotFoundException("Unknown job: AAA"));
String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" +
JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID;

View File

@ -1,6 +1,8 @@
package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
@ -17,17 +19,15 @@ import org.hamcrest.Matchers;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Binary;
import org.hl7.fhir.r4.model.InstantType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import java.util.Date;
import java.util.UUID;
@ -53,7 +53,9 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
private IBulkDataExportSvc myBulkDataExportSvc;
@Autowired
private IBatchJobSubmitter myBatchJobSubmitter;
@Autowired
@Qualifier("bulkExportJob")
private Job myBulkJob;
@ -151,7 +153,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertNotNull(jobDetails.getJobId());
// Check the status
IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId());
IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.SUBMITTED, status.getStatus());
assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Observation,Patient&_typeFilter="+TEST_FILTER, status.getRequest());
@ -159,7 +161,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
myBulkDataExportSvc.buildExportFiles();
// Fetch the job again
status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId());
status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.COMPLETE, status.getStatus());
assertEquals(2, status.getFiles().size());
@ -201,7 +203,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertNotNull(jobDetails.getJobId());
// Check the status
IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId());
IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.SUBMITTED, status.getStatus());
assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson", status.getRequest());
@ -209,7 +211,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
myBulkDataExportSvc.buildExportFiles();
// Fetch the job again
status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId());
status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.COMPLETE, status.getStatus());
assertEquals(2, status.getFiles().size());
@ -249,7 +251,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Test
public void testBatchJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
public void testBatchJob() throws InterruptedException {
createResources();
// Create a bulk job
@ -258,10 +260,19 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
JobParametersBuilder paramBuilder = new JobParametersBuilder().addString("jobUUID", jobDetails.getJobId());
myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
IBulkDataExportSvc.JobInfo jobStatusOrThrowResourceNotFound = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobStatusOrThrowResourceNotFound.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
IBulkDataExportSvc.JobInfo jobInfo;
while(true) {
jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
if (jobInfo.getStatus() != BulkJobStatusEnum.COMPLETE) {
Thread.sleep(1000L);
ourLog.warn("waiting..");
} else {
break;
}
}
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
}
@Test
public void testSubmit_WithSince() throws InterruptedException {
@ -284,7 +295,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertNotNull(jobDetails.getJobId());
// Check the status
IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId());
IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.SUBMITTED, status.getStatus());
assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Observation,Patient&_since=" + cutoff.setTimeZoneZulu(true).getValueAsString(), status.getRequest());
@ -292,7 +303,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
myBulkDataExportSvc.buildExportFiles();
// Fetch the job again
status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId());
status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.COMPLETE, status.getStatus());
assertEquals(1, status.getFiles().size());
@ -320,11 +331,11 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i);
IIdType patId = myPatientDao.update(patient).getId().toUnqualifiedVersionless();
//Observation obs = new Observation();
//obs.setId("OBS" + i);
//obs.setStatus(Observation.ObservationStatus.FINAL);
//obs.getSubject().setReference(patId.getValue());
// myObservationDao.update(obs);
Observation obs = new Observation();
obs.setId("OBS" + i);
obs.setStatus(Observation.ObservationStatus.FINAL);
obs.getSubject().setReference(patId.getValue());
myObservationDao.update(obs);
}
}
}

View File

@ -1,29 +1,15 @@
package ca.uhn.fhir.jpa.config;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.batch.svc.BatchJobSubmitterImpl;
import ca.uhn.fhir.jpa.bulk.batch.BulkItemReader;
import ca.uhn.fhir.jpa.search.LuceneSearchMappingFactory;
import ca.uhn.fhir.jpa.subscription.match.deliver.email.IEmailSender;
import ca.uhn.fhir.jpa.subscription.match.deliver.email.JavaMailEmailSender;
import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener;
import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
import ca.uhn.fhir.validation.ResultSeverityEnum;
import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder;
import org.apache.commons.dbcp2.BasicDataSource;
import org.hibernate.dialect.H2Dialect;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemStreamWriter;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.*;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;

View File

@ -1,18 +1,13 @@
package ca.uhn.fhir.jpa.config;
import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.batch.svc.BatchJobSubmitterImpl;
import ca.uhn.fhir.jpa.binstore.IBinaryStorageSvc;
import ca.uhn.fhir.jpa.binstore.MemoryBinaryStorageSvcImpl;
import ca.uhn.fhir.jpa.bulk.batch.BulkExportDaoSvc;
import ca.uhn.fhir.jpa.bulk.batch.BulkExportJobCompletionListener;
import ca.uhn.fhir.jpa.bulk.batch.BulkItemReader;
import ca.uhn.fhir.jpa.bulk.batch.BulkItemResourceLoaderProcessor;
import ca.uhn.fhir.jpa.bulk.batch.ResourceToFileWriter;
import ca.uhn.fhir.jpa.bulk.batch.ResourceTypePartitioner;
import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc;
import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener;
import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
import ca.uhn.fhir.validation.ResultSeverityEnum;
import net.ttddyy.dsproxy.listener.SingleQueryCountHolder;
@ -20,18 +15,6 @@ import net.ttddyy.dsproxy.listener.logging.SLF4JLogLevel;
import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder;
import org.apache.commons.dbcp2.BasicDataSource;
import org.hibernate.dialect.H2Dialect;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@ -42,20 +25,17 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.sql.DataSource;
import java.sql.Connection;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.fail;
@Configuration
@Import(TestJPAConfig.class)
@Import({TestJPAConfig.class, BatchJobsConfig.class})
@EnableTransactionManagement()
@EnableBatchProcessing
public class TestR4Config extends BaseJavaConfigR4 {
/**
* NANI
*/
public static final String WILL_LATE_BIND = null;
CountDownLatch jobLatch = new CountDownLatch(1);
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(TestR4Config.class);
public static Integer ourMaxThreads;
@ -67,15 +47,10 @@ public class TestR4Config extends BaseJavaConfigR4 {
* starvation
*/
if (ourMaxThreads == null) {
ourMaxThreads = (int) (Math.random() * 6.0) + 1;
ourMaxThreads = (int) (Math.random() * 6.0) + 3;
}
}
@Autowired
private StepBuilderFactory myStepBuilderFactory;
@Autowired
private JobBuilderFactory myJobBuilderFactory;
private Exception myLastStackTrace;
@ -90,73 +65,11 @@ public class TestR4Config extends BaseJavaConfigR4 {
return new CircularQueueCaptureQueriesListener();
}
@Bean
public Job bulkExportJob() {
return myJobBuilderFactory.get("bulkExportJob")
.start(partitionStep())
.listener(bulkExportJobCompletionListener())
.build();
}
@Bean
public Step workerResourceStep() {
return myStepBuilderFactory.get("workerResourceStep")
.<ResourcePersistentId, IBaseResource> chunk(2)
.reader(myBulkItemReader(WILL_LATE_BIND))
.processor(pidToResourceProcessor())
.writer(resourceToFileWriter())
.build();
}
@Bean
@JobScope
public BulkExportJobCompletionListener bulkExportJobCompletionListener() {
return new BulkExportJobCompletionListener();
}
@Bean
@StepScope
public ItemWriter<IBaseResource> resourceToFileWriter() {
return new ResourceToFileWriter();
}
@Bean
public Step partitionStep() {
return myStepBuilderFactory.get("partitionStep")
.partitioner("workerResourceStep", partitioner(null))
.step(workerResourceStep())
.build();
}
@Bean
public BulkExportDaoSvc bulkExportDaoSvc() {
return new BulkExportDaoSvc();
}
@Bean
@JobScope
public ResourceTypePartitioner partitioner(@Value("#{jobParameters['jobUUID']}") String theJobUUID) {
return new ResourceTypePartitioner(theJobUUID);
}
@Bean
@StepScope
public ItemProcessor<ResourcePersistentId, IBaseResource> pidToResourceProcessor() {
return new BulkItemResourceLoaderProcessor();
}
@Bean
@StepScope
public BulkItemReader myBulkItemReader(@Value("#{jobParameters['jobUUID']}") String theJobUUID) {
BulkItemReader bulkItemReader = new BulkItemReader();
bulkItemReader.setJobUUID(theJobUUID);
bulkItemReader.setName("bulkItemReader");
return bulkItemReader;
}
@Bean
public DataSource dataSource() {
BasicDataSource retVal = new BasicDataSource() {

View File

@ -8,8 +8,7 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
import ca.uhn.fhir.jpa.entity.TermConcept;
import ca.uhn.fhir.jpa.model.util.JpaConstants;

View File

@ -8,7 +8,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoSubscription;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestDstu2Config;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc;

View File

@ -13,7 +13,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestDstu3Config;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc;
@ -107,7 +107,6 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;

View File

@ -17,7 +17,7 @@ import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider;
import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor;
import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestR4Config;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc;

View File

@ -6,7 +6,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestR4ConfigWithElasticSearch;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;

View File

@ -7,7 +7,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestR4WithLuceneDisabledConfig;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
import ca.uhn.fhir.jpa.dao.dstu2.FhirResourceDaoDstu2SearchNoFtTest;

View File

@ -16,7 +16,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider;
import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor;
import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestR5Config;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc;

View File

@ -1,23 +0,0 @@
package ca.uhn.fhir.jpa.batch.config;
import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
/*//
@Component
public class JpaBatchConfigurer extends DefaultBatchConfigurer {
@Autowired
@Qualifier
private JpaTransactionManager myPlatformTransactionManager;
@Override
public PlatformTransactionManager getTransactionManager() {
return myPlatformTransactionManager;
}
}
*/

View File

@ -5,7 +5,7 @@ import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.bulk.BulkDataExportProvider;
import ca.uhn.fhir.jpa.bulk.provider.BulkDataExportProvider;
import ca.uhn.fhir.jpa.provider.DiffProvider;
import ca.uhn.fhir.jpa.subscription.match.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.interceptor.CascadingDeleteInterceptor;
@ -21,7 +21,6 @@ import ca.uhn.fhir.jpa.provider.r5.JpaConformanceProviderR5;
import ca.uhn.fhir.jpa.provider.r5.JpaSystemProviderR5;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
import ca.uhn.fhir.jpa.subscription.match.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.narrative.DefaultThymeleafNarrativeGenerator;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.server.ETagSupportEnum;