3846 migrate bulk import pull to batch2 (#3853)

* still fixing tests

* migrated a bulk import pull job t obatch 2

* changelog

* cleanup

* review fixes

* minor tweaks

* cleaningup

* tra la la

* blah

* blah

* updating msg

Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-MacBook-Pro.local>
This commit is contained in:
TipzCM 2022-08-03 10:50:51 -04:00 committed by GitHub
parent dcfaf42e73
commit eae1c0ffb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 686 additions and 915 deletions

View File

@ -25,7 +25,7 @@ public final class Msg {
/**
* IMPORTANT: Please update the following comment after you add a new code
* Last code value: 2123
* Last code value: 2124
*/
private Msg() {}

View File

@ -6,7 +6,6 @@ import org.springframework.batch.core.configuration.annotation.JobBuilderFactory
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;

View File

@ -0,0 +1,4 @@
---
type: fix
issue: 3846
title: "Migrated Bulk Import Pull to Batch2 Framework"

View File

@ -22,8 +22,6 @@ package ca.uhn.fhir.jpa.batch;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.batch.svc.BatchJobSubmitterImpl;
import ca.uhn.fhir.jpa.bulk.export.job.BulkExportJobConfig;
import ca.uhn.fhir.jpa.bulk.imprt.job.BulkImportJobConfig;
import ca.uhn.fhir.jpa.config.BatchJobRegisterer;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
@ -38,9 +36,7 @@ import org.springframework.context.annotation.Import;
@Configuration
@EnableBatchProcessing
@Import({
CommonBatchJobConfig.class,
BulkExportJobConfig.class,
BulkImportJobConfig.class
CommonBatchJobConfig.class
// When you define a new batch job, add it here.
})
@Deprecated

View File

@ -26,6 +26,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@Deprecated
public class CommonBatchJobConfig {
public static final int MINUTES_IN_FUTURE_TO_PROCESS_FROM = 1;

View File

@ -49,6 +49,7 @@ import static ca.uhn.fhir.jpa.batch.config.BatchConstants.PATIENT_BULK_EXPORT_FO
* Reusable Item Processor which attaches an extension to any outgoing resource. This extension will contain a resource
* reference to the golden resource patient of the given resources' patient. (e.g. Observation.subject, Immunization.patient, etc)
*/
@Deprecated
public class GoldenResourceAnnotatingProcessor implements ItemProcessor<List<IBaseResource>, List<IBaseResource>> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();

View File

@ -23,13 +23,18 @@ package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.config.BaseBatch2Config;
import ca.uhn.fhir.batch2.coordinator.SynchronizedJobPersistenceWrapper;
import ca.uhn.fhir.jpa.bulk.export.job.BulkExportJobConfig;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
@Configuration
@Import({
BulkExportJobConfig.class
})
public class JpaBatch2Config extends BaseBatch2Config {
@Bean

View File

@ -30,8 +30,6 @@ import org.springframework.context.annotation.Configuration;
*/
@Configuration
public class BulkExportJobConfig {
public static final int CHUNK_SIZE = 100;
@Bean
public MdmExpansionCacheSvc mdmExpansionCacheSvc() {

View File

@ -1,51 +0,0 @@
package ca.uhn.fhir.jpa.bulk.imprt.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
/**
* Will run before and after a job to set the status to whatever is appropriate.
*/
public class ActivateBulkImportEntityStepListener implements StepExecutionListener {
@Autowired
private IBulkDataImportSvc myBulkImportDaoSvc;
@Override
public void beforeStep(StepExecution theStepExecution) {
String jobUuid = theStepExecution.getJobExecution().getJobParameters().getString(BatchConstants.JOB_UUID_PARAMETER);
if (jobUuid != null) {
myBulkImportDaoSvc.setJobToStatus(jobUuid, BulkImportJobStatusEnum.RUNNING);
}
}
@Override
public ExitStatus afterStep(StepExecution theStepExecution) {
return ExitStatus.EXECUTING;
}
}

View File

@ -1,76 +0,0 @@
package ca.uhn.fhir.jpa.bulk.imprt.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson;
import ca.uhn.fhir.jpa.bulk.imprt.model.ParsedBulkImportRecord;
import ca.uhn.fhir.util.IoUtil;
import com.google.common.io.LineReader;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import java.io.StringReader;
@SuppressWarnings("UnstableApiUsage")
public class BulkImportFileReader implements ItemReader<ParsedBulkImportRecord> {
@Autowired
private IBulkDataImportSvc myBulkDataImportSvc;
@Autowired
private FhirContext myFhirContext;
@Value("#{stepExecutionContext['" + BatchConstants.JOB_UUID_PARAMETER + "']}")
private String myJobUuid;
@Value("#{stepExecutionContext['" + BulkImportPartitioner.FILE_INDEX + "']}")
private int myFileIndex;
private StringReader myReader;
private LineReader myLineReader;
private int myLineIndex;
private String myTenantName;
@Override
public ParsedBulkImportRecord read() throws Exception {
if (myReader == null) {
BulkImportJobFileJson file = myBulkDataImportSvc.fetchFile(myJobUuid, myFileIndex);
myTenantName = file.getTenantName();
myReader = new StringReader(file.getContents());
myLineReader = new LineReader(myReader);
}
String nextLine = myLineReader.readLine();
if (nextLine == null) {
IoUtil.closeQuietly(myReader);
return null;
}
Logs.getBatchTroubleshootingLog().debug("Reading line {} file index {} for job: {}", myLineIndex++, myFileIndex, myJobUuid);
IBaseResource parsed = myFhirContext.newJsonParser().parseResource(nextLine);
return new ParsedBulkImportRecord(myTenantName, parsed, myLineIndex);
}
}

View File

@ -1,85 +0,0 @@
package ca.uhn.fhir.jpa.bulk.imprt.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.jpa.bulk.imprt.model.JobFileRowProcessingModeEnum;
import ca.uhn.fhir.jpa.bulk.imprt.model.ParsedBulkImportRecord;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.util.StopWatch;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.util.List;
public class BulkImportFileWriter implements ItemWriter<ParsedBulkImportRecord> {
private static final Logger ourLog = LoggerFactory.getLogger(BulkImportFileWriter.class);
@Value("#{stepExecutionContext['" + BatchConstants.JOB_UUID_PARAMETER + "']}")
private String myJobUuid;
@Value("#{stepExecutionContext['" + BulkImportPartitioner.FILE_INDEX + "']}")
private int myFileIndex;
@Value("#{stepExecutionContext['" + BulkImportPartitioner.ROW_PROCESSING_MODE + "']}")
private JobFileRowProcessingModeEnum myRowProcessingMode;
@Autowired
private DaoRegistry myDaoRegistry;
@SuppressWarnings({"SwitchStatementWithTooFewBranches", "rawtypes", "unchecked"})
@Override
public void write(List<? extends ParsedBulkImportRecord> theItemLists) throws Exception {
assert TransactionSynchronizationManager.isActualTransactionActive();
String offsets = "unknown";
if (theItemLists.size() > 0) {
offsets = theItemLists.get(0).getLineIndex() + " - " + theItemLists.get(theItemLists.size()-1).getLineIndex();
}
ourLog.info("Beginning bulk import write {} rows Job[{}] FileIndex[{}] Offset[{}]", theItemLists.size(), myJobUuid, myFileIndex, offsets);
StopWatch sw = new StopWatch();
for (ParsedBulkImportRecord nextItem : theItemLists) {
SystemRequestDetails requestDetails = new SystemRequestDetails();
requestDetails.setTenantId(nextItem.getTenantName());
// Yeah this is a lame switch - We'll add more later I swear
switch (myRowProcessingMode) {
default:
case FHIR_TRANSACTION:
IFhirSystemDao systemDao = myDaoRegistry.getSystemDao();
IBaseResource inputBundle = nextItem.getRowContent();
systemDao.transactionNested(requestDetails, inputBundle);
break;
}
}
ourLog.info("Completed bulk import write {} rows Job[{}] FileIndex[{}] Offset[{}] in {}", theItemLists.size(), myJobUuid, myFileIndex, offsets, sw);
}
}

View File

@ -1,57 +0,0 @@
package ca.uhn.fhir.jpa.bulk.imprt.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
/**
* Will run before and after a job to set the status to whatever is appropriate.
*/
public class BulkImportJobCloser implements Tasklet {
@Value("#{jobParameters['" + BatchConstants.JOB_UUID_PARAMETER + "']}")
private String myJobUUID;
@Autowired
private IBulkDataImportSvc myBulkDataImportSvc;
@Override
public RepeatStatus execute(StepContribution theStepContribution, ChunkContext theChunkContext) {
BatchStatus executionStatus = theChunkContext.getStepContext().getStepExecution().getJobExecution().getStatus();
if (executionStatus == BatchStatus.STARTED) {
myBulkDataImportSvc.setJobToStatus(myJobUUID, BulkImportJobStatusEnum.COMPLETE);
myBulkDataImportSvc.deleteJobFiles(myJobUUID);
} else {
myBulkDataImportSvc.setJobToStatus(myJobUUID, BulkImportJobStatusEnum.ERROR, "Found job in status: " + executionStatus);
myBulkDataImportSvc.deleteJobFiles(myJobUUID);
}
return RepeatStatus.FINISHED;
}
}

View File

@ -1,204 +0,0 @@
package ca.uhn.fhir.jpa.bulk.imprt.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.jpa.bulk.imprt.model.ParsedBulkImportRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersValidator;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.batch.core.step.item.KeyGenerator;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.CompletionPolicy;
import org.springframework.batch.repeat.RepeatContext;
import org.springframework.batch.repeat.exception.ExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.task.TaskExecutor;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.policy.CompositeRetryPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.policy.TimeoutRetryPolicy;
import javax.batch.api.chunk.listener.RetryProcessListener;
import static ca.uhn.fhir.jpa.batch.config.BatchConstants.BULK_IMPORT_JOB_NAME;
import static ca.uhn.fhir.jpa.batch.config.BatchConstants.BULK_IMPORT_PROCESSING_STEP;
/**
* Spring batch Job configuration file. Contains all necessary plumbing to run a
* Bulk Export job.
*/
@Configuration
public class BulkImportJobConfig {
public static final String JOB_PARAM_COMMIT_INTERVAL = "commitInterval";
private static final Logger ourLog = LoggerFactory.getLogger(BulkImportJobConfig.class);
@Autowired
private StepBuilderFactory myStepBuilderFactory;
@Autowired
private JobBuilderFactory myJobBuilderFactory;
@Autowired
@Qualifier(BatchConstants.JOB_LAUNCHING_TASK_EXECUTOR)
private TaskExecutor myTaskExecutor;
@Autowired
private DaoConfig myDaoConfig;
@Bean(name = BULK_IMPORT_JOB_NAME)
@Lazy
public Job bulkImportJob() throws Exception {
return myJobBuilderFactory.get(BULK_IMPORT_JOB_NAME)
.validator(bulkImportJobParameterValidator())
.start(bulkImportProcessingStep())
.next(bulkImportCloseJobStep())
.build();
}
@Bean
public JobParametersValidator bulkImportJobParameterValidator() {
return new BulkImportJobParameterValidator();
}
@Bean
public CreateBulkImportEntityTasklet createBulkImportEntityTasklet() {
return new CreateBulkImportEntityTasklet();
}
@Bean
@JobScope
public ActivateBulkImportEntityStepListener activateBulkImportEntityStepListener() {
return new ActivateBulkImportEntityStepListener();
}
@Bean
public Step bulkImportProcessingStep() throws Exception {
return myStepBuilderFactory.get(BULK_IMPORT_PROCESSING_STEP)
.partitioner(BULK_IMPORT_PROCESSING_STEP, bulkImportPartitioner())
.partitionHandler(partitionHandler())
.listener(activateBulkImportEntityStepListener())
.listener(errorLisener())
.gridSize(10)
.build();
}
private ChunkAroundListener errorLisener() {
return new ChunkAroundListener();
}
private PartitionHandler partitionHandler() throws Exception {
assert myTaskExecutor != null;
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setStep(bulkImportProcessFilesStep());
retVal.setTaskExecutor(myTaskExecutor);
retVal.afterPropertiesSet();
return retVal;
}
@Bean
public Step bulkImportCloseJobStep() {
return myStepBuilderFactory.get("bulkImportCloseJobStep")
.tasklet(bulkImportJobCloser())
.build();
}
@Bean
@JobScope
public BulkImportJobCloser bulkImportJobCloser() {
return new BulkImportJobCloser();
}
@Bean
@JobScope
public BulkImportPartitioner bulkImportPartitioner() {
return new BulkImportPartitioner();
}
@Bean
public Step bulkImportProcessFilesStep() {
return myStepBuilderFactory.get("bulkImportProcessFilesStep")
.<ParsedBulkImportRecord, ParsedBulkImportRecord>chunk(completionPolicy())
.reader(bulkImportFileReader())
.writer(bulkImportFileWriter())
.listener(bulkImportStepListener())
.listener(completionPolicy())
.faultTolerant()
.retryPolicy(bulkImportProcessFilesStepRetryPolicy())
.build();
}
private RetryPolicy bulkImportProcessFilesStepRetryPolicy() {
TimeoutRetryPolicy timeoutPolicy = new TimeoutRetryPolicy();
timeoutPolicy.setTimeout(10000);
SimpleRetryPolicy countRetryPolicy = new SimpleRetryPolicy(myDaoConfig.getBulkImportMaxRetryCount());
CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy();
compositeRetryPolicy.setPolicies(new RetryPolicy[]{timeoutPolicy, countRetryPolicy});
return compositeRetryPolicy;
}
@Bean
@StepScope
public CompletionPolicy completionPolicy() {
return new BulkImportProcessStepCompletionPolicy();
}
@Bean
@StepScope
public BulkImportFileWriter bulkImportFileWriter() {
return new BulkImportFileWriter();
}
@Bean
@StepScope
public BulkImportFileReader bulkImportFileReader() {
return new BulkImportFileReader();
}
@Bean
@StepScope
public BulkImportStepListener bulkImportStepListener() {
return new BulkImportStepListener();
}
public static class ChunkAroundListener implements RetryProcessListener {
@Override
public void onRetryProcessException(Object item, Exception ex) throws Exception {
throw ex;
}
}
}

View File

@ -1,71 +0,0 @@
package ca.uhn.fhir.jpa.bulk.imprt.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.jpa.dao.data.IBulkImportJobDao;
import ca.uhn.fhir.jpa.entity.BulkImportJobEntity;
import org.apache.commons.lang3.StringUtils;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.JobParametersValidator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.Optional;
/**
* This class will prevent a job from running if the UUID does not exist or is invalid.
*/
public class BulkImportJobParameterValidator implements JobParametersValidator {
@Autowired
private IBulkImportJobDao myBulkImportJobDao;
@Autowired
private PlatformTransactionManager myTransactionManager;
@Override
public void validate(JobParameters theJobParameters) throws JobParametersInvalidException {
if (theJobParameters == null) {
throw new JobParametersInvalidException(Msg.code(784) + "This job needs Parameters: [jobUUID]");
}
TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager);
String errorMessage = txTemplate.execute(tx -> {
StringBuilder errorBuilder = new StringBuilder();
String jobUUID = theJobParameters.getString(BatchConstants.JOB_UUID_PARAMETER);
Optional<BulkImportJobEntity> oJob = myBulkImportJobDao.findByJobId(jobUUID);
if (!StringUtils.isBlank(jobUUID) && !oJob.isPresent()) {
errorBuilder.append("There is no persisted job that exists with UUID: ");
errorBuilder.append(jobUUID);
errorBuilder.append(". ");
}
return errorBuilder.toString();
});
if (!StringUtils.isEmpty(errorMessage)) {
throw new JobParametersInvalidException(Msg.code(785) + errorMessage);
}
}
}

View File

@ -1,78 +0,0 @@
package ca.uhn.fhir.jpa.bulk.imprt.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson;
import org.slf4j.Logger;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import javax.annotation.Nonnull;
import java.util.HashMap;
import java.util.Map;
import static org.slf4j.LoggerFactory.getLogger;
public class BulkImportPartitioner implements Partitioner {
public static final String FILE_INDEX = "fileIndex";
public static final String FILE_DESCRIPTION = "fileDescription";
public static final String JOB_DESCRIPTION = "jobDescription";
public static final String ROW_PROCESSING_MODE = "rowProcessingMode";
private static final Logger ourLog = getLogger(BulkImportPartitioner.class);
@Value("#{jobParameters['" + BatchConstants.JOB_UUID_PARAMETER + "']}")
private String myJobUUID;
@Autowired
private IBulkDataImportSvc myBulkDataImportSvc;
@Nonnull
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> retVal = new HashMap<>();
BulkImportJobJson job = myBulkDataImportSvc.fetchJob(myJobUUID);
for (int i = 0; i < job.getFileCount(); i++) {
String fileDescription = myBulkDataImportSvc.getFileDescription(myJobUUID, i);
ExecutionContext context = new ExecutionContext();
context.putString(BatchConstants.JOB_UUID_PARAMETER, myJobUUID);
context.putInt(FILE_INDEX, i);
context.put(ROW_PROCESSING_MODE, job.getProcessingMode());
context.put(JOB_DESCRIPTION, job.getJobDescription());
context.put(FILE_DESCRIPTION, fileDescription);
String key = "FILE" + i + ":" + fileDescription;
retVal.put(key, context);
}
return retVal;
}
}

View File

@ -1,41 +0,0 @@
package ca.uhn.fhir.jpa.bulk.imprt.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.springframework.batch.repeat.RepeatContext;
import org.springframework.batch.repeat.policy.CompletionPolicySupport;
import org.springframework.beans.factory.annotation.Value;
import static ca.uhn.fhir.jpa.bulk.imprt.job.BulkImportJobConfig.JOB_PARAM_COMMIT_INTERVAL;
public class BulkImportProcessStepCompletionPolicy extends CompletionPolicySupport {
@Value("#{jobParameters['" + JOB_PARAM_COMMIT_INTERVAL + "']}")
private int myChunkSize;
@Override
public boolean isComplete(RepeatContext context) {
if (context.getStartedCount() < myChunkSize) {
return false;
}
return true;
}
}

View File

@ -1,85 +0,0 @@
package ca.uhn.fhir.jpa.bulk.imprt.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.jsr.RetryListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.retry.ExhaustedRetryException;
import javax.annotation.Nonnull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
/**
* This class sets the job status to ERROR if any failures occur while actually
* generating the export files.
*/
public class BulkImportStepListener implements StepExecutionListener, RetryListener {
@Autowired
private IBulkDataImportSvc myBulkDataImportSvc;
@Override
public void beforeStep(@Nonnull StepExecution stepExecution) {
// nothing
}
@Override
public ExitStatus afterStep(StepExecution theStepExecution) {
if (theStepExecution.getExitStatus().getExitCode().equals(ExitStatus.FAILED.getExitCode())) {
//Try to fetch it from the parameters first, and if it doesn't exist, fetch it from the context.
String jobUuid = theStepExecution.getJobExecution().getJobParameters().getString(BatchConstants.JOB_UUID_PARAMETER);
if (jobUuid == null) {
jobUuid = theStepExecution.getJobExecution().getExecutionContext().getString(BatchConstants.JOB_UUID_PARAMETER);
}
assert isNotBlank(jobUuid);
StringBuilder message = new StringBuilder();
message.append("Job: ").append(theStepExecution.getExecutionContext().getString(BulkImportPartitioner.JOB_DESCRIPTION)).append("\n");
message.append("File: ").append(theStepExecution.getExecutionContext().getString(BulkImportPartitioner.FILE_DESCRIPTION)).append("\n");
for (Throwable next : theStepExecution.getFailureExceptions()) {
if (next instanceof ExhaustedRetryException) {
next = next.getCause(); // ExhaustedRetryException is a spring exception that wraps the real one
}
String nextErrorMessage = next.toString();
message.append("Error: ").append(nextErrorMessage).append("\n");
}
theStepExecution.addFailureException(new RuntimeException(message.toString()));
myBulkDataImportSvc.setJobToStatus(jobUuid, BulkImportJobStatusEnum.ERROR, message.toString());
ExitStatus exitStatus = ExitStatus.FAILED.addExitDescription(message.toString());
theStepExecution.setExitStatus(exitStatus);
// Replace the built-in error message with a better one
return exitStatus;
}
return theStepExecution.getExitStatus();
}
}

View File

@ -1,52 +0,0 @@
package ca.uhn.fhir.jpa.bulk.imprt.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.util.ValidateUtil;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import java.util.Map;
public class CreateBulkImportEntityTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution theStepContribution, ChunkContext theChunkContext) throws Exception {
Map<String, Object> jobParameters = theChunkContext.getStepContext().getJobParameters();
//We can leave early if they provided us with an existing job.
ValidateUtil.isTrueOrThrowInvalidRequest(jobParameters.containsKey(BatchConstants.JOB_UUID_PARAMETER), "Job doesn't have a UUID");
addUUIDToJobContext(theChunkContext, (String) jobParameters.get(BatchConstants.JOB_UUID_PARAMETER));
return RepeatStatus.FINISHED;
}
public void addUUIDToJobContext(ChunkContext theChunkContext, String theJobUUID) {
theChunkContext
.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext()
.putString(BatchConstants.JOB_UUID_PARAMETER, theJobUUID);
}
}

View File

@ -20,15 +20,17 @@ package ca.uhn.fhir.jpa.bulk.imprt.svc;
* #L%
*/
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.importpull.models.Batch2BulkImportPullJobParameters;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import ca.uhn.fhir.jpa.bulk.imprt.job.BulkImportJobConfig;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.imprt.model.ActivateJobResult;
import ca.uhn.fhir.jpa.dao.data.IBulkImportJobDao;
import ca.uhn.fhir.jpa.dao.data.IBulkImportJobFileDao;
import ca.uhn.fhir.jpa.entity.BulkImportJobEntity;
@ -42,10 +44,7 @@ import org.apache.commons.lang3.time.DateUtils;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
@ -61,8 +60,6 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class BulkDataImportSvcImpl implements IBulkDataImportSvc {
private static final Logger ourLog = LoggerFactory.getLogger(BulkDataImportSvcImpl.class);
private final Semaphore myRunningJobSemaphore = new Semaphore(1);
@ -73,13 +70,13 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc {
@Autowired
private PlatformTransactionManager myTxManager;
private TransactionTemplate myTxTemplate;
@Autowired
private ISchedulerService mySchedulerService;
@Autowired
private IBatchJobSubmitter myJobSubmitter;
@Autowired
@Qualifier(BatchConstants.BULK_IMPORT_JOB_NAME)
private org.springframework.batch.core.Job myBulkImportJob;
private IJobCoordinator myJobCoordinator;
@Autowired
private DaoConfig myDaoConfig;
@ -159,15 +156,15 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc {
*/
@Transactional(value = Transactional.TxType.NEVER)
@Override
public boolean activateNextReadyJob() {
public ActivateJobResult activateNextReadyJob() {
if (!myDaoConfig.isEnableTaskBulkImportJobExecution()) {
Logs.getBatchTroubleshootingLog().trace("Bulk import job execution is not enabled on this server. No action taken.");
return false;
return new ActivateJobResult(false, null);
}
if (!myRunningJobSemaphore.tryAcquire()) {
Logs.getBatchTroubleshootingLog().trace("Already have a running batch job, not going to check for more");
return false;
return new ActivateJobResult(false, null);
}
try {
@ -177,7 +174,7 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc {
}
}
private boolean doActivateNextReadyJob() {
private ActivateJobResult doActivateNextReadyJob() {
Optional<BulkImportJobEntity> jobToProcessOpt = Objects.requireNonNull(myTxTemplate.execute(t -> {
Pageable page = PageRequest.of(0, 1);
Slice<BulkImportJobEntity> submittedJobs = myJobDao.findByStatus(page, BulkImportJobStatusEnum.READY);
@ -188,14 +185,15 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc {
}));
if (!jobToProcessOpt.isPresent()) {
return false;
return new ActivateJobResult(false, null);
}
BulkImportJobEntity bulkImportJobEntity = jobToProcessOpt.get();
String jobUuid = bulkImportJobEntity.getJobId();
String batchJobId = null;
try {
processJob(bulkImportJobEntity);
batchJobId = processJob(bulkImportJobEntity);
} catch (Exception e) {
ourLog.error("Failure while preparing bulk export extract", e);
myTxTemplate.execute(t -> {
@ -206,11 +204,11 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc {
jobEntity.setStatusMessage(e.getMessage());
myJobDao.save(jobEntity);
}
return false;
return new ActivateJobResult(false, null);
});
}
return true;
return new ActivateJobResult(true, batchJobId);
}
@Override
@ -273,22 +271,21 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc {
myJobDao.delete(job);
}
private void processJob(BulkImportJobEntity theBulkExportJobEntity) throws JobParametersInvalidException {
private String processJob(BulkImportJobEntity theBulkExportJobEntity) {
String jobId = theBulkExportJobEntity.getJobId();
int batchSize = theBulkExportJobEntity.getBatchSize();
ValidateUtil.isTrueOrThrowInvalidRequest(batchSize > 0, "Batch size must be positive");
JobParametersBuilder parameters = new JobParametersBuilder()
.addString(BatchConstants.JOB_UUID_PARAMETER, jobId)
.addLong(BulkImportJobConfig.JOB_PARAM_COMMIT_INTERVAL, (long) batchSize);
Batch2BulkImportPullJobParameters jobParameters = new Batch2BulkImportPullJobParameters();
jobParameters.setJobId(jobId);
jobParameters.setBatchSize(batchSize);
if (isNotBlank(theBulkExportJobEntity.getJobDescription())) {
parameters.addString(BatchConstants.JOB_DESCRIPTION, theBulkExportJobEntity.getJobDescription());
}
JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(BatchConstants.BULK_IMPORT_JOB_NAME);
request.setParameters(jobParameters);
ourLog.info("Submitting bulk import job {} to job scheduler", jobId);
myJobSubmitter.runJob(myBulkImportJob, parameters.toJobParameters());
return myJobCoordinator.startInstance(request).getJobId();
}
private void addFilesToJob(@Nonnull List<BulkImportJobFileJson> theInitialFiles, BulkImportJobEntity job, int nextSequence) {

View File

@ -718,7 +718,7 @@ public class IdHelperService implements IIdHelperService {
public IIdType resourceIdFromPidOrThrowException(ResourcePersistentId thePid, String theResourceType) {
Optional<ResourceTable> optionalResource = myResourceTableDao.findById(thePid.getIdAsLong());
if (!optionalResource.isPresent()) {
throw new ResourceNotFoundException(Msg.code(2107) + "Requested resource not found");
throw new ResourceNotFoundException(Msg.code(2124) + "Requested resource not found");
}
return optionalResource.get().getIdDt().toVersionless();
}

View File

@ -1,30 +1,38 @@
package ca.uhn.fhir.jpa.bulk.imprt.svc;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import ca.uhn.fhir.jpa.bulk.imprt.model.ActivateJobResult;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson;
import ca.uhn.fhir.jpa.bulk.imprt.model.JobFileRowProcessingModeEnum;
import ca.uhn.fhir.jpa.dao.data.IBulkImportJobDao;
import ca.uhn.fhir.jpa.dao.data.IBulkImportJobFileDao;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.entity.BulkImportJobEntity;
import ca.uhn.fhir.jpa.entity.BulkImportJobFileEntity;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.test.utilities.BatchJobHelper;
import ca.uhn.fhir.test.utilities.ITestDataBuilder;
import ca.uhn.fhir.util.BundleBuilder;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
@ -32,27 +40,25 @@ import org.junit.jupiter.api.TestMethodOrder;
import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.ExecutorChannelInterceptor;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static ca.uhn.fhir.batch2.config.BaseBatch2Config.CHANNEL_NAME;
import static ca.uhn.fhir.jpa.batch.config.BatchConstants.BULK_IMPORT_JOB_NAME;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@ -63,18 +69,32 @@ import static org.mockito.Mockito.verify;
public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuilder {
private static final Logger ourLog = LoggerFactory.getLogger(BulkDataImportR4Test.class);
@Autowired
private IBulkDataImportSvc mySvc;
@Autowired
private IBulkImportJobDao myBulkImportJobDao;
@Autowired
private IBulkImportJobFileDao myBulkImportJobFileDao;
@Autowired
private JobExplorer myJobExplorer;
private IJobCoordinator myJobCoordinator;
@Autowired
private JobRegistry myJobRegistry;
private Batch2JobHelper myBatch2JobHelper;
@Autowired
private BatchJobHelper myBatchJobHelper;
private JobDefinitionRegistry myJobDefinitionRegistry;
@Autowired
private IChannelFactory myChannelFactory;
private LinkedBlockingChannel myWorkChannel;
@BeforeEach
public void before() {
myWorkChannel = (LinkedBlockingChannel) myChannelFactory.getOrCreateReceiver(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, new ChannelConsumerSettings());
}
@AfterEach
public void after() {
@ -82,6 +102,21 @@ public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuil
myInterceptorRegistry.unregisterInterceptorsIf(t -> t instanceof MyFailAfterThreeCreatesInterceptor);
}
/**
* Sets up a message retry interceptor for when errors are encountered
*/
private void setupRetryFailures() {
myWorkChannel.addInterceptor(new ExecutorChannelInterceptor() {
@Override
public void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler, Exception ex) {
if (ex != null) {
ourLog.info("Work channel received exception {}", ex.getMessage());
channel.send(message);
}
}
});
}
@Order(-1)
@Test
public void testFlow_ErrorDuringWrite() {
@ -91,6 +126,9 @@ public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuil
int fileCount = 5;
List<BulkImportJobFileJson> files = createInputFiles(transactionsPerFile, fileCount);
try {
setupRetryFailures();
BulkImportJobJson job = new BulkImportJobJson();
job.setProcessingMode(JobFileRowProcessingModeEnum.FHIR_TRANSACTION);
job.setJobDescription("This is the job description");
@ -98,30 +136,23 @@ public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuil
String jobId = mySvc.createNewJob(job, files);
mySvc.markJobAsReadyForActivation(jobId);
boolean activateJobOutcome = mySvc.activateNextReadyJob();
assertTrue(activateJobOutcome);
ActivateJobResult activateJobOutcome = mySvc.activateNextReadyJob();
assertTrue(activateJobOutcome.isActivated);
String[] jobNames = new String[]{BULK_IMPORT_JOB_NAME};
assert jobNames.length > 0;
await().until(() -> runInTransaction(() -> {
JobInstance jobInstance = myJobExplorer.getLastJobInstance(BULK_IMPORT_JOB_NAME);
JobExecution jobExecution = myJobExplorer.getLastJobExecution(jobInstance);
ourLog.info("Exit status: {}", jobExecution.getExitStatus());
return jobExecution.getExitStatus().getExitCode().equals(ExitStatus.FAILED.getExitCode());
}));
JobInstance jobInstance = myJobExplorer.getLastJobInstance(BULK_IMPORT_JOB_NAME);
JobExecution jobExecution = myJobExplorer.getLastJobExecution(jobInstance);
List<StepExecution> failedExecutions = jobExecution
.getStepExecutions()
.stream()
.filter(t -> t.getExitStatus().getExitCode().equals("FAILED"))
.collect(Collectors.toList());
assertEquals(2, failedExecutions.size());
assertThat(failedExecutions.get(1).getStepName(), containsString(":File With Description"));
JobInstance instance = myBatch2JobHelper.awaitJobHitsStatusInTime(activateJobOutcome.jobId,
60,
StatusEnum.FAILED);
HashSet<StatusEnum> failed = new HashSet<>();
failed.add(StatusEnum.FAILED);
failed.add(StatusEnum.ERRORED);
assertTrue(failed.contains(instance.getStatus()), instance.getStatus() + " is the actual status");
String errorMsg = instance.getErrorMessage();
assertTrue(errorMsg.contains("Too many errors"), errorMsg);
assertTrue(errorMsg.contains("Too many errors"), MyFailAfterThreeCreatesInterceptor.ERROR_MESSAGE);
} finally {
myWorkChannel.clearInterceptorsForUnitTest();
}
}
@Order(0)
@ -138,20 +169,12 @@ public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuil
String jobId = mySvc.createNewJob(job, files);
mySvc.markJobAsReadyForActivation(jobId);
boolean activateJobOutcome = mySvc.activateNextReadyJob();
assertTrue(activateJobOutcome);
ActivateJobResult activateJobOutcome = mySvc.activateNextReadyJob();
assertTrue(activateJobOutcome.isActivated);
List<JobExecution> executions = awaitAllBulkImportJobCompletion();
assertEquals("testFlow_TransactionRows", executions.get(0).getJobParameters().getString(BatchConstants.JOB_DESCRIPTION));
runInTransaction(() -> {
List<BulkImportJobEntity> jobs = myBulkImportJobDao.findAll();
assertEquals(0, jobs.size());
List<BulkImportJobFileEntity> jobFiles = myBulkImportJobFileDao.findAll();
assertEquals(0, jobFiles.size());
});
JobInstance instance = myBatch2JobHelper.awaitJobCompletion(activateJobOutcome.jobId);
assertNotNull(instance);
assertEquals(StatusEnum.COMPLETED, instance.getStatus());
IBundleProvider searchResults = myPatientDao.search(SearchParameterMap.newSynchronous());
assertEquals(transactionsPerFile * fileCount, searchResults.sizeOrThrowNpe());
@ -176,10 +199,11 @@ public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuil
String jobId = mySvc.createNewJob(job, files);
mySvc.markJobAsReadyForActivation(jobId);
boolean activateJobOutcome = mySvc.activateNextReadyJob();
assertTrue(activateJobOutcome);
ActivateJobResult activateJobOutcome = mySvc.activateNextReadyJob();
assertTrue(activateJobOutcome.isActivated);
awaitAllBulkImportJobCompletion();
JobInstance instance = myBatch2JobHelper.awaitJobCompletion(activateJobOutcome.jobId);
assertNotNull(instance);
ArgumentCaptor<HookParams> paramsCaptor = ArgumentCaptor.forClass(HookParams.class);
verify(interceptor, times(50)).invoke(any(), paramsCaptor.capture());
@ -218,15 +242,11 @@ public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuil
return files;
}
@Order(3)
@Test
public void testJobsAreRegisteredWithJobRegistry() throws NoSuchJobException {
Job job = myJobRegistry.getJob(BULK_IMPORT_JOB_NAME);
assertEquals(true, job.isRestartable());
}
public void testJobsAreRegisteredWithJobRegistry() {
Optional<JobDefinition<?>> jobDefinitionOp = myJobDefinitionRegistry.getLatestJobDefinition(BULK_IMPORT_JOB_NAME);
protected List<JobExecution> awaitAllBulkImportJobCompletion() {
return myBatchJobHelper.awaitAllBulkJobCompletions(BatchConstants.BULK_IMPORT_JOB_NAME);
assertTrue(jobDefinitionOp.isPresent());
}
@Interceptor

View File

@ -8,18 +8,15 @@ import ca.uhn.fhir.jpa.dao.data.ITermConceptDao;
import ca.uhn.fhir.jpa.entity.TermCodeSystemVersion;
import ca.uhn.fhir.jpa.entity.TermConcept;
import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink;
import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.batch.core.JobExecution;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertFalse;

View File

@ -22,6 +22,7 @@ package ca.uhn.fhir.batch2.jobs.config;
import ca.uhn.fhir.batch2.jobs.export.BulkExportAppCtx;
import ca.uhn.fhir.batch2.jobs.expunge.DeleteExpungeAppCtx;
import ca.uhn.fhir.batch2.jobs.importpull.BulkImportPullConfig;
import ca.uhn.fhir.batch2.jobs.imprt.BulkImportAppCtx;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.jobs.termcodesystem.TermCodeSystemJobConfig;
@ -36,7 +37,8 @@ import org.springframework.context.annotation.Import;
ReindexAppCtx.class,
DeleteExpungeAppCtx.class,
BulkExportAppCtx.class,
TermCodeSystemJobConfig.class
TermCodeSystemJobConfig.class,
BulkImportPullConfig.class
})
public class Batch2JobsConfig {
}

View File

@ -0,0 +1,52 @@
package ca.uhn.fhir.batch2.jobs.importpull;
import ca.uhn.fhir.batch2.api.IJobParametersValidator;
import ca.uhn.fhir.batch2.importpull.models.Batch2BulkImportPullJobParameters;
import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.List;
import static org.slf4j.LoggerFactory.getLogger;
public class BulkImportParameterValidator implements IJobParametersValidator<Batch2BulkImportPullJobParameters> {
private static final Logger ourLog = getLogger(BulkImportParameterValidator.class);
private final IBulkDataImportSvc myBulkDataImportSvc;
public BulkImportParameterValidator(IBulkDataImportSvc theIBulkDataImportSvc) {
myBulkDataImportSvc = theIBulkDataImportSvc;
}
@Nullable
@Override
public List<String> validate(@NotNull Batch2BulkImportPullJobParameters theParameters) {
ourLog.info("BulkImportPull parameter validation begin");
ArrayList<String> errors = new ArrayList<>();
if (theParameters.getBatchSize() <= 0) {
errors.add("Batch size must be positive");
}
String jobId = theParameters.getJobId();
if (StringUtils.isEmpty(jobId)) {
errors.add("Bulk Import Pull requires an existing job id");
} else {
BulkImportJobJson job = myBulkDataImportSvc.fetchJob(jobId);
if (job == null) {
errors.add("There is no persistent job that exists with UUID: " + jobId);
}
}
ourLog.info("BulkImportPull parameter validation end");
return errors;
}
}

View File

@ -0,0 +1,76 @@
package ca.uhn.fhir.batch2.jobs.importpull;
import ca.uhn.fhir.batch2.importpull.models.Batch2BulkImportPullJobParameters;
import ca.uhn.fhir.batch2.importpull.models.BulkImportFilePartitionResult;
import ca.uhn.fhir.batch2.importpull.models.BulkImportRecord;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BulkImportPullConfig {
@Autowired
private FhirContext myFhirContext;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private IBulkDataImportSvc myBulkDataImportSvc;
@Bean
public JobDefinition bulkImportPullJobDefinition() {
return JobDefinition
.newBuilder()
.setJobDefinitionId(BatchConstants.BULK_IMPORT_JOB_NAME)
.setJobDescription("Performs bulk import pull job")
.setJobDefinitionVersion(1)
.gatedExecution()
.setParametersType(Batch2BulkImportPullJobParameters.class)
.setParametersValidator(importParameterValidator())
.addFirstStep(
"ReadInResourcesStep",
"Reads an import file and extracts the resources",
BulkImportFilePartitionResult.class,
fetchPartitionedFilesStep()
)
.addIntermediateStep(
"ReadInResourcesFromFileStep",
"Reads the import file to get the serialized bundles",
BulkImportRecord.class,
readInResourcesFromFileStep()
)
.addLastStep(
"WriteBundleStep",
"Parses the bundle from previous step and writes it to the dv",
writeBundleForImportStep()
)
.build();
}
@Bean
public BulkImportParameterValidator importParameterValidator() {
return new BulkImportParameterValidator(myBulkDataImportSvc);
}
@Bean
public FetchPartitionedFilesStep fetchPartitionedFilesStep() {
return new FetchPartitionedFilesStep(myBulkDataImportSvc);
}
@Bean
public ReadInResourcesFromFileStep readInResourcesFromFileStep() {
return new ReadInResourcesFromFileStep(myBulkDataImportSvc);
}
@Bean
public WriteBundleForImportStep writeBundleForImportStep() {
return new WriteBundleForImportStep(myFhirContext, myDaoRegistry);
}
}

View File

@ -0,0 +1,55 @@
package ca.uhn.fhir.batch2.jobs.importpull;
import ca.uhn.fhir.batch2.api.IFirstJobStepWorker;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.importpull.models.Batch2BulkImportPullJobParameters;
import ca.uhn.fhir.batch2.importpull.models.BulkImportFilePartitionResult;
import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import static org.slf4j.LoggerFactory.getLogger;
public class FetchPartitionedFilesStep implements IFirstJobStepWorker<Batch2BulkImportPullJobParameters, BulkImportFilePartitionResult> {
private static final Logger ourLog = getLogger(FetchPartitionedFilesStep.class);
private final IBulkDataImportSvc myBulkDataImportSvc;
public FetchPartitionedFilesStep(IBulkDataImportSvc theBulkDataImportSvc) {
myBulkDataImportSvc = theBulkDataImportSvc;
}
@NotNull
@Override
public RunOutcome run(
@NotNull StepExecutionDetails<Batch2BulkImportPullJobParameters, VoidModel> theStepExecutionDetails,
@NotNull IJobDataSink<BulkImportFilePartitionResult> theDataSink
) throws JobExecutionFailedException {
String jobId = theStepExecutionDetails.getParameters().getJobId();
ourLog.info("Start FetchPartitionedFilesStep for jobID {} ", jobId);
BulkImportJobJson job = myBulkDataImportSvc.fetchJob(jobId);
for (int i = 0; i < job.getFileCount(); i++) {
String fileDescription = myBulkDataImportSvc.getFileDescription(jobId, i);
BulkImportFilePartitionResult result = new BulkImportFilePartitionResult();
result.setFileIndex(i);
result.setProcessingMode(job.getProcessingMode());
result.setFileDescription(fileDescription);
result.setJobDescription(job.getJobDescription());
theDataSink.accept(result);
}
ourLog.info("FetchPartitionedFilesStep complete for jobID {}", jobId);
return RunOutcome.SUCCESS;
}
}

View File

@ -0,0 +1,88 @@
package ca.uhn.fhir.batch2.jobs.importpull;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.importpull.models.Batch2BulkImportPullJobParameters;
import ca.uhn.fhir.batch2.importpull.models.BulkImportFilePartitionResult;
import ca.uhn.fhir.batch2.importpull.models.BulkImportRecord;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson;
import ca.uhn.fhir.jpa.bulk.imprt.model.JobFileRowProcessingModeEnum;
import ca.uhn.fhir.util.IoUtil;
import com.google.common.io.LineReader;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.StringReader;
public class ReadInResourcesFromFileStep implements IJobStepWorker<Batch2BulkImportPullJobParameters, BulkImportFilePartitionResult, BulkImportRecord> {
private static final Logger ourLog = LoggerFactory.getLogger(ReadInResourcesFromFileStep.class);
private final IBulkDataImportSvc myBulkDataImportSvc;
public ReadInResourcesFromFileStep(IBulkDataImportSvc theBulkDataImportSvc) {
myBulkDataImportSvc = theBulkDataImportSvc;
}
// because we are using an unstable google api
@SuppressWarnings("UnstableApiUsage")
@NotNull
@Override
public RunOutcome run(
@NotNull StepExecutionDetails<Batch2BulkImportPullJobParameters, BulkImportFilePartitionResult> theStepExecutionDetails,
@NotNull IJobDataSink<BulkImportRecord> theDataSink
) throws JobExecutionFailedException {
String jobId = theStepExecutionDetails.getParameters().getJobId();
int fileIndex = theStepExecutionDetails.getData().getFileIndex();
JobFileRowProcessingModeEnum mode = theStepExecutionDetails.getData().getProcessingMode();
ourLog.info("ReadInResourcesFromFileStep for jobId {} begin", jobId);
BulkImportJobFileJson file = myBulkDataImportSvc.fetchFile(jobId, fileIndex);
String tenantName = file.getTenantName();
String contents = file.getContents();
StringReader reader = new StringReader(contents);
// data explodes into even more chunks
LineReader lineReader = new LineReader(reader);
try {
int lineIndex = 0;
String nextLine;
do {
nextLine = lineReader.readLine();
if (nextLine != null) {
BulkImportRecord record = new BulkImportRecord();
record.setResourceString(nextLine);
record.setLineIndex(lineIndex);
record.setTenantName(tenantName);
record.setProcessingMode(mode);
record.setFileIndex(fileIndex);
theDataSink.accept(record);
}
lineIndex++;
} while (nextLine != null);
} catch (IOException ex) {
ourLog.error("Failed to read file : " + ex.getMessage());
throw new JobExecutionFailedException(Msg.code(2107)
+ " : Could not read file"
);
} finally {
IoUtil.closeQuietly(reader);
}
ourLog.info("ReadInResourcesFromFileStep for jobId {} end", jobId);
return RunOutcome.SUCCESS;
}
}

View File

@ -0,0 +1,83 @@
package ca.uhn.fhir.batch2.jobs.importpull;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.ILastJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.importpull.models.Batch2BulkImportPullJobParameters;
import ca.uhn.fhir.batch2.importpull.models.BulkImportRecord;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.bulk.imprt.model.JobFileRowProcessingModeEnum;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.parser.IParser;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WriteBundleForImportStep implements ILastJobStepWorker<Batch2BulkImportPullJobParameters, BulkImportRecord> {
private static final Logger ourLog = LoggerFactory.getLogger(WriteBundleForImportStep.class);
private final FhirContext myFhirContext;
private final DaoRegistry myDaoRegistry;
public WriteBundleForImportStep(FhirContext theFhirContext, DaoRegistry theDaoRegistry) {
myFhirContext = theFhirContext;
myDaoRegistry = theDaoRegistry;
}
@SuppressWarnings({"SwitchStatementWithTooFewBranches", "rawtypes", "unchecked"})
@NotNull
@Override
public RunOutcome run(
@NotNull StepExecutionDetails<Batch2BulkImportPullJobParameters, BulkImportRecord> theStepExecutionDetails,
@NotNull IJobDataSink<VoidModel> theDataSink
) throws JobExecutionFailedException {
BulkImportRecord record = theStepExecutionDetails.getData();
JobFileRowProcessingModeEnum mode = record.getProcessingMode();
int fileIndex = record.getFileIndex();
String content = record.getResourceString();
String tenantName = record.getTenantName();
int lineIndex = record.getLineIndex();
String jobId = theStepExecutionDetails.getParameters().getJobId();
ourLog.info(
"Beginning bulk import write row {} for Job[{}] FileIndex[{}]",
lineIndex,
jobId,
fileIndex
);
IParser parser = myFhirContext.newJsonParser();
SystemRequestDetails requestDetails = new SystemRequestDetails();
requestDetails.setTenantId(tenantName);
IBaseResource bundle = parser.parseResource(content);
// Yeah this is a lame switch - We'll add more later I swear
switch (mode) {
default:
case FHIR_TRANSACTION:
IFhirSystemDao systemDao = myDaoRegistry.getSystemDao();
systemDao.transaction(requestDetails, bundle);
break;
}
ourLog.info(
"Completed bulk import write for row {} Job[{}] FileIndex[{}]",
lineIndex,
jobId,
fileIndex
);
return RunOutcome.SUCCESS;
}
}

View File

@ -24,7 +24,6 @@ import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.configuration.DuplicateJobException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
@ -39,8 +38,7 @@ public class Batch2JobRegisterer {
@PostConstruct
public void start() throws DuplicateJobException {
public void start() {
Map<String, JobDefinition> batchJobs = myApplicationContext.getBeansOfType(JobDefinition.class);
JobDefinitionRegistry jobRegistry = myApplicationContext.getBean(JobDefinitionRegistry.class);

View File

@ -327,7 +327,11 @@ public class StepExecutionSvc {
// TODO - marking for posterity
// see comments on MAX_CHUNK_ERROR_COUNT
if (chunk.getErrorCount() > MAX_CHUNK_ERROR_COUNT) {
myJobPersistence.markWorkChunkAsFailed(chunkId, "Too many errors: " + chunk.getErrorCount());
String errorMsg = "Too many errors: "
+ chunk.getErrorCount()
+ ". Last error msg was "
+ e.getMessage();
myJobPersistence.markWorkChunkAsFailed(chunkId, errorMsg);
return false;
}
}

View File

@ -0,0 +1,29 @@
package ca.uhn.fhir.batch2.importpull.models;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
public class Batch2BulkImportPullJobParameters implements IModelJson {
@JsonProperty("jobId")
private String myJobId;
@JsonProperty("batchSize")
private long myBatchSize;
public String getJobId() {
return myJobId;
}
public void setJobId(String theJobId) {
myJobId = theJobId;
}
public long getBatchSize() {
return myBatchSize;
}
public void setBatchSize(long theBatchSize) {
myBatchSize = theBatchSize;
}
}

View File

@ -0,0 +1,64 @@
package ca.uhn.fhir.batch2.importpull.models;
import ca.uhn.fhir.jpa.bulk.imprt.model.JobFileRowProcessingModeEnum;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
public class BulkImportFilePartitionResult implements IModelJson {
/**
* The file index for the import job
*/
@JsonProperty("fileIndex")
private int myFileIndex;
/**
* Row processing mode
*/
@JsonProperty("rowProcessingMode")
private JobFileRowProcessingModeEnum myProcessingMode;
/**
* The job description
*/
@JsonProperty("jobDescription")
private String myJobDescription;
/**
* The file description
*/
@JsonProperty("fileDescription")
private String myFileDescription;
public int getFileIndex() {
return myFileIndex;
}
public void setFileIndex(int theFileIndex) {
myFileIndex = theFileIndex;
}
public JobFileRowProcessingModeEnum getProcessingMode() {
return myProcessingMode;
}
public void setProcessingMode(JobFileRowProcessingModeEnum theProcessingMode) {
myProcessingMode = theProcessingMode;
}
public String getJobDescription() {
return myJobDescription;
}
public void setJobDescription(String theJobDescription) {
myJobDescription = theJobDescription;
}
public String getFileDescription() {
return myFileDescription;
}
public void setFileDescription(String theFileDescription) {
myFileDescription = theFileDescription;
}
}

View File

@ -0,0 +1,78 @@
package ca.uhn.fhir.batch2.importpull.models;
import ca.uhn.fhir.jpa.bulk.imprt.model.JobFileRowProcessingModeEnum;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
public class BulkImportRecord implements IModelJson {
/**
* Stringified version of the resource
*/
@JsonProperty("resource")
private String myResourceString;
/**
* Name of the tenant from the bulk import job file
*/
@JsonProperty("tenantName")
private String myTenantName;
/**
* The line index (starting at 1; for backwards compatibility)
* of the import file from which the resource was read.
*/
@JsonProperty("lineIndex")
private int myLineIndex;
/**
* The file index for the import job
*/
@JsonProperty("fileIndex")
private int myFileIndex;
/**
* Row processing mode
*/
@JsonProperty("rowProcessingMode")
private JobFileRowProcessingModeEnum myProcessingMode;
public String getResourceString() {
return myResourceString;
}
public void setResourceString(String theResourceString) {
myResourceString = theResourceString;
}
public String getTenantName() {
return myTenantName;
}
public void setTenantName(String theTenantName) {
myTenantName = theTenantName;
}
public int getLineIndex() {
return myLineIndex;
}
public void setLineIndex(int theLineIndex) {
myLineIndex = theLineIndex;
}
public int getFileIndex() {
return myFileIndex;
}
public void setFileIndex(int theFileIndex) {
myFileIndex = theFileIndex;
}
public JobFileRowProcessingModeEnum getProcessingMode() {
return myProcessingMode;
}
public void setProcessingMode(JobFileRowProcessingModeEnum theProcessingMode) {
myProcessingMode = theProcessingMode;
}
}

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.bulk.imprt.api;
* #L%
*/
import ca.uhn.fhir.jpa.bulk.imprt.model.ActivateJobResult;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum;
@ -91,7 +92,7 @@ public interface IBulkDataImportSvc {
*
* @return Returns {@literal true} if a job was activated
*/
boolean activateNextReadyJob();
ActivateJobResult activateNextReadyJob();
/**
* Updates the job status for the given job

View File

@ -0,0 +1,23 @@
package ca.uhn.fhir.jpa.bulk.imprt.model;
import javax.annotation.Nullable;
public class ActivateJobResult {
/**
* Whether the job is activated or not
*/
public final boolean isActivated;
/**
* The newly created jobid
*/
@Nullable
public final String jobId;
public ActivateJobResult(boolean theIsActivated,
@Nullable String theJobId) {
isActivated = theIsActivated;
jobId = theJobId;
}
}