Add bulk import instrumentation (#2685)

* Add batch job instrumentation

* Add bulk import instrumentation

* Fix build
This commit is contained in:
James Agnew 2021-05-30 11:47:51 -04:00 committed by GitHub
parent f34dd7ecd8
commit 8d377fbf76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 414 additions and 89 deletions

View File

@ -17,7 +17,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.validation.constraints.Null;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -84,6 +83,7 @@ public class DaoConfig {
* By default, all are enabled.
*/
public static final boolean DEFAULT_ENABLE_TASKS = true;
public static final int DEFAULT_MAXIMUM_INCLUDES_TO_LOAD_PER_PAGE = 1000;
/**
* Default value for {@link #setMaximumSearchResultCountInTransaction(Integer)}
*
@ -98,7 +98,6 @@ public class DaoConfig {
* Child Configurations
*/
private static final Integer DEFAULT_INTERNAL_SYNCHRONOUS_SEARCH_SIZE = 10000;
public static final int DEFAULT_MAXIMUM_INCLUDES_TO_LOAD_PER_PAGE = 1000;
private final ModelConfig myModelConfig = new ModelConfig();
/**
* Do not change default of {@code 0}!
@ -129,6 +128,7 @@ public class DaoConfig {
private boolean myEnforceReferentialIntegrityOnWrite = true;
private SearchTotalModeEnum myDefaultTotalMode = null;
private int myEverythingIncludesFetchPageSize = 50;
private int myBulkImportMaxRetryCount = 10;
/**
* update setter javadoc if default changes
*/
@ -266,6 +266,26 @@ public class DaoConfig {
}
}
/**
* Specifies the maximum number of times that a chunk will be retried during bulk import
* processes before giving up.
*
* @since 5.5.0
*/
public int getBulkImportMaxRetryCount() {
return myBulkImportMaxRetryCount;
}
/**
* Specifies the maximum number of times that a chunk will be retried during bulk import
* processes before giving up.
*
* @since 5.5.0
*/
public void setBulkImportMaxRetryCount(int theBulkImportMaxRetryCount) {
myBulkImportMaxRetryCount = theBulkImportMaxRetryCount;
}
/**
* Specifies the maximum number of <code>_include</code> and <code>_revinclude</code> results to return in a
* single page of results. The default is <code>1000</code>, and <code>null</code> may be used
@ -2367,29 +2387,22 @@ public class DaoConfig {
}
/**
* If this is enabled (disabled by default), Mass Ingestion Mode is enabled. In this mode, a number of
* runtime checks are disabled. This mode is designed for rapid backloading of data while the system is not
* being otherwise used.
*
* In this mode:
*
* - Tags/Profiles/Security Labels will not be updated on existing resources that already have them
* - Resources modification checks will be skipped in favour of a simple hash check
* - Extra resource ID caching is enabled
* If this is enabled (this is the default), this server will attempt to run resource reindexing jobs.
* Otherwise, this server will not.
*
* @since 5.5.0
*/
public void setMassIngestionMode(boolean theMassIngestionMode) {
myMassIngestionMode = theMassIngestionMode;
public void setEnableTaskResourceReindexing(boolean theEnableTaskResourceReindexing) {
myEnableTaskResourceReindexing = theEnableTaskResourceReindexing;
}
/**
* If this is enabled (disabled by default), Mass Ingestion Mode is enabled. In this mode, a number of
* runtime checks are disabled. This mode is designed for rapid backloading of data while the system is not
* being otherwise used.
*
* <p>
* In this mode:
*
* <p>
* - Tags/Profiles/Security Labels will not be updated on existing resources that already have them
* - Resources modification checks will be skipped in favour of a simple hash check
* - Extra resource ID caching is enabled
@ -2401,24 +2414,20 @@ public class DaoConfig {
}
/**
* If this is enabled (this is the default), this server will attempt to run resource reindexing jobs.
* Otherwise, this server will not.
* If this is enabled (disabled by default), Mass Ingestion Mode is enabled. In this mode, a number of
* runtime checks are disabled. This mode is designed for rapid backloading of data while the system is not
* being otherwise used.
* <p>
* In this mode:
* <p>
* - Tags/Profiles/Security Labels will not be updated on existing resources that already have them
* - Resources modification checks will be skipped in favour of a simple hash check
* - Extra resource ID caching is enabled
*
* @since 5.5.0
*/
public void setEnableTaskResourceReindexing(boolean theEnableTaskResourceReindexing) {
myEnableTaskResourceReindexing = theEnableTaskResourceReindexing;
}
/**
* If set to true (default is false), date indexes will account for null values in the range columns. As of 5.3.0
* we no longer place null values in these columns, but legacy data may exist that still has these values. Note that
* enabling this results in more complexity in the search SQL.
*
* @since 5.5.0
*/
public void setAccountForDateIndexNulls(boolean theAccountForDateIndexNulls) {
myAccountForDateIndexNulls = theAccountForDateIndexNulls;
public void setMassIngestionMode(boolean theMassIngestionMode) {
myMassIngestionMode = theMassIngestionMode;
}
/**
@ -2432,6 +2441,17 @@ public class DaoConfig {
return myAccountForDateIndexNulls;
}
/**
* If set to true (default is false), date indexes will account for null values in the range columns. As of 5.3.0
* we no longer place null values in these columns, but legacy data may exist that still has these values. Note that
* enabling this results in more complexity in the search SQL.
*
* @since 5.5.0
*/
public void setAccountForDateIndexNulls(boolean theAccountForDateIndexNulls) {
myAccountForDateIndexNulls = theAccountForDateIndexNulls;
}
/**
* If set to true (default is false) then subscriptions will be triggered for resource updates even if they
* do not trigger a new version (e.g. $meta-add and $meta-delete).

View File

@ -25,6 +25,10 @@ import ca.uhn.fhir.jpa.bulk.imprt.job.BulkImportJobConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@Configuration
//When you define a new batch job, add it here.
@Import({
@ -33,8 +37,40 @@ import org.springframework.context.annotation.Import;
BulkImportJobConfig.class
})
public class BatchJobsConfig {
public static final String BULK_IMPORT_JOB_NAME = "bulkImportJob";
/*
* Bulk Export
*/
public static final String BULK_EXPORT_JOB_NAME = "bulkExportJob";
public static final String GROUP_BULK_EXPORT_JOB_NAME = "groupBulkExportJob";
public static final String PATIENT_BULK_EXPORT_JOB_NAME = "patientBulkExportJob";
public static final String BULK_EXPORT_GENERATE_RESOURCE_FILES_STEP = "bulkExportGenerateResourceFilesStep";
/*
* Bulk Import
*/
public static final String BULK_IMPORT_JOB_NAME = "bulkImportJob";
public static final String BULK_IMPORT_PROCESSING_STEP = "bulkImportProcessingStep";
/**
* This Set contains the step names across all job types that are appropriate for
* someone to look at the write count for that given step in order to determine the
* number of processed records.
*
* This is provided since a job might have multiple steps that the same data passes
* through, so you can't just sum up the total of all of them.
*
* For any given batch job type, there should only be one step name in this set
*/
public static final Set<String> RECORD_PROCESSING_STEP_NAMES;
static {
HashSet<String> recordProcessingStepNames = new HashSet<>();
recordProcessingStepNames.add(BULK_IMPORT_PROCESSING_STEP);
recordProcessingStepNames.add(BULK_EXPORT_GENERATE_RESOURCE_FILES_STEP);
RECORD_PROCESSING_STEP_NAMES = Collections.unmodifiableSet(recordProcessingStepNames);
}
}

View File

@ -169,7 +169,7 @@ public class BulkExportJobConfig {
@Bean
public Step bulkExportGenerateResourceFilesStep() {
return myStepBuilderFactory.get("bulkExportGenerateResourceFilesStep")
return myStepBuilderFactory.get(BatchJobsConfig.BULK_EXPORT_GENERATE_RESOURCE_FILES_STEP)
.<List<ResourcePersistentId>, List<IBaseResource>>chunk(CHUNK_SIZE) //1000 resources per generated file, as the reader returns 10 resources at a time.
.reader(bulkItemReader())
.processor(myPidToIBaseResourceProcessor)
@ -217,7 +217,7 @@ public class BulkExportJobConfig {
@Bean
public Step bulkExportPartitionStep() {
return myStepBuilderFactory.get("partitionStep")
.partitioner("bulkExportGenerateResourceFilesStep", bulkExportResourceTypePartitioner())
.partitioner(BatchJobsConfig.BULK_EXPORT_GENERATE_RESOURCE_FILES_STEP, bulkExportResourceTypePartitioner())
.step(bulkExportGenerateResourceFilesStep())
.build();
}

View File

@ -90,4 +90,9 @@ public interface IBulkDataImportSvc {
* Delete all input files associated with a particular job
*/
void deleteJobFiles(String theJobId);
/**
* Fetch just the file description for the given file
*/
String getFileDescription(String theJobId, int theFileIndex);
}

View File

@ -23,6 +23,7 @@ package ca.uhn.fhir.jpa.bulk.imprt.job;
import ca.uhn.fhir.jpa.bulk.export.job.BulkExportJobConfig;
import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum;
import org.elasticsearch.client.enrich.ExecutePolicyResponse;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;

View File

@ -20,11 +20,15 @@ package ca.uhn.fhir.jpa.bulk.imprt.job;
* #L%
*/
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.batch.BatchConstants;
import ca.uhn.fhir.jpa.bulk.imprt.model.ParsedBulkImportRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersValidator;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
@ -39,8 +43,15 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.task.TaskExecutor;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.policy.CompositeRetryPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.policy.TimeoutRetryPolicy;
import javax.batch.api.chunk.listener.RetryProcessListener;
import static ca.uhn.fhir.jpa.batch.BatchJobsConfig.BULK_IMPORT_JOB_NAME;
import static ca.uhn.fhir.jpa.batch.BatchJobsConfig.BULK_IMPORT_PROCESSING_STEP;
/**
* Spring batch Job configuration file. Contains all necessary plumbing to run a
@ -50,23 +61,23 @@ import static ca.uhn.fhir.jpa.batch.BatchJobsConfig.BULK_IMPORT_JOB_NAME;
public class BulkImportJobConfig {
public static final String JOB_PARAM_COMMIT_INTERVAL = "commitInterval";
private static final Logger ourLog = LoggerFactory.getLogger(BulkImportJobConfig.class);
@Autowired
private StepBuilderFactory myStepBuilderFactory;
@Autowired
private JobBuilderFactory myJobBuilderFactory;
@Autowired
@Qualifier(BatchConstants.JOB_LAUNCHING_TASK_EXECUTOR)
private TaskExecutor myTaskExecutor;
@Autowired
private DaoConfig myDaoConfig;
@Bean(name = BULK_IMPORT_JOB_NAME)
@Lazy
public Job bulkImportJob() throws Exception {
return myJobBuilderFactory.get(BULK_IMPORT_JOB_NAME)
.validator(bulkImportJobParameterValidator())
.start(bulkImportPartitionStep())
.start(bulkImportProcessingStep())
.next(bulkImportCloseJobStep())
.build();
}
@ -88,15 +99,20 @@ public class BulkImportJobConfig {
}
@Bean
public Step bulkImportPartitionStep() throws Exception {
return myStepBuilderFactory.get("bulkImportPartitionStep")
.partitioner("bulkImportPartitionStep", bulkImportPartitioner())
public Step bulkImportProcessingStep() throws Exception {
return myStepBuilderFactory.get(BULK_IMPORT_PROCESSING_STEP)
.partitioner(BULK_IMPORT_PROCESSING_STEP, bulkImportPartitioner())
.partitionHandler(partitionHandler())
.listener(activateBulkImportEntityStepListener())
.listener(errorLisener())
.gridSize(10)
.build();
}
private ChunkAroundListener errorLisener() {
return new ChunkAroundListener();
}
private PartitionHandler partitionHandler() throws Exception {
assert myTaskExecutor != null;
@ -126,20 +142,31 @@ public class BulkImportJobConfig {
return new BulkImportPartitioner();
}
@Bean
public Step bulkImportProcessFilesStep() {
CompletionPolicy completionPolicy = completionPolicy();
return myStepBuilderFactory.get("bulkImportProcessFilesStep")
.<ParsedBulkImportRecord, ParsedBulkImportRecord>chunk(completionPolicy)
.<ParsedBulkImportRecord, ParsedBulkImportRecord>chunk(completionPolicy())
.reader(bulkImportFileReader())
.writer(bulkImportFileWriter())
.listener(bulkImportStepListener())
.listener(completionPolicy)
.listener(completionPolicy())
.faultTolerant()
.retryPolicy(bulkImportProcessFilesStepRetryPolicy())
.build();
}
private RetryPolicy bulkImportProcessFilesStepRetryPolicy() {
TimeoutRetryPolicy timeoutPolicy = new TimeoutRetryPolicy();
timeoutPolicy.setTimeout(10000);
SimpleRetryPolicy countRetryPolicy = new SimpleRetryPolicy(myDaoConfig.getBulkImportMaxRetryCount());
CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy();
compositeRetryPolicy.setPolicies(new RetryPolicy[]{timeoutPolicy, countRetryPolicy});
return compositeRetryPolicy;
}
@Bean
@StepScope
public CompletionPolicy completionPolicy() {
@ -152,7 +179,6 @@ public class BulkImportJobConfig {
return new BulkImportFileWriter();
}
@Bean
@StepScope
public BulkImportFileReader bulkImportFileReader() {
@ -165,5 +191,12 @@ public class BulkImportJobConfig {
return new BulkImportStepListener();
}
public static class ChunkAroundListener implements RetryProcessListener {
@Override
public void onRetryProcessException(Object item, Exception ex) throws Exception {
throw ex;
}
}
}

View File

@ -37,6 +37,8 @@ import static org.slf4j.LoggerFactory.getLogger;
public class BulkImportPartitioner implements Partitioner {
public static final String FILE_INDEX = "fileIndex";
public static final String FILE_DESCRIPTION = "fileDescription";
public static final String JOB_DESCRIPTION = "jobDescription";
public static final String ROW_PROCESSING_MODE = "rowProcessingMode";
private static final Logger ourLog = getLogger(BulkImportPartitioner.class);
@ -56,12 +58,16 @@ public class BulkImportPartitioner implements Partitioner {
for (int i = 0; i < job.getFileCount(); i++) {
String fileDescription = myBulkDataImportSvc.getFileDescription(myJobUUID, i);
ExecutionContext context = new ExecutionContext();
context.putString(BulkExportJobConfig.JOB_UUID_PARAMETER, myJobUUID);
context.putInt(FILE_INDEX, i);
context.put(ROW_PROCESSING_MODE, job.getProcessingMode());
context.put(JOB_DESCRIPTION, job.getJobDescription());
context.put(FILE_DESCRIPTION, fileDescription);
String key = "FILE" + i;
String key = "FILE" + i + ":" + fileDescription;
retVal.put(key, context);
}

View File

@ -26,7 +26,9 @@ import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.jsr.RetryListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.retry.ExhaustedRetryException;
import javax.annotation.Nonnull;
@ -36,7 +38,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
* This class sets the job status to ERROR if any failures occur while actually
* generating the export files.
*/
public class BulkImportStepListener implements StepExecutionListener {
public class BulkImportStepListener implements StepExecutionListener, RetryListener {
@Autowired
private IBulkDataImportSvc myBulkDataImportSvc;
@ -55,9 +57,24 @@ public class BulkImportStepListener implements StepExecutionListener {
jobUuid = theStepExecution.getJobExecution().getExecutionContext().getString(BulkExportJobConfig.JOB_UUID_PARAMETER);
}
assert isNotBlank(jobUuid);
String exitDescription = theStepExecution.getExitStatus().getExitDescription();
myBulkDataImportSvc.setJobToStatus(jobUuid, BulkImportJobStatusEnum.ERROR, exitDescription);
StringBuilder message = new StringBuilder();
message.append("Job: ").append(theStepExecution.getExecutionContext().getString(BulkImportPartitioner.JOB_DESCRIPTION)).append("\n");
message.append("File: ").append(theStepExecution.getExecutionContext().getString(BulkImportPartitioner.FILE_DESCRIPTION)).append("\n");
for (Throwable next : theStepExecution.getFailureExceptions()) {
if (next instanceof ExhaustedRetryException) {
next = next.getCause(); // ExhaustedRetryException is a spring exception that wraps the real one
}
String nextErrorMessage = next.toString();
message.append("Error: ").append(nextErrorMessage).append("\n");
}
myBulkDataImportSvc.setJobToStatus(jobUuid, BulkImportJobStatusEnum.ERROR, message.toString());
// Replace the built-in error message with a better one
return ExitStatus.FAILED.addExitDescription(message.toString());
}
return theStepExecution.getExitStatus();
}
}

View File

@ -29,6 +29,16 @@ public class BulkImportJobFileJson implements IModelJson {
private String myTenantName;
@JsonProperty("contents")
private String myContents;
@JsonProperty("description")
private String myDescription;
public String getDescription() {
return myDescription;
}
public void setDescription(String theDescription) {
myDescription = theDescription;
}
public String getTenantName() {
return myTenantName;

View File

@ -245,6 +245,14 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc {
.orElseThrow(() -> new IllegalArgumentException("Invalid index " + theFileIndex + " for job " + theJobId));
}
@Transactional
@Override
public String getFileDescription(String theJobId, int theFileIndex) {
BulkImportJobEntity job = findJobByJobId(theJobId);
return myJobFileDao.findFileDescriptionForJob(job, theFileIndex).orElse("");
}
@Override
@Transactional
public void deleteJobFiles(String theJobId) {
@ -282,6 +290,7 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc {
jobFile.setJob(job);
jobFile.setContents(nextFile.getContents());
jobFile.setTenantName(nextFile.getTenantName());
jobFile.setFileDescription(nextFile.getDescription());
jobFile.setFileSequence(nextSequence++);
myJobFileDao.save(jobFile);
}

View File

@ -253,6 +253,10 @@ public abstract class BaseConfig {
return new BatchJobSubmitterImpl();
}
@Bean
public BatchJobRegisterer batchJobRegisterer() {
return new BatchJobRegisterer();
}
@Lazy
@Bean

View File

@ -0,0 +1,62 @@
package ca.uhn.fhir.jpa.config;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2021 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.springframework.batch.core.Job;
import org.springframework.batch.core.configuration.DuplicateJobException;
import org.springframework.batch.core.configuration.JobFactory;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.support.ApplicationContextJobFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import javax.annotation.PostConstruct;
import java.util.Map;
public class BatchJobRegisterer {
@Autowired
private ApplicationContext myApplicationContext;
@PostConstruct
public void start() throws DuplicateJobException {
Map<String, Job> batchJobs = myApplicationContext.getBeansOfType(Job.class);
JobRegistry jobRegistry = myApplicationContext.getBean(JobRegistry.class);
for (Map.Entry<String, Job> next : batchJobs.entrySet()) {
jobRegistry.register(new JobFactory() {
@Override
public Job createJob() {
return next.getValue();
}
@Override
public String getJobName() {
return next.getKey();
}
});
}
}
}

View File

@ -37,6 +37,9 @@ public interface IBulkImportJobFileDao extends JpaRepository<BulkImportJobFileEn
@Query("SELECT f FROM BulkImportJobFileEntity f WHERE f.myJob = :job AND f.myFileSequence = :fileIndex")
Optional<BulkImportJobFileEntity> findForJob(@Param("job") BulkImportJobEntity theJob, @Param("fileIndex") int theFileIndex);
@Query("SELECT f.myFileDescription FROM BulkImportJobFileEntity f WHERE f.myJob = :job AND f.myFileSequence = :fileIndex")
Optional<String> findFileDescriptionForJob(@Param("job") BulkImportJobEntity theJob, @Param("fileIndex") int theFileIndex);
@Query("SELECT f.myId FROM BulkImportJobFileEntity f WHERE f.myJob.myJobId = :jobId ORDER BY f.myFileSequence ASC")
List<Long> findAllIdsForJob(@Param("jobId") String theJobId);

View File

@ -37,12 +37,15 @@ import javax.persistence.Table;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import static org.apache.commons.lang3.StringUtils.left;
@Entity
@Table(name = "HFJ_BLK_IMPORT_JOBFILE", indexes = {
@Index(name = "IDX_BLKIM_JOBFILE_JOBID", columnList = "JOB_PID")
@Index(name = "IDX_BLKIM_JOBFILE_JOBID", columnList = "JOB_PID")
})
public class BulkImportJobFileEntity implements Serializable {
public static final int MAX_DESCRIPTION_LENGTH = 500;
@Id
@GeneratedValue(strategy = GenerationType.AUTO, generator = "SEQ_BLKIMJOBFILE_PID")
@SequenceGenerator(name = "SEQ_BLKIMJOBFILE_PID", sequenceName = "SEQ_BLKIMJOBFILE_PID")
@ -55,14 +58,22 @@ public class BulkImportJobFileEntity implements Serializable {
@Column(name = "FILE_SEQ", nullable = false)
private int myFileSequence;
@Column(name = "FILE_DESCRIPTION", nullable = true, length = MAX_DESCRIPTION_LENGTH)
private String myFileDescription;
@Lob
@Column(name = "JOB_CONTENTS", nullable = false)
private byte[] myContents;
@Column(name = "TENANT_NAME", nullable = true, length = PartitionEntity.MAX_NAME_LENGTH)
private String myTenantName;
public String getFileDescription() {
return myFileDescription;
}
public void setFileDescription(String theFileDescription) {
myFileDescription = left(theFileDescription, MAX_DESCRIPTION_LENGTH);
}
public BulkImportJobEntity getJob() {
return myJob;
}
@ -94,11 +105,11 @@ public class BulkImportJobFileEntity implements Serializable {
.setTenantName(getTenantName());
}
public void setTenantName(String theTenantName) {
myTenantName = theTenantName;
}
public String getTenantName() {
return myTenantName;
}
public void setTenantName(String theTenantName) {
myTenantName = theTenantName;
}
}

View File

@ -43,7 +43,16 @@ public class PerformanceTracingLoggingInterceptor {
* Constructor that logs to this class with a level of INFO
*/
public PerformanceTracingLoggingInterceptor() {
this(ourLog, Level.INFO);
this(Level.INFO);
}
/**
* Constructor that logs with a specific level
*
* @since 5.5.0
*/
public PerformanceTracingLoggingInterceptor(Level theLevel) {
this(ourLog, theLevel);
}
/**

View File

@ -1,12 +1,17 @@
package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.jpa.dao.r4.BaseJpaR4Test;
import org.junit.jupiter.api.AfterEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.repository.dao.JobExecutionDao;
import org.springframework.batch.core.repository.dao.JobInstanceDao;
import org.springframework.batch.core.repository.dao.MapJobExecutionDao;
import org.springframework.batch.core.repository.dao.MapJobInstanceDao;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
@ -24,6 +29,16 @@ public class BaseBatchJobR4Test extends BaseJpaR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(BaseBatchJobR4Test.class);
@Autowired
private JobExplorer myJobExplorer;
// @Autowired
// private JobExecutionDao myMapJobExecutionDao;
// @Autowired
// private JobInstanceDao myMapJobInstanceDao;
//
// @AfterEach
// public void after() {
// ((MapJobExecutionDao)myMapJobExecutionDao).clear();
// ((MapJobInstanceDao)myMapJobInstanceDao).clear();
// }
protected List<JobExecution> awaitAllBulkJobCompletions(String... theJobNames) {
assert theJobNames.length > 0;
@ -40,6 +55,8 @@ public class BaseBatchJobR4Test extends BaseJpaR4Test {
List<JobExecution> bulkExportExecutions = bulkExport.stream().flatMap(jobInstance -> myJobExplorer.getJobExecutions(jobInstance).stream()).collect(Collectors.toList());
awaitJobCompletions(bulkExportExecutions);
// Return the final state
bulkExportExecutions = bulkExport.stream().flatMap(jobInstance -> myJobExplorer.getJobExecutions(jobInstance).stream()).collect(Collectors.toList());
return bulkExportExecutions;
}

View File

@ -1,15 +1,15 @@
package ca.uhn.fhir.jpa.bulk.imprt.svc;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
import ca.uhn.fhir.jpa.bulk.BaseBatchJobR4Test;
import ca.uhn.fhir.jpa.bulk.export.job.BulkExportJobConfig;
import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.imprt.model.JobFileRowProcessingModeEnum;
import ca.uhn.fhir.jpa.dao.data.IBulkImportJobDao;
import ca.uhn.fhir.jpa.dao.data.IBulkImportJobFileDao;
@ -18,13 +18,23 @@ import ca.uhn.fhir.jpa.entity.BulkImportJobFileEntity;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.test.utilities.ITestDataBuilder;
import ca.uhn.fhir.util.BundleBuilder;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
@ -32,8 +42,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import static ca.uhn.fhir.jpa.batch.BatchJobsConfig.BULK_IMPORT_JOB_NAME;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@ -43,16 +56,22 @@ import static org.mockito.Mockito.verify;
public class BulkDataImportR4Test extends BaseBatchJobR4Test implements ITestDataBuilder {
private static final Logger ourLog = LoggerFactory.getLogger(BulkDataImportR4Test.class);
@Autowired
private IBulkDataImportSvc mySvc;
@Autowired
private IBulkImportJobDao myBulkImportJobDao;
@Autowired
private IBulkImportJobFileDao myBulkImportJobFileDao;
@Autowired
private JobExplorer myJobExplorer;
@Autowired
private JobRegistry myJobRegistry;
@AfterEach
public void after() {
myInterceptorRegistry.unregisterInterceptorsIf(t -> t instanceof IAnonymousInterceptor);
myInterceptorRegistry.unregisterInterceptorsIf(t -> t instanceof MyFailAfterThreeCreatesInterceptor);
}
@Test
@ -63,7 +82,7 @@ public class BulkDataImportR4Test extends BaseBatchJobR4Test implements ITestDat
BulkImportJobJson job = new BulkImportJobJson();
job.setProcessingMode(JobFileRowProcessingModeEnum.FHIR_TRANSACTION);
job.setJobDescription("This is the description");
job.setJobDescription("testFlow_TransactionRows");
job.setBatchSize(3);
String jobId = mySvc.createNewJob(job, files);
mySvc.markJobAsReadyForActivation(jobId);
@ -72,8 +91,7 @@ public class BulkDataImportR4Test extends BaseBatchJobR4Test implements ITestDat
assertTrue(activateJobOutcome);
List<JobExecution> executions = awaitAllBulkJobCompletions();
assertEquals(1, executions.size());
assertEquals("This is the description", executions.get(0).getJobParameters().getString(BulkExportJobConfig.JOB_DESCRIPTION));
assertEquals("testFlow_TransactionRows", executions.get(0).getJobParameters().getString(BulkExportJobConfig.JOB_DESCRIPTION));
runInTransaction(() -> {
List<BulkImportJobEntity> jobs = myBulkImportJobDao.findAll();
@ -86,7 +104,6 @@ public class BulkDataImportR4Test extends BaseBatchJobR4Test implements ITestDat
IBundleProvider searchResults = myPatientDao.search(SearchParameterMap.newSynchronous());
assertEquals(transactionsPerFile * fileCount, searchResults.sizeOrThrowNpe());
}
@Test
@ -126,16 +143,51 @@ public class BulkDataImportR4Test extends BaseBatchJobR4Test implements ITestDat
));
}
@Test
public void testFlow_ErrorDuringWrite() {
myInterceptorRegistry.registerInterceptor(new MyFailAfterThreeCreatesInterceptor());
int transactionsPerFile = 10;
int fileCount = 10;
List<BulkImportJobFileJson> files = createInputFiles(transactionsPerFile, fileCount);
BulkImportJobJson job = new BulkImportJobJson();
job.setProcessingMode(JobFileRowProcessingModeEnum.FHIR_TRANSACTION);
job.setJobDescription("This is the job description");
job.setBatchSize(3);
String jobId = mySvc.createNewJob(job, files);
mySvc.markJobAsReadyForActivation(jobId);
boolean activateJobOutcome = mySvc.activateNextReadyJob();
assertTrue(activateJobOutcome);
String[] jobNames = new String[]{BULK_IMPORT_JOB_NAME};
assert jobNames.length > 0;
await().until(() -> runInTransaction(() -> {
JobInstance jobInstance = myJobExplorer.getLastJobInstance(BULK_IMPORT_JOB_NAME);
JobExecution jobExecution = myJobExplorer.getLastJobExecution(jobInstance);
ourLog.info("Exit status: {}", jobExecution.getExitStatus());
return jobExecution.getExitStatus().getExitCode().equals(ExitStatus.FAILED.getExitCode());
}));
JobInstance jobInstance = myJobExplorer.getLastJobInstance(BULK_IMPORT_JOB_NAME);
JobExecution jobExecution = myJobExplorer.getLastJobExecution(jobInstance);
String exitDescription = jobExecution.getExitStatus().getExitDescription();
assertThat(exitDescription, containsString("File: File With Description"));
}
@Nonnull
private List<BulkImportJobFileJson> createInputFiles(int transactionsPerFile, int fileCount) {
List<BulkImportJobFileJson> files = new ArrayList<>();
int counter = 0;
for (int fileIndex = 0; fileIndex < fileCount; fileIndex++) {
StringBuilder fileContents = new StringBuilder();
for (int transactionIdx = 0; transactionIdx < transactionsPerFile; transactionIdx++) {
BundleBuilder bundleBuilder = new BundleBuilder(myFhirCtx);
IBaseResource patient = buildPatient(withFamily("FAM " + fileIndex + " " + transactionIdx));
IBaseResource patient = buildPatient(withFamily("FAM " + fileIndex + " " + transactionIdx), withIdentifier(null, "patient" + counter++));
bundleBuilder.addTransactionCreateEntry(patient);
fileContents.append(myFhirCtx.newJsonParser().setPrettyPrint(false).encodeResourceToString(bundleBuilder.getBundle()));
fileContents.append("\n");
@ -143,13 +195,35 @@ public class BulkDataImportR4Test extends BaseBatchJobR4Test implements ITestDat
BulkImportJobFileJson nextFile = new BulkImportJobFileJson();
nextFile.setContents(fileContents.toString());
nextFile.setDescription("File With Description " + fileIndex);
files.add(nextFile);
}
return files;
}
@Test
public void testJobsAreRegisteredWithJobRegistry() throws NoSuchJobException {
Job job = myJobRegistry.getJob(BULK_IMPORT_JOB_NAME);
assertEquals(true, job.isRestartable());
}
protected List<JobExecution> awaitAllBulkJobCompletions() {
return awaitAllBulkJobCompletions(BatchJobsConfig.BULK_IMPORT_JOB_NAME);
return awaitAllBulkJobCompletions(BULK_IMPORT_JOB_NAME);
}
@Interceptor
public class MyFailAfterThreeCreatesInterceptor {
public static final String ERROR_MESSAGE = "This is an error message";
@Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED)
public void create(IBaseResource thePatient) {
Patient patient = (Patient) thePatient;
if (patient.getIdentifierFirstRep().getValue().equals("patient10")) {
throw new InternalErrorException(ERROR_MESSAGE);
}
}
}
}

View File

@ -164,6 +164,7 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.event.Level;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
@ -531,7 +532,7 @@ public abstract class BaseJpaR4Test extends BaseJpaTest implements ITestDataBuil
public void beforeCreateInterceptor() {
myInterceptor = mock(IServerInterceptor.class);
myPerformanceTracingLoggingInterceptor = new PerformanceTracingLoggingInterceptor();
myPerformanceTracingLoggingInterceptor = new PerformanceTracingLoggingInterceptor(Level.DEBUG);
myInterceptorRegistry.registerInterceptor(myPerformanceTracingLoggingInterceptor);
}

View File

@ -1,14 +1,5 @@
package ca.uhn.fhir.jpa.provider.dstu3;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.stringContainsInOrder;
import static org.junit.jupiter.api.Assertions.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;
import ca.uhn.fhir.rest.openapi.OpenApiInterceptor;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
@ -16,11 +7,16 @@ import org.apache.http.client.methods.HttpGet;
import org.hl7.fhir.dstu3.model.CapabilityStatement;
import org.hl7.fhir.dstu3.model.CapabilityStatement.CapabilityStatementRestResourceComponent;
import org.hl7.fhir.dstu3.model.CapabilityStatement.CapabilityStatementRestResourceSearchParamComponent;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import ca.uhn.fhir.util.TestUtil;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
public class ServerDstu3Test extends BaseResourceProviderDstu3Test {
@ -30,10 +26,10 @@ public class ServerDstu3Test extends BaseResourceProviderDstu3Test {
@AfterEach
public void after() throws Exception {
super.after();
ourRestServer.getInterceptorService().unregisterInterceptorsIf(t->t instanceof OpenApiInterceptor);
ourRestServer.getInterceptorService().unregisterInterceptorsIf(t -> t instanceof OpenApiInterceptor);
}
/**
* See #519
*/
@ -44,12 +40,12 @@ public class ServerDstu3Test extends BaseResourceProviderDstu3Test {
try {
ourLog.info(resp.toString());
assertEquals(200, resp.getStatusLine().getStatusCode());
String respString = IOUtils.toString(resp.getEntity().getContent(), StandardCharsets.UTF_8);
ourLog.info(respString);
CapabilityStatement cs = myFhirCtx.newXmlParser().parseResource(CapabilityStatement.class, respString);
for (CapabilityStatementRestResourceComponent nextResource : cs.getRest().get(0).getResource()) {
ourLog.info("Testing resource: " + nextResource.getType());
Set<String> sps = new HashSet<String>();
@ -58,13 +54,13 @@ public class ServerDstu3Test extends BaseResourceProviderDstu3Test {
fail("Duplicate search parameter " + nextSp.getName() + " for resource " + nextResource.getType());
}
}
if (!sps.contains("_id")) {
fail("No search parameter _id for resource " + nextResource.getType());
}
}
}
} finally {
IOUtils.closeQuietly(resp.getEntity().getContent());
IOUtils.closeQuietly(resp.getEntity().getContent());
}
}

View File

@ -19,12 +19,10 @@
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>${spring_batch_version}</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-infrastructure</artifactId>
<version>${spring_batch_version}</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>

View File

@ -86,6 +86,9 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
forcedId.dropIndex("20210516.1", "IDX_FORCEDID_TYPE_FID").onlyAppliesToPlatforms(DriverTypeEnum.MSSQL_2012).runEvenDuringSchemaInitialization();
forcedId.addIndex("20210516.2", "IDX_FORCEDID_TYPE_FID").unique(true).includeColumns("RESOURCE_PID").withColumns("RESOURCE_TYPE", "FORCED_ID").onlyAppliesToPlatforms(DriverTypeEnum.MSSQL_2012).runEvenDuringSchemaInitialization();
// Add bulk import file description
version.onTable("HFJ_BLK_IMPORT_JOBFILE")
.addColumn("20210528.1", "FILE_DESCRIPTION").nullable().type(ColumnTypeEnum.STRING, 500);
}
private void init540() {

14
pom.xml
View File

@ -814,7 +814,7 @@
<spring_version>5.3.6</spring_version>
<!-- FYI: Spring Data JPA 2.1.9 causes test failures due to unexpected cascading deletes -->
<spring_data_version>2.5.0</spring_data_version>
<spring_batch_version>4.3.2</spring_batch_version>
<spring_batch_version>4.3.3</spring_batch_version>
<spring_boot_version>2.4.4</spring_boot_version>
<spring_retry_version>1.2.2.RELEASE</spring_retry_version>
@ -1763,7 +1763,17 @@
<artifactId>spring-websocket</artifactId>
<version>${spring_version}</version>
</dependency>
<dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>${spring_batch_version}</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-infrastructure</artifactId>
<version>${spring_batch_version}</version>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>${spring_retry_version}</version>