diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/i18n/Msg.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/i18n/Msg.java index 27b7dacee57..7dfb10a52c6 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/i18n/Msg.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/i18n/Msg.java @@ -25,7 +25,7 @@ public final class Msg { /** * IMPORTANT: Please update the following comment after you add a new code - * Last code value: 2123 + * Last code value: 2124 */ private Msg() {} diff --git a/hapi-fhir-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/BatchJobConfig.java b/hapi-fhir-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/BatchJobConfig.java index c856af092ca..23502a0dce2 100644 --- a/hapi-fhir-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/BatchJobConfig.java +++ b/hapi-fhir-batch/src/test/java/ca/uhn/fhir/jpa/batch/config/BatchJobConfig.java @@ -6,7 +6,6 @@ import org.springframework.batch.core.configuration.annotation.JobBuilderFactory import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.step.tasklet.Tasklet; -import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_1_0/3846-migrate-bulk-import-pull-to-batch2.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_1_0/3846-migrate-bulk-import-pull-to-batch2.yaml new file mode 100644 index 00000000000..0c2c82bed3a --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_1_0/3846-migrate-bulk-import-pull-to-batch2.yaml @@ -0,0 +1,4 @@ +--- +type: fix +issue: 3846 +title: "Migrated Bulk Import Pull to Batch2 Framework" diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/BatchJobsConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/BatchJobsConfig.java index af01472fba6..5e0398d9b00 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/BatchJobsConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/BatchJobsConfig.java @@ -22,8 +22,6 @@ package ca.uhn.fhir.jpa.batch; import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; import ca.uhn.fhir.jpa.batch.svc.BatchJobSubmitterImpl; -import ca.uhn.fhir.jpa.bulk.export.job.BulkExportJobConfig; -import ca.uhn.fhir.jpa.bulk.imprt.job.BulkImportJobConfig; import ca.uhn.fhir.jpa.config.BatchJobRegisterer; import org.springframework.batch.core.configuration.JobRegistry; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; @@ -38,9 +36,7 @@ import org.springframework.context.annotation.Import; @Configuration @EnableBatchProcessing @Import({ - CommonBatchJobConfig.class, - BulkExportJobConfig.class, - BulkImportJobConfig.class + CommonBatchJobConfig.class // When you define a new batch job, add it here. }) @Deprecated diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/CommonBatchJobConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/CommonBatchJobConfig.java index 5dfd4ff4bbd..857c41f51ff 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/CommonBatchJobConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/CommonBatchJobConfig.java @@ -26,6 +26,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration +@Deprecated public class CommonBatchJobConfig { public static final int MINUTES_IN_FUTURE_TO_PROCESS_FROM = 1; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processor/GoldenResourceAnnotatingProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processor/GoldenResourceAnnotatingProcessor.java index 78b5985d908..bf1dd6d11d8 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processor/GoldenResourceAnnotatingProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processor/GoldenResourceAnnotatingProcessor.java @@ -49,6 +49,7 @@ import static ca.uhn.fhir.jpa.batch.config.BatchConstants.PATIENT_BULK_EXPORT_FO * Reusable Item Processor which attaches an extension to any outgoing resource. This extension will contain a resource * reference to the golden resource patient of the given resources' patient. (e.g. Observation.subject, Immunization.patient, etc) */ +@Deprecated public class GoldenResourceAnnotatingProcessor implements ItemProcessor, List> { private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaBatch2Config.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaBatch2Config.java index 6dcd36085da..1949a0eabf6 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaBatch2Config.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaBatch2Config.java @@ -23,13 +23,18 @@ package ca.uhn.fhir.jpa.batch2; import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.config.BaseBatch2Config; import ca.uhn.fhir.batch2.coordinator.SynchronizedJobPersistenceWrapper; +import ca.uhn.fhir.jpa.bulk.export.job.BulkExportJobConfig; import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository; import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Primary; @Configuration +@Import({ + BulkExportJobConfig.class +}) public class JpaBatch2Config extends BaseBatch2Config { @Bean diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BulkExportJobConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BulkExportJobConfig.java index ebbbf6a32c2..3edf78610d3 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BulkExportJobConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BulkExportJobConfig.java @@ -30,8 +30,6 @@ import org.springframework.context.annotation.Configuration; */ @Configuration public class BulkExportJobConfig { - public static final int CHUNK_SIZE = 100; - @Bean public MdmExpansionCacheSvc mdmExpansionCacheSvc() { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/.keep b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/.keep new file mode 100644 index 00000000000..e69de29bb2d diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/ActivateBulkImportEntityStepListener.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/ActivateBulkImportEntityStepListener.java deleted file mode 100644 index c3718152d15..00000000000 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/ActivateBulkImportEntityStepListener.java +++ /dev/null @@ -1,51 +0,0 @@ -package ca.uhn.fhir.jpa.bulk.imprt.job; - -/*- - * #%L - * HAPI FHIR JPA Server - * %% - * Copyright (C) 2014 - 2022 Smile CDR, Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import ca.uhn.fhir.jpa.batch.config.BatchConstants; -import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; -import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum; -import org.springframework.batch.core.ExitStatus; -import org.springframework.batch.core.StepExecution; -import org.springframework.batch.core.StepExecutionListener; -import org.springframework.beans.factory.annotation.Autowired; - -/** - * Will run before and after a job to set the status to whatever is appropriate. - */ -public class ActivateBulkImportEntityStepListener implements StepExecutionListener { - - @Autowired - private IBulkDataImportSvc myBulkImportDaoSvc; - - @Override - public void beforeStep(StepExecution theStepExecution) { - String jobUuid = theStepExecution.getJobExecution().getJobParameters().getString(BatchConstants.JOB_UUID_PARAMETER); - if (jobUuid != null) { - myBulkImportDaoSvc.setJobToStatus(jobUuid, BulkImportJobStatusEnum.RUNNING); - } - } - - @Override - public ExitStatus afterStep(StepExecution theStepExecution) { - return ExitStatus.EXECUTING; - } -} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportFileReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportFileReader.java deleted file mode 100644 index 74ba8b27f9d..00000000000 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportFileReader.java +++ /dev/null @@ -1,76 +0,0 @@ -package ca.uhn.fhir.jpa.bulk.imprt.job; - -/*- - * #%L - * HAPI FHIR JPA Server - * %% - * Copyright (C) 2014 - 2022 Smile CDR, Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import ca.uhn.fhir.context.FhirContext; -import ca.uhn.fhir.jpa.batch.config.BatchConstants; -import ca.uhn.fhir.jpa.batch.log.Logs; -import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; -import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson; -import ca.uhn.fhir.jpa.bulk.imprt.model.ParsedBulkImportRecord; -import ca.uhn.fhir.util.IoUtil; -import com.google.common.io.LineReader; -import org.hl7.fhir.instance.model.api.IBaseResource; -import org.springframework.batch.item.ItemReader; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; - -import java.io.StringReader; - -@SuppressWarnings("UnstableApiUsage") -public class BulkImportFileReader implements ItemReader { - - @Autowired - private IBulkDataImportSvc myBulkDataImportSvc; - @Autowired - private FhirContext myFhirContext; - @Value("#{stepExecutionContext['" + BatchConstants.JOB_UUID_PARAMETER + "']}") - private String myJobUuid; - @Value("#{stepExecutionContext['" + BulkImportPartitioner.FILE_INDEX + "']}") - private int myFileIndex; - - private StringReader myReader; - private LineReader myLineReader; - private int myLineIndex; - private String myTenantName; - - @Override - public ParsedBulkImportRecord read() throws Exception { - - if (myReader == null) { - BulkImportJobFileJson file = myBulkDataImportSvc.fetchFile(myJobUuid, myFileIndex); - myTenantName = file.getTenantName(); - myReader = new StringReader(file.getContents()); - myLineReader = new LineReader(myReader); - } - - String nextLine = myLineReader.readLine(); - if (nextLine == null) { - IoUtil.closeQuietly(myReader); - return null; - } - - Logs.getBatchTroubleshootingLog().debug("Reading line {} file index {} for job: {}", myLineIndex++, myFileIndex, myJobUuid); - - IBaseResource parsed = myFhirContext.newJsonParser().parseResource(nextLine); - return new ParsedBulkImportRecord(myTenantName, parsed, myLineIndex); - } -} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportFileWriter.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportFileWriter.java deleted file mode 100644 index ccaa2cc4ff0..00000000000 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportFileWriter.java +++ /dev/null @@ -1,85 +0,0 @@ -package ca.uhn.fhir.jpa.bulk.imprt.job; - -/*- - * #%L - * HAPI FHIR JPA Server - * %% - * Copyright (C) 2014 - 2022 Smile CDR, Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import ca.uhn.fhir.jpa.api.dao.DaoRegistry; -import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; -import ca.uhn.fhir.jpa.batch.config.BatchConstants; -import ca.uhn.fhir.jpa.bulk.imprt.model.JobFileRowProcessingModeEnum; -import ca.uhn.fhir.jpa.bulk.imprt.model.ParsedBulkImportRecord; -import ca.uhn.fhir.jpa.partition.SystemRequestDetails; -import ca.uhn.fhir.util.StopWatch; -import org.hl7.fhir.instance.model.api.IBaseResource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.batch.item.ItemWriter; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.transaction.support.TransactionSynchronizationManager; - -import java.util.List; - -public class BulkImportFileWriter implements ItemWriter { - - private static final Logger ourLog = LoggerFactory.getLogger(BulkImportFileWriter.class); - @Value("#{stepExecutionContext['" + BatchConstants.JOB_UUID_PARAMETER + "']}") - private String myJobUuid; - @Value("#{stepExecutionContext['" + BulkImportPartitioner.FILE_INDEX + "']}") - private int myFileIndex; - @Value("#{stepExecutionContext['" + BulkImportPartitioner.ROW_PROCESSING_MODE + "']}") - private JobFileRowProcessingModeEnum myRowProcessingMode; - @Autowired - private DaoRegistry myDaoRegistry; - - @SuppressWarnings({"SwitchStatementWithTooFewBranches", "rawtypes", "unchecked"}) - @Override - public void write(List theItemLists) throws Exception { - assert TransactionSynchronizationManager.isActualTransactionActive(); - - String offsets = "unknown"; - if (theItemLists.size() > 0) { - offsets = theItemLists.get(0).getLineIndex() + " - " + theItemLists.get(theItemLists.size()-1).getLineIndex(); - } - - ourLog.info("Beginning bulk import write {} rows Job[{}] FileIndex[{}] Offset[{}]", theItemLists.size(), myJobUuid, myFileIndex, offsets); - StopWatch sw = new StopWatch(); - - for (ParsedBulkImportRecord nextItem : theItemLists) { - - SystemRequestDetails requestDetails = new SystemRequestDetails(); - requestDetails.setTenantId(nextItem.getTenantName()); - - // Yeah this is a lame switch - We'll add more later I swear - switch (myRowProcessingMode) { - default: - case FHIR_TRANSACTION: - IFhirSystemDao systemDao = myDaoRegistry.getSystemDao(); - IBaseResource inputBundle = nextItem.getRowContent(); - systemDao.transactionNested(requestDetails, inputBundle); - break; - } - - } - - ourLog.info("Completed bulk import write {} rows Job[{}] FileIndex[{}] Offset[{}] in {}", theItemLists.size(), myJobUuid, myFileIndex, offsets, sw); - } - -} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportJobCloser.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportJobCloser.java deleted file mode 100644 index 1598d63a867..00000000000 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportJobCloser.java +++ /dev/null @@ -1,57 +0,0 @@ -package ca.uhn.fhir.jpa.bulk.imprt.job; - -/*- - * #%L - * HAPI FHIR JPA Server - * %% - * Copyright (C) 2014 - 2022 Smile CDR, Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import ca.uhn.fhir.jpa.batch.config.BatchConstants; -import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; -import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum; -import org.springframework.batch.core.BatchStatus; -import org.springframework.batch.core.StepContribution; -import org.springframework.batch.core.scope.context.ChunkContext; -import org.springframework.batch.core.step.tasklet.Tasklet; -import org.springframework.batch.repeat.RepeatStatus; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; - -/** - * Will run before and after a job to set the status to whatever is appropriate. - */ -public class BulkImportJobCloser implements Tasklet { - - @Value("#{jobParameters['" + BatchConstants.JOB_UUID_PARAMETER + "']}") - private String myJobUUID; - - @Autowired - private IBulkDataImportSvc myBulkDataImportSvc; - - @Override - public RepeatStatus execute(StepContribution theStepContribution, ChunkContext theChunkContext) { - BatchStatus executionStatus = theChunkContext.getStepContext().getStepExecution().getJobExecution().getStatus(); - if (executionStatus == BatchStatus.STARTED) { - myBulkDataImportSvc.setJobToStatus(myJobUUID, BulkImportJobStatusEnum.COMPLETE); - myBulkDataImportSvc.deleteJobFiles(myJobUUID); - } else { - myBulkDataImportSvc.setJobToStatus(myJobUUID, BulkImportJobStatusEnum.ERROR, "Found job in status: " + executionStatus); - myBulkDataImportSvc.deleteJobFiles(myJobUUID); - } - return RepeatStatus.FINISHED; - } -} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportJobConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportJobConfig.java deleted file mode 100644 index 4fce96371b1..00000000000 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportJobConfig.java +++ /dev/null @@ -1,204 +0,0 @@ -package ca.uhn.fhir.jpa.bulk.imprt.job; - -/*- - * #%L - * HAPI FHIR JPA Server - * %% - * Copyright (C) 2014 - 2022 Smile CDR, Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import ca.uhn.fhir.jpa.api.config.DaoConfig; -import ca.uhn.fhir.jpa.batch.config.BatchConstants; -import ca.uhn.fhir.jpa.bulk.imprt.model.ParsedBulkImportRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.batch.core.Job; -import org.springframework.batch.core.JobParametersValidator; -import org.springframework.batch.core.Step; -import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; -import org.springframework.batch.core.configuration.annotation.JobScope; -import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; -import org.springframework.batch.core.configuration.annotation.StepScope; -import org.springframework.batch.core.partition.PartitionHandler; -import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler; -import org.springframework.batch.core.step.item.KeyGenerator; -import org.springframework.batch.item.ItemWriter; -import org.springframework.batch.repeat.CompletionPolicy; -import org.springframework.batch.repeat.RepeatContext; -import org.springframework.batch.repeat.exception.ExceptionHandler; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Lazy; -import org.springframework.core.task.TaskExecutor; -import org.springframework.retry.RetryPolicy; -import org.springframework.retry.policy.CompositeRetryPolicy; -import org.springframework.retry.policy.SimpleRetryPolicy; -import org.springframework.retry.policy.TimeoutRetryPolicy; - -import javax.batch.api.chunk.listener.RetryProcessListener; - -import static ca.uhn.fhir.jpa.batch.config.BatchConstants.BULK_IMPORT_JOB_NAME; -import static ca.uhn.fhir.jpa.batch.config.BatchConstants.BULK_IMPORT_PROCESSING_STEP; - -/** - * Spring batch Job configuration file. Contains all necessary plumbing to run a - * Bulk Export job. - */ -@Configuration -public class BulkImportJobConfig { - - public static final String JOB_PARAM_COMMIT_INTERVAL = "commitInterval"; - private static final Logger ourLog = LoggerFactory.getLogger(BulkImportJobConfig.class); - @Autowired - private StepBuilderFactory myStepBuilderFactory; - @Autowired - private JobBuilderFactory myJobBuilderFactory; - @Autowired - @Qualifier(BatchConstants.JOB_LAUNCHING_TASK_EXECUTOR) - private TaskExecutor myTaskExecutor; - @Autowired - private DaoConfig myDaoConfig; - - @Bean(name = BULK_IMPORT_JOB_NAME) - @Lazy - public Job bulkImportJob() throws Exception { - return myJobBuilderFactory.get(BULK_IMPORT_JOB_NAME) - .validator(bulkImportJobParameterValidator()) - .start(bulkImportProcessingStep()) - .next(bulkImportCloseJobStep()) - .build(); - } - - @Bean - public JobParametersValidator bulkImportJobParameterValidator() { - return new BulkImportJobParameterValidator(); - } - - @Bean - public CreateBulkImportEntityTasklet createBulkImportEntityTasklet() { - return new CreateBulkImportEntityTasklet(); - } - - @Bean - @JobScope - public ActivateBulkImportEntityStepListener activateBulkImportEntityStepListener() { - return new ActivateBulkImportEntityStepListener(); - } - - @Bean - public Step bulkImportProcessingStep() throws Exception { - return myStepBuilderFactory.get(BULK_IMPORT_PROCESSING_STEP) - .partitioner(BULK_IMPORT_PROCESSING_STEP, bulkImportPartitioner()) - .partitionHandler(partitionHandler()) - .listener(activateBulkImportEntityStepListener()) - .listener(errorLisener()) - .gridSize(10) - .build(); - } - - private ChunkAroundListener errorLisener() { - return new ChunkAroundListener(); - } - - private PartitionHandler partitionHandler() throws Exception { - assert myTaskExecutor != null; - - TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler(); - retVal.setStep(bulkImportProcessFilesStep()); - retVal.setTaskExecutor(myTaskExecutor); - retVal.afterPropertiesSet(); - return retVal; - } - - @Bean - public Step bulkImportCloseJobStep() { - return myStepBuilderFactory.get("bulkImportCloseJobStep") - .tasklet(bulkImportJobCloser()) - .build(); - } - - @Bean - @JobScope - public BulkImportJobCloser bulkImportJobCloser() { - return new BulkImportJobCloser(); - } - - @Bean - @JobScope - public BulkImportPartitioner bulkImportPartitioner() { - return new BulkImportPartitioner(); - } - - @Bean - public Step bulkImportProcessFilesStep() { - - return myStepBuilderFactory.get("bulkImportProcessFilesStep") - .chunk(completionPolicy()) - .reader(bulkImportFileReader()) - .writer(bulkImportFileWriter()) - .listener(bulkImportStepListener()) - .listener(completionPolicy()) - .faultTolerant() - .retryPolicy(bulkImportProcessFilesStepRetryPolicy()) - .build(); - } - - private RetryPolicy bulkImportProcessFilesStepRetryPolicy() { - TimeoutRetryPolicy timeoutPolicy = new TimeoutRetryPolicy(); - timeoutPolicy.setTimeout(10000); - - SimpleRetryPolicy countRetryPolicy = new SimpleRetryPolicy(myDaoConfig.getBulkImportMaxRetryCount()); - - CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy(); - compositeRetryPolicy.setPolicies(new RetryPolicy[]{timeoutPolicy, countRetryPolicy}); - return compositeRetryPolicy; - } - - @Bean - @StepScope - public CompletionPolicy completionPolicy() { - return new BulkImportProcessStepCompletionPolicy(); - } - - @Bean - @StepScope - public BulkImportFileWriter bulkImportFileWriter() { - return new BulkImportFileWriter(); - } - - @Bean - @StepScope - public BulkImportFileReader bulkImportFileReader() { - return new BulkImportFileReader(); - } - - @Bean - @StepScope - public BulkImportStepListener bulkImportStepListener() { - return new BulkImportStepListener(); - } - - public static class ChunkAroundListener implements RetryProcessListener { - - @Override - public void onRetryProcessException(Object item, Exception ex) throws Exception { - throw ex; - } - } - -} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportJobParameterValidator.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportJobParameterValidator.java deleted file mode 100644 index 280f1c04214..00000000000 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportJobParameterValidator.java +++ /dev/null @@ -1,71 +0,0 @@ -package ca.uhn.fhir.jpa.bulk.imprt.job; - -/*- - * #%L - * HAPI FHIR JPA Server - * %% - * Copyright (C) 2014 - 2022 Smile CDR, Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import ca.uhn.fhir.i18n.Msg; -import ca.uhn.fhir.jpa.batch.config.BatchConstants; -import ca.uhn.fhir.jpa.dao.data.IBulkImportJobDao; -import ca.uhn.fhir.jpa.entity.BulkImportJobEntity; -import org.apache.commons.lang3.StringUtils; -import org.springframework.batch.core.JobParameters; -import org.springframework.batch.core.JobParametersInvalidException; -import org.springframework.batch.core.JobParametersValidator; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.transaction.support.TransactionTemplate; - -import java.util.Optional; - -/** - * This class will prevent a job from running if the UUID does not exist or is invalid. - */ -public class BulkImportJobParameterValidator implements JobParametersValidator { - - @Autowired - private IBulkImportJobDao myBulkImportJobDao; - @Autowired - private PlatformTransactionManager myTransactionManager; - - @Override - public void validate(JobParameters theJobParameters) throws JobParametersInvalidException { - if (theJobParameters == null) { - throw new JobParametersInvalidException(Msg.code(784) + "This job needs Parameters: [jobUUID]"); - } - - TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager); - String errorMessage = txTemplate.execute(tx -> { - StringBuilder errorBuilder = new StringBuilder(); - String jobUUID = theJobParameters.getString(BatchConstants.JOB_UUID_PARAMETER); - Optional oJob = myBulkImportJobDao.findByJobId(jobUUID); - if (!StringUtils.isBlank(jobUUID) && !oJob.isPresent()) { - errorBuilder.append("There is no persisted job that exists with UUID: "); - errorBuilder.append(jobUUID); - errorBuilder.append(". "); - } - - return errorBuilder.toString(); - }); - - if (!StringUtils.isEmpty(errorMessage)) { - throw new JobParametersInvalidException(Msg.code(785) + errorMessage); - } - } -} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportPartitioner.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportPartitioner.java deleted file mode 100644 index 782daa89456..00000000000 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportPartitioner.java +++ /dev/null @@ -1,78 +0,0 @@ -package ca.uhn.fhir.jpa.bulk.imprt.job; - -/*- - * #%L - * HAPI FHIR JPA Server - * %% - * Copyright (C) 2014 - 2022 Smile CDR, Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import ca.uhn.fhir.jpa.batch.config.BatchConstants; -import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; -import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson; -import org.slf4j.Logger; -import org.springframework.batch.core.partition.support.Partitioner; -import org.springframework.batch.item.ExecutionContext; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; - -import javax.annotation.Nonnull; -import java.util.HashMap; -import java.util.Map; - -import static org.slf4j.LoggerFactory.getLogger; - -public class BulkImportPartitioner implements Partitioner { - public static final String FILE_INDEX = "fileIndex"; - public static final String FILE_DESCRIPTION = "fileDescription"; - public static final String JOB_DESCRIPTION = "jobDescription"; - public static final String ROW_PROCESSING_MODE = "rowProcessingMode"; - - private static final Logger ourLog = getLogger(BulkImportPartitioner.class); - - @Value("#{jobParameters['" + BatchConstants.JOB_UUID_PARAMETER + "']}") - private String myJobUUID; - - @Autowired - private IBulkDataImportSvc myBulkDataImportSvc; - - @Nonnull - @Override - public Map partition(int gridSize) { - Map retVal = new HashMap<>(); - - BulkImportJobJson job = myBulkDataImportSvc.fetchJob(myJobUUID); - - for (int i = 0; i < job.getFileCount(); i++) { - - String fileDescription = myBulkDataImportSvc.getFileDescription(myJobUUID, i); - - ExecutionContext context = new ExecutionContext(); - context.putString(BatchConstants.JOB_UUID_PARAMETER, myJobUUID); - context.putInt(FILE_INDEX, i); - context.put(ROW_PROCESSING_MODE, job.getProcessingMode()); - context.put(JOB_DESCRIPTION, job.getJobDescription()); - context.put(FILE_DESCRIPTION, fileDescription); - - String key = "FILE" + i + ":" + fileDescription; - retVal.put(key, context); - } - - return retVal; - } - - -} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportProcessStepCompletionPolicy.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportProcessStepCompletionPolicy.java deleted file mode 100644 index 2f750be17da..00000000000 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportProcessStepCompletionPolicy.java +++ /dev/null @@ -1,41 +0,0 @@ -package ca.uhn.fhir.jpa.bulk.imprt.job; - -/*- - * #%L - * HAPI FHIR JPA Server - * %% - * Copyright (C) 2014 - 2022 Smile CDR, Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.springframework.batch.repeat.RepeatContext; -import org.springframework.batch.repeat.policy.CompletionPolicySupport; -import org.springframework.beans.factory.annotation.Value; - -import static ca.uhn.fhir.jpa.bulk.imprt.job.BulkImportJobConfig.JOB_PARAM_COMMIT_INTERVAL; - -public class BulkImportProcessStepCompletionPolicy extends CompletionPolicySupport { - - @Value("#{jobParameters['" + JOB_PARAM_COMMIT_INTERVAL + "']}") - private int myChunkSize; - - @Override - public boolean isComplete(RepeatContext context) { - if (context.getStartedCount() < myChunkSize) { - return false; - } - return true; - } -} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportStepListener.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportStepListener.java deleted file mode 100644 index 20fb319c8a1..00000000000 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/BulkImportStepListener.java +++ /dev/null @@ -1,85 +0,0 @@ -package ca.uhn.fhir.jpa.bulk.imprt.job; - -/*- - * #%L - * HAPI FHIR JPA Server - * %% - * Copyright (C) 2014 - 2022 Smile CDR, Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import ca.uhn.fhir.jpa.batch.config.BatchConstants; -import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; -import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum; -import org.springframework.batch.core.ExitStatus; -import org.springframework.batch.core.StepExecution; -import org.springframework.batch.core.StepExecutionListener; -import org.springframework.batch.core.jsr.RetryListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.retry.ExhaustedRetryException; - -import javax.annotation.Nonnull; - -import static org.apache.commons.lang3.StringUtils.isNotBlank; - -/** - * This class sets the job status to ERROR if any failures occur while actually - * generating the export files. - */ -public class BulkImportStepListener implements StepExecutionListener, RetryListener { - - @Autowired - private IBulkDataImportSvc myBulkDataImportSvc; - - @Override - public void beforeStep(@Nonnull StepExecution stepExecution) { - // nothing - } - - @Override - public ExitStatus afterStep(StepExecution theStepExecution) { - if (theStepExecution.getExitStatus().getExitCode().equals(ExitStatus.FAILED.getExitCode())) { - //Try to fetch it from the parameters first, and if it doesn't exist, fetch it from the context. - String jobUuid = theStepExecution.getJobExecution().getJobParameters().getString(BatchConstants.JOB_UUID_PARAMETER); - if (jobUuid == null) { - jobUuid = theStepExecution.getJobExecution().getExecutionContext().getString(BatchConstants.JOB_UUID_PARAMETER); - } - assert isNotBlank(jobUuid); - - StringBuilder message = new StringBuilder(); - message.append("Job: ").append(theStepExecution.getExecutionContext().getString(BulkImportPartitioner.JOB_DESCRIPTION)).append("\n"); - message.append("File: ").append(theStepExecution.getExecutionContext().getString(BulkImportPartitioner.FILE_DESCRIPTION)).append("\n"); - for (Throwable next : theStepExecution.getFailureExceptions()) { - if (next instanceof ExhaustedRetryException) { - next = next.getCause(); // ExhaustedRetryException is a spring exception that wraps the real one - } - String nextErrorMessage = next.toString(); - message.append("Error: ").append(nextErrorMessage).append("\n"); - } - - theStepExecution.addFailureException(new RuntimeException(message.toString())); - - myBulkDataImportSvc.setJobToStatus(jobUuid, BulkImportJobStatusEnum.ERROR, message.toString()); - - ExitStatus exitStatus = ExitStatus.FAILED.addExitDescription(message.toString()); - theStepExecution.setExitStatus(exitStatus); - - // Replace the built-in error message with a better one - return exitStatus; - } - - return theStepExecution.getExitStatus(); - } -} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/CreateBulkImportEntityTasklet.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/CreateBulkImportEntityTasklet.java deleted file mode 100644 index f4e465d8f67..00000000000 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/job/CreateBulkImportEntityTasklet.java +++ /dev/null @@ -1,52 +0,0 @@ -package ca.uhn.fhir.jpa.bulk.imprt.job; - -/*- - * #%L - * HAPI FHIR JPA Server - * %% - * Copyright (C) 2014 - 2022 Smile CDR, Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import ca.uhn.fhir.jpa.batch.config.BatchConstants; -import ca.uhn.fhir.util.ValidateUtil; -import org.springframework.batch.core.StepContribution; -import org.springframework.batch.core.scope.context.ChunkContext; -import org.springframework.batch.core.step.tasklet.Tasklet; -import org.springframework.batch.repeat.RepeatStatus; - -import java.util.Map; - -public class CreateBulkImportEntityTasklet implements Tasklet { - - @Override - public RepeatStatus execute(StepContribution theStepContribution, ChunkContext theChunkContext) throws Exception { - Map jobParameters = theChunkContext.getStepContext().getJobParameters(); - - //We can leave early if they provided us with an existing job. - ValidateUtil.isTrueOrThrowInvalidRequest(jobParameters.containsKey(BatchConstants.JOB_UUID_PARAMETER), "Job doesn't have a UUID"); - addUUIDToJobContext(theChunkContext, (String) jobParameters.get(BatchConstants.JOB_UUID_PARAMETER)); - return RepeatStatus.FINISHED; - } - - public void addUUIDToJobContext(ChunkContext theChunkContext, String theJobUUID) { - theChunkContext - .getStepContext() - .getStepExecution() - .getJobExecution() - .getExecutionContext() - .putString(BatchConstants.JOB_UUID_PARAMETER, theJobUUID); - } -} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/svc/BulkDataImportSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/svc/BulkDataImportSvcImpl.java index 54d64c97e05..4ecf33d0d7c 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/svc/BulkDataImportSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/svc/BulkDataImportSvcImpl.java @@ -20,15 +20,17 @@ package ca.uhn.fhir.jpa.bulk.imprt.svc; * #L% */ +import ca.uhn.fhir.batch2.api.IJobCoordinator; +import ca.uhn.fhir.batch2.importpull.models.Batch2BulkImportPullJobParameters; +import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.jpa.api.config.DaoConfig; -import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; import ca.uhn.fhir.jpa.batch.config.BatchConstants; import ca.uhn.fhir.jpa.batch.log.Logs; import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; -import ca.uhn.fhir.jpa.bulk.imprt.job.BulkImportJobConfig; import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson; import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson; import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum; +import ca.uhn.fhir.jpa.bulk.imprt.model.ActivateJobResult; import ca.uhn.fhir.jpa.dao.data.IBulkImportJobDao; import ca.uhn.fhir.jpa.dao.data.IBulkImportJobFileDao; import ca.uhn.fhir.jpa.entity.BulkImportJobEntity; @@ -42,10 +44,7 @@ import org.apache.commons.lang3.time.DateUtils; import org.quartz.JobExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.batch.core.JobParametersBuilder; -import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Slice; @@ -61,8 +60,6 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.Semaphore; -import static org.apache.commons.lang3.StringUtils.isNotBlank; - public class BulkDataImportSvcImpl implements IBulkDataImportSvc { private static final Logger ourLog = LoggerFactory.getLogger(BulkDataImportSvcImpl.class); private final Semaphore myRunningJobSemaphore = new Semaphore(1); @@ -73,13 +70,13 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc { @Autowired private PlatformTransactionManager myTxManager; private TransactionTemplate myTxTemplate; + @Autowired private ISchedulerService mySchedulerService; + @Autowired - private IBatchJobSubmitter myJobSubmitter; - @Autowired - @Qualifier(BatchConstants.BULK_IMPORT_JOB_NAME) - private org.springframework.batch.core.Job myBulkImportJob; + private IJobCoordinator myJobCoordinator; + @Autowired private DaoConfig myDaoConfig; @@ -159,15 +156,15 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc { */ @Transactional(value = Transactional.TxType.NEVER) @Override - public boolean activateNextReadyJob() { + public ActivateJobResult activateNextReadyJob() { if (!myDaoConfig.isEnableTaskBulkImportJobExecution()) { Logs.getBatchTroubleshootingLog().trace("Bulk import job execution is not enabled on this server. No action taken."); - return false; + return new ActivateJobResult(false, null); } if (!myRunningJobSemaphore.tryAcquire()) { Logs.getBatchTroubleshootingLog().trace("Already have a running batch job, not going to check for more"); - return false; + return new ActivateJobResult(false, null); } try { @@ -177,7 +174,7 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc { } } - private boolean doActivateNextReadyJob() { + private ActivateJobResult doActivateNextReadyJob() { Optional jobToProcessOpt = Objects.requireNonNull(myTxTemplate.execute(t -> { Pageable page = PageRequest.of(0, 1); Slice submittedJobs = myJobDao.findByStatus(page, BulkImportJobStatusEnum.READY); @@ -188,14 +185,15 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc { })); if (!jobToProcessOpt.isPresent()) { - return false; + return new ActivateJobResult(false, null); } BulkImportJobEntity bulkImportJobEntity = jobToProcessOpt.get(); String jobUuid = bulkImportJobEntity.getJobId(); + String batchJobId = null; try { - processJob(bulkImportJobEntity); + batchJobId = processJob(bulkImportJobEntity); } catch (Exception e) { ourLog.error("Failure while preparing bulk export extract", e); myTxTemplate.execute(t -> { @@ -206,11 +204,11 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc { jobEntity.setStatusMessage(e.getMessage()); myJobDao.save(jobEntity); } - return false; + return new ActivateJobResult(false, null); }); } - return true; + return new ActivateJobResult(true, batchJobId); } @Override @@ -273,22 +271,21 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc { myJobDao.delete(job); } - private void processJob(BulkImportJobEntity theBulkExportJobEntity) throws JobParametersInvalidException { + private String processJob(BulkImportJobEntity theBulkExportJobEntity) { String jobId = theBulkExportJobEntity.getJobId(); int batchSize = theBulkExportJobEntity.getBatchSize(); - ValidateUtil.isTrueOrThrowInvalidRequest(batchSize > 0, "Batch size must be positive"); - JobParametersBuilder parameters = new JobParametersBuilder() - .addString(BatchConstants.JOB_UUID_PARAMETER, jobId) - .addLong(BulkImportJobConfig.JOB_PARAM_COMMIT_INTERVAL, (long) batchSize); + Batch2BulkImportPullJobParameters jobParameters = new Batch2BulkImportPullJobParameters(); + jobParameters.setJobId(jobId); + jobParameters.setBatchSize(batchSize); - if (isNotBlank(theBulkExportJobEntity.getJobDescription())) { - parameters.addString(BatchConstants.JOB_DESCRIPTION, theBulkExportJobEntity.getJobDescription()); - } + JobInstanceStartRequest request = new JobInstanceStartRequest(); + request.setJobDefinitionId(BatchConstants.BULK_IMPORT_JOB_NAME); + request.setParameters(jobParameters); ourLog.info("Submitting bulk import job {} to job scheduler", jobId); - myJobSubmitter.runJob(myBulkImportJob, parameters.toJobParameters()); + return myJobCoordinator.startInstance(request).getJobId(); } private void addFilesToJob(@Nonnull List theInitialFiles, BulkImportJobEntity job, int nextSequence) { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/index/IdHelperService.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/index/IdHelperService.java index 900e19e2da7..751b82b2fd6 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/index/IdHelperService.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/index/IdHelperService.java @@ -718,7 +718,7 @@ public class IdHelperService implements IIdHelperService { public IIdType resourceIdFromPidOrThrowException(ResourcePersistentId thePid, String theResourceType) { Optional optionalResource = myResourceTableDao.findById(thePid.getIdAsLong()); if (!optionalResource.isPresent()) { - throw new ResourceNotFoundException(Msg.code(2107) + "Requested resource not found"); + throw new ResourceNotFoundException(Msg.code(2124) + "Requested resource not found"); } return optionalResource.get().getIdDt().toVersionless(); } diff --git a/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/bulk/imprt/svc/BulkDataImportR4Test.java b/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/bulk/imprt/svc/BulkDataImportR4Test.java index 0bb343787fc..945017e8698 100644 --- a/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/bulk/imprt/svc/BulkDataImportR4Test.java +++ b/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/bulk/imprt/svc/BulkDataImportR4Test.java @@ -1,30 +1,38 @@ package ca.uhn.fhir.jpa.bulk.imprt.svc; +import ca.uhn.fhir.batch2.api.IJobCoordinator; +import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry; +import ca.uhn.fhir.batch2.model.JobDefinition; +import ca.uhn.fhir.batch2.model.JobInstance; +import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage; +import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.interceptor.api.Hook; import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor; import ca.uhn.fhir.interceptor.api.Interceptor; import ca.uhn.fhir.interceptor.api.Pointcut; -import ca.uhn.fhir.jpa.batch.config.BatchConstants; import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; +import ca.uhn.fhir.jpa.bulk.imprt.model.ActivateJobResult; import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson; import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson; import ca.uhn.fhir.jpa.bulk.imprt.model.JobFileRowProcessingModeEnum; import ca.uhn.fhir.jpa.dao.data.IBulkImportJobDao; import ca.uhn.fhir.jpa.dao.data.IBulkImportJobFileDao; -import ca.uhn.fhir.jpa.test.BaseJpaR4Test; -import ca.uhn.fhir.jpa.entity.BulkImportJobEntity; -import ca.uhn.fhir.jpa.entity.BulkImportJobFileEntity; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; +import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings; +import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; +import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel; +import ca.uhn.fhir.jpa.test.BaseJpaR4Test; +import ca.uhn.fhir.jpa.test.Batch2JobHelper; import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; -import ca.uhn.fhir.test.utilities.BatchJobHelper; import ca.uhn.fhir.test.utilities.ITestDataBuilder; import ca.uhn.fhir.util.BundleBuilder; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.r4.model.Patient; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; @@ -32,27 +40,25 @@ import org.junit.jupiter.api.TestMethodOrder; import org.mockito.ArgumentCaptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.batch.core.ExitStatus; -import org.springframework.batch.core.Job; -import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.JobInstance; -import org.springframework.batch.core.StepExecution; -import org.springframework.batch.core.configuration.JobRegistry; -import org.springframework.batch.core.explore.JobExplorer; -import org.springframework.batch.core.launch.NoSuchJobException; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.support.ExecutorChannelInterceptor; import javax.annotation.Nonnull; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; +import static ca.uhn.fhir.batch2.config.BaseBatch2Config.CHANNEL_NAME; import static ca.uhn.fhir.jpa.batch.config.BatchConstants.BULK_IMPORT_JOB_NAME; -import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.containsString; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -63,18 +69,32 @@ import static org.mockito.Mockito.verify; public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuilder { private static final Logger ourLog = LoggerFactory.getLogger(BulkDataImportR4Test.class); + @Autowired private IBulkDataImportSvc mySvc; @Autowired private IBulkImportJobDao myBulkImportJobDao; @Autowired private IBulkImportJobFileDao myBulkImportJobFileDao; + @Autowired - private JobExplorer myJobExplorer; + private IJobCoordinator myJobCoordinator; + @Autowired - private JobRegistry myJobRegistry; + private Batch2JobHelper myBatch2JobHelper; + @Autowired - private BatchJobHelper myBatchJobHelper; + private JobDefinitionRegistry myJobDefinitionRegistry; + + @Autowired + private IChannelFactory myChannelFactory; + + private LinkedBlockingChannel myWorkChannel; + + @BeforeEach + public void before() { + myWorkChannel = (LinkedBlockingChannel) myChannelFactory.getOrCreateReceiver(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, new ChannelConsumerSettings()); + } @AfterEach public void after() { @@ -82,6 +102,21 @@ public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuil myInterceptorRegistry.unregisterInterceptorsIf(t -> t instanceof MyFailAfterThreeCreatesInterceptor); } + /** + * Sets up a message retry interceptor for when errors are encountered + */ + private void setupRetryFailures() { + myWorkChannel.addInterceptor(new ExecutorChannelInterceptor() { + @Override + public void afterMessageHandled(Message message, MessageChannel channel, MessageHandler handler, Exception ex) { + if (ex != null) { + ourLog.info("Work channel received exception {}", ex.getMessage()); + channel.send(message); + } + } + }); + } + @Order(-1) @Test public void testFlow_ErrorDuringWrite() { @@ -91,37 +126,33 @@ public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuil int fileCount = 5; List files = createInputFiles(transactionsPerFile, fileCount); - BulkImportJobJson job = new BulkImportJobJson(); - job.setProcessingMode(JobFileRowProcessingModeEnum.FHIR_TRANSACTION); - job.setJobDescription("This is the job description"); - job.setBatchSize(3); - String jobId = mySvc.createNewJob(job, files); - mySvc.markJobAsReadyForActivation(jobId); + try { + setupRetryFailures(); - boolean activateJobOutcome = mySvc.activateNextReadyJob(); - assertTrue(activateJobOutcome); + BulkImportJobJson job = new BulkImportJobJson(); + job.setProcessingMode(JobFileRowProcessingModeEnum.FHIR_TRANSACTION); + job.setJobDescription("This is the job description"); + job.setBatchSize(3); + String jobId = mySvc.createNewJob(job, files); + mySvc.markJobAsReadyForActivation(jobId); - String[] jobNames = new String[]{BULK_IMPORT_JOB_NAME}; - assert jobNames.length > 0; + ActivateJobResult activateJobOutcome = mySvc.activateNextReadyJob(); + assertTrue(activateJobOutcome.isActivated); - await().until(() -> runInTransaction(() -> { - JobInstance jobInstance = myJobExplorer.getLastJobInstance(BULK_IMPORT_JOB_NAME); - JobExecution jobExecution = myJobExplorer.getLastJobExecution(jobInstance); - ourLog.info("Exit status: {}", jobExecution.getExitStatus()); - return jobExecution.getExitStatus().getExitCode().equals(ExitStatus.FAILED.getExitCode()); - })); - - JobInstance jobInstance = myJobExplorer.getLastJobInstance(BULK_IMPORT_JOB_NAME); - JobExecution jobExecution = myJobExplorer.getLastJobExecution(jobInstance); - List failedExecutions = jobExecution - .getStepExecutions() - .stream() - .filter(t -> t.getExitStatus().getExitCode().equals("FAILED")) - .collect(Collectors.toList()); - - assertEquals(2, failedExecutions.size()); - assertThat(failedExecutions.get(1).getStepName(), containsString(":File With Description")); + JobInstance instance = myBatch2JobHelper.awaitJobHitsStatusInTime(activateJobOutcome.jobId, + 60, + StatusEnum.FAILED); + HashSet failed = new HashSet<>(); + failed.add(StatusEnum.FAILED); + failed.add(StatusEnum.ERRORED); + assertTrue(failed.contains(instance.getStatus()), instance.getStatus() + " is the actual status"); + String errorMsg = instance.getErrorMessage(); + assertTrue(errorMsg.contains("Too many errors"), errorMsg); + assertTrue(errorMsg.contains("Too many errors"), MyFailAfterThreeCreatesInterceptor.ERROR_MESSAGE); + } finally { + myWorkChannel.clearInterceptorsForUnitTest(); + } } @Order(0) @@ -138,20 +169,12 @@ public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuil String jobId = mySvc.createNewJob(job, files); mySvc.markJobAsReadyForActivation(jobId); - boolean activateJobOutcome = mySvc.activateNextReadyJob(); - assertTrue(activateJobOutcome); + ActivateJobResult activateJobOutcome = mySvc.activateNextReadyJob(); + assertTrue(activateJobOutcome.isActivated); - List executions = awaitAllBulkImportJobCompletion(); - assertEquals("testFlow_TransactionRows", executions.get(0).getJobParameters().getString(BatchConstants.JOB_DESCRIPTION)); - - runInTransaction(() -> { - List jobs = myBulkImportJobDao.findAll(); - assertEquals(0, jobs.size()); - - List jobFiles = myBulkImportJobFileDao.findAll(); - assertEquals(0, jobFiles.size()); - - }); + JobInstance instance = myBatch2JobHelper.awaitJobCompletion(activateJobOutcome.jobId); + assertNotNull(instance); + assertEquals(StatusEnum.COMPLETED, instance.getStatus()); IBundleProvider searchResults = myPatientDao.search(SearchParameterMap.newSynchronous()); assertEquals(transactionsPerFile * fileCount, searchResults.sizeOrThrowNpe()); @@ -176,10 +199,11 @@ public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuil String jobId = mySvc.createNewJob(job, files); mySvc.markJobAsReadyForActivation(jobId); - boolean activateJobOutcome = mySvc.activateNextReadyJob(); - assertTrue(activateJobOutcome); + ActivateJobResult activateJobOutcome = mySvc.activateNextReadyJob(); + assertTrue(activateJobOutcome.isActivated); - awaitAllBulkImportJobCompletion(); + JobInstance instance = myBatch2JobHelper.awaitJobCompletion(activateJobOutcome.jobId); + assertNotNull(instance); ArgumentCaptor paramsCaptor = ArgumentCaptor.forClass(HookParams.class); verify(interceptor, times(50)).invoke(any(), paramsCaptor.capture()); @@ -218,15 +242,11 @@ public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuil return files; } - @Order(3) @Test - public void testJobsAreRegisteredWithJobRegistry() throws NoSuchJobException { - Job job = myJobRegistry.getJob(BULK_IMPORT_JOB_NAME); - assertEquals(true, job.isRestartable()); - } + public void testJobsAreRegisteredWithJobRegistry() { + Optional> jobDefinitionOp = myJobDefinitionRegistry.getLatestJobDefinition(BULK_IMPORT_JOB_NAME); - protected List awaitAllBulkImportJobCompletion() { - return myBatchJobHelper.awaitAllBulkJobCompletions(BatchConstants.BULK_IMPORT_JOB_NAME); + assertTrue(jobDefinitionOp.isPresent()); } @Interceptor diff --git a/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImplTest.java b/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImplTest.java index 906ed4daa7a..57f73d82f47 100644 --- a/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImplTest.java +++ b/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImplTest.java @@ -8,18 +8,15 @@ import ca.uhn.fhir.jpa.dao.data.ITermConceptDao; import ca.uhn.fhir.jpa.entity.TermCodeSystemVersion; import ca.uhn.fhir.jpa.entity.TermConcept; import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink; -import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.springframework.batch.core.JobExecution; import org.springframework.test.util.ReflectionTestUtils; import org.springframework.transaction.PlatformTransactionManager; import java.util.ArrayList; -import java.util.Collections; import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertFalse; diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/config/Batch2JobsConfig.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/config/Batch2JobsConfig.java index 4d2b26639d3..1f06a0603b1 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/config/Batch2JobsConfig.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/config/Batch2JobsConfig.java @@ -22,6 +22,7 @@ package ca.uhn.fhir.batch2.jobs.config; import ca.uhn.fhir.batch2.jobs.export.BulkExportAppCtx; import ca.uhn.fhir.batch2.jobs.expunge.DeleteExpungeAppCtx; +import ca.uhn.fhir.batch2.jobs.importpull.BulkImportPullConfig; import ca.uhn.fhir.batch2.jobs.imprt.BulkImportAppCtx; import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx; import ca.uhn.fhir.batch2.jobs.termcodesystem.TermCodeSystemJobConfig; @@ -36,7 +37,8 @@ import org.springframework.context.annotation.Import; ReindexAppCtx.class, DeleteExpungeAppCtx.class, BulkExportAppCtx.class, - TermCodeSystemJobConfig.class + TermCodeSystemJobConfig.class, + BulkImportPullConfig.class }) public class Batch2JobsConfig { } diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/importpull/BulkImportParameterValidator.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/importpull/BulkImportParameterValidator.java new file mode 100644 index 00000000000..f6b13feaea9 --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/importpull/BulkImportParameterValidator.java @@ -0,0 +1,52 @@ +package ca.uhn.fhir.batch2.jobs.importpull; + +import ca.uhn.fhir.batch2.api.IJobParametersValidator; +import ca.uhn.fhir.batch2.importpull.models.Batch2BulkImportPullJobParameters; +import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; +import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson; +import org.apache.commons.lang3.StringUtils; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +import static org.slf4j.LoggerFactory.getLogger; + +public class BulkImportParameterValidator implements IJobParametersValidator { + private static final Logger ourLog = getLogger(BulkImportParameterValidator.class); + + private final IBulkDataImportSvc myBulkDataImportSvc; + + public BulkImportParameterValidator(IBulkDataImportSvc theIBulkDataImportSvc) { + myBulkDataImportSvc = theIBulkDataImportSvc; + } + + @Nullable + @Override + public List validate(@NotNull Batch2BulkImportPullJobParameters theParameters) { + ourLog.info("BulkImportPull parameter validation begin"); + + ArrayList errors = new ArrayList<>(); + + if (theParameters.getBatchSize() <= 0) { + errors.add("Batch size must be positive"); + } + + String jobId = theParameters.getJobId(); + if (StringUtils.isEmpty(jobId)) { + errors.add("Bulk Import Pull requires an existing job id"); + } else { + BulkImportJobJson job = myBulkDataImportSvc.fetchJob(jobId); + + if (job == null) { + errors.add("There is no persistent job that exists with UUID: " + jobId); + } + } + + ourLog.info("BulkImportPull parameter validation end"); + + return errors; + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/importpull/BulkImportPullConfig.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/importpull/BulkImportPullConfig.java new file mode 100644 index 00000000000..6a12670d608 --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/importpull/BulkImportPullConfig.java @@ -0,0 +1,76 @@ +package ca.uhn.fhir.batch2.jobs.importpull; + +import ca.uhn.fhir.batch2.importpull.models.Batch2BulkImportPullJobParameters; +import ca.uhn.fhir.batch2.importpull.models.BulkImportFilePartitionResult; +import ca.uhn.fhir.batch2.importpull.models.BulkImportRecord; +import ca.uhn.fhir.batch2.model.JobDefinition; +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.batch.config.BatchConstants; +import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class BulkImportPullConfig { + + @Autowired + private FhirContext myFhirContext; + + @Autowired + private DaoRegistry myDaoRegistry; + + @Autowired + private IBulkDataImportSvc myBulkDataImportSvc; + + @Bean + public JobDefinition bulkImportPullJobDefinition() { + return JobDefinition + .newBuilder() + .setJobDefinitionId(BatchConstants.BULK_IMPORT_JOB_NAME) + .setJobDescription("Performs bulk import pull job") + .setJobDefinitionVersion(1) + .gatedExecution() + .setParametersType(Batch2BulkImportPullJobParameters.class) + .setParametersValidator(importParameterValidator()) + .addFirstStep( + "ReadInResourcesStep", + "Reads an import file and extracts the resources", + BulkImportFilePartitionResult.class, + fetchPartitionedFilesStep() + ) + .addIntermediateStep( + "ReadInResourcesFromFileStep", + "Reads the import file to get the serialized bundles", + BulkImportRecord.class, + readInResourcesFromFileStep() + ) + .addLastStep( + "WriteBundleStep", + "Parses the bundle from previous step and writes it to the dv", + writeBundleForImportStep() + ) + .build(); + } + + @Bean + public BulkImportParameterValidator importParameterValidator() { + return new BulkImportParameterValidator(myBulkDataImportSvc); + } + + @Bean + public FetchPartitionedFilesStep fetchPartitionedFilesStep() { + return new FetchPartitionedFilesStep(myBulkDataImportSvc); + } + + @Bean + public ReadInResourcesFromFileStep readInResourcesFromFileStep() { + return new ReadInResourcesFromFileStep(myBulkDataImportSvc); + } + + @Bean + public WriteBundleForImportStep writeBundleForImportStep() { + return new WriteBundleForImportStep(myFhirContext, myDaoRegistry); + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/importpull/FetchPartitionedFilesStep.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/importpull/FetchPartitionedFilesStep.java new file mode 100644 index 00000000000..e0ebc906262 --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/importpull/FetchPartitionedFilesStep.java @@ -0,0 +1,55 @@ +package ca.uhn.fhir.batch2.jobs.importpull; + +import ca.uhn.fhir.batch2.api.IFirstJobStepWorker; +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.api.JobExecutionFailedException; +import ca.uhn.fhir.batch2.api.RunOutcome; +import ca.uhn.fhir.batch2.api.StepExecutionDetails; +import ca.uhn.fhir.batch2.api.VoidModel; +import ca.uhn.fhir.batch2.importpull.models.Batch2BulkImportPullJobParameters; +import ca.uhn.fhir.batch2.importpull.models.BulkImportFilePartitionResult; +import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; +import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; + +import static org.slf4j.LoggerFactory.getLogger; + +public class FetchPartitionedFilesStep implements IFirstJobStepWorker { + private static final Logger ourLog = getLogger(FetchPartitionedFilesStep.class); + + private final IBulkDataImportSvc myBulkDataImportSvc; + + public FetchPartitionedFilesStep(IBulkDataImportSvc theBulkDataImportSvc) { + myBulkDataImportSvc = theBulkDataImportSvc; + } + + @NotNull + @Override + public RunOutcome run( + @NotNull StepExecutionDetails theStepExecutionDetails, + @NotNull IJobDataSink theDataSink + ) throws JobExecutionFailedException { + String jobId = theStepExecutionDetails.getParameters().getJobId(); + + ourLog.info("Start FetchPartitionedFilesStep for jobID {} ", jobId); + + BulkImportJobJson job = myBulkDataImportSvc.fetchJob(jobId); + + for (int i = 0; i < job.getFileCount(); i++) { + String fileDescription = myBulkDataImportSvc.getFileDescription(jobId, i); + + BulkImportFilePartitionResult result = new BulkImportFilePartitionResult(); + result.setFileIndex(i); + result.setProcessingMode(job.getProcessingMode()); + result.setFileDescription(fileDescription); + result.setJobDescription(job.getJobDescription()); + + theDataSink.accept(result); + } + + ourLog.info("FetchPartitionedFilesStep complete for jobID {}", jobId); + + return RunOutcome.SUCCESS; + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/importpull/ReadInResourcesFromFileStep.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/importpull/ReadInResourcesFromFileStep.java new file mode 100644 index 00000000000..004056bc192 --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/importpull/ReadInResourcesFromFileStep.java @@ -0,0 +1,88 @@ +package ca.uhn.fhir.batch2.jobs.importpull; + +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.api.IJobStepWorker; +import ca.uhn.fhir.batch2.api.JobExecutionFailedException; +import ca.uhn.fhir.batch2.api.RunOutcome; +import ca.uhn.fhir.batch2.api.StepExecutionDetails; +import ca.uhn.fhir.batch2.importpull.models.Batch2BulkImportPullJobParameters; +import ca.uhn.fhir.batch2.importpull.models.BulkImportFilePartitionResult; +import ca.uhn.fhir.batch2.importpull.models.BulkImportRecord; +import ca.uhn.fhir.i18n.Msg; +import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; +import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson; +import ca.uhn.fhir.jpa.bulk.imprt.model.JobFileRowProcessingModeEnum; +import ca.uhn.fhir.util.IoUtil; +import com.google.common.io.LineReader; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.StringReader; + +public class ReadInResourcesFromFileStep implements IJobStepWorker { + + private static final Logger ourLog = LoggerFactory.getLogger(ReadInResourcesFromFileStep.class); + + private final IBulkDataImportSvc myBulkDataImportSvc; + + public ReadInResourcesFromFileStep(IBulkDataImportSvc theBulkDataImportSvc) { + myBulkDataImportSvc = theBulkDataImportSvc; + } + + // because we are using an unstable google api + @SuppressWarnings("UnstableApiUsage") + @NotNull + @Override + public RunOutcome run( + @NotNull StepExecutionDetails theStepExecutionDetails, + @NotNull IJobDataSink theDataSink + ) throws JobExecutionFailedException { + String jobId = theStepExecutionDetails.getParameters().getJobId(); + int fileIndex = theStepExecutionDetails.getData().getFileIndex(); + JobFileRowProcessingModeEnum mode = theStepExecutionDetails.getData().getProcessingMode(); + + ourLog.info("ReadInResourcesFromFileStep for jobId {} begin", jobId); + + BulkImportJobFileJson file = myBulkDataImportSvc.fetchFile(jobId, fileIndex); + String tenantName = file.getTenantName(); + String contents = file.getContents(); + + StringReader reader = new StringReader(contents); + + // data explodes into even more chunks + LineReader lineReader = new LineReader(reader); + try { + int lineIndex = 0; + String nextLine; + do { + nextLine = lineReader.readLine(); + + if (nextLine != null) { + BulkImportRecord record = new BulkImportRecord(); + record.setResourceString(nextLine); + record.setLineIndex(lineIndex); + record.setTenantName(tenantName); + record.setProcessingMode(mode); + record.setFileIndex(fileIndex); + + theDataSink.accept(record); + } + lineIndex++; + } while (nextLine != null); + } catch (IOException ex) { + ourLog.error("Failed to read file : " + ex.getMessage()); + + throw new JobExecutionFailedException(Msg.code(2107) + + " : Could not read file" + ); + } finally { + IoUtil.closeQuietly(reader); + } + + ourLog.info("ReadInResourcesFromFileStep for jobId {} end", jobId); + + return RunOutcome.SUCCESS; + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/importpull/WriteBundleForImportStep.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/importpull/WriteBundleForImportStep.java new file mode 100644 index 00000000000..165a76a1f0a --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/importpull/WriteBundleForImportStep.java @@ -0,0 +1,83 @@ +package ca.uhn.fhir.batch2.jobs.importpull; + +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.api.ILastJobStepWorker; +import ca.uhn.fhir.batch2.api.JobExecutionFailedException; +import ca.uhn.fhir.batch2.api.RunOutcome; +import ca.uhn.fhir.batch2.api.StepExecutionDetails; +import ca.uhn.fhir.batch2.api.VoidModel; +import ca.uhn.fhir.batch2.importpull.models.Batch2BulkImportPullJobParameters; +import ca.uhn.fhir.batch2.importpull.models.BulkImportRecord; +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; +import ca.uhn.fhir.jpa.bulk.imprt.model.JobFileRowProcessingModeEnum; +import ca.uhn.fhir.jpa.partition.SystemRequestDetails; +import ca.uhn.fhir.parser.IParser; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WriteBundleForImportStep implements ILastJobStepWorker { + + private static final Logger ourLog = LoggerFactory.getLogger(WriteBundleForImportStep.class); + + private final FhirContext myFhirContext; + + private final DaoRegistry myDaoRegistry; + + public WriteBundleForImportStep(FhirContext theFhirContext, DaoRegistry theDaoRegistry) { + myFhirContext = theFhirContext; + myDaoRegistry = theDaoRegistry; + } + + @SuppressWarnings({"SwitchStatementWithTooFewBranches", "rawtypes", "unchecked"}) + @NotNull + @Override + public RunOutcome run( + @NotNull StepExecutionDetails theStepExecutionDetails, + @NotNull IJobDataSink theDataSink + ) throws JobExecutionFailedException { + + BulkImportRecord record = theStepExecutionDetails.getData(); + + JobFileRowProcessingModeEnum mode = record.getProcessingMode(); + int fileIndex = record.getFileIndex(); + String content = record.getResourceString(); + String tenantName = record.getTenantName(); + int lineIndex = record.getLineIndex(); + String jobId = theStepExecutionDetails.getParameters().getJobId(); + + ourLog.info( + "Beginning bulk import write row {} for Job[{}] FileIndex[{}]", + lineIndex, + jobId, + fileIndex + ); + + IParser parser = myFhirContext.newJsonParser(); + + SystemRequestDetails requestDetails = new SystemRequestDetails(); + requestDetails.setTenantId(tenantName); + + IBaseResource bundle = parser.parseResource(content); + + // Yeah this is a lame switch - We'll add more later I swear + switch (mode) { + default: + case FHIR_TRANSACTION: + IFhirSystemDao systemDao = myDaoRegistry.getSystemDao(); + systemDao.transaction(requestDetails, bundle); + break; + } + + ourLog.info( + "Completed bulk import write for row {} Job[{}] FileIndex[{}]", + lineIndex, + jobId, + fileIndex + ); + return RunOutcome.SUCCESS; + } +} diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/Batch2JobRegisterer.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/Batch2JobRegisterer.java index c42fc3c4908..1176772c406 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/Batch2JobRegisterer.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/Batch2JobRegisterer.java @@ -24,7 +24,6 @@ import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry; import ca.uhn.fhir.batch2.model.JobDefinition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.batch.core.configuration.DuplicateJobException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; @@ -39,8 +38,7 @@ public class Batch2JobRegisterer { @PostConstruct - public void start() throws DuplicateJobException { - + public void start() { Map batchJobs = myApplicationContext.getBeansOfType(JobDefinition.class); JobDefinitionRegistry jobRegistry = myApplicationContext.getBean(JobDefinitionRegistry.class); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/StepExecutionSvc.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/StepExecutionSvc.java index 2a3c1bad6a5..9a699b6b8ed 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/StepExecutionSvc.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/StepExecutionSvc.java @@ -327,7 +327,11 @@ public class StepExecutionSvc { // TODO - marking for posterity // see comments on MAX_CHUNK_ERROR_COUNT if (chunk.getErrorCount() > MAX_CHUNK_ERROR_COUNT) { - myJobPersistence.markWorkChunkAsFailed(chunkId, "Too many errors: " + chunk.getErrorCount()); + String errorMsg = "Too many errors: " + + chunk.getErrorCount() + + ". Last error msg was " + + e.getMessage(); + myJobPersistence.markWorkChunkAsFailed(chunkId, errorMsg); return false; } } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/batch2/importpull/.keep b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/batch2/importpull/.keep new file mode 100644 index 00000000000..e69de29bb2d diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/batch2/importpull/models/Batch2BulkImportPullJobParameters.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/batch2/importpull/models/Batch2BulkImportPullJobParameters.java new file mode 100644 index 00000000000..104c9e8fcd8 --- /dev/null +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/batch2/importpull/models/Batch2BulkImportPullJobParameters.java @@ -0,0 +1,29 @@ +package ca.uhn.fhir.batch2.importpull.models; + +import ca.uhn.fhir.model.api.IModelJson; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class Batch2BulkImportPullJobParameters implements IModelJson { + + @JsonProperty("jobId") + private String myJobId; + + @JsonProperty("batchSize") + private long myBatchSize; + + public String getJobId() { + return myJobId; + } + + public void setJobId(String theJobId) { + myJobId = theJobId; + } + + public long getBatchSize() { + return myBatchSize; + } + + public void setBatchSize(long theBatchSize) { + myBatchSize = theBatchSize; + } +} diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/batch2/importpull/models/BulkImportFilePartitionResult.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/batch2/importpull/models/BulkImportFilePartitionResult.java new file mode 100644 index 00000000000..407fc5888f8 --- /dev/null +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/batch2/importpull/models/BulkImportFilePartitionResult.java @@ -0,0 +1,64 @@ +package ca.uhn.fhir.batch2.importpull.models; + +import ca.uhn.fhir.jpa.bulk.imprt.model.JobFileRowProcessingModeEnum; +import ca.uhn.fhir.model.api.IModelJson; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class BulkImportFilePartitionResult implements IModelJson { + + /** + * The file index for the import job + */ + @JsonProperty("fileIndex") + private int myFileIndex; + + /** + * Row processing mode + */ + @JsonProperty("rowProcessingMode") + private JobFileRowProcessingModeEnum myProcessingMode; + + /** + * The job description + */ + @JsonProperty("jobDescription") + private String myJobDescription; + + /** + * The file description + */ + @JsonProperty("fileDescription") + private String myFileDescription; + + public int getFileIndex() { + return myFileIndex; + } + + public void setFileIndex(int theFileIndex) { + myFileIndex = theFileIndex; + } + + public JobFileRowProcessingModeEnum getProcessingMode() { + return myProcessingMode; + } + + public void setProcessingMode(JobFileRowProcessingModeEnum theProcessingMode) { + myProcessingMode = theProcessingMode; + } + + public String getJobDescription() { + return myJobDescription; + } + + public void setJobDescription(String theJobDescription) { + myJobDescription = theJobDescription; + } + + public String getFileDescription() { + return myFileDescription; + } + + public void setFileDescription(String theFileDescription) { + myFileDescription = theFileDescription; + } +} diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/batch2/importpull/models/BulkImportRecord.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/batch2/importpull/models/BulkImportRecord.java new file mode 100644 index 00000000000..103f0b7bbe9 --- /dev/null +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/batch2/importpull/models/BulkImportRecord.java @@ -0,0 +1,78 @@ +package ca.uhn.fhir.batch2.importpull.models; + +import ca.uhn.fhir.jpa.bulk.imprt.model.JobFileRowProcessingModeEnum; +import ca.uhn.fhir.model.api.IModelJson; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class BulkImportRecord implements IModelJson { + /** + * Stringified version of the resource + */ + @JsonProperty("resource") + private String myResourceString; + + /** + * Name of the tenant from the bulk import job file + */ + @JsonProperty("tenantName") + private String myTenantName; + + /** + * The line index (starting at 1; for backwards compatibility) + * of the import file from which the resource was read. + */ + @JsonProperty("lineIndex") + private int myLineIndex; + + /** + * The file index for the import job + */ + @JsonProperty("fileIndex") + private int myFileIndex; + + /** + * Row processing mode + */ + @JsonProperty("rowProcessingMode") + private JobFileRowProcessingModeEnum myProcessingMode; + + public String getResourceString() { + return myResourceString; + } + + public void setResourceString(String theResourceString) { + myResourceString = theResourceString; + } + + public String getTenantName() { + return myTenantName; + } + + public void setTenantName(String theTenantName) { + myTenantName = theTenantName; + } + + public int getLineIndex() { + return myLineIndex; + } + + public void setLineIndex(int theLineIndex) { + myLineIndex = theLineIndex; + } + + public int getFileIndex() { + return myFileIndex; + } + + public void setFileIndex(int theFileIndex) { + myFileIndex = theFileIndex; + } + + public JobFileRowProcessingModeEnum getProcessingMode() { + return myProcessingMode; + } + + public void setProcessingMode(JobFileRowProcessingModeEnum theProcessingMode) { + myProcessingMode = theProcessingMode; + } +} diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/api/IBulkDataImportSvc.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/api/IBulkDataImportSvc.java index b0c9ee0b0a9..ea01d3e884c 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/api/IBulkDataImportSvc.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/api/IBulkDataImportSvc.java @@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.bulk.imprt.api; * #L% */ +import ca.uhn.fhir.jpa.bulk.imprt.model.ActivateJobResult; import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson; import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson; import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum; @@ -91,7 +92,7 @@ public interface IBulkDataImportSvc { * * @return Returns {@literal true} if a job was activated */ - boolean activateNextReadyJob(); + ActivateJobResult activateNextReadyJob(); /** * Updates the job status for the given job diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/model/ActivateJobResult.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/model/ActivateJobResult.java new file mode 100644 index 00000000000..41ceccc1ce9 --- /dev/null +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/model/ActivateJobResult.java @@ -0,0 +1,23 @@ +package ca.uhn.fhir.jpa.bulk.imprt.model; + +import javax.annotation.Nullable; + +public class ActivateJobResult { + + /** + * Whether the job is activated or not + */ + public final boolean isActivated; + + /** + * The newly created jobid + */ + @Nullable + public final String jobId; + + public ActivateJobResult(boolean theIsActivated, + @Nullable String theJobId) { + isActivated = theIsActivated; + jobId = theJobId; + } +}