Merge pull request #1906 from jamesagnew/spring-batch-integration

Spring batch integration
This commit is contained in:
Tadgh 2020-06-28 16:10:43 -07:00 committed by GitHub
commit e365e643ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
82 changed files with 2259 additions and 285 deletions

View File

@ -1,34 +0,0 @@
# Use docker-based build environment (instead of openvz)
#sudo: false
#dist: trusty
# Use VM based build environment
sudo: required
dist: trusty
language: java
jdk:
- openjdk11
#- oraclejdk9
env:
global:
- MAVEN_OPTS="-Xmx10244M -Xss128M -XX:MetaspaceSize=512M -XX:MaxMetaspaceSize=1024M -XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC"
cache:
directories:
- '$HOME/.m2/repository'
install: /bin/true
before_script:
# This seems to be required to get travis to set Xmx4g, per https://github.com/travis-ci/travis-ci/issues/3893
- export MAVEN_SKIP_RC=true
# Sometimes things get restored from the cache with bad permissions. See https://github.com/travis-ci/travis-ci/issues/9630
- sudo chmod -R 777 "$HOME/.m2/repository";
- sudo chown -R travis:travis "$HOME/.m2/repository";
script:
# - mvn -e -B clean install && cd hapi-fhir-ra && mvn -e -B -DTRAVIS_JOB_ID=$TRAVIS_JOB_ID clean test jacoco:report coveralls:report
# - mvn -Dci=true -e -B -P ALLMODULES,NOPARALLEL,ERRORPRONE clean install && cd hapi-fhir-jacoco && mvn -e -B -DTRAVIS_JOB_ID=$TRAVIS_JOB_ID jacoco:report coveralls:report
- mvn -Dci=true -e -B -P ALLMODULES,REDUCED_JPA_TESTS,JACOCO clean install && cd hapi-fhir-jacoco && mvn -e -B -DTRAVIS_JOB_ID=$TRAVIS_JOB_ID jacoco:report coveralls:report;

View File

@ -25,16 +25,16 @@ jobs:
inputs:
targetType: 'inline'
script: mkdir -p $(MAVEN_CACHE_FOLDER); pwd; ls -al $(MAVEN_CACHE_FOLDER)
- task: Maven@3
env:
JAVA_HOME_11_X64: /usr/local/openjdk-11
inputs:
goals: 'dependency:go-offline'
# These are Maven CLI options (and show up in the build logs) - "-nsu"=Don't update snapshots. We can remove this when Maven OSS is more healthy
options: '-P ALLMODULES,JACOCO,CI,ERRORPRONE -e -B -Dmaven.repo.local=$(MAVEN_CACHE_FOLDER)'
# These are JVM options (and don't show up in the build logs)
mavenOptions: '-Xmx1024m $(MAVEN_OPTS) -Dorg.slf4j.simpleLogger.showDateTime=true -Dorg.slf4j.simpleLogger.dateTimeFormat=HH:mm:ss,SSS -Duser.timezone=America/Toronto'
jdkVersionOption: 1.11
# - task: Maven@3
#env:
# JAVA_HOME_11_X64: /usr/local/openjdk-11
# inputs:
# goals: 'dependency:go-offline'
# # These are Maven CLI options (and show up in the build logs) - "-nsu"=Don't update snapshots. We can remove this when Maven OSS is more healthy
# options: '-P ALLMODULES,JACOCO,CI,ERRORPRONE -e -B -Dmaven.repo.local=$(MAVEN_CACHE_FOLDER)'
# # These are JVM options (and don't show up in the build logs)
# mavenOptions: '-Xmx1024m $(MAVEN_OPTS) -Dorg.slf4j.simpleLogger.showDateTime=true -Dorg.slf4j.simpleLogger.dateTimeFormat=HH:mm:ss,SSS -Duser.timezone=America/Toronto'
# jdkVersionOption: 1.11
- task: Maven@3
env:
JAVA_HOME_11_X64: /usr/local/openjdk-11

View File

@ -63,8 +63,8 @@ ca.uhn.fhir.validation.ValidationResult.noIssuesDetected=No issues detected duri
# JPA Messages
ca.uhn.fhir.jpa.bulk.BulkDataExportSvcImpl.onlyBinarySelected=Binary resources may not be exported with bulk export
ca.uhn.fhir.jpa.bulk.BulkDataExportSvcImpl.unknownResourceType=Unknown or unsupported resource type: {0}
ca.uhn.fhir.jpa.bulk.svc.BulkDataExportSvcImpl.onlyBinarySelected=Binary resources may not be exported with bulk export
ca.uhn.fhir.jpa.bulk.svc.BulkDataExportSvcImpl.unknownResourceType=Unknown or unsupported resource type: {0}
ca.uhn.fhir.jpa.config.HapiFhirHibernateJpaDialect.resourceVersionConstraintFailure=The operation has failed with a version constraint failure. This generally means that two clients/threads were trying to update the same resource at the same time, and this request was chosen as the failing request.
ca.uhn.fhir.jpa.config.HapiFhirHibernateJpaDialect.resourceIndexedCompositeStringUniqueConstraintFailure=The operation has failed with a unique index constraint failure. This probably means that the operation was trying to create/update a resource that would have resulted in a duplicate value for a unique index.
ca.uhn.fhir.jpa.config.HapiFhirHibernateJpaDialect.forcedIdConstraintFailure=The operation has failed with a client-assigned ID constraint failure. This typically means that multiple client threads are trying to create a new resource with the same client-assigned ID at the same time, and this thread was chosen to be rejected.

View File

@ -1,6 +1,6 @@
---
type: add
issue: 1826
title: "A new servce has been added to the JPA server that fetches FHIR Implementation Guide NPM
title: "A new service has been added to the JPA server that fetches FHIR Implementation Guide NPM
packages and installs the contained conformance resources into the JPA repository. Thanks to
Martin Zacho Grønhøj for the pull request!"

View File

@ -0,0 +1,4 @@
---
type: add
issue: 1926
title: "Spring Batch has been added to HAPI FHIR in the `hapi-fhir-jpaserver-batch` project. `hapi-fhir-jpaserver-base` will make use of it for batch jobs"

View File

@ -145,6 +145,11 @@
<artifactId>hapi-fhir-validation-resources-r5</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-jpaserver-batch</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-elasticsearch-6</artifactId>
@ -159,6 +164,7 @@
</dependency>
<dependency>
<groupId>net.ttddyy</groupId>
<artifactId>datasource-proxy</artifactId>

View File

@ -0,0 +1,34 @@
package ca.uhn.fhir.jpa.batch;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.bulk.job.BulkExportJobConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@Configuration
//When you define a new batch job, add it here.
@Import({
CommonBatchJobConfig.class,
BulkExportJobConfig.class,})
public class BatchJobsConfig {
//Empty config, as this is just an aggregator for all the various batch jobs defined around the system.
}

View File

@ -0,0 +1,37 @@
package ca.uhn.fhir.jpa.batch;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.batch.processors.PidToIBaseResourceProcessor;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CommonBatchJobConfig {
@Bean
@StepScope
public PidToIBaseResourceProcessor pidToResourceProcessor() {
return new PidToIBaseResourceProcessor();
}
}

View File

@ -0,0 +1,32 @@
package ca.uhn.fhir.jpa.batch.log;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Logs {
private static final Logger ourBatchTroubleshootingLog = LoggerFactory.getLogger("ca.uhn.fhir.log.batch_troubleshooting");
public static Logger getBatchTroubleshootingLog() {
return ourBatchTroubleshootingLog;
}
}

View File

@ -0,0 +1,73 @@
package ca.uhn.fhir.jpa.batch.processors;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* Reusable Item Processor which converts ResourcePersistentIds to their IBaseResources
*/
public class PidToIBaseResourceProcessor implements ItemProcessor<List<ResourcePersistentId>, List<IBaseResource>> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@Autowired
private SearchBuilderFactory mySearchBuilderFactory;
@Autowired
private DaoRegistry myDaoRegistry;
@Value("#{stepExecutionContext['resourceType']}")
private String myResourceType;
@Autowired
private FhirContext myContext;
@Override
public List<IBaseResource> process(List<ResourcePersistentId> theResourcePersistentId) throws Exception {
IFhirResourceDao dao = myDaoRegistry.getResourceDao(myResourceType);
Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, resourceTypeClass);
List<IBaseResource> outgoing = new ArrayList<>();
sb.loadResourcesByPid(theResourcePersistentId, Collections.emptyList(), outgoing, false, null);
ourLog.trace("Loaded resources: {}", outgoing.stream().map(Object::toString).collect(Collectors.joining(", ")));
return outgoing;
}
}

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.bulk;
package ca.uhn.fhir.jpa.bulk.api;
/*-
* #%L
@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.bulk;
* #L%
*/
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import org.hl7.fhir.instance.model.api.IIdType;
import javax.transaction.Transactional;
@ -36,7 +37,7 @@ public interface IBulkDataExportSvc {
JobInfo submitJob(String theOutputFormat, Set<String> theResourceTypes, Date theSince, Set<String> theFilters);
JobInfo getJobStatusOrThrowResourceNotFound(String theJobId);
JobInfo getJobInfoOrThrowResourceNotFound(String theJobId);
void cancelAndPurgeAllJobs();
@ -79,6 +80,7 @@ public interface IBulkDataExportSvc {
myFiles = new ArrayList<>();
}
return myFiles;
}
public BulkJobStatusEnum getStatus() {
@ -131,4 +133,5 @@ public interface IBulkDataExportSvc {
}
}

View File

@ -0,0 +1,53 @@
package ca.uhn.fhir.jpa.bulk.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
/**
* Will run before and after a job to set the status to whatever is appropriate.
*/
public class BulkExportJobCloser implements Tasklet {
@Value("#{jobExecutionContext['jobUUID']}")
private String myJobUUID;
@Autowired
private BulkExportDaoSvc myBulkExportDaoSvc;
@Override
public RepeatStatus execute(StepContribution theStepContribution, ChunkContext theChunkContext) throws Exception {
if (theChunkContext.getStepContext().getStepExecution().getJobExecution().getStatus() == BatchStatus.STARTED) {
myBulkExportDaoSvc.setJobToStatus(myJobUUID, BulkJobStatusEnum.COMPLETE);
} else {
myBulkExportDaoSvc.setJobToStatus(myJobUUID, BulkJobStatusEnum.ERROR);
}
return RepeatStatus.FINISHED;
}
}

View File

@ -0,0 +1,145 @@
package ca.uhn.fhir.jpa.bulk.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.batch.processors.PidToIBaseResourceProcessor;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersValidator;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import java.util.List;
/**
* Spring batch Job configuration file. Contains all necessary plumbing to run a
* Bulk Export job.
*/
@Configuration
public class BulkExportJobConfig {
@Autowired
private StepBuilderFactory myStepBuilderFactory;
@Autowired
private JobBuilderFactory myJobBuilderFactory;
@Autowired
private PidToIBaseResourceProcessor myPidToIBaseResourceProcessor;
@Bean
@Lazy
public Job bulkExportJob() {
return myJobBuilderFactory.get("bulkExportJob")
.validator(bulkJobParameterValidator())
.start(createBulkExportEntityStep())
.next(partitionStep())
.next(closeJobStep())
.build();
}
@Bean
public Step createBulkExportEntityStep() {
return myStepBuilderFactory.get("createBulkExportEntityStep")
.tasklet(createBulkExportEntityTasklet())
.listener(bulkExportJobStartedListener())
.build();
}
@Bean
public CreateBulkExportEntityTasklet createBulkExportEntityTasklet() {
return new CreateBulkExportEntityTasklet();
}
@Bean
public JobParametersValidator bulkJobParameterValidator() {
return new BulkExportJobParameterValidator();
}
@Bean
public Step bulkExportGenerateResourceFilesStep() {
return myStepBuilderFactory.get("bulkExportGenerateResourceFilesStep")
.<List<ResourcePersistentId>, List<IBaseResource>> chunk(100) //1000 resources per generated file, as the reader returns 10 resources at a time.
.reader(bulkItemReader())
.processor(myPidToIBaseResourceProcessor)
.writer(resourceToFileWriter())
.build();
}
@Bean
@JobScope
public BulkExportJobCloser bulkExportJobCloser() {
return new BulkExportJobCloser();
}
@Bean
public Step closeJobStep() {
return myStepBuilderFactory.get("closeJobStep")
.tasklet(bulkExportJobCloser())
.build();
}
@Bean
@JobScope
public BulkExportJobStartedListener bulkExportJobStartedListener() {
return new BulkExportJobStartedListener();
}
@Bean
public Step partitionStep() {
return myStepBuilderFactory.get("partitionStep")
.partitioner("bulkExportGenerateResourceFilesStep", bulkExportResourceTypePartitioner())
.step(bulkExportGenerateResourceFilesStep())
.build();
}
@Bean
@StepScope
public BulkItemReader bulkItemReader(){
return new BulkItemReader();
}
@Bean
@JobScope
public ResourceTypePartitioner bulkExportResourceTypePartitioner() {
return new ResourceTypePartitioner();
}
@Bean
@StepScope
public ItemWriter<List<IBaseResource>> resourceToFileWriter() {
return new ResourceToFileWriter();
}
}

View File

@ -0,0 +1,87 @@
package ca.uhn.fhir.jpa.bulk.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.rest.api.Constants;
import org.apache.commons.lang3.StringUtils;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.JobParametersValidator;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Arrays;
import java.util.Optional;
/**
* This class will prevent a job from running if the UUID does not exist or is invalid.
*/
public class BulkExportJobParameterValidator implements JobParametersValidator {
@Autowired
private IBulkExportJobDao myBulkExportJobDao;
@Override
public void validate(JobParameters theJobParameters) throws JobParametersInvalidException {
if (theJobParameters == null) {
throw new JobParametersInvalidException("This job needs Parameters: [readChunkSize], [jobUUID], [filters], [outputFormat], [resourceTypes]");
}
StringBuilder errorBuilder = new StringBuilder();
Long readChunkSize = theJobParameters.getLong("readChunkSize");
if (readChunkSize == null || readChunkSize < 1) {
errorBuilder.append("There must be a valid number for readChunkSize, which is at least 1. ");
}
String jobUUID = theJobParameters.getString("jobUUID");
Optional<BulkExportJobEntity> oJob = myBulkExportJobDao.findByJobId(jobUUID);
if (!StringUtils.isBlank(jobUUID) && !oJob.isPresent()) {
errorBuilder.append("There is no persisted job that exists with UUID: " + jobUUID + ". ");
}
boolean hasExistingJob = oJob.isPresent();
//Check for to-be-created parameters.
if (!hasExistingJob) {
String resourceTypes = theJobParameters.getString("resourceTypes");
if (StringUtils.isBlank(resourceTypes)) {
errorBuilder.append("You must include [resourceTypes] as a Job Parameter");
} else {
String[] resourceArray = resourceTypes.split(",");
Arrays.stream(resourceArray).filter(resourceType -> resourceType.equalsIgnoreCase("Binary"))
.findFirst()
.ifPresent(resourceType -> {
errorBuilder.append("Bulk export of Binary resources is forbidden");
});
}
String outputFormat = theJobParameters.getString("outputFormat");
if (!StringUtils.isBlank(outputFormat) && !Constants.CT_FHIR_NDJSON.equals(outputFormat)) {
errorBuilder.append("The only allowed format for Bulk Export is currently " + Constants.CT_FHIR_NDJSON);
}
}
String errorMessage = errorBuilder.toString();
if (!StringUtils.isEmpty(errorMessage)) {
throw new JobParametersInvalidException(errorMessage);
}
}
}

View File

@ -0,0 +1,48 @@
package ca.uhn.fhir.jpa.bulk.job;
import ca.uhn.fhir.rest.api.Constants;
import org.springframework.batch.core.JobParametersBuilder;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* This builder is a helper so you don't have to go lookup what job parameters are required for a bulk export job.
*
*/
public class BulkExportJobParametersBuilder extends JobParametersBuilder {
public BulkExportJobParametersBuilder setResourceTypes(List<String> resourceTypes) {
String resourceTypesString = resourceTypes.stream().collect(Collectors.joining(","));
this.addString("resourceTypes", resourceTypesString);
return this;
}
public BulkExportJobParametersBuilder setSince(Date theSince) {
this.addDate("since", theSince);
return this;
}
public BulkExportJobParametersBuilder setOutputFormat(String theOutputFormat) {
//TODO eventually we will support more types.
theOutputFormat = Constants.CT_FHIR_NDJSON;
this.addString("outputFormat", theOutputFormat);
return this;
}
public BulkExportJobParametersBuilder setFilters(Set<String> theFilters) {
this.addString("filters", theFilters.stream().collect(Collectors.joining(",")));
return this;
}
public BulkExportJobParametersBuilder setJobUUID(String theJobUUID) {
this.addString("jobUUID", theJobUUID);
return this;
}
public BulkExportJobParametersBuilder setReadChunkSize(Long theReadChunkSize) {
this.addLong("readChunkSize", theReadChunkSize);
return this;
}
}

View File

@ -0,0 +1,54 @@
package ca.uhn.fhir.jpa.bulk.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
/**
* Will run before and after a job to set the status to whatever is appropriate.
*/
public class BulkExportJobStartedListener implements StepExecutionListener {
@Value("#{jobExecutionContext['jobUUID']}")
private String myJobUUID;
@Autowired
private BulkExportDaoSvc myBulkExportDaoSvc;
@Override
public void beforeStep(StepExecution theStepExecution) {
}
@Override
public ExitStatus afterStep(StepExecution theStepExecution) {
if (theStepExecution.getStatus() == BatchStatus.STARTING) {
myBulkExportDaoSvc.setJobToStatus(myJobUUID, BulkJobStatusEnum.BUILDING);
}
return ExitStatus.EXECUTING;
}
}

View File

@ -0,0 +1,124 @@
package ca.uhn.fhir.jpa.bulk.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
public class BulkItemReader implements ItemReader<List<ResourcePersistentId>> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@Value("#{jobParameters['readChunkSize']}")
private Long READ_CHUNK_SIZE;
@Autowired
private IBulkExportJobDao myBulkExportJobDao;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private FhirContext myContext;
@Autowired
private SearchBuilderFactory mySearchBuilderFactory;
private BulkExportJobEntity myJobEntity;
@Value("#{jobExecutionContext['jobUUID']}")
private String myJobUUID;
@Value("#{stepExecutionContext['resourceType']}")
private String myResourceType;
Iterator<ResourcePersistentId> myPidIterator;
private void loadResourcePids() {
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
if (!jobOpt.isPresent()) {
ourLog.warn("Job appears to be deleted");
return;
}
myJobEntity = jobOpt.get();
ourLog.info("Bulk export starting generation for batch export job: {}", myJobEntity);
IFhirResourceDao dao = myDaoRegistry.getResourceDao(myResourceType);
ourLog.info("Bulk export assembling export of type {} for job {}", myResourceType, myJobUUID);
Class<? extends IBaseResource> nextTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, nextTypeClass);
SearchParameterMap map = new SearchParameterMap();
map.setLoadSynchronous(true);
if (myJobEntity.getSince() != null) {
map.setLastUpdated(new DateRangeParam(myJobEntity.getSince(), null));
}
IResultIterator myResultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
List<ResourcePersistentId> myReadPids = new ArrayList<>();
while (myResultIterator.hasNext()) {
myReadPids.add(myResultIterator.next());
}
myPidIterator = myReadPids.iterator();
}
@Override
public List<ResourcePersistentId> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (myPidIterator == null) {
loadResourcePids();
}
int count = 0;
List<ResourcePersistentId> outgoing = new ArrayList<>();
while (myPidIterator.hasNext() && count < READ_CHUNK_SIZE) {
outgoing.add(myPidIterator.next());
count += 1;
}
return outgoing.size() == 0 ? null : outgoing;
}
}

View File

@ -0,0 +1,63 @@
package ca.uhn.fhir.jpa.bulk.job;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.rest.api.Constants;
import org.apache.commons.lang3.StringUtils;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class CreateBulkExportEntityTasklet implements Tasklet {
@Autowired
private IBulkDataExportSvc myBulkDataExportSvc;
@Override
public RepeatStatus execute(StepContribution theStepContribution, ChunkContext theChunkContext) throws Exception {
Map<String, Object> jobParameters = theChunkContext.getStepContext().getJobParameters();
//We can leave early if they provided us with an existing job.
if (jobParameters.containsKey("jobUUID")) {
addUUIDToJobContext(theChunkContext, (String)jobParameters.get("jobUUID"));
return RepeatStatus.FINISHED;
} else {
String resourceTypes = (String)jobParameters.get("resourceTypes");
Date since = (Date)jobParameters.get("since");
String filters = (String)jobParameters.get("filters");
Set<String> filterSet;
if (StringUtils.isBlank(filters)) {
filterSet = null;
} else {
filterSet = Arrays.stream(filters.split(",")).collect(Collectors.toSet());
}
Set<String> resourceTypeSet = Arrays.stream(resourceTypes.split(",")).collect(Collectors.toSet());
String outputFormat = (String)jobParameters.get("outputFormat");
if (StringUtils.isBlank(outputFormat)) {
outputFormat = Constants.CT_FHIR_NDJSON;
}
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.submitJob(outputFormat, resourceTypeSet, since, filterSet);
addUUIDToJobContext(theChunkContext, jobInfo.getJobId());
return RepeatStatus.FINISHED;
}
}
private void addUUIDToJobContext(ChunkContext theChunkContext, String theJobUUID) {
theChunkContext
.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext()
.putString("jobUUID", theJobUUID);
}
}

View File

@ -0,0 +1,129 @@
package ca.uhn.fhir.jpa.bulk.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.util.BinaryUtil;
import org.hl7.fhir.instance.model.api.IBaseBinary;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import javax.annotation.PostConstruct;
import java.io.ByteArrayOutputStream;
import java.io.OutputStreamWriter;
import java.util.List;
import java.util.Optional;
public class ResourceToFileWriter implements ItemWriter<List<IBaseResource>> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@Autowired
private FhirContext myFhirContext;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private BulkExportDaoSvc myBulkExportDaoSvc;
private ByteArrayOutputStream myOutputStream;
private OutputStreamWriter myWriter;
private IParser myParser;
@Value("#{stepExecutionContext['bulkExportCollectionEntityId']}")
private Long myBulkExportCollectionEntityId;
@Value("#{stepExecutionContext['resourceType']}")
private String myReosurceType;
private IFhirResourceDao<IBaseBinary> myBinaryDao;
public ResourceToFileWriter() {
myOutputStream = new ByteArrayOutputStream();
myWriter = new OutputStreamWriter(myOutputStream, Constants.CHARSET_UTF8);
}
@PostConstruct
public void start() {
myParser = myFhirContext.newJsonParser().setPrettyPrint(false);
myBinaryDao = getBinaryDao();
}
private Optional<IIdType> flushToFiles() {
if (myOutputStream.size() > 0) {
IIdType createdId = createBinaryFromOutputStream();
BulkExportCollectionFileEntity file = new BulkExportCollectionFileEntity();
file.setResource(createdId.getIdPart());
myBulkExportDaoSvc.addFileToCollectionWithId(myBulkExportCollectionEntityId, file);
myOutputStream.reset();
return Optional.of(createdId);
}
return Optional.empty();
}
private IIdType createBinaryFromOutputStream() {
IBaseBinary binary = BinaryUtil.newBinary(myFhirContext);
binary.setContentType(Constants.CT_FHIR_NDJSON);
binary.setContent(myOutputStream.toByteArray());
return myBinaryDao.create(binary).getResource().getIdElement();
}
@SuppressWarnings("unchecked")
private IFhirResourceDao<IBaseBinary> getBinaryDao() {
return myDaoRegistry.getResourceDao("Binary");
}
@Override
public void write(List<? extends List<IBaseResource>> theList) throws Exception {
int count = 0;
for (List<IBaseResource> resourceList : theList) {
for (IBaseResource nextFileResource : resourceList) {
myParser.encodeResourceToWriter(nextFileResource, myWriter);
myWriter.append("\n");
count++;
}
}
Optional<IIdType> createdId = flushToFiles();
if (createdId.isPresent()) {
ourLog.info("Created {} resources for bulk export file containing {} resources of type {} ", count, createdId.get().toUnqualifiedVersionless().getValue(), myReosurceType);
}
}
}

View File

@ -0,0 +1,75 @@
package ca.uhn.fhir.jpa.bulk.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc;
import org.slf4j.Logger;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import java.util.HashMap;
import java.util.Map;
import static org.slf4j.LoggerFactory.getLogger;
public class ResourceTypePartitioner implements Partitioner {
private static final Logger ourLog = getLogger(ResourceTypePartitioner.class);
@Value("#{jobExecutionContext['jobUUID']}")
private String myJobUUID;
@Autowired
private BulkExportDaoSvc myBulkExportDaoSvc;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> partitionContextMap = new HashMap<>();
Map<Long, String> idToResourceType = myBulkExportDaoSvc.getBulkJobCollectionIdToResourceTypeMap(myJobUUID);
idToResourceType.entrySet().stream()
.forEach(entry -> {
String resourceType = entry.getValue();
Long collectionEntityId = entry.getKey();
ourLog.debug("Creating a partition step for CollectionEntity: [{}] processing resource type [{}]", collectionEntityId, resourceType);
ExecutionContext context = new ExecutionContext();
//The worker step needs to know what resource type it is looking for.
context.putString("resourceType", resourceType);
// The worker step needs to know which parent job it is processing for, and which collection entity it will be
// attaching its results to.
context.putString("jobUUID", myJobUUID);
context.putLong("bulkExportCollectionEntityId", collectionEntityId);
// Name the partition based on the resource type
partitionContextMap.put(resourceType, context);
});
return partitionContextMap;
}
}

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.bulk;
package ca.uhn.fhir.jpa.bulk.provider;
/*-
* #%L
@ -21,6 +21,8 @@ package ca.uhn.fhir.jpa.bulk;
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.model.BulkExportResponseJson;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.rest.annotation.Operation;
import ca.uhn.fhir.rest.annotation.OperationParam;
@ -123,7 +125,7 @@ public class BulkDataExportProvider {
HttpServletResponse response = theRequestDetails.getServletResponse();
theRequestDetails.getServer().addHeadersToResponse(response);
IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(theJobId.getValueAsString());
IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(theJobId.getValueAsString());
switch (status.getStatus()) {
case SUBMITTED:

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.bulk;
package ca.uhn.fhir.jpa.bulk.svc;
/*-
* #%L
@ -21,44 +21,37 @@ package ca.uhn.fhir.jpa.bulk;
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.util.BinaryUtil;
import ca.uhn.fhir.util.StopWatch;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseBinary;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.InstantType;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
@ -67,18 +60,10 @@ import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.PostConstruct;
import javax.transaction.Transactional;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static ca.uhn.fhir.util.UrlUtil.escapeUrlParam;
@ -86,6 +71,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
private static final Long READ_CHUNK_SIZE = 10L;
private static final Logger ourLog = LoggerFactory.getLogger(BulkDataExportSvcImpl.class);
private int myReuseBulkExportForMillis = (int) (60 * DateUtils.MILLIS_PER_MINUTE);
@ -103,11 +89,15 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
private FhirContext myContext;
@Autowired
private PlatformTransactionManager myTxManager;
@Autowired
private SearchBuilderFactory mySearchBuilderFactory;
private TransactionTemplate myTxTemplate;
private long myFileMaxChars = 500 * FileUtils.ONE_KB;
@Autowired
private IBatchJobSubmitter myJobSubmitter;
@Autowired
@Qualifier("bulkExportJob")
private org.springframework.batch.core.Job myBulkExportJob;
private int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR);
/**
@ -134,10 +124,7 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
String jobUuid = jobToProcessOpt.get().getJobId();
try {
myTxTemplate.execute(t -> {
processJob(jobUuid);
return null;
});
processJob(jobUuid);
} catch (Exception e) {
ourLog.error("Failure while preparing bulk export extract", e);
myTxTemplate.execute(t -> {
@ -203,112 +190,18 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
}
private void processJob(String theJobUuid) {
JobParameters parameters = new JobParametersBuilder()
.addString("jobUUID", theJobUuid)
.addLong("readChunkSize", READ_CHUNK_SIZE)
.toJobParameters();
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(theJobUuid);
if (!jobOpt.isPresent()) {
ourLog.info("Job appears to be deleted");
return;
}
StopWatch jobStopwatch = new StopWatch();
AtomicInteger jobResourceCounter = new AtomicInteger();
BulkExportJobEntity job = jobOpt.get();
ourLog.info("Bulk export starting generation for batch export job: {}", job);
for (BulkExportCollectionEntity nextCollection : job.getCollections()) {
String nextType = nextCollection.getResourceType();
IFhirResourceDao dao = myDaoRegistry.getResourceDao(nextType);
ourLog.info("Bulk export assembling export of type {} for job {}", nextType, theJobUuid);
Class<? extends IBaseResource> nextTypeClass = myContext.getResourceDefinition(nextType).getImplementingClass();
ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, nextType, nextTypeClass);
SearchParameterMap map = new SearchParameterMap();
map.setLoadSynchronous(true);
if (job.getSince() != null) {
map.setLastUpdated(new DateRangeParam(job.getSince(), null));
}
IResultIterator resultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, theJobUuid), null, RequestPartitionId.allPartitions());
storeResultsToFiles(nextCollection, sb, resultIterator, jobResourceCounter, jobStopwatch);
}
job.setStatus(BulkJobStatusEnum.COMPLETE);
updateExpiry(job);
myBulkExportJobDao.save(job);
ourLog.info("Bulk export completed job in {}: {}", jobStopwatch, job);
}
private void storeResultsToFiles(BulkExportCollectionEntity theExportCollection, ISearchBuilder theSearchBuilder, IResultIterator theResultIterator, AtomicInteger theJobResourceCounter, StopWatch theJobStopwatch) {
try (IResultIterator query = theResultIterator) {
if (!query.hasNext()) {
return;
}
AtomicInteger fileCounter = new AtomicInteger(0);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
OutputStreamWriter writer = new OutputStreamWriter(outputStream, Constants.CHARSET_UTF8);
IParser parser = myContext.newJsonParser().setPrettyPrint(false);
List<ResourcePersistentId> pidsSpool = new ArrayList<>();
List<IBaseResource> resourcesSpool = new ArrayList<>();
while (query.hasNext()) {
pidsSpool.add(query.next());
fileCounter.incrementAndGet();
theJobResourceCounter.incrementAndGet();
if (pidsSpool.size() >= 10 || !query.hasNext()) {
theSearchBuilder.loadResourcesByPid(pidsSpool, Collections.emptyList(), resourcesSpool, false, null);
for (IBaseResource nextFileResource : resourcesSpool) {
parser.encodeResourceToWriter(nextFileResource, writer);
writer.append("\n");
}
pidsSpool.clear();
resourcesSpool.clear();
if (outputStream.size() >= myFileMaxChars || !query.hasNext()) {
Optional<IIdType> createdId = flushToFiles(theExportCollection, fileCounter, outputStream);
createdId.ifPresent(theIIdType -> ourLog.info("Created resource {} for bulk export file containing {} resources of type {} - Total {} resources ({}/sec)", theIIdType.toUnqualifiedVersionless().getValue(), fileCounter.get(), theExportCollection.getResourceType(), theJobResourceCounter.get(), theJobStopwatch.formatThroughput(theJobResourceCounter.get(), TimeUnit.SECONDS)));
fileCounter.set(0);
}
}
}
} catch (IOException e) {
throw new InternalErrorException(e);
try {
myJobSubmitter.runJob(myBulkExportJob, parameters);
} catch (JobParametersInvalidException theE) {
ourLog.error("Unable to start job with UUID: {}, the parameters are invalid. {}", theJobUuid, theE.getMessage());
}
}
private Optional<IIdType> flushToFiles(BulkExportCollectionEntity theCollection, AtomicInteger theCounter, ByteArrayOutputStream theOutputStream) {
if (theOutputStream.size() > 0) {
IBaseBinary binary = BinaryUtil.newBinary(myContext);
binary.setContentType(Constants.CT_FHIR_NDJSON);
binary.setContent(theOutputStream.toByteArray());
IIdType createdId = getBinaryDao().create(binary).getResource().getIdElement();
BulkExportCollectionFileEntity file = new BulkExportCollectionFileEntity();
theCollection.getFiles().add(file);
file.setCollection(theCollection);
file.setResource(createdId.getIdPart());
myBulkExportCollectionFileDao.saveAndFlush(file);
theOutputStream.reset();
return Optional.of(createdId);
}
return Optional.empty();
}
@SuppressWarnings("unchecked")
private IFhirResourceDao<IBaseBinary> getBinaryDao() {
@ -429,7 +322,7 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
@Transactional
@Override
public JobInfo getJobStatusOrThrowResourceNotFound(String theJobId) {
public JobInfo getJobInfoOrThrowResourceNotFound(String theJobId) {
BulkExportJobEntity job = myBulkExportJobDao
.findByJobId(theJobId)
.orElseThrow(() -> new ResourceNotFoundException(theJobId));

View File

@ -0,0 +1,41 @@
package ca.uhn.fhir.jpa.bulk.svc;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.transaction.Transactional;
@Service
public class BulkExportCollectionFileDaoSvc {
@Autowired
private IBulkExportCollectionFileDao myBulkExportCollectionFileDao;
@Transactional
public void save(BulkExportCollectionFileEntity theBulkExportCollectionEntity) {
myBulkExportCollectionFileDao.saveAndFlush(theBulkExportCollectionEntity);
}
}

View File

@ -0,0 +1,215 @@
package ca.uhn.fhir.jpa.bulk.svc;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.r4.model.InstantType;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.stereotype.Service;
import javax.transaction.Transactional;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import static ca.uhn.fhir.util.UrlUtil.escapeUrlParam;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.slf4j.LoggerFactory.getLogger;
@Service
public class BulkExportDaoSvc {
private static final Logger ourLog = getLogger(BulkExportDaoSvc.class);
private int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR);
@Autowired
private FhirContext myFhirContext;
@Autowired
IBulkExportJobDao myBulkExportJobDao;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
IBulkExportCollectionDao myBulkExportCollectionDao;
@Autowired
IBulkExportCollectionFileDao myBulkExportCollectionFileDao;
@Transactional
public void addFileToCollectionWithId(Long theCollectionEntityId, BulkExportCollectionFileEntity theFile) {
Optional<BulkExportCollectionEntity> byId = myBulkExportCollectionDao.findById(theCollectionEntityId);
if (byId.isPresent()) {
BulkExportCollectionEntity exportCollectionEntity = byId.get();
theFile.setCollection(exportCollectionEntity);
exportCollectionEntity.getFiles().add(theFile);
myBulkExportCollectionFileDao.saveAndFlush(theFile);
myBulkExportCollectionDao.saveAndFlush(exportCollectionEntity);
}
}
@Transactional
public Map<Long, String> getBulkJobCollectionIdToResourceTypeMap(String theJobUUID) {
BulkExportJobEntity bulkExportJobEntity = loadJob(theJobUUID);
Collection<BulkExportCollectionEntity> collections = bulkExportJobEntity.getCollections();
return collections.stream()
.collect(Collectors.toMap(
BulkExportCollectionEntity::getId,
BulkExportCollectionEntity::getResourceType
));
}
private BulkExportJobEntity loadJob(String theJobUUID) {
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(theJobUUID);
if (!jobOpt.isPresent()) {
ourLog.warn("Job with UUID {} appears to be deleted", theJobUUID);
return null;
}
return jobOpt.get();
}
@Transactional
public void setJobToStatus(String theJobUUID, BulkJobStatusEnum theStatus) {
Optional<BulkExportJobEntity> oJob = myBulkExportJobDao.findByJobId(theJobUUID);
if (!oJob.isPresent()) {
ourLog.error("Job with UUID {} doesn't exist!", theJobUUID);
return;
}
ourLog.info("Setting job with UUID {} to {}", theJobUUID, theStatus);
BulkExportJobEntity bulkExportJobEntity = oJob.get();
bulkExportJobEntity.setStatus(theStatus);
myBulkExportJobDao.save(bulkExportJobEntity);
}
public IBulkDataExportSvc.JobInfo submitJob(String theOutputFormat, Set<String> theResourceTypes, Date theSince, Set<String> theFilters, int theReuseMillis) {
String outputFormat = Constants.CT_FHIR_NDJSON;
if (isNotBlank(theOutputFormat)) {
outputFormat = theOutputFormat;
}
if (!Constants.CTS_NDJSON.contains(outputFormat)) {
throw new InvalidRequestException("Invalid output format: " + theOutputFormat);
}
StringBuilder requestBuilder = new StringBuilder();
requestBuilder.append("/").append(JpaConstants.OPERATION_EXPORT);
requestBuilder.append("?").append(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT).append("=").append(escapeUrlParam(outputFormat));
Set<String> resourceTypes = theResourceTypes;
if (resourceTypes != null) {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_TYPE).append("=").append(String.join(",", resourceTypes));
}
Date since = theSince;
if (since != null) {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_SINCE).append("=").append(new InstantType(since).setTimeZoneZulu(true).getValueAsString());
}
if (theFilters != null && theFilters.size() > 0) {
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_TYPE_FILTER).append("=").append(String.join(",", theFilters));
}
String request = requestBuilder.toString();
Date cutoff = DateUtils.addMilliseconds(new Date(), -theReuseMillis);
Pageable page = PageRequest.of(0, 10);
Slice<BulkExportJobEntity> existing = myBulkExportJobDao.findExistingJob(page, request, cutoff, BulkJobStatusEnum.ERROR);
if (!existing.isEmpty()) {
return toSubmittedJobInfo(existing.iterator().next());
}
if (resourceTypes != null && resourceTypes.contains("Binary")) {
String msg = myFhirContext.getLocalizer().getMessage(BulkDataExportSvcImpl.class, "onlyBinarySelected");
throw new InvalidRequestException(msg);
}
if (resourceTypes == null || resourceTypes.isEmpty()) {
// This is probably not a useful default, but having the default be "download the whole
// server" seems like a risky default too. We'll deal with that by having the default involve
// only returning a small time span
resourceTypes = myFhirContext.getResourceTypes();
if (since == null) {
since = DateUtils.addDays(new Date(), -1);
}
}
resourceTypes =
resourceTypes
.stream()
.filter(t -> !"Binary".equals(t))
.collect(Collectors.toSet());
BulkExportJobEntity job = new BulkExportJobEntity();
job.setJobId(UUID.randomUUID().toString());
job.setStatus(BulkJobStatusEnum.SUBMITTED);
job.setSince(since);
job.setCreated(new Date());
job.setRequest(request);
updateExpiry(job);
myBulkExportJobDao.save(job);
for (String nextType : resourceTypes) {
if (!myDaoRegistry.isResourceTypeSupported(nextType)) {
String msg = myFhirContext.getLocalizer().getMessage(BulkDataExportSvcImpl.class, "unknownResourceType", nextType);
throw new InvalidRequestException(msg);
}
BulkExportCollectionEntity collection = new BulkExportCollectionEntity();
collection.setJob(job);
collection.setResourceType(nextType);
job.getCollections().add(collection);
myBulkExportCollectionDao.save(collection);
}
ourLog.info("Bulk export job submitted: {}", job.toString());
return toSubmittedJobInfo(job);
}
private void updateExpiry(BulkExportJobEntity theJob) {
theJob.setExpiry(DateUtils.addMilliseconds(new Date(), myRetentionPeriod));
}
private IBulkDataExportSvc.JobInfo toSubmittedJobInfo(BulkExportJobEntity theJob) {
return new IBulkDataExportSvc.JobInfo().setJobId(theJob.getJobId());
}
}

View File

@ -6,11 +6,15 @@ import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.interceptor.executor.InterceptorService;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IDao;
import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.batch.config.NonPersistedBatchConfigurer;
import ca.uhn.fhir.jpa.batch.svc.BatchJobSubmitterImpl;
import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider;
import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor;
import ca.uhn.fhir.jpa.bulk.BulkDataExportProvider;
import ca.uhn.fhir.jpa.bulk.BulkDataExportSvcImpl;
import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.provider.BulkDataExportProvider;
import ca.uhn.fhir.jpa.bulk.svc.BulkDataExportSvcImpl;
import ca.uhn.fhir.jpa.dao.HistoryBuilder;
import ca.uhn.fhir.jpa.dao.HistoryBuilderFactory;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
@ -24,8 +28,8 @@ import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.packages.IHapiPackageCacheManager;
import ca.uhn.fhir.jpa.packages.IPackageInstallerSvc;
import ca.uhn.fhir.jpa.packages.JpaPackageCache;
import ca.uhn.fhir.jpa.packages.PackageInstallerSvcImpl;
import ca.uhn.fhir.jpa.packages.NpmJpaValidationSupport;
import ca.uhn.fhir.jpa.packages.PackageInstallerSvcImpl;
import ca.uhn.fhir.jpa.partition.IPartitionLookupSvc;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.partition.PartitionLookupSvcImpl;
@ -60,6 +64,8 @@ import org.hibernate.jpa.HibernatePersistenceProvider;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.utilities.cache.FilesystemPackageCacheManager;
import org.hl7.fhir.utilities.graphql.IGraphQLStorageServices;
import org.springframework.batch.core.configuration.annotation.BatchConfigurer;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
@ -71,12 +77,14 @@ import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Scope;
import org.springframework.core.env.Environment;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.dao.annotation.PersistenceExceptionTranslationPostProcessor;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
import org.springframework.scheduling.concurrent.ScheduledExecutorFactoryBean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import javax.annotation.Nullable;
@ -112,11 +120,13 @@ import java.util.Date;
@ComponentScan.Filter(type = FilterType.REGEX, pattern = ".*Test.*"),
@ComponentScan.Filter(type = FilterType.REGEX, pattern = "ca.uhn.fhir.jpa.subscription.*"),
@ComponentScan.Filter(type = FilterType.REGEX, pattern = "ca.uhn.fhir.jpa.searchparam.*"),
@ComponentScan.Filter(type = FilterType.REGEX, pattern = "ca.uhn.fhir.jpa.empi.*")
@ComponentScan.Filter(type = FilterType.REGEX, pattern = "ca.uhn.fhir.jpa.empi.*"),
@ComponentScan.Filter(type = FilterType.REGEX, pattern = "ca.uhn.fhir.jpa.batch.*")
})
@Import({
SearchParamConfig.class
SearchParamConfig.class, BatchJobsConfig.class
})
@EnableBatchProcessing
public abstract class BaseConfig {
public static final String JPA_VALIDATION_SUPPORT_CHAIN = "myJpaValidationSupportChain";
@ -129,11 +139,18 @@ public abstract class BaseConfig {
public static final String SEARCH_BUILDER = "SearchBuilder";
public static final String HISTORY_BUILDER = "HistoryBuilder";
private static final String HAPI_DEFAULT_SCHEDULER_GROUP = "HAPI";
@Autowired
protected Environment myEnv;
@Autowired
private DaoRegistry myDaoRegistry;
@Bean
public BatchConfigurer batchConfigurer() {
return new NonPersistedBatchConfigurer();
}
@Bean("myDaoRegistry")
public DaoRegistry daoRegistry() {
return new DaoRegistry();
@ -144,6 +161,11 @@ public abstract class BaseConfig {
return new DatabaseBackedPagingProvider();
}
@Bean
public IBatchJobSubmitter batchJobSubmitter() {
return new BatchJobSubmitterImpl();
}
/**
* This method should be overridden to provide an actual completed
* bean, but it provides a partially completed entity manager
@ -245,6 +267,18 @@ public abstract class BaseConfig {
return retVal;
}
@Bean
public TaskExecutor jobLaunchingTaskExecutor() {
ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
asyncTaskExecutor.setCorePoolSize(5);
asyncTaskExecutor.setMaxPoolSize(10);
asyncTaskExecutor.setQueueCapacity(500);
asyncTaskExecutor.setThreadNamePrefix("JobLauncher-");
asyncTaskExecutor.initialize();
return asyncTaskExecutor;
}
@Bean
public IResourceReindexingSvc resourceReindexingSvc() {
return new ResourceReindexingSvcImpl();

View File

@ -52,6 +52,7 @@ public class EmpiLinkDaoSvc {
@Autowired
private IdHelperService myIdHelperService;
@Transactional
public EmpiLink createOrUpdateLinkEntity(IBaseResource thePerson, IBaseResource theTarget, EmpiMatchResultEnum theMatchResult, EmpiLinkSourceEnum theLinkSource, @Nullable EmpiTransactionContext theEmpiTransactionContext) {
Long personPid = myIdHelperService.getPidOrNull(thePerson);
Long resourcePid = myIdHelperService.getPidOrNull(theTarget);

View File

@ -35,9 +35,9 @@ import ca.uhn.fhir.jpa.dao.data.IResourceTagDao;
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
import ca.uhn.fhir.jpa.dao.predicate.PredicateBuilder;
import ca.uhn.fhir.jpa.dao.predicate.PredicateBuilderFactory;
import ca.uhn.fhir.jpa.dao.predicate.querystack.QueryStack;
import ca.uhn.fhir.jpa.dao.predicate.SearchBuilderJoinEnum;
import ca.uhn.fhir.jpa.dao.predicate.SearchBuilderJoinKey;
import ca.uhn.fhir.jpa.dao.predicate.querystack.QueryStack;
import ca.uhn.fhir.jpa.entity.ResourceSearchView;
import ca.uhn.fhir.jpa.interceptor.JpaPreResourceAccessDetails;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;

View File

@ -35,5 +35,4 @@ public interface IBulkExportCollectionDao extends JpaRepository<BulkExportCollec
@Modifying
@Query("DELETE FROM BulkExportCollectionEntity t WHERE t.myId = :pid")
void deleteByPid(@Param("pid") Long theId);
}

View File

@ -1,6 +1,6 @@
package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.jpa.bulk.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;

View File

@ -22,7 +22,17 @@ package ca.uhn.fhir.jpa.entity;
import ca.uhn.fhir.jpa.model.entity.ForcedId;
import javax.persistence.*;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.FetchType;
import javax.persistence.ForeignKey;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.JoinColumn;
import javax.persistence.ManyToOne;
import javax.persistence.SequenceGenerator;
import javax.persistence.Table;
import java.io.Serializable;
@Entity
@ -34,9 +44,11 @@ public class BulkExportCollectionFileEntity implements Serializable {
@SequenceGenerator(name = "SEQ_BLKEXCOLFILE_PID", sequenceName = "SEQ_BLKEXCOLFILE_PID")
@Column(name = "PID")
private Long myId;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "COLLECTION_PID", referencedColumnName = "PID", nullable = false, foreignKey = @ForeignKey(name="FK_BLKEXCOLFILE_COLLECT"))
private BulkExportCollectionEntity myCollection;
@Column(name = "RES_ID", length = ForcedId.MAX_FORCED_ID_LENGTH, nullable = false)
private String myResourceId;

View File

@ -20,12 +20,27 @@ package ca.uhn.fhir.jpa.entity;
* #L%
*/
import ca.uhn.fhir.jpa.bulk.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.hl7.fhir.r5.model.InstantType;
import javax.persistence.*;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.FetchType;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Index;
import javax.persistence.OneToMany;
import javax.persistence.SequenceGenerator;
import javax.persistence.Table;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;
import javax.persistence.UniqueConstraint;
import javax.persistence.Version;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;

View File

@ -88,7 +88,6 @@ public class PartitionLookupSvcImpl implements IPartitionLookupSvc {
myPartitionDao.save(partitionEntity);
}
});
}
@Override

View File

@ -5,12 +5,13 @@ import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.model.api.annotation.Description;
import ca.uhn.fhir.model.valueset.BundleTypeEnum;
import ca.uhn.fhir.rest.annotation.*;
import ca.uhn.fhir.rest.annotation.Operation;
import ca.uhn.fhir.rest.annotation.OperationParam;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.SortOrderEnum;
import ca.uhn.fhir.rest.api.SortSpec;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.*;
import ca.uhn.fhir.rest.param.DateAndListParam;
import ca.uhn.fhir.rest.param.ReferenceAndListParam;
import ca.uhn.fhir.rest.param.TokenAndListParam;
import org.hl7.fhir.dstu3.model.Observation;
import org.hl7.fhir.dstu3.model.UnsignedIntType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;

View File

@ -5,14 +5,16 @@ import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.model.api.annotation.Description;
import ca.uhn.fhir.model.valueset.BundleTypeEnum;
import ca.uhn.fhir.rest.annotation.*;
import ca.uhn.fhir.rest.annotation.Operation;
import ca.uhn.fhir.rest.annotation.OperationParam;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.SortOrderEnum;
import ca.uhn.fhir.rest.api.SortSpec;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.*;
import ca.uhn.fhir.rest.param.DateAndListParam;
import ca.uhn.fhir.rest.param.ReferenceAndListParam;
import ca.uhn.fhir.rest.param.TokenAndListParam;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.*;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.UnsignedIntType;
/*
* #%L

View File

@ -5,15 +5,16 @@ import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.model.api.annotation.Description;
import ca.uhn.fhir.model.valueset.BundleTypeEnum;
import ca.uhn.fhir.rest.annotation.*;
import ca.uhn.fhir.rest.annotation.Operation;
import ca.uhn.fhir.rest.annotation.OperationParam;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.SortOrderEnum;
import ca.uhn.fhir.rest.api.SortSpec;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.*;
import ca.uhn.fhir.rest.param.DateAndListParam;
import ca.uhn.fhir.rest.param.ReferenceAndListParam;
import ca.uhn.fhir.rest.param.TokenAndListParam;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r5.model.UnsignedIntType;
import org.hl7.fhir.r5.model.Observation;
import org.hl7.fhir.r5.model.UnsignedIntType;
/*
* #%L

View File

@ -713,6 +713,7 @@ public class TermCodeSystemStorageSvcImpl implements ITermCodeSystemStorageSvc {
txTemplate.execute(t -> {
link.forEach(id -> theDao.deleteByPid(id));
theDao.flush();
return null;
});
@ -720,7 +721,7 @@ public class TermCodeSystemStorageSvcImpl implements ITermCodeSystemStorageSvc {
ourLog.info(" * {} {} deleted ({}/{}) remaining - {}/sec - ETA: {}", count, theDescriptor, count, totalCount, sw.formatThroughput(count, TimeUnit.SECONDS), sw.getEstimatedTimeRemaining(count, totalCount));
}
theDao.flush();
}

View File

@ -23,7 +23,14 @@ import java.sql.SQLException;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.matchesPattern;
import static org.junit.Assert.*;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

View File

@ -1,6 +1,10 @@
package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.model.BulkExportResponseJson;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.provider.BulkDataExportProvider;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.client.apache.ResourceEntity;
@ -179,7 +183,7 @@ public class BulkDataExportProviderTest {
.setJobId(A_JOB_ID)
.setStatus(BulkJobStatusEnum.BUILDING)
.setStatusTime(InstantType.now().getValue());
when(myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(eq(A_JOB_ID))).thenReturn(jobInfo);
when(myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(eq(A_JOB_ID))).thenReturn(jobInfo);
String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" +
JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID;
@ -204,7 +208,7 @@ public class BulkDataExportProviderTest {
.setStatus(BulkJobStatusEnum.ERROR)
.setStatusTime(InstantType.now().getValue())
.setStatusMessage("Some Error Message");
when(myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(eq(A_JOB_ID))).thenReturn(jobInfo);
when(myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(eq(A_JOB_ID))).thenReturn(jobInfo);
String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" +
JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID;
@ -233,7 +237,7 @@ public class BulkDataExportProviderTest {
jobInfo.addFile().setResourceType("Patient").setResourceId(new IdType("Binary/111"));
jobInfo.addFile().setResourceType("Patient").setResourceId(new IdType("Binary/222"));
jobInfo.addFile().setResourceType("Patient").setResourceId(new IdType("Binary/333"));
when(myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(eq(A_JOB_ID))).thenReturn(jobInfo);
when(myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(eq(A_JOB_ID))).thenReturn(jobInfo);
String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" +
JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID;
@ -263,7 +267,7 @@ public class BulkDataExportProviderTest {
@Test
public void testPollForStatus_Gone() throws IOException {
when(myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(eq(A_JOB_ID))).thenThrow(new ResourceNotFoundException("Unknown job: AAA"));
when(myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(eq(A_JOB_ID))).thenThrow(new ResourceNotFoundException("Unknown job: AAA"));
String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" +
JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID;

View File

@ -1,5 +1,9 @@
package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.job.BulkExportJobParametersBuilder;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
@ -21,13 +25,30 @@ import org.hl7.fhir.r4.model.Patient;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.*;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@ -41,7 +62,14 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
private IBulkExportCollectionFileDao myBulkExportCollectionFileDao;
@Autowired
private IBulkDataExportSvc myBulkDataExportSvc;
@Autowired
private IBatchJobSubmitter myBatchJobSubmitter;
@Autowired
private JobExplorer myJobExplorer;
@Autowired
@Qualifier("bulkExportJob")
private Job myBulkJob;
@Test
public void testPurgeExpiredJobs() {
@ -137,15 +165,17 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertNotNull(jobDetails.getJobId());
// Check the status
IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId());
IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.SUBMITTED, status.getStatus());
assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Observation,Patient&_typeFilter="+TEST_FILTER, status.getRequest());
// Run a scheduled pass to build the export
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
// Fetch the job again
status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId());
status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.COMPLETE, status.getStatus());
assertEquals(2, status.getFiles().size());
@ -169,6 +199,26 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
}
}
@Test
public void testBatchJobIsCapableOfCreatingAnExportEntityIfNoJobIsProvided() throws Exception {
createResources();
//Add the UUID to the job
BulkExportJobParametersBuilder paramBuilder = new BulkExportJobParametersBuilder();
paramBuilder.setReadChunkSize(100L)
.setOutputFormat(Constants.CT_FHIR_NDJSON)
.setResourceTypes(Arrays.asList("Patient", "Observation"));
JobExecution jobExecution = myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
awaitJobCompletion(jobExecution);
String jobUUID = (String)jobExecution.getExecutionContext().get("jobUUID");
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobUUID);
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(2));
}
@Test
public void testSubmitWithoutSpecificResources() {
@ -187,15 +237,17 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertNotNull(jobDetails.getJobId());
// Check the status
IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId());
IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.SUBMITTED, status.getStatus());
assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson", status.getRequest());
// Run a scheduled pass to build the export
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
// Fetch the job again
status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId());
status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.COMPLETE, status.getStatus());
assertEquals(2, status.getFiles().size());
@ -219,6 +271,25 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
}
}
public void awaitAllBulkJobCompletions() {
List<JobInstance> bulkExport = myJobExplorer.findJobInstancesByJobName("bulkExportJob", 0, 100);
if (bulkExport.isEmpty()) {
fail("There are no bulk export jobs running!");
}
List<JobExecution> bulkExportExecutions = bulkExport.stream().flatMap(jobInstance -> myJobExplorer.getJobExecutions(jobInstance).stream()).collect(Collectors.toList());
awaitJobCompletions(bulkExportExecutions);
}
public void awaitJobCompletions(Collection<JobExecution> theJobs) {
theJobs.stream().forEach(jobExecution -> {
try {
awaitJobCompletion(jobExecution);
} catch (InterruptedException theE) {
fail();
}
});
}
@Test
public void testSubmitReusesExisting() {
@ -233,8 +304,48 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertEquals(jobDetails1.getJobId(), jobDetails2.getJobId());
}
@Test
public void testBatchJobSubmitsAndRuns() throws Exception {
createResources();
@Test
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(null, Sets.newHashSet("Patient", "Observation"), null, null);
//Add the UUID to the job
BulkExportJobParametersBuilder paramBuilder = new BulkExportJobParametersBuilder()
.setJobUUID(jobDetails.getJobId())
.setReadChunkSize(10L);
JobExecution jobExecution = myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
awaitJobCompletion(jobExecution);
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(2));
}
@Test
public void testJobParametersValidatorRejectsInvalidParameters() {
JobParametersBuilder paramBuilder = new JobParametersBuilder().addString("jobUUID", "I'm not real!");
try {
myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
fail("Should have had invalid parameter execption!");
} catch (JobParametersInvalidException e) {
}
}
//Note that if the job is generated, and doesnt rely on an existed persisted BulkExportJobEntity, it will need to
//create one itself, which means that its jobUUID isnt known until it starts. to get around this, we move
public void awaitJobCompletion(JobExecution theJobExecution) throws InterruptedException {
await().until(() -> {
JobExecution jobExecution = myJobExplorer.getJobExecution(theJobExecution.getId());
return jobExecution.getStatus() == BatchStatus.COMPLETED;
});
}
@Test
public void testSubmit_WithSince() throws InterruptedException {
// Create some resources to load
@ -256,15 +367,17 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertNotNull(jobDetails.getJobId());
// Check the status
IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId());
IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.SUBMITTED, status.getStatus());
assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Observation,Patient&_since=" + cutoff.setTimeZoneZulu(true).getValueAsString(), status.getRequest());
// Run a scheduled pass to build the export
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
// Fetch the job again
status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId());
status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.COMPLETE, status.getStatus());
assertEquals(1, status.getFiles().size());
@ -282,7 +395,6 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
} else {
fail(next.getResourceType());
}
}
}

View File

@ -10,7 +10,11 @@ import ca.uhn.fhir.validation.ResultSeverityEnum;
import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder;
import org.apache.commons.dbcp2.BasicDataSource;
import org.hibernate.dialect.H2Dialect;
import org.springframework.context.annotation.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Primary;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@ -30,11 +34,13 @@ public class TestDstu3Config extends BaseJavaConfigDstu3 {
static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(TestDstu3Config.class);
private Exception myLastStackTrace;
@Bean
public CircularQueueCaptureQueriesListener captureQueriesListener() {
return new CircularQueueCaptureQueriesListener();
}
@Bean
public BasicDataSource basicDataSource() {
BasicDataSource retVal = new BasicDataSource() {

View File

@ -40,8 +40,13 @@ public class TestJPAConfig {
return daoConfig().getModelConfig();
}
/*
Please do not rename this bean to "transactionManager()" as this will conflict with the transactionManager
provided by Spring Batch.
*/
@Bean
public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
@Primary
public JpaTransactionManager hapiTransactionManager(EntityManagerFactory entityManagerFactory) {
JpaTransactionManager retVal = new JpaTransactionManager();
retVal.setEntityManagerFactory(entityManagerFactory);
return retVal;

View File

@ -1,7 +1,11 @@
package ca.uhn.fhir.jpa.config;
import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.batch.svc.BatchJobSubmitterImpl;
import ca.uhn.fhir.jpa.binstore.IBinaryStorageSvc;
import ca.uhn.fhir.jpa.binstore.MemoryBinaryStorageSvcImpl;
import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc;
import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener;
import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener;
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
@ -11,12 +15,10 @@ import net.ttddyy.dsproxy.listener.logging.SLF4JLogLevel;
import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder;
import org.apache.commons.dbcp2.BasicDataSource;
import org.hibernate.dialect.H2Dialect;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.env.Environment;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@ -28,7 +30,7 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.fail;
@Configuration
@Import(TestJPAConfig.class)
@Import({TestJPAConfig.class, BatchJobsConfig.class})
@EnableTransactionManagement()
public class TestR4Config extends BaseJavaConfigR4 {
@ -42,7 +44,7 @@ public class TestR4Config extends BaseJavaConfigR4 {
* starvation
*/
if (ourMaxThreads == null) {
ourMaxThreads = (int) (Math.random() * 6.0) + 1;
ourMaxThreads = (int) (Math.random() * 6.0) + 3;
if ("true".equals(System.getProperty("single_db_connection"))) {
ourMaxThreads = 1;
@ -50,16 +52,25 @@ public class TestR4Config extends BaseJavaConfigR4 {
}
}
@Autowired
private Environment myEnvironment;
private Exception myLastStackTrace;
@Bean
public IBatchJobSubmitter batchJobSubmitter() {
return new BatchJobSubmitterImpl();
}
@Bean
public CircularQueueCaptureQueriesListener captureQueriesListener() {
return new CircularQueueCaptureQueriesListener();
}
@Bean
public BulkExportDaoSvc bulkExportDaoSvc() {
return new BulkExportDaoSvc();
}
@Bean
public DataSource dataSource() {
BasicDataSource retVal = new BasicDataSource() {

View File

@ -9,8 +9,8 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.BaseConfig;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
import ca.uhn.fhir.jpa.entity.TermConcept;
import ca.uhn.fhir.jpa.model.util.JpaConstants;

View File

@ -8,7 +8,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoSubscription;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestDstu2Config;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc;

View File

@ -12,7 +12,8 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoSubscription;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestDstu3Config;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc;
@ -133,6 +134,8 @@ public abstract class BaseJpaDstu3Test extends BaseJpaTest {
private static IValidationSupport ourJpaValidationSupportChainDstu3;
private static IFhirResourceDaoValueSet<ValueSet, Coding, CodeableConcept> ourValueSetDao;
@Autowired
protected IBatchJobSubmitter myBatchJobSubmitter;
@Autowired
protected ITermDeferredStorageSvc myTerminologyDeferredStorageSvc;
@Autowired

View File

@ -14,9 +14,10 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoSubscription;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider;
import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor;
import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestR4Config;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc;
@ -181,7 +182,6 @@ public abstract class BaseJpaR4Test extends BaseJpaTest implements ITestDataBui
private static IValidationSupport ourJpaValidationSupportChainR4;
private static IFhirResourceDaoValueSet<ValueSet, Coding, CodeableConcept> ourValueSetDao;
@Autowired
protected IPartitionLookupSvc myPartitionConfigSvc;
@Autowired
@ -460,6 +460,8 @@ public abstract class BaseJpaR4Test extends BaseJpaTest implements ITestDataBui
private IBulkDataExportSvc myBulkDataExportSvc;
@Autowired
private IdHelperService myIdHelperService;
@Autowired
protected IBatchJobSubmitter myBatchJobSubmitter;
@After()
public void afterCleanupDao() {

View File

@ -6,14 +6,13 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestR4ConfigWithElasticSearch;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
import ca.uhn.fhir.jpa.entity.TermCodeSystemVersion;
import ca.uhn.fhir.jpa.entity.TermConcept;
import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
@ -23,6 +22,7 @@ import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc;
import ca.uhn.fhir.jpa.term.api.ITermReadSvcR4;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.util.TestUtil;
import ca.uhn.fhir.validation.FhirValidator;

View File

@ -7,7 +7,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestR4WithLuceneDisabledConfig;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
import ca.uhn.fhir.jpa.dao.dstu2.FhirResourceDaoDstu2SearchNoFtTest;

View File

@ -16,7 +16,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider;
import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor;
import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.TestR5Config;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc;

View File

@ -1,26 +1,49 @@
package ca.uhn.fhir.jpa.search.lastn;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.param.*;
import ca.uhn.fhir.jpa.search.lastn.config.TestElasticsearchConfig;
import ca.uhn.fhir.jpa.search.lastn.json.CodeJson;
import ca.uhn.fhir.jpa.search.lastn.json.ObservationJson;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.param.DateAndListParam;
import ca.uhn.fhir.rest.param.DateOrListParam;
import ca.uhn.fhir.rest.param.DateParam;
import ca.uhn.fhir.rest.param.ParamPrefixEnum;
import ca.uhn.fhir.rest.param.ReferenceAndListParam;
import ca.uhn.fhir.rest.param.ReferenceOrListParam;
import ca.uhn.fhir.rest.param.ReferenceParam;
import ca.uhn.fhir.rest.param.TokenAndListParam;
import ca.uhn.fhir.rest.param.TokenOrListParam;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.param.TokenParamModifier;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.hl7.fhir.r4.model.Observation;
import org.junit.*;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {TestElasticsearchConfig.class})

View File

@ -0,0 +1,100 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.1.0-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<artifactId>hapi-fhir-jpaserver-batch</artifactId>
<packaging>jar</packaging>
<name>HAPI FHIR JPA Server - Batch Task Processor</name>
<dependencies>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>${spring_batch_version}</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-infrastructure</artifactId>
<version>${spring_batch_version}</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-jpa</artifactId>
</dependency>
<!--test dependencies -->
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<version>${spring_batch_version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-base</artifactId>
<version>${project.version}</version>
</dependency>
<!-- test -->
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>java-hamcrest</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-test-utilities</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
<configuration>
<skipDeploy>true</skipDeploy>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<executions>
<execution>
<id>default-prepare-agent</id>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,39 @@
package ca.uhn.fhir.jpa.batch.api;
/*-
* #%L
* HAPI FHIR JPA Server - Batch Task Processor
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
public interface IBatchJobSubmitter {
/**
* Given a {@link Job} and a {@link JobParameters}, execute the job with the given parameters.
*
* @param theJob the job to run.
* @param theJobParameters A collection of key-value pairs that are used to parameterize the job.
* @return A {@link JobExecution} representing the job.
* @throws JobParametersInvalidException If validation on the parameters fails.
*/
JobExecution runJob(Job theJob, JobParameters theJobParameters) throws JobParametersInvalidException;
}

View File

@ -0,0 +1,85 @@
package ca.uhn.fhir.jpa.batch.config;
/*-
* #%L
* HAPI FHIR JPA Server - Batch Task Processor
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.springframework.batch.core.configuration.annotation.BatchConfigurer;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.explore.support.MapJobExplorerFactoryBean;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import javax.annotation.PostConstruct;
@Component
@Configuration
public class InMemoryJobRepositoryBatchConfig implements BatchConfigurer {
private PlatformTransactionManager myPlatformTransactionManager;
private JobLauncher myJobLauncher;
private JobRepository myJobRepository;
private JobExplorer myJobExplorer;
@Override
public PlatformTransactionManager getTransactionManager() {
return myPlatformTransactionManager;
}
@Override
public JobRepository getJobRepository() {
return myJobRepository;
}
@Override
public JobLauncher getJobLauncher() {
return myJobLauncher;
}
@Override
public JobExplorer getJobExplorer() {
return myJobExplorer;
}
@PostConstruct
public void setup() throws Exception{
if (myPlatformTransactionManager == null) {
myPlatformTransactionManager = new ResourcelessTransactionManager();
}
MapJobRepositoryFactoryBean jobRepositoryFactoryBean = new MapJobRepositoryFactoryBean(myPlatformTransactionManager);
jobRepositoryFactoryBean.afterPropertiesSet();
myJobRepository = jobRepositoryFactoryBean.getObject();
MapJobExplorerFactoryBean jobExplorerFactoryBean = new MapJobExplorerFactoryBean(jobRepositoryFactoryBean);
jobExplorerFactoryBean.afterPropertiesSet();
myJobExplorer = jobExplorerFactoryBean.getObject();
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(myJobRepository);
jobLauncher.afterPropertiesSet();
myJobLauncher = jobLauncher;
}
}

View File

@ -0,0 +1,77 @@
package ca.uhn.fhir.jpa.batch.config;
/*-
* #%L
* HAPI FHIR JPA Server - Batch Task Processor
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.explore.support.MapJobExplorerFactoryBean;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.task.TaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
public class NonPersistedBatchConfigurer extends DefaultBatchConfigurer {
@Autowired
@Qualifier("hapiTransactionManager")
private PlatformTransactionManager myHapiPlatformTransactionManager;
@Autowired
@Qualifier("jobLaunchingTaskExecutor")
private TaskExecutor myTaskExecutor;
private MapJobRepositoryFactoryBean myJobRepositoryFactory;
@Override
public PlatformTransactionManager getTransactionManager() {
return myHapiPlatformTransactionManager;
}
@Override
protected JobRepository createJobRepository() throws Exception {
MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean();
factory.setTransactionManager(this.getTransactionManager());
factory.afterPropertiesSet();
myJobRepositoryFactory = factory;
return factory.getObject();
}
@Override
public JobExplorer createJobExplorer() throws Exception {
MapJobExplorerFactoryBean jobExplorerFactoryBean = new MapJobExplorerFactoryBean(myJobRepositoryFactory);
jobExplorerFactoryBean.afterPropertiesSet();
return jobExplorerFactoryBean.getObject();
}
@Override
protected JobLauncher createJobLauncher() throws Exception {
SimpleJobLauncher launcher = new SimpleJobLauncher();
launcher.setTaskExecutor(myTaskExecutor);
launcher.setJobRepository(getJobRepository());
launcher.afterPropertiesSet();
return launcher;
}
}

View File

@ -0,0 +1,60 @@
package ca.uhn.fhir.jpa.batch.svc;
/*-
* #%L
* HAPI FHIR JPA Server - Batch Task Processor
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import org.slf4j.Logger;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import static org.slf4j.LoggerFactory.getLogger;
public class BatchJobSubmitterImpl implements IBatchJobSubmitter {
private static final Logger ourLog = getLogger(BatchJobSubmitterImpl.class);
@Autowired
private JobLauncher myJobLauncher;
@Autowired
private JobRepository myJobRepository;
@Override
public JobExecution runJob(Job theJob, JobParameters theJobParameters) throws JobParametersInvalidException{
try {
return myJobLauncher.run(theJob, theJobParameters);
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException theE) {
ourLog.warn("Job {} was already running, ignoring the call to start.", theJob.getName());
return myJobRepository.getLastJobExecution(theJob.getName(), theJobParameters);
} catch (JobParametersInvalidException theE) {
ourLog.error("Job Parameters passed to this job were invalid: {}", theE.getMessage());
throw theE;
}
}
}

View File

@ -0,0 +1,22 @@
package ca.uhn.fhir.jpa.batch.svc;
/*-
* #%L
* HAPI FHIR JPA Server - Batch Task Processor
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/

View File

@ -0,0 +1,25 @@
package ca.uhn.fhir.jpa.batch;
import ca.uhn.fhir.jpa.batch.config.BatchJobConfig;
import ca.uhn.fhir.jpa.batch.config.TestBatchConfig;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import static org.slf4j.LoggerFactory.getLogger;
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {BatchJobConfig.class, TestBatchConfig.class})
abstract public class BaseBatchR4Test {
private static final Logger ourLog = getLogger(BaseBatchR4Test.class);
@Autowired
protected JobLauncher myJobLauncher;
@Autowired
protected Job myJob;
}

View File

@ -0,0 +1,55 @@
package ca.uhn.fhir.jpa.batch.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BatchJobConfig {
@Autowired
private JobBuilderFactory myJobBuilderFactory;
@Autowired
private StepBuilderFactory myStepBuilderFactory;
@Bean
public Job testJob() {
return myJobBuilderFactory.get("testJob")
.start(testStep())
.build();
}
@Bean
public Step testStep() {
return myStepBuilderFactory.get("testStep")
.tasklet(sampleTasklet())
.build();
}
@Bean
@StepScope
public Tasklet sampleTasklet() {
return new SampleTasklet();
}
@Bean
@StepScope
public ItemReader<String> reader() {
return new SampleItemReader();
}
@Bean
public ItemWriter<String> simpleWriter() {
return theList -> theList.forEach(System.out::println);
}
}

View File

@ -0,0 +1,15 @@
package ca.uhn.fhir.jpa.batch.config;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
public class SampleItemReader implements ItemReader<String> {
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
return "zoop";
}
}

View File

@ -0,0 +1,14 @@
package ca.uhn.fhir.jpa.batch.config;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
public class SampleTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution theStepContribution, ChunkContext theChunkContext) throws Exception {
System.out.println("woo");
return RepeatStatus.FINISHED;
}
}

View File

@ -0,0 +1,35 @@
package ca.uhn.fhir.jpa.batch.config;
import org.springframework.batch.core.configuration.annotation.BatchConfigurer;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
@EnableBatchProcessing
public class TestBatchConfig {
@Bean
public PlatformTransactionManager hapiTransactionManager() {
return new ResourcelessTransactionManager();
}
@Bean
public TaskExecutor jobLaunchingTaskExecutor() {
ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
asyncTaskExecutor.setCorePoolSize(5);
asyncTaskExecutor.setMaxPoolSize(10);
asyncTaskExecutor.setQueueCapacity(500);
asyncTaskExecutor.setThreadNamePrefix("JobLauncher-");
asyncTaskExecutor.initialize();
return asyncTaskExecutor;
}
@Bean
public BatchConfigurer batchConfigurer() {
return new NonPersistedBatchConfigurer();
}
}

View File

@ -0,0 +1,18 @@
package ca.uhn.fhir.jpa.batch.svc;
import ca.uhn.fhir.jpa.batch.BaseBatchR4Test;
import org.junit.Test;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
public class BatchSvcTest extends BaseBatchR4Test {
@Test
public void testApplicationContextLoads() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, InterruptedException {
myJobLauncher.run(myJob, new JobParameters());
}
}

View File

@ -0,0 +1,74 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} [%file:%line] %msg%n</pattern>
</encoder>
</appender>
<logger name="org.springframework.web.socket.handler.ExceptionWebSocketHandlerDecorator" additivity="false" level="info">
<appender-ref ref="STDOUT" />
</logger>
<logger name="ca.uhn.fhir.jpa.dao.FhirResourceDaoSubscriptionDstu2" additivity="false" level="info">
<appender-ref ref="STDOUT" />
</logger>
<logger name="org.eclipse.jetty.websocket" additivity="false" level="info">
<appender-ref ref="STDOUT" />
</logger>
<logger name="org.eclipse" additivity="false" level="error">
</logger>
<logger name="ca.uhn.fhir.rest.client" additivity="false" level="info">
<appender-ref ref="STDOUT" />
</logger>
<logger name="ca.uhn.fhir.jpa.dao" additivity="false" level="info">
<appender-ref ref="STDOUT" />
</logger>
<!-- Set to 'trace' to enable SQL logging -->
<logger name="org.hibernate.SQL" additivity="false" level="info">
<appender-ref ref="STDOUT" />
</logger>
<!-- Set to 'trace' to enable SQL Value logging -->
<logger name="org.hibernate.type" additivity="false" level="info">
<appender-ref ref="STDOUT" />
</logger>
<logger name="org.hibernate.search.elasticsearch.request" additivity="false" level="trace">
<appender-ref ref="STDOUT" />
</logger>
<logger name="org.hibernate" additivity="false" level="info">
<appender-ref ref="STDOUT" />
</logger>
<appender name="BATCH_TROUBLESHOOTING" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter"><level>INFO</level></filter>
<file>${smile.basedir}/log/batch-troubleshooting.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>${smile.basedir}/log/batch-troubleshooting.log.%i.gz</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>9</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>5MB</maxFileSize>
</triggeringPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n${log.stackfilter.pattern}</pattern>
</encoder>
</appender>
<logger name="ca.uhn.fhir.log.batch_troubleshooting" level="TRACE">
<appender-ref ref="BATCH_TROUBLESHOOTING"/>
</logger>
<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View File

@ -106,9 +106,14 @@ abstract public class BaseEmpiR4Test extends BaseJpaR4Test {
@After
public void after() {
myEmpiLinkDao.deleteAll();
assertEquals(0, myEmpiLinkDao.count());
super.after();
}
protected void saveLink(EmpiLink theEmpiLink) {
myEmpiLinkDaoSvc.save(theEmpiLink);
}
@Nonnull
protected Person createUnmanagedPerson() {
return createPerson(new Person(), false);

View File

@ -50,7 +50,7 @@ public class EmpiExpungeTest extends BaseEmpiR4Test {
empiLink.setMatchResult(EmpiMatchResultEnum.MATCH);
empiLink.setPersonPid(myPersonEntity.getId());
empiLink.setTargetPid(myTargetEntity.getId());
myEmpiLinkDaoSvc.save(empiLink);
saveLink(empiLink);
}
@Test

View File

@ -41,7 +41,7 @@ public abstract class BaseLinkR4Test extends BaseProviderR4Test {
myLink = getOnlyPatientLink();
// Tests require our initial link to be a POSSIBLE_MATCH
myLink.setMatchResult(EmpiMatchResultEnum.POSSIBLE_MATCH);
myEmpiLinkDao.save(myLink);
saveLink(myLink);
assertEquals(EmpiLinkSourceEnum.AUTO, myLink.getLinkSource());
}

View File

@ -47,7 +47,7 @@ public class EmpiProviderQueryLinkR4Test extends BaseLinkR4Test {
myPerson2Id = new StringType(person2.getIdElement().toVersionless().getValue());
Long person2Pid = myIdHelperService.getPidOrNull(person2);
EmpiLink possibleDuplicateEmpiLink = new EmpiLink().setPersonPid(person1Pid).setTargetPid(person2Pid).setMatchResult(EmpiMatchResultEnum.POSSIBLE_DUPLICATE).setLinkSource(EmpiLinkSourceEnum.AUTO);
myEmpiLinkDaoSvc.save(possibleDuplicateEmpiLink);
saveLink(possibleDuplicateEmpiLink);
}
@Test

View File

@ -116,7 +116,7 @@ public class EmpiLinkSvcTest extends BaseEmpiR4Test {
.setTargetPid(theTargetPid)
.setLinkSource(EmpiLinkSourceEnum.MANUAL)
.setMatchResult(EmpiMatchResultEnum.NO_MATCH);
myEmpiLinkDaoSvc.save(noMatchLink);
saveLink(noMatchLink);
}
@Test

View File

@ -108,7 +108,7 @@ public class EmpiPersonMergerSvcTest extends BaseEmpiR4Test {
@Test
public void mergeRemovesPossibleDuplicatesLink() {
EmpiLink empiLink = new EmpiLink().setPersonPid(myToPersonPid).setTargetPid(myFromPersonPid).setMatchResult(EmpiMatchResultEnum.POSSIBLE_DUPLICATE).setLinkSource(EmpiLinkSourceEnum.AUTO);
myEmpiLinkDaoSvc.save(empiLink);
saveLink(empiLink);
assertEquals(1, myEmpiLinkDao.count());
mergePersons();
assertEquals(0, myEmpiLinkDao.count());
@ -164,7 +164,7 @@ public class EmpiPersonMergerSvcTest extends BaseEmpiR4Test {
EmpiLink fromLink = createEmpiLink(myFromPerson, myTargetPatient1);
fromLink.setLinkSource(EmpiLinkSourceEnum.MANUAL);
fromLink.setMatchResult(EmpiMatchResultEnum.MATCH);
myEmpiLinkDaoSvc.save(fromLink);
saveLink(fromLink);
createEmpiLink(myToPerson, myTargetPatient1);
@ -179,7 +179,8 @@ public class EmpiPersonMergerSvcTest extends BaseEmpiR4Test {
EmpiLink fromLink = createEmpiLink(myFromPerson, myTargetPatient1);
fromLink.setLinkSource(EmpiLinkSourceEnum.MANUAL);
fromLink.setMatchResult(EmpiMatchResultEnum.NO_MATCH);
myEmpiLinkDaoSvc.save(fromLink);
saveLink(fromLink);
createEmpiLink(myToPerson, myTargetPatient1);
@ -196,7 +197,7 @@ public class EmpiPersonMergerSvcTest extends BaseEmpiR4Test {
EmpiLink toLink = createEmpiLink(myToPerson, myTargetPatient1);
toLink.setLinkSource(EmpiLinkSourceEnum.MANUAL);
toLink.setMatchResult(EmpiMatchResultEnum.NO_MATCH);
myEmpiLinkDaoSvc.save(toLink);
saveLink(toLink);
mergePersons();
List<EmpiLink> links = myEmpiLinkDaoSvc.findEmpiLinksByPersonId(myToPerson);
@ -209,12 +210,12 @@ public class EmpiPersonMergerSvcTest extends BaseEmpiR4Test {
EmpiLink fromLink = createEmpiLink(myFromPerson, myTargetPatient1);
fromLink.setLinkSource(EmpiLinkSourceEnum.MANUAL);
fromLink.setMatchResult(EmpiMatchResultEnum.NO_MATCH);
myEmpiLinkDaoSvc.save(fromLink);
saveLink(fromLink);
EmpiLink toLink = createEmpiLink(myToPerson, myTargetPatient1);
toLink.setLinkSource(EmpiLinkSourceEnum.MANUAL);
toLink.setMatchResult(EmpiMatchResultEnum.MATCH);
myEmpiLinkDaoSvc.save(toLink);
saveLink(toLink);
try {
mergePersons();
@ -229,12 +230,12 @@ public class EmpiPersonMergerSvcTest extends BaseEmpiR4Test {
EmpiLink fromLink = createEmpiLink(myFromPerson, myTargetPatient1);
fromLink.setLinkSource(EmpiLinkSourceEnum.MANUAL);
fromLink.setMatchResult(EmpiMatchResultEnum.MATCH);
myEmpiLinkDaoSvc.save(fromLink);
saveLink(fromLink);
EmpiLink toLink = createEmpiLink(myToPerson, myTargetPatient1);
toLink.setLinkSource(EmpiLinkSourceEnum.MANUAL);
toLink.setMatchResult(EmpiMatchResultEnum.NO_MATCH);
myEmpiLinkDaoSvc.save(toLink);
saveLink(toLink);
try {
mergePersons();
@ -249,12 +250,12 @@ public class EmpiPersonMergerSvcTest extends BaseEmpiR4Test {
EmpiLink fromLink = createEmpiLink(myFromPerson, myTargetPatient1);
fromLink.setLinkSource(EmpiLinkSourceEnum.MANUAL);
fromLink.setMatchResult(EmpiMatchResultEnum.NO_MATCH);
myEmpiLinkDaoSvc.save(fromLink);
saveLink(fromLink);
EmpiLink toLink = createEmpiLink(myToPerson, myTargetPatient2);
toLink.setLinkSource(EmpiLinkSourceEnum.MANUAL);
toLink.setMatchResult(EmpiMatchResultEnum.MATCH);
myEmpiLinkDaoSvc.save(toLink);
saveLink(toLink);
mergePersons();
assertEquals(1, myToPerson.getLink().size());

View File

@ -47,6 +47,11 @@
<logger name="org.hibernate" additivity="false" level="info">
<appender-ref ref="STDOUT" />
</logger>
<!--
Configuration for EMPI troubleshooting log
-->
<appender name="EMPI_TROUBLESHOOTING" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter"><level>DEBUG</level></filter>
<file>${smile.basedir}/log/empi-troubleshooting.log</file>
@ -67,6 +72,7 @@
</logger>
<root level="info">
<appender-ref ref="STDOUT" />
</root>

View File

@ -42,7 +42,6 @@
<artifactId>hapi-fhir-jpaserver-base</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Test Database -->
<dependency>
<groupId>com.h2database</groupId>

View File

@ -71,7 +71,7 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
init510(); // 20200516 - present
}
private void init510() {
protected void init510() {
Builder version = forVersion(VersionEnum.V5_1_0);
// NPM Packages

View File

@ -24,10 +24,13 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.orm.jpa.JpaTransactionManager;
import javax.persistence.EntityManagerFactory;
// TODO RC1 can use in jpa tests?
@Configuration
public class TestJpaConfig {
@Bean
public DaoConfig daoConfig() {
@ -41,7 +44,8 @@ public class TestJpaConfig {
}
@Bean
public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
@Primary
public JpaTransactionManager hapiTransactionManager(EntityManagerFactory entityManagerFactory) {
JpaTransactionManager retVal = new JpaTransactionManager();
retVal.setEntityManagerFactory(entityManagerFactory);
return retVal;

View File

@ -25,5 +25,4 @@ import org.springframework.test.context.ContextConfiguration;
@ContextConfiguration(classes = {TestJpaR4Config.class})
public abstract class BaseJpaR4Test extends BaseJpaTest {
}

View File

@ -5,10 +5,9 @@ import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.bulk.BulkDataExportProvider;
import ca.uhn.fhir.jpa.provider.DiffProvider;
import ca.uhn.fhir.jpa.subscription.match.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.bulk.provider.BulkDataExportProvider;
import ca.uhn.fhir.jpa.interceptor.CascadingDeleteInterceptor;
import ca.uhn.fhir.jpa.provider.DiffProvider;
import ca.uhn.fhir.jpa.provider.GraphQLProvider;
import ca.uhn.fhir.jpa.provider.JpaConformanceProviderDstu2;
import ca.uhn.fhir.jpa.provider.JpaSystemProviderDstu2;

View File

@ -68,9 +68,13 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.core.annotation.AnnotationAwareOrderComparator;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.CollectionUtils;
import javax.persistence.EntityManagerFactory;
import javax.servlet.ServletException;
import javax.sql.DataSource;
import java.util.List;
@ -86,6 +90,7 @@ import java.util.concurrent.ScheduledExecutorService;
@EnableConfigurationProperties(FhirProperties.class)
public class FhirAutoConfiguration {
private final FhirProperties properties;
public FhirAutoConfiguration(FhirProperties properties) {
@ -178,6 +183,15 @@ public class FhirAutoConfiguration {
})
static class FhirJpaDaoConfiguration {
@Autowired
private EntityManagerFactory emf;
@Bean
@Primary
public PlatformTransactionManager hapiTransactionManager() {
return new JpaTransactionManager(emf);
}
@Bean
@ConditionalOnMissingBean
@ConfigurationProperties("hapi.fhir.jpa")

View File

@ -16,7 +16,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.EmbeddedDataSourceConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.boot.web.servlet.ServletRegistrationBean;
@ -81,15 +80,11 @@ public class FhirAutoConfigurationTest {
@Test
public void withJpaServer() {
load(
Arrays.array(
EmbeddedDataSourceConfiguration.class,
HibernateJpaAutoConfiguration.class,
PropertyPlaceholderAutoConfiguration.class,
FhirAutoConfiguration.class),
"hapi.fhir.version:DSTU3",
"spring.jpa.properties.hibernate.search.default.indexBase:target/lucenefiles",
"spring.jpa.properties.hibernate.search.model_mapping:ca.uhn.fhir.jpa.search.LuceneSearchMappingFactory");
load(Arrays.array(EmbeddedDataSourceConfiguration.class,
HibernateJpaAutoConfiguration.class,
FhirAutoConfiguration.class),
"hapi.fhir.version:DSTU3", "spring.jpa.properties.hibernate.search.default.indexBase:target/lucenefiles", "spring.jpa.properties.hibernate.search.model_mapping:ca.uhn.fhir.jpa.search.LuceneSearchMappingFactory");
assertThat(this.context.getBeansOfType(DaoConfig.class)).hasSize(1);
assertThat(this.context.getBeansOfType(Dstu3.class)).hasSize(1);
}
@ -149,14 +144,15 @@ public class FhirAutoConfigurationTest {
}
private void load(Class<?>[] configs, ClassLoader classLoader, String... environment) {
MockEnvironment env = new MockEnvironment();
for (String next : environment) {
String nextKey = next.substring(0, next.indexOf(':'));
String nextValue = next.substring(next.indexOf(':') + 1);
env.setProperty(nextKey, nextValue);
}
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();
applicationContext.setEnvironment(env);
if (classLoader != null) {

View File

@ -13,9 +13,14 @@ spring:
hibernate.search.lucene_version: LUCENE_CURRENT
hibernate.search.model_mapping: ca.uhn.fhir.jpa.search.LuceneSearchMappingFactory
main:
allow-bean-definition-overriding: true
h2:
console:
enabled: true
batch:
job:
enabled: false
hapi:
fhir:
version: DSTU3

View File

@ -19,17 +19,17 @@
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-base</artifactId>
<version>5.1.0-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-server</artifactId>
<version>5.1.0-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-client</artifactId>
<version>5.1.0-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<!-- General -->

View File

@ -726,6 +726,7 @@
<spring_version>5.2.3.RELEASE</spring_version>
<!-- FYI: Spring Data JPA 2.1.9 causes test failures due to unexpected cascading deletes -->
<spring_data_version>2.2.0.RELEASE</spring_data_version>
<spring_batch_version>4.2.2.RELEASE</spring_batch_version>
<spring_boot_version>2.2.6.RELEASE</spring_boot_version>
<spring_retry_version>1.2.2.RELEASE</spring_retry_version>
@ -2081,6 +2082,9 @@
<copy todir="target/site/apidocs-jpaserver-empi">
<fileset dir="hapi-fhir-jpaserver-empi/target/site/apidocs"/>
</copy>
<copy todir="target/site/apidocs-jpaserver-batch">
<fileset dir="hapi-fhir-jpaserver-batch/target/site/apidocs"/>
</copy>
<copy todir="target/site/apidocs-client">
<fileset dir="hapi-fhir-client/target/site/apidocs"/>
</copy>
@ -2570,6 +2574,7 @@
<module>hapi-fhir-jpaserver-subscription</module>
<module>hapi-fhir-jaxrsserver-base</module>
<module>hapi-fhir-jaxrsserver-example</module>
<module>hapi-fhir-jpaserver-batch</module>
<module>hapi-fhir-jpaserver-base</module>
<module>hapi-fhir-jpaserver-empi</module>
<module>hapi-fhir-jpaserver-migrate</module>