From 5dfdc91682b09c07185c436bc1088acd57e437d5 Mon Sep 17 00:00:00 2001 From: James Agnew Date: Mon, 18 Apr 2022 20:20:52 -0400 Subject: [PATCH] Add gated execution mode for Batch2 (#3545) * Add gated execution mode for Batcvh2 * Allow for async processing * Test fixes * Cleanup * Add javadoc * Fixes --- .../jpa/batch2/JpaJobPersistenceImpl.java | 4 + .../delete/job/DeleteExpungeProcessor.java | 4 +- .../jpa/entity/Batch2JobInstanceEntity.java | 11 ++ .../tasks/HapiFhirJpaMigrationTasks.java | 21 ++- .../ca/uhn/fhir/jpa/test/Batch2JobHelper.java | 8 +- .../jpa/bulk/imprt2/BulkImportR4Test.java | 15 +- .../fhir/jpa/delete/job/ReindexJobTest.java | 1 + .../src/test/resources/logback-test.xml | 10 +- .../batch2/jobs/reindex/ReindexAppCtx.java | 1 + .../batch2/api/IJobCompletionHandler.java | 9 + .../uhn/fhir/batch2/api/IJobCoordinator.java | 1 - ...rvice.java => IJobMaintenanceService.java} | 4 +- .../uhn/fhir/batch2/api/IJobPersistence.java | 2 +- .../fhir/batch2/api/JobCompletionDetails.java | 56 ++++++ .../ca/uhn/fhir/batch2/api/package-info.java | 2 +- .../fhir/batch2/config/BaseBatch2Config.java | 17 +- .../fhir/batch2/impl/JobCoordinatorImpl.java | 85 ++++----- .../ca/uhn/fhir/batch2/impl/JobDataSink.java | 10 +- ...pl.java => JobMaintenanceServiceImpl.java} | 176 ++++++++++++++++-- .../SynchronizedJobPersistenceWrapper.java | 2 + .../uhn/fhir/batch2/model/JobDefinition.java | 93 ++++++++- .../ca/uhn/fhir/batch2/model/JobInstance.java | 21 ++- .../ca/uhn/fhir/batch2/model/StatusEnum.java | 40 +++- .../uhn/fhir/batch2/impl/BaseBatch2Test.java | 74 ++++++++ .../batch2/impl/JobCoordinatorImplTest.java | 106 +++++------ .../uhn/fhir/batch2/impl/JobDataSinkTest.java | 2 +- ...ava => JobMaintenanceServiceImplTest.java} | 82 ++++++-- 27 files changed, 657 insertions(+), 200 deletions(-) create mode 100644 hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobCompletionHandler.java rename hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/{IJobCleanerService.java => IJobMaintenanceService.java} (91%) create mode 100644 hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/JobCompletionDetails.java rename hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/{JobCleanerServiceImpl.java => JobMaintenanceServiceImpl.java} (53%) create mode 100644 hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/BaseBatch2Test.java rename hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/{JobCleanerServiceImplTest.java => JobMaintenanceServiceImplTest.java} (77%) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java index 1e694f6c04a..34235dc89df 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java @@ -35,6 +35,7 @@ import org.springframework.data.domain.PageRequest; import javax.annotation.Nonnull; import javax.transaction.Transactional; import java.util.Date; +import java.util.EnumSet; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -91,6 +92,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence { entity.setDefinitionVersion(theInstance.getJobDefinitionVersion()); entity.setStatus(theInstance.getStatus()); entity.setParams(theInstance.getParameters()); + entity.setCurrentGatedStepId(theInstance.getCurrentGatedStepId()); entity.setCreateTime(new Date()); entity = myJobInstanceRepository.save(entity); @@ -156,6 +158,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence { retVal.setErrorCount(theEntity.getErrorCount()); retVal.setEstimatedTimeRemaining(theEntity.getEstimatedTimeRemaining()); retVal.setParameters(theEntity.getParams()); + retVal.setCurrentGatedStepId(theEntity.getCurrentGatedStepId()); return retVal; } @@ -202,6 +205,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence { instance.setErrorMessage(theInstance.getErrorMessage()); instance.setErrorCount(theInstance.getErrorCount()); instance.setEstimatedTimeRemaining(theInstance.getEstimatedTimeRemaining()); + instance.setCurrentGatedStepId(theInstance.getCurrentGatedStepId()); myJobInstanceRepository.save(instance); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/delete/job/DeleteExpungeProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/delete/job/DeleteExpungeProcessor.java index bffee9b7d9a..16ba8832fb3 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/delete/job/DeleteExpungeProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/delete/job/DeleteExpungeProcessor.java @@ -37,6 +37,7 @@ import org.springframework.beans.factory.annotation.Autowired; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.function.Consumer; import java.util.stream.Collectors; /** @@ -85,7 +86,8 @@ public class DeleteExpungeProcessor implements ItemProcessor, List conflictResourceLinks = Collections.synchronizedList(new ArrayList<>()); PartitionRunner partitionRunner = new PartitionRunner(PROCESS_NAME, THREAD_PREFIX, myDaoConfig.getExpungeBatchSize(), myDaoConfig.getExpungeThreadCount()); - partitionRunner.runInPartitionedThreads(thePids, someTargetPids -> findResourceLinksWithTargetPidIn(thePids, someTargetPids, conflictResourceLinks)); + Consumer> listConsumer = someTargetPids -> findResourceLinksWithTargetPidIn(thePids, someTargetPids, conflictResourceLinks); + partitionRunner.runInPartitionedThreads(thePids, listConsumer); if (conflictResourceLinks.isEmpty()) { return; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/Batch2JobInstanceEntity.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/Batch2JobInstanceEntity.java index 362d0a569a5..f2c2e7da4a5 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/Batch2JobInstanceEntity.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/Batch2JobInstanceEntity.java @@ -38,6 +38,7 @@ import javax.persistence.TemporalType; import java.io.Serializable; import java.util.Date; +import static ca.uhn.fhir.batch2.model.JobDefinition.ID_MAX_LENGTH; import static ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity.ERROR_MSG_MAX_LENGTH; import static org.apache.commons.lang3.StringUtils.left; @@ -101,6 +102,16 @@ public class Batch2JobInstanceEntity implements Serializable { private int myErrorCount; @Column(name = "EST_REMAINING", length = TIME_REMAINING_LENGTH, nullable = true) private String myEstimatedTimeRemaining; + @Column(name = "CUR_GATED_STEP_ID", length = ID_MAX_LENGTH, nullable = true) + private String myCurrentGatedStepId; + + public String getCurrentGatedStepId() { + return myCurrentGatedStepId; + } + + public void setCurrentGatedStepId(String theCurrentGatedStepId) { + myCurrentGatedStepId = theCurrentGatedStepId; + } public boolean isCancelled() { return myCancelled; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/migrate/tasks/HapiFhirJpaMigrationTasks.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/migrate/tasks/HapiFhirJpaMigrationTasks.java index e1186bc9113..1e93e373f1a 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/migrate/tasks/HapiFhirJpaMigrationTasks.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/migrate/tasks/HapiFhirJpaMigrationTasks.java @@ -49,11 +49,10 @@ import java.util.stream.Collectors; @SuppressWarnings({"SqlNoDataSourceInspection", "SpellCheckingInspection"}) public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks { - private final Set myFlags; - // H2, Derby, MariaDB, and MySql automatically add indexes to foreign keys public static final DriverTypeEnum[] NON_AUTOMATIC_FK_INDEX_PLATFORMS = new DriverTypeEnum[]{ DriverTypeEnum.POSTGRES_9_4, DriverTypeEnum.ORACLE_12C, DriverTypeEnum.MSSQL_2012}; + private final Set myFlags; /** @@ -273,15 +272,22 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks { replaceNumericSPIndices(version); replaceQuantitySPIndices(version); - - // Drop Index on HFJ_RESOURCE.INDEX_STATUS - version - .onTable("HFJ_RESOURCE") - .dropIndex("20220314.1", "IDX_INDEXSTATUS"); + + // Drop Index on HFJ_RESOURCE.INDEX_STATUS + version + .onTable("HFJ_RESOURCE") + .dropIndex("20220314.1", "IDX_INDEXSTATUS"); + + version + .onTable("BT2_JOB_INSTANCE") + .addColumn("20220416.1", "CUR_GATED_STEP_ID") + .nullable() + .type(ColumnTypeEnum.STRING, 100); } /** * new numeric search indexing + * * @see ca.uhn.fhir.jpa.search.builder.predicate.NumberPredicateBuilder * @see ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamNumber */ @@ -320,6 +326,7 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks { /** * new quantity search indexing + * * @see ca.uhn.fhir.jpa.search.builder.predicate.QuantityPredicateBuilder * @see ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamQuantity * @see ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamQuantityNormalized diff --git a/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/Batch2JobHelper.java b/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/Batch2JobHelper.java index 0bf99717a52..8bd86bc1fa5 100644 --- a/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/Batch2JobHelper.java +++ b/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/Batch2JobHelper.java @@ -20,7 +20,7 @@ package ca.uhn.fhir.jpa.test; * #L% */ -import ca.uhn.fhir.batch2.api.IJobCleanerService; +import ca.uhn.fhir.batch2.api.IJobMaintenanceService; import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.StatusEnum; @@ -33,21 +33,21 @@ import static org.hamcrest.Matchers.equalTo; public class Batch2JobHelper { @Autowired - private IJobCleanerService myJobCleanerService; + private IJobMaintenanceService myJobCleanerService; @Autowired private IJobCoordinator myJobCoordinator; public void awaitJobCompletion(String theId) { await().until(() -> { - myJobCleanerService.runCleanupPass(); + myJobCleanerService.runMaintenancePass(); return myJobCoordinator.getInstance(theId).getStatus(); }, equalTo(StatusEnum.COMPLETED)); } public JobInstance awaitJobFailure(String theId) { await().until(() -> { - myJobCleanerService.runCleanupPass(); + myJobCleanerService.runMaintenancePass(); return myJobCoordinator.getInstance(theId).getStatus(); }, Matchers.anyOf(equalTo(StatusEnum.ERRORED),equalTo(StatusEnum.FAILED))); return myJobCoordinator.getInstance(theId); diff --git a/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/bulk/imprt2/BulkImportR4Test.java b/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/bulk/imprt2/BulkImportR4Test.java index 8874bfd3268..6276b7850ac 100644 --- a/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/bulk/imprt2/BulkImportR4Test.java +++ b/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/bulk/imprt2/BulkImportR4Test.java @@ -1,6 +1,6 @@ package ca.uhn.fhir.jpa.bulk.imprt2; -import ca.uhn.fhir.batch2.api.IJobCleanerService; +import ca.uhn.fhir.batch2.api.IJobMaintenanceService; import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.jobs.imprt.BulkImportAppCtx; import ca.uhn.fhir.batch2.jobs.imprt.BulkImportFileServlet; @@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; -import java.io.StringReader; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -56,7 +55,7 @@ public class BulkImportR4Test extends BaseJpaR4Test { @Autowired private IJobCoordinator myJobCoordinator; @Autowired - private IJobCleanerService myJobCleanerService; + private IJobMaintenanceService myJobCleanerService; @Autowired private IBatch2JobInstanceRepository myJobInstanceRepository; @Autowired @@ -99,7 +98,7 @@ public class BulkImportR4Test extends BaseJpaR4Test { // Verify await().until(() -> { - myJobCleanerService.runCleanupPass(); + myJobCleanerService.runMaintenancePass(); JobInstance instance = myJobCoordinator.getInstance(instanceId); return instance.getStatus(); }, equalTo(StatusEnum.COMPLETED)); @@ -152,7 +151,7 @@ public class BulkImportR4Test extends BaseJpaR4Test { // Verify await().until(() -> { - myJobCleanerService.runCleanupPass(); + myJobCleanerService.runMaintenancePass(); JobInstance instance = myJobCoordinator.getInstance(instanceId); return instance.getStatus(); }, equalTo(StatusEnum.ERRORED)); @@ -174,7 +173,7 @@ public class BulkImportR4Test extends BaseJpaR4Test { }); await().until(() -> { - myJobCleanerService.runCleanupPass(); + myJobCleanerService.runMaintenancePass(); JobInstance instance = myJobCoordinator.getInstance(instanceId); return instance.getErrorCount(); }, equalTo(3)); @@ -224,7 +223,7 @@ public class BulkImportR4Test extends BaseJpaR4Test { // Verify await().until(() -> { - myJobCleanerService.runCleanupPass(); + myJobCleanerService.runMaintenancePass(); JobInstance instance = myJobCoordinator.getInstance(instanceId); return instance.getStatus(); }, equalTo(StatusEnum.FAILED)); @@ -267,7 +266,7 @@ public class BulkImportR4Test extends BaseJpaR4Test { // Verify await().until(() -> { - myJobCleanerService.runCleanupPass(); + myJobCleanerService.runMaintenancePass(); JobInstance instance = myJobCoordinator.getInstance(instanceId); return instance.getStatus(); }, equalTo(StatusEnum.FAILED)); diff --git a/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobTest.java b/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobTest.java index 7e3bf0d7c61..1c5d228dd06 100644 --- a/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobTest.java +++ b/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobTest.java @@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.junit.jupiter.api.Assertions.assertEquals; public class ReindexJobTest extends BaseJpaR4Test { + @Autowired private IJobCoordinator myJobCoordinator; diff --git a/hapi-fhir-jpaserver-test-utilities/src/test/resources/logback-test.xml b/hapi-fhir-jpaserver-test-utilities/src/test/resources/logback-test.xml index 548c75951db..6411e7da93a 100644 --- a/hapi-fhir-jpaserver-test-utilities/src/test/resources/logback-test.xml +++ b/hapi-fhir-jpaserver-test-utilities/src/test/resources/logback-test.xml @@ -1,7 +1,7 @@ - TRACE + INFO %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} [%file:%line] %msg%n @@ -23,14 +23,6 @@ - - - - - - - - j diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexAppCtx.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexAppCtx.java index 4e74a2e67a4..f2b64b6321f 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexAppCtx.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexAppCtx.java @@ -41,6 +41,7 @@ public class ReindexAppCtx { .setJobDefinitionVersion(1) .setParametersType(ReindexJobParameters.class) .setParametersValidator(reindexJobParametersValidator()) + .gatedExecution() .addFirstStep( "generate-ranges", "Generate data ranges to reindex", diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobCompletionHandler.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobCompletionHandler.java new file mode 100644 index 00000000000..b2082f7decb --- /dev/null +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobCompletionHandler.java @@ -0,0 +1,9 @@ +package ca.uhn.fhir.batch2.api; + +import ca.uhn.fhir.model.api.IModelJson; + +public interface IJobCompletionHandler { + + void jobComplete(JobCompletionDetails theDetails); + +} diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobCoordinator.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobCoordinator.java index 131dde2039f..d4cbecdc7fe 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobCoordinator.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobCoordinator.java @@ -54,5 +54,4 @@ public interface IJobCoordinator { void cancelInstance(String theInstanceId) throws ResourceNotFoundException; - } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobCleanerService.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobMaintenanceService.java similarity index 91% rename from hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobCleanerService.java rename to hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobMaintenanceService.java index a8643fed467..8026eaec5e5 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobCleanerService.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobMaintenanceService.java @@ -20,8 +20,8 @@ package ca.uhn.fhir.batch2.api; * #L% */ -public interface IJobCleanerService { +public interface IJobMaintenanceService { - void runCleanupPass(); + void runMaintenancePass(); } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java index 3e802c40976..55dc6f00072 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java @@ -101,7 +101,7 @@ public interface IJobPersistence { /** * Increments the work chunk error count by the given amount * - * @param theChunkId The chunk ID + * @param theChunkId The chunk ID * @param theIncrementBy The number to increment the error count by */ void incrementWorkChunkErrorCount(String theChunkId, int theIncrementBy); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/JobCompletionDetails.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/JobCompletionDetails.java new file mode 100644 index 00000000000..470954b674a --- /dev/null +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/JobCompletionDetails.java @@ -0,0 +1,56 @@ +package ca.uhn.fhir.batch2.api; + +/*- + * #%L + * HAPI FHIR JPA Server - Batch2 Task Processor + * %% + * Copyright (C) 2014 - 2022 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import ca.uhn.fhir.model.api.IModelJson; +import org.apache.commons.lang3.Validate; + +import javax.annotation.Nonnull; + +public class JobCompletionDetails { + + private final PT myParameters; + private final String myInstanceId; + + public JobCompletionDetails(@Nonnull PT theParameters, @Nonnull String theInstanceId) { + Validate.notNull(theParameters); + myParameters = theParameters; + myInstanceId = theInstanceId; + } + + /** + * Returns the parameters associated with this job instance. Note that parameters + * are set when the job instance is created and can not be modified after that. + */ + @Nonnull + public PT getParameters() { + return myParameters; + } + + /** + * Returns the job instance ID being executed + */ + @Nonnull + public String getInstanceId() { + return myInstanceId; + } + +} diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/package-info.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/package-info.java index 5d0257cb196..42ff73ff781 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/package-info.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/package-info.java @@ -6,7 +6,7 @@ * framework, used to start and stop jobs and inquire about status. * *
    - * {@link ca.uhn.fhir.batch2.api.IJobCleanerService} is a background processor that + * {@link ca.uhn.fhir.batch2.api.IJobMaintenanceService} is a background processor that * updates statistics and clean up stale data *
*
    diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java index 30e8f035801..54569124be8 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java @@ -20,11 +20,11 @@ package ca.uhn.fhir.batch2.config; * #L% */ -import ca.uhn.fhir.batch2.api.IJobCleanerService; +import ca.uhn.fhir.batch2.api.IJobMaintenanceService; import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.impl.BatchJobSender; -import ca.uhn.fhir.batch2.impl.JobCleanerServiceImpl; +import ca.uhn.fhir.batch2.impl.JobMaintenanceServiceImpl; import ca.uhn.fhir.batch2.impl.JobCoordinatorImpl; import ca.uhn.fhir.batch2.impl.JobDefinitionRegistry; import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage; @@ -48,9 +48,14 @@ public abstract class BaseBatch2Config { } @Bean - public IJobCoordinator batch2JobCoordinator(IChannelFactory theChannelFactory, IJobPersistence theJobInstancePersister, JobDefinitionRegistry theJobDefinitionRegistry) { + public BatchJobSender batchJobSender(IChannelFactory theChannelFactory) { + return new BatchJobSender(batch2ProcessingChannelProducer(theChannelFactory)); + } + + @Bean + public IJobCoordinator batch2JobCoordinator(IChannelFactory theChannelFactory, IJobPersistence theJobInstancePersister, JobDefinitionRegistry theJobDefinitionRegistry, BatchJobSender theBatchJobSender) { return new JobCoordinatorImpl( - new BatchJobSender(batch2ProcessingChannelProducer(theChannelFactory)), + theBatchJobSender, batch2ProcessingChannelReceiver(theChannelFactory), theJobInstancePersister, theJobDefinitionRegistry @@ -58,8 +63,8 @@ public abstract class BaseBatch2Config { } @Bean - public IJobCleanerService batch2JobCleaner(ISchedulerService theSchedulerService, IJobPersistence theJobPersistence) { - return new JobCleanerServiceImpl(theSchedulerService, theJobPersistence); + public IJobMaintenanceService batch2JobMaintenanceService(ISchedulerService theSchedulerService, IJobPersistence theJobPersistence, JobDefinitionRegistry theJobDefinitionRegistry, BatchJobSender theBatchJobSender) { + return new JobMaintenanceServiceImpl(theSchedulerService, theJobPersistence, theJobDefinitionRegistry, theBatchJobSender); } @Bean diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImpl.java index 639503fcfe5..e291d1de9f0 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImpl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImpl.java @@ -37,6 +37,7 @@ import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage; import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.i18n.Msg; +import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.model.api.annotation.PasswordField; @@ -49,6 +50,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.commons.lang3.Validate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; @@ -78,15 +80,13 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato private final JobDefinitionRegistry myJobDefinitionRegistry; private final MessageHandler myReceiverHandler = new WorkChannelMessageHandler(); private final ValidatorFactory myValidatorFactory = Validation.buildDefaultValidatorFactory(); + @Autowired + private ISchedulerService mySchedulerService; /** * Constructor */ - public JobCoordinatorImpl( - @Nonnull BatchJobSender theBatchJobSender, - @Nonnull IChannelReceiver theWorkChannelReceiver, - @Nonnull IJobPersistence theJobPersistence, - @Nonnull JobDefinitionRegistry theJobDefinitionRegistry) { + public JobCoordinatorImpl(@Nonnull BatchJobSender theBatchJobSender, @Nonnull IChannelReceiver theWorkChannelReceiver, @Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinitionRegistry theJobDefinitionRegistry) { super(theJobPersistence); myBatchJobSender = theBatchJobSender; myWorkChannelReceiver = theWorkChannelReceiver; @@ -95,9 +95,7 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato @Override public String startInstance(JobInstanceStartRequest theStartRequest) { - JobDefinition jobDefinition = myJobDefinitionRegistry - .getLatestJobDefinition(theStartRequest.getJobDefinitionId()) - .orElseThrow(() -> new IllegalArgumentException(Msg.code(2063) + "Unknown job definition ID: " + theStartRequest.getJobDefinitionId())); + JobDefinition jobDefinition = myJobDefinitionRegistry.getLatestJobDefinition(theStartRequest.getJobDefinitionId()).orElseThrow(() -> new IllegalArgumentException(Msg.code(2063) + "Unknown job definition ID: " + theStartRequest.getJobDefinitionId())); if (isBlank(theStartRequest.getParameters())) { throw new InvalidRequestException(Msg.code(2065) + "No parameters supplied"); @@ -115,6 +113,10 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato instance.setStatus(StatusEnum.QUEUED); instance.setParameters(theStartRequest.getParameters()); + if (jobDefinition.isGatedExecution()) { + instance.setCurrentGatedStepId(firstStepId); + } + String instanceId = myJobPersistence.storeNewInstance(instance); BatchWorkChunk batchWorkChunk = new BatchWorkChunk(jobDefinitionId, jobDefinitionVersion, firstStepId, instanceId, 0, null); @@ -132,11 +134,7 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato Validator validator = myValidatorFactory.getValidator(); PT parameters = theStartRequest.getParameters(theJobDefinition.getParametersType()); Set> constraintErrors = validator.validate(parameters); - List errorStrings = constraintErrors - .stream() - .map(t -> t.getPropertyPath() + " - " + t.getMessage()) - .sorted() - .collect(Collectors.toList()); + List errorStrings = constraintErrors.stream().map(t -> t.getPropertyPath() + " - " + t.getMessage()).sorted().collect(Collectors.toList()); // Programmatic Validator IJobParametersValidator parametersValidator = theJobDefinition.getParametersValidator(); @@ -147,13 +145,7 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato } if (!errorStrings.isEmpty()) { - String message = "Failed to validate parameters for job of type " + - theJobDefinition.getJobDefinitionId() + - ": " + - errorStrings - .stream() - .map(t -> "\n * " + t) - .collect(Collectors.joining()); + String message = "Failed to validate parameters for job of type " + theJobDefinition.getJobDefinitionId() + ": " + errorStrings.stream().map(t -> "\n * " + t).collect(Collectors.joining()); throw new InvalidRequestException(Msg.code(2039) + message); } @@ -161,19 +153,12 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato @Override public JobInstance getInstance(String theInstanceId) { - return myJobPersistence - .fetchInstance(theInstanceId) - .map(t -> massageInstanceForUserAccess(t)) - .orElseThrow(() -> new ResourceNotFoundException(Msg.code(2040) + "Unknown instance ID: " + UrlUtil.escapeUrlParam(theInstanceId))); + return myJobPersistence.fetchInstance(theInstanceId).map(t -> massageInstanceForUserAccess(t)).orElseThrow(() -> new ResourceNotFoundException(Msg.code(2040) + "Unknown instance ID: " + UrlUtil.escapeUrlParam(theInstanceId))); } @Override public List getInstances(int thePageSize, int thePageIndex) { - return myJobPersistence - .fetchInstances(thePageSize, thePageIndex) - .stream() - .map(t -> massageInstanceForUserAccess(t)) - .collect(Collectors.toList()); + return myJobPersistence.fetchInstances(thePageSize, thePageIndex).stream().map(t -> massageInstanceForUserAccess(t)).collect(Collectors.toList()); } @Override @@ -195,7 +180,7 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato return retVal; } - private boolean executeStep(@Nonnull WorkChunk theWorkChunk, String jobDefinitionId, String targetStepId, Class theInputType, PT parameters, IJobStepWorker worker, BaseDataSink dataSink) { + private boolean executeStep(@Nonnull WorkChunk theWorkChunk, String theJobDefinitionId, String theTargetStepId, Class theInputType, PT theParameters, IJobStepWorker theWorker, BaseDataSink theDataSink) { IT data = null; if (!theInputType.equals(VoidModel.class)) { data = theWorkChunk.getData(theInputType); @@ -204,21 +189,21 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato String instanceId = theWorkChunk.getInstanceId(); String chunkId = theWorkChunk.getId(); - StepExecutionDetails stepExecutionDetails = new StepExecutionDetails<>(parameters, data, instanceId, chunkId); + StepExecutionDetails stepExecutionDetails = new StepExecutionDetails<>(theParameters, data, instanceId, chunkId); RunOutcome outcome; try { - outcome = worker.run(stepExecutionDetails, dataSink); - Validate.notNull(outcome, "Step worker returned null: %s", worker.getClass()); + outcome = theWorker.run(stepExecutionDetails, theDataSink); + Validate.notNull(outcome, "Step theWorker returned null: %s", theWorker.getClass()); } catch (JobExecutionFailedException e) { - ourLog.error("Unrecoverable failure executing job {} step {}", jobDefinitionId, targetStepId, e); + ourLog.error("Unrecoverable failure executing job {} step {}", theJobDefinitionId, theTargetStepId, e); myJobPersistence.markWorkChunkAsFailed(chunkId, e.toString()); return false; } catch (Exception e) { - ourLog.error("Failure executing job {} step {}", jobDefinitionId, targetStepId, e); + ourLog.error("Failure executing job {} step {}", theJobDefinitionId, theTargetStepId, e); myJobPersistence.markWorkChunkAsErroredAndIncrementErrorCount(chunkId, e.toString()); throw new JobExecutionFailedException(Msg.code(2041) + e.getMessage(), e); } catch (Throwable t) { - ourLog.error("Unexpected failure executing job {} step {}", jobDefinitionId, targetStepId, t); + ourLog.error("Unexpected failure executing job {} step {}", theJobDefinitionId, theTargetStepId, t); myJobPersistence.markWorkChunkAsFailed(chunkId, t.toString()); return false; } @@ -226,7 +211,7 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato int recordsProcessed = outcome.getRecordsProcessed(); myJobPersistence.markWorkChunkAsCompletedAndClearData(chunkId, recordsProcessed); - int recoveredErrorCount = dataSink.getRecoveredErrorCount(); + int recoveredErrorCount = theDataSink.getRecoveredErrorCount(); if (recoveredErrorCount > 0) { myJobPersistence.incrementWorkChunkErrorCount(chunkId, recoveredErrorCount); } @@ -296,32 +281,40 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato return; } - executeStep(chunk, jobDefinitionId, jobDefinitionVersion, definition, targetStep, nextStep, targetStepId, firstStep, instance, instanceId); + executeStep(chunk, jobDefinitionId, jobDefinitionVersion, definition, targetStep, nextStep, targetStepId, firstStep, instance); } @SuppressWarnings("unchecked") - private void executeStep(WorkChunk chunk, String jobDefinitionId, int jobDefinitionVersion, JobDefinition definition, JobDefinitionStep theStep, JobDefinitionStep theSubsequentStep, String targetStepId, boolean firstStep, JobInstance instance, String instanceId) { - PT parameters = (PT) instance.getParameters(definition.getParametersType()); + private void executeStep(WorkChunk theWorkChunk, String theJobDefinitionId, int theJobDefinitionVersion, JobDefinition theDefinition, JobDefinitionStep theStep, JobDefinitionStep theSubsequentStep, String theTargetStepId, boolean theFirstStep, JobInstance theInstance) { + String instanceId = theInstance.getInstanceId(); + PT parameters = theInstance.getParameters(theDefinition.getParametersType()); IJobStepWorker worker = theStep.getJobStepWorker(); BaseDataSink dataSink; - if (theSubsequentStep != null) { - dataSink = new JobDataSink<>(myBatchJobSender, myJobPersistence, jobDefinitionId, jobDefinitionVersion, theSubsequentStep, instanceId, theStep.getStepId()); + boolean finalStep = theSubsequentStep == null; + if (!finalStep) { + dataSink = new JobDataSink<>(myBatchJobSender, myJobPersistence, theJobDefinitionId, theJobDefinitionVersion, theSubsequentStep, instanceId, theStep.getStepId(), theDefinition.isGatedExecution()); } else { - dataSink = (BaseDataSink) new FinalStepDataSink(jobDefinitionId, instanceId, theStep.getStepId()); + dataSink = (BaseDataSink) new FinalStepDataSink(theJobDefinitionId, instanceId, theStep.getStepId()); } Class inputType = theStep.getInputType(); - boolean success = executeStep(chunk, jobDefinitionId, targetStepId, inputType, parameters, worker, dataSink); + boolean success = executeStep(theWorkChunk, theJobDefinitionId, theTargetStepId, inputType, parameters, worker, dataSink); if (!success) { return; } int workChunkCount = dataSink.getWorkChunkCount(); - if (firstStep && workChunkCount == 0) { - ourLog.info("First step of job instance {} produced no work chunks, marking as completed", instanceId); + if (theFirstStep && workChunkCount == 0) { + ourLog.info("First step of job theInstance {} produced no work chunks, marking as completed", instanceId); myJobPersistence.markInstanceAsCompleted(instanceId); } + + if (theDefinition.isGatedExecution() && theFirstStep) { + theInstance.setCurrentGatedStepId(theTargetStepId); + myJobPersistence.updateInstance(theInstance); + } + } private JobDefinition getDefinitionOrThrowException(String jobDefinitionId, int jobDefinitionVersion) { diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobDataSink.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobDataSink.java index 0cb78c78a1d..dadc5c0987e 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobDataSink.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobDataSink.java @@ -36,14 +36,16 @@ class JobDataSink extends BaseDataSink { private final int myJobDefinitionVersion; private final JobDefinitionStep myTargetStep; private final AtomicInteger myChunkCounter = new AtomicInteger(0); + private final boolean myGatedExecution; - JobDataSink(BatchJobSender theBatchJobSender, IJobPersistence theJobPersistence, String theJobDefinitionId, int theJobDefinitionVersion, JobDefinitionStep theTargetStep, String theInstanceId, String theCurrentStepId) { + JobDataSink(BatchJobSender theBatchJobSender, IJobPersistence theJobPersistence, String theJobDefinitionId, int theJobDefinitionVersion, JobDefinitionStep theTargetStep, String theInstanceId, String theCurrentStepId, boolean theGatedExecution) { super(theInstanceId, theCurrentStepId); myBatchJobSender = theBatchJobSender; myJobPersistence = theJobPersistence; myJobDefinitionId = theJobDefinitionId; myJobDefinitionVersion = theJobDefinitionVersion; myTargetStep = theTargetStep; + myGatedExecution = theGatedExecution; } @Override @@ -59,8 +61,10 @@ class JobDataSink extends BaseDataSink { BatchWorkChunk batchWorkChunk = new BatchWorkChunk(jobDefinitionId, jobDefinitionVersion, targetStepId, instanceId, sequence, dataValueString); String chunkId = myJobPersistence.storeWorkChunk(batchWorkChunk); - JobWorkNotification workNotification = new JobWorkNotification(jobDefinitionId, jobDefinitionVersion, instanceId, targetStepId, chunkId); - myBatchJobSender.sendWorkChannelMessage(workNotification); + if (!myGatedExecution) { + JobWorkNotification workNotification = new JobWorkNotification(jobDefinitionId, jobDefinitionVersion, instanceId, targetStepId, chunkId); + myBatchJobSender.sendWorkChannelMessage(workNotification); + } } @Override diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobCleanerServiceImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobMaintenanceServiceImpl.java similarity index 53% rename from hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobCleanerServiceImpl.java rename to hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobMaintenanceServiceImpl.java index 4bb6d7fb1f2..c8f89dd692e 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobCleanerServiceImpl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobMaintenanceServiceImpl.java @@ -20,15 +20,22 @@ package ca.uhn.fhir.batch2.impl; * #L% */ -import ca.uhn.fhir.batch2.api.IJobCleanerService; +import ca.uhn.fhir.batch2.api.IJobCompletionHandler; +import ca.uhn.fhir.batch2.api.IJobMaintenanceService; import ca.uhn.fhir.batch2.api.IJobPersistence; +import ca.uhn.fhir.batch2.api.JobCompletionDetails; +import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobInstance; +import ca.uhn.fhir.batch2.model.JobWorkNotification; import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; +import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.util.StopWatch; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.time.DateUtils; import org.quartz.JobExecutionContext; @@ -36,16 +43,27 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import javax.annotation.Nonnull; import javax.annotation.PostConstruct; +import java.util.Collection; import java.util.Date; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyList; +import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; +import static org.apache.commons.lang3.StringUtils.isBlank; /** * This class performs regular polls of the stored jobs in order to - * calculate statistics and delete expired tasks. This class does + * perform maintenance. This includes two major functions. + * + *

    + * First, we calculate statistics and delete expired tasks. This class does * the following things: *

      *
    • For instances that are IN_PROGRESS, calculates throughput and percent complete
    • @@ -55,45 +73,60 @@ import java.util.concurrent.TimeUnit; *
    • For instances that are IN_PROGRESS with an error message set where no chunks are ERRORED or FAILED, clears the error message in the instance (meaning presumably there was an error but it cleared)
    • *
    • For instances that are COMPLETE or FAILED and are old, delete them entirely
    • *
    + *

    + * + *

    + * Second, we check for any job instances where the job is configured to + * have gated execution. For these instances, we check if the current step + * is complete (all chunks are in COMPLETE status) and trigger the next step. + *

    */ -public class JobCleanerServiceImpl extends BaseJobService implements IJobCleanerService { +public class JobMaintenanceServiceImpl extends BaseJobService implements IJobMaintenanceService { public static final int INSTANCES_PER_PASS = 100; public static final long PURGE_THRESHOLD = 7L * DateUtils.MILLIS_PER_DAY; - private static final Logger ourLog = LoggerFactory.getLogger(JobCleanerServiceImpl.class); + private static final Logger ourLog = LoggerFactory.getLogger(JobMaintenanceServiceImpl.class); private final ISchedulerService mySchedulerService; + private final JobDefinitionRegistry myJobDefinitionRegistry; + private final BatchJobSender myBatchJobSender; /** * Constructor */ - public JobCleanerServiceImpl(ISchedulerService theSchedulerService, IJobPersistence theJobPersistence) { + public JobMaintenanceServiceImpl(ISchedulerService theSchedulerService, IJobPersistence theJobPersistence, JobDefinitionRegistry theJobDefinitionRegistry, BatchJobSender theBatchJobSender) { super(theJobPersistence); Validate.notNull(theSchedulerService); + Validate.notNull(theJobDefinitionRegistry); + Validate.notNull(theBatchJobSender); mySchedulerService = theSchedulerService; + myJobDefinitionRegistry = theJobDefinitionRegistry; + myBatchJobSender = theBatchJobSender; } @PostConstruct public void start() { ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition(); - jobDefinition.setId(JobCleanerScheduledJob.class.getName()); - jobDefinition.setJobClass(JobCleanerScheduledJob.class); + jobDefinition.setId(JobMaintenanceScheduledJob.class.getName()); + jobDefinition.setJobClass(JobMaintenanceScheduledJob.class); mySchedulerService.scheduleClusteredJob(DateUtils.MILLIS_PER_MINUTE, jobDefinition); } @Override - public void runCleanupPass() { + public void runMaintenancePass() { // NB: If you add any new logic, update the class javadoc Set processedInstanceIds = new HashSet<>(); + JobChunkProgressAccumulator progressAccumulator = new JobChunkProgressAccumulator(); for (int page = 0; ; page++) { List instances = myJobPersistence.fetchInstances(INSTANCES_PER_PASS, page); for (JobInstance instance : instances) { if (processedInstanceIds.add(instance.getInstanceId())) { - cleanupInstance(instance); + cleanupInstance(instance, progressAccumulator); + triggerGatedExecutions(instance, progressAccumulator); } } @@ -103,13 +136,13 @@ public class JobCleanerServiceImpl extends BaseJobService implements IJobCleaner } } - private void cleanupInstance(JobInstance theInstance) { + private void cleanupInstance(JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) { switch (theInstance.getStatus()) { case QUEUED: break; case IN_PROGRESS: case ERRORED: - calculateInstanceProgress(theInstance); + calculateInstanceProgress(theInstance, theProgressAccumulator); break; case COMPLETED: case FAILED: @@ -132,7 +165,7 @@ public class JobCleanerServiceImpl extends BaseJobService implements IJobCleaner } - private void calculateInstanceProgress(JobInstance theInstance) { + private void calculateInstanceProgress(JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) { int resourcesProcessed = 0; int incompleteChunkCount = 0; int completeChunkCount = 0; @@ -146,6 +179,8 @@ public class JobCleanerServiceImpl extends BaseJobService implements IJobCleaner List chunks = myJobPersistence.fetchWorkChunksWithoutData(theInstance.getInstanceId(), INSTANCES_PER_PASS, page); for (WorkChunk chunk : chunks) { + theProgressAccumulator.addChunk(chunk.getInstanceId(), chunk.getId(), chunk.getTargetStepId(), chunk.getStatus()); + errorCountForAllStatuses += chunk.getErrorCount(); if (chunk.getRecordsProcessed() != null) { @@ -201,7 +236,12 @@ public class JobCleanerServiceImpl extends BaseJobService implements IJobCleaner changedStatus = false; if (incompleteChunkCount == 0 && erroredChunkCount == 0 && failedChunkCount == 0) { - changedStatus |= updateInstanceStatus(theInstance, StatusEnum.COMPLETED); + boolean completed = updateInstanceStatus(theInstance, StatusEnum.COMPLETED); + if (completed) { + JobDefinition definition = myJobDefinitionRegistry.getJobDefinition(theInstance.getJobDefinitionId(), theInstance.getJobDefinitionVersion()).orElseThrow(() -> new IllegalStateException("Unknown job " + theInstance.getJobDefinitionId() + "/" + theInstance.getJobDefinitionVersion())); + invokeJobCompletionHandler(theInstance, definition); + } + changedStatus |= completed; } if (erroredChunkCount > 0) { changedStatus |= updateInstanceStatus(theInstance, StatusEnum.ERRORED); @@ -246,6 +286,17 @@ public class JobCleanerServiceImpl extends BaseJobService implements IJobCleaner } + private void invokeJobCompletionHandler(JobInstance theInstance, JobDefinition definition) { + IJobCompletionHandler completionHandler = definition.getCompletionHandler(); + if (completionHandler != null) { + + String instanceId = theInstance.getInstanceId(); + PT jobParameters = theInstance.getParameters(definition.getParametersType()); + JobCompletionDetails completionDetails = new JobCompletionDetails<>(jobParameters, instanceId); + completionHandler.jobComplete(completionDetails); + } + } + private boolean updateInstanceStatus(JobInstance theInstance, StatusEnum newStatus) { if (theInstance.getStatus() != newStatus) { ourLog.info("Marking job instance {} of type {} as {}", theInstance.getInstanceId(), theInstance.getJobDefinitionId(), newStatus); @@ -255,16 +306,109 @@ public class JobCleanerServiceImpl extends BaseJobService implements IJobCleaner return false; } + private void triggerGatedExecutions(JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) { + if (!theInstance.isRunning()) { + return; + } - public static class JobCleanerScheduledJob implements HapiJob { + String jobDefinitionId = theInstance.getJobDefinitionId(); + int jobDefinitionVersion = theInstance.getJobDefinitionVersion(); + String instanceId = theInstance.getInstanceId(); + + JobDefinition definition = myJobDefinitionRegistry.getJobDefinition(jobDefinitionId, jobDefinitionVersion).orElseThrow(() -> new IllegalStateException("Unknown job definition: " + jobDefinitionId + " " + jobDefinitionVersion)); + if (!definition.isGatedExecution()) { + return; + } + + String currentStepId = theInstance.getCurrentGatedStepId(); + if (isBlank(currentStepId)) { + return; + } + + if (definition.isLastStep(currentStepId)) { + return; + } + + int incompleteChunks = theProgressAccumulator.countChunksWithStatus(instanceId, currentStepId, StatusEnum.getIncompleteStatuses()); + if (incompleteChunks == 0) { + + int currentStepIndex = definition.getStepIndex(currentStepId); + String nextStepId = definition.getSteps().get(currentStepIndex + 1).getStepId(); + + ourLog.info("All processing is complete for gated execution of instance {} step {}. Proceeding to step {}", instanceId, currentStepId, nextStepId); + List chunksForNextStep = theProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, EnumSet.of(StatusEnum.QUEUED)); + for (String nextChunkId : chunksForNextStep) { + JobWorkNotification workNotification = new JobWorkNotification(jobDefinitionId, jobDefinitionVersion, instanceId, nextStepId, nextChunkId); + myBatchJobSender.sendWorkChannelMessage(workNotification); + } + + theInstance.setCurrentGatedStepId(nextStepId); + myJobPersistence.updateInstance(theInstance); + } + + } + + + public static class JobMaintenanceScheduledJob implements HapiJob { @Autowired - private IJobCleanerService myTarget; + private IJobMaintenanceService myTarget; @Override public void execute(JobExecutionContext theContext) { - myTarget.runCleanupPass(); + myTarget.runMaintenancePass(); } } + /** + * While performing cleanup, the cleanup job loads all of the known + * work chunks to examine their status. This bean collects the counts that + * are found, so that they can be reused for maintenance jobs without + * needing to hit the database a second time. + */ + private static class JobChunkProgressAccumulator { + + private final Set myConsumedInstanceAndChunkIds = new HashSet<>(); + private final Multimap myInstanceIdToChunkStatuses = ArrayListMultimap.create(); + + public void addChunk(String theInstanceId, String theChunkId, String theStepId, StatusEnum theStatus) { + // Note: If chunks are being written while we're executing, we may see the same chunk twice. This + // check avoids adding it twice. + if (myConsumedInstanceAndChunkIds.add(theInstanceId + " " + theChunkId)) { + myInstanceIdToChunkStatuses.put(theInstanceId, new ChunkStatusCountKey(theChunkId, theStepId, theStatus)); + } + } + + public int countChunksWithStatus(String theInstanceId, String theStepId, Set theStatuses) { + return getChunkIdsWithStatus(theInstanceId, theStepId, theStatuses).size(); + } + + public List getChunkIdsWithStatus(String theInstanceId, String theStepId, Set theStatuses) { + return getChunkStatuses(theInstanceId).stream().filter(t -> t.myStepId.equals(theStepId)).filter(t -> theStatuses.contains(t.myStatus)).map(t -> t.myChunkId).collect(Collectors.toList()); + } + + @Nonnull + private Collection getChunkStatuses(String theInstanceId) { + Collection chunkStatuses = myInstanceIdToChunkStatuses.get(theInstanceId); + chunkStatuses = defaultIfNull(chunkStatuses, emptyList()); + return chunkStatuses; + } + + + private static class ChunkStatusCountKey { + public final String myChunkId; + public final String myStepId; + public final StatusEnum myStatus; + + private ChunkStatusCountKey(String theChunkId, String theStepId, StatusEnum theStatus) { + myChunkId = theChunkId; + myStepId = theStepId; + myStatus = theStatus; + } + } + + + } + + } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/SynchronizedJobPersistenceWrapper.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/SynchronizedJobPersistenceWrapper.java index 0276a24daf2..25925bd4335 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/SynchronizedJobPersistenceWrapper.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/SynchronizedJobPersistenceWrapper.java @@ -22,8 +22,10 @@ package ca.uhn.fhir.batch2.impl; import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.model.JobInstance; +import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunk; +import java.util.EnumSet; import java.util.List; import java.util.Optional; diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobDefinition.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobDefinition.java index f22c324552e..5d18a70d7a7 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobDefinition.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobDefinition.java @@ -20,6 +20,7 @@ package ca.uhn.fhir.batch2.model; * #L% */ +import ca.uhn.fhir.batch2.api.IJobCompletionHandler; import ca.uhn.fhir.batch2.api.IJobParametersValidator; import ca.uhn.fhir.batch2.api.IJobStepWorker; import ca.uhn.fhir.batch2.api.VoidModel; @@ -31,6 +32,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; public class JobDefinition { @@ -42,11 +44,14 @@ public class JobDefinition { private final List> mySteps; private final String myJobDescription; private final IJobParametersValidator myParametersValidator; + private final boolean myGatedExecution; + private final List myStepIds; + private final IJobCompletionHandler myCompletionHandler; /** * Constructor */ - private JobDefinition(String theJobDefinitionId, int theJobDefinitionVersion, String theJobDescription, Class theParametersType, List> theSteps, IJobParametersValidator theParametersValidator) { + private JobDefinition(String theJobDefinitionId, int theJobDefinitionVersion, String theJobDescription, Class theParametersType, List> theSteps, IJobParametersValidator theParametersValidator, boolean theGatedExecution, IJobCompletionHandler theCompletionHandler) { Validate.isTrue(theJobDefinitionId.length() <= ID_MAX_LENGTH, "Maximum ID length is %d", ID_MAX_LENGTH); Validate.notBlank(theJobDefinitionId, "No job definition ID supplied"); Validate.notBlank(theJobDescription, "No job description supplied"); @@ -56,8 +61,16 @@ public class JobDefinition { myJobDefinitionVersion = theJobDefinitionVersion; myJobDescription = theJobDescription; mySteps = theSteps; + myStepIds = mySteps.stream().map(t -> t.getStepId()).collect(Collectors.toList()); myParametersType = theParametersType; myParametersValidator = theParametersValidator; + myGatedExecution = theGatedExecution; + myCompletionHandler = theCompletionHandler; + } + + @Nullable + public IJobCompletionHandler getCompletionHandler() { + return myCompletionHandler; } @Nullable @@ -97,8 +110,18 @@ public class JobDefinition { return mySteps; } - public static Builder newBuilder() { - return new Builder<>(); + public boolean isGatedExecution() { + return myGatedExecution; + } + + public int getStepIndex(String theStepId) { + int retVal = myStepIds.indexOf(theStepId); + Validate.isTrue(retVal != -1); + return retVal; + } + + public boolean isLastStep(String theStepId) { + return getStepIndex(theStepId) == (myStepIds.size() - 1); } public static class Builder { @@ -111,12 +134,14 @@ public class JobDefinition { private Class myNextInputType; @Nullable private IJobParametersValidator myParametersValidator; + private boolean myGatedExecution; + private IJobCompletionHandler myCompletionHandler; Builder() { mySteps = new ArrayList<>(); } - Builder(List> theSteps, String theJobDefinitionId, int theJobDefinitionVersion, String theJobDescription, Class theJobParametersType, Class theNextInputType, IJobParametersValidator theParametersValidator) { + Builder(List> theSteps, String theJobDefinitionId, int theJobDefinitionVersion, String theJobDescription, Class theJobParametersType, Class theNextInputType, IJobParametersValidator theParametersValidator, boolean theGatedExecution, IJobCompletionHandler theCompletionHandler) { mySteps = theSteps; myJobDefinitionId = theJobDefinitionId; myJobDefinitionVersion = theJobDefinitionVersion; @@ -124,6 +149,8 @@ public class JobDefinition { myJobParametersType = theJobParametersType; myNextInputType = theNextInputType; myParametersValidator = theParametersValidator; + myGatedExecution = theGatedExecution; + myCompletionHandler = theCompletionHandler; } /** @@ -154,7 +181,7 @@ public class JobDefinition { */ public Builder addFirstStep(String theStepId, String theStepDescription, Class theOutputType, IJobStepWorker theStepWorker) { mySteps.add(new JobDefinitionStep<>(theStepId, theStepDescription, theStepWorker, VoidModel.class, theOutputType)); - return new Builder<>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, theOutputType, myParametersValidator); + return new Builder<>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, theOutputType, myParametersValidator, myGatedExecution, myCompletionHandler); } /** @@ -168,7 +195,7 @@ public class JobDefinition { */ public Builder addIntermediateStep(String theStepId, String theStepDescription, Class theOutputType, IJobStepWorker theStepWorker) { mySteps.add(new JobDefinitionStep<>(theStepId, theStepDescription, theStepWorker, myNextInputType, theOutputType)); - return new Builder<>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, theOutputType, myParametersValidator); + return new Builder<>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, theOutputType, myParametersValidator, myGatedExecution, myCompletionHandler); } /** @@ -182,12 +209,12 @@ public class JobDefinition { */ public Builder addLastStep(String theStepId, String theStepDescription, IJobStepWorker theStepWorker) { mySteps.add(new JobDefinitionStep<>(theStepId, theStepDescription, theStepWorker, myNextInputType, VoidModel.class)); - return new Builder<>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, VoidModel.class, myParametersValidator); + return new Builder<>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, VoidModel.class, myParametersValidator, myGatedExecution, myCompletionHandler); } public JobDefinition build() { Validate.notNull(myJobParametersType, "No job parameters type was supplied"); - return new JobDefinition<>(myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, Collections.unmodifiableList(mySteps), myParametersValidator); + return new JobDefinition<>(myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, Collections.unmodifiableList(mySteps), myParametersValidator, myGatedExecution, myCompletionHandler); } public Builder setJobDescription(String theJobDescription) { @@ -233,12 +260,58 @@ public class JobDefinition { * @param theParametersValidator The validator (must not be null. Do not call this method at all if you do not want a parameters validator). */ @SuppressWarnings("unchecked") - public Builder setParametersValidator(@Nonnull IJobParametersValidator theParametersValidator) { + public Builder setParametersValidator(@Nonnull IJobParametersValidator theParametersValidator) { Validate.notNull(theParametersValidator, "theParametersValidator must not be null"); Validate.isTrue(myParametersValidator == null, "Can not supply multiple parameters validators. Already have: %s", myParametersValidator); myParametersValidator = theParametersValidator; - return (Builder) this; + return this; } + + /** + * If this is set, the framework will wait for all work chunks to be + * processed for an individual step before moving on to beginning + * processing on the next step. Otherwise, processing on subsequent + * steps may begin as soon as any data has been produced. + *

    + * This is useful in a few cases: + *

      + *
    • + * If there are potential constraint issues, e.g. data being + * written by the third step depends on all data from the + * second step already being written + *
    • + *
    • + * If multiple steps require expensive database queries, it may + * reduce the chances of timeouts to ensure that they are run + * discretely. + *
    • + *
    + *

    + *

    + * Setting this mode means the job may take longer, since it will + * rely on a polling mechanism to determine that one step is + * complete before beginning any processing for the next step. + *

    + */ + public Builder gatedExecution() { + myGatedExecution = true; + return this; + } + + /** + * Supplies an optional callback that will be invoked when the job is complete + */ + public Builder completionHandler(IJobCompletionHandler theCompletionHandler) { + Validate.isTrue(myCompletionHandler == null, "Can not supply multiple completion handlers"); + myCompletionHandler = theCompletionHandler; + return this; + } + + + } + + public static Builder newBuilder() { + return new Builder<>(); } } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobInstance.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobInstance.java index dd6b65553a1..6c25104e20e 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobInstance.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobInstance.java @@ -74,13 +74,12 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson { @JsonProperty(value = "progress", access = JsonProperty.Access.READ_ONLY) private double myProgress; - + @JsonProperty(value = "currentGatedStepId", access = JsonProperty.Access.READ_ONLY) + private String myCurrentGatedStepId; @JsonProperty(value = "errorMessage", access = JsonProperty.Access.READ_ONLY) private String myErrorMessage; - @JsonProperty(value = "errorCount", access = JsonProperty.Access.READ_ONLY) private int myErrorCount; - @JsonProperty(value = "estimatedCompletion", access = JsonProperty.Access.READ_ONLY) private String myEstimatedTimeRemaining; @@ -111,6 +110,15 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson { setStatus(theJobInstance.getStatus()); setTotalElapsedMillis(theJobInstance.getTotalElapsedMillis()); setWorkChunksPurged(theJobInstance.isWorkChunksPurged()); + setCurrentGatedStepId(theJobInstance.getCurrentGatedStepId()); + } + + public String getCurrentGatedStepId() { + return myCurrentGatedStepId; + } + + public void setCurrentGatedStepId(String theCurrentGatedStepId) { + myCurrentGatedStepId = theCurrentGatedStepId; } public int getErrorCount() { @@ -258,4 +266,11 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson { .append("estimatedTimeRemaining", myEstimatedTimeRemaining) .toString(); } + + /** + * Returns true if the job instance is in {@link StatusEnum#IN_PROGRESS} and is not cancelled + */ + public boolean isRunning() { + return getStatus() == StatusEnum.IN_PROGRESS && !isCancelled(); + } } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/StatusEnum.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/StatusEnum.java index dbef53b05bc..e645f12bb45 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/StatusEnum.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/StatusEnum.java @@ -20,33 +20,63 @@ package ca.uhn.fhir.batch2.model; * #L% */ +import java.util.Collections; +import java.util.EnumSet; +import java.util.Set; + public enum StatusEnum { /** * Task is waiting to execute and should begin with no intervention required. */ - QUEUED, + QUEUED(true), /** * Task is current executing */ - IN_PROGRESS, + IN_PROGRESS(true), /** * Task completed successfully */ - COMPLETED, + COMPLETED(false), /** * Task execution resulted in an error but the error may be transient (or transient status is unknown). * Retrying may result in success. */ - ERRORED, + ERRORED(true), /** * Task has failed and is known to be unrecoverable. There is no reason to believe that retrying will * result in a different outcome. */ - FAILED + FAILED(true); + + private final boolean myIncomplete; + private static Set ourIncompleteStatuses; + + StatusEnum(boolean theIncomplete) { + myIncomplete = theIncomplete; + } + + /** + * Statuses that represent a job that has not yet completed. I.e. + * all statuses except {@link #COMPLETED} + */ + public static Set getIncompleteStatuses() { + Set retVal = ourIncompleteStatuses; + if (retVal == null) { + EnumSet set = EnumSet.noneOf(StatusEnum.class); + for (StatusEnum next : values()) { + if (next.myIncomplete) { + set.add(next); + } + } + ourIncompleteStatuses = Collections.unmodifiableSet(set); + retVal = ourIncompleteStatuses; + } + return retVal; + } } diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/BaseBatch2Test.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/BaseBatch2Test.java new file mode 100644 index 00000000000..de6d94d74e6 --- /dev/null +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/BaseBatch2Test.java @@ -0,0 +1,74 @@ +package ca.uhn.fhir.batch2.impl; + +import ca.uhn.fhir.batch2.api.IJobStepWorker; +import ca.uhn.fhir.batch2.api.VoidModel; +import ca.uhn.fhir.batch2.model.JobDefinition; +import ca.uhn.fhir.batch2.model.JobInstance; +import ca.uhn.fhir.batch2.model.StatusEnum; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import javax.annotation.Nonnull; +import java.util.function.Consumer; + +@ExtendWith(MockitoExtension.class) +public abstract class BaseBatch2Test { + + public static final String JOB_DEFINITION_ID = "JOB_DEFINITION_ID"; + public static final String PARAM_1_VALUE = "PARAM 1 VALUE"; + public static final String PARAM_2_VALUE = "PARAM 2 VALUE"; + public static final String PASSWORD_VALUE = "PASSWORD VALUE"; + public static final String STEP_1 = "STEP_1"; + public static final String STEP_2 = "STEP_2"; + public static final String STEP_3 = "STEP_3"; + public static final String INSTANCE_ID = "INSTANCE-ID"; + public static final String CHUNK_ID = "CHUNK-ID"; + public static final String CHUNK_ID_2 = "CHUNK-ID-2"; + public static final String DATA_1_VALUE = "data 1 value"; + public static final String DATA_2_VALUE = "data 2 value"; + public static final String DATA_3_VALUE = "data 3 value"; + public static final String DATA_4_VALUE = "data 4 value"; + + @Mock + protected IJobStepWorker myStep1Worker; + @Mock + protected IJobStepWorker myStep2Worker; + @Mock + protected IJobStepWorker myStep3Worker; + + @Nonnull + static JobInstance createInstance() { + JobInstance instance = new JobInstance(); + instance.setInstanceId(INSTANCE_ID); + instance.setStatus(StatusEnum.IN_PROGRESS); + instance.setJobDefinitionId(JOB_DEFINITION_ID); + instance.setJobDefinitionVersion(1); + instance.setParameters(new TestJobParameters() + .setParam1(PARAM_1_VALUE) + .setParam2(PARAM_2_VALUE) + .setPassword(PASSWORD_VALUE) + ); + return instance; + } + + @SafeVarargs + final JobDefinition createJobDefinition(Consumer>... theModifiers) { + JobDefinition.Builder builder = JobDefinition + .newBuilder() + .setJobDefinitionId(JOB_DEFINITION_ID) + .setJobDescription("This is a job description") + .setJobDefinitionVersion(1) + .setParametersType(TestJobParameters.class) + .addFirstStep(STEP_1, "Step 1", TestJobStep2InputType.class, myStep1Worker) + .addIntermediateStep(STEP_2, "Step 2", TestJobStep3InputType.class, myStep2Worker) + .addLastStep(STEP_3, "Step 3", myStep3Worker); + + for (Consumer> next : theModifiers) { + next.accept(builder); + } + + return builder.build(); + } + +} diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImplTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImplTest.java index 889bc569589..624d9a049da 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImplTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImplTest.java @@ -3,7 +3,6 @@ package ca.uhn.fhir.batch2.impl; import ca.uhn.fhir.batch2.api.IJobDataSink; import ca.uhn.fhir.batch2.api.IJobParametersValidator; import ca.uhn.fhir.batch2.api.IJobPersistence; -import ca.uhn.fhir.batch2.api.IJobStepWorker; import ca.uhn.fhir.batch2.api.RunOutcome; import ca.uhn.fhir.batch2.api.StepExecutionDetails; import ca.uhn.fhir.batch2.api.VoidModel; @@ -35,12 +34,12 @@ import org.springframework.messaging.MessageDeliveryException; import javax.annotation.Nonnull; import java.util.List; import java.util.Optional; -import java.util.function.Consumer; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -49,21 +48,8 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @TestMethodOrder(MethodOrderer.MethodName.class) -public class JobCoordinatorImplTest { +public class JobCoordinatorImplTest extends BaseBatch2Test { - public static final String JOB_DEFINITION_ID = "JOB_DEFINITION_ID"; - public static final String PARAM_1_VALUE = "PARAM 1 VALUE"; - public static final String PARAM_2_VALUE = "PARAM 2 VALUE"; - public static final String PASSWORD_VALUE = "PASSWORD VALUE"; - public static final String STEP_1 = "STEP_1"; - public static final String STEP_2 = "STEP_2"; - public static final String STEP_3 = "STEP_3"; - public static final String INSTANCE_ID = "INSTANCE-ID"; - public static final String CHUNK_ID = "CHUNK-ID"; - public static final String DATA_1_VALUE = "data 1 value"; - public static final String DATA_2_VALUE = "data 2 value"; - public static final String DATA_3_VALUE = "data 3 value"; - public static final String DATA_4_VALUE = "data 4 value"; private static final Logger ourLog = LoggerFactory.getLogger(JobCoordinatorImplTest.class); private final IChannelReceiver myWorkChannelReceiver = LinkedBlockingChannel.newSynchronous("receiver"); private JobCoordinatorImpl mySvc; @@ -73,12 +59,6 @@ public class JobCoordinatorImplTest { private IJobPersistence myJobInstancePersister; @Mock private JobDefinitionRegistry myJobDefinitionRegistry; - @Mock - private IJobStepWorker myStep1Worker; - @Mock - private IJobStepWorker myStep2Worker; - @Mock - private IJobStepWorker myStep3Worker; @Captor private ArgumentCaptor> myStep1ExecutionDetailsCaptor; @Captor @@ -114,7 +94,7 @@ public class JobCoordinatorImplTest { // Setup - JobDefinition definition = createJobDefinition(); + JobDefinition definition = createJobDefinition(); JobInstance instance = createInstance(); when(myJobDefinitionRegistry.getJobDefinition(eq(JOB_DEFINITION_ID), eq(1))).thenReturn(Optional.of(definition)); @@ -160,7 +140,8 @@ public class JobCoordinatorImplTest { when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunkStep1())); when(myStep1Worker.run(any(), any())).thenAnswer(t -> { IJobDataSink sink = t.getArgument(1, IJobDataSink.class); - sink.accept(new TestJobStep2InputType("data value 1", "data value 2")); + sink.accept(new TestJobStep2InputType("data value 1a", "data value 2a")); + sink.accept(new TestJobStep2InputType("data value 1b", "data value 2b")); return new RunOutcome(50); }); mySvc.start(); @@ -178,6 +159,8 @@ public class JobCoordinatorImplTest { assertEquals(PASSWORD_VALUE, params.getPassword()); verify(myJobInstancePersister, times(1)).markWorkChunkAsCompletedAndClearData(any(), eq(50)); + verify(myJobInstancePersister, times(0)).fetchWorkChunksWithoutData(any(), anyInt(), anyInt()); + verify(myBatchJobSender, times(2)).sendWorkChannelMessage(any()); } /** @@ -210,6 +193,41 @@ public class JobCoordinatorImplTest { verify(myJobInstancePersister, times(1)).markInstanceAsCompleted(eq(INSTANCE_ID)); } + @Test + public void testPerformStep_FirstStep_GatedExecutionMode() { + + // Setup + + JobDefinition jobDefinition = createJobDefinition( + t -> t.gatedExecution() + ); + when(myJobDefinitionRegistry.getJobDefinition(eq(JOB_DEFINITION_ID), eq(1))).thenReturn(Optional.of(jobDefinition)); + when(myJobInstancePersister.fetchInstanceAndMarkInProgress(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance())); + when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunkStep1())); + when(myStep1Worker.run(any(), any())).thenAnswer(t -> { + IJobDataSink sink = t.getArgument(1, IJobDataSink.class); + sink.accept(new TestJobStep2InputType("data value 1a", "data value 2a")); + sink.accept(new TestJobStep2InputType("data value 1b", "data value 2b")); + return new RunOutcome(50); + }); + mySvc.start(); + + // Execute + + myWorkChannelReceiver.send(new JobWorkNotificationJsonMessage(createWorkNotification(STEP_1))); + + // Verify + + verify(myStep1Worker, times(1)).run(myStep1ExecutionDetailsCaptor.capture(), any()); + TestJobParameters params = myStep1ExecutionDetailsCaptor.getValue().getParameters(); + assertEquals(PARAM_1_VALUE, params.getParam1()); + assertEquals(PARAM_2_VALUE, params.getParam2()); + assertEquals(PASSWORD_VALUE, params.getPassword()); + + verify(myJobInstancePersister, times(1)).markWorkChunkAsCompletedAndClearData(any(), eq(50)); + verify(myBatchJobSender, times(0)).sendWorkChannelMessage(any()); + } + @Test public void testPerformStep_SecondStep() { @@ -276,7 +294,7 @@ public class JobCoordinatorImplTest { when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_2, new TestJobStep2InputType(DATA_1_VALUE, DATA_2_VALUE)))); when(myJobDefinitionRegistry.getJobDefinition(eq(JOB_DEFINITION_ID), eq(1))).thenReturn(Optional.of(createJobDefinition())); when(myJobInstancePersister.fetchInstanceAndMarkInProgress(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance())); - when(myStep2Worker.run(any(), any())).thenAnswer(t->{ + when(myStep2Worker.run(any(), any())).thenAnswer(t -> { IJobDataSink sink = t.getArgument(1, IJobDataSink.class); sink.recoveredError("Error message 1"); sink.recoveredError("Error message 2"); @@ -479,7 +497,7 @@ public class JobCoordinatorImplTest { } return null; }; - JobDefinition jobDefinition = createJobDefinition(t->t.setParametersValidator(v)); + JobDefinition jobDefinition = createJobDefinition(t -> t.setParametersValidator(v)); when(myJobDefinitionRegistry.getLatestJobDefinition(eq(JOB_DEFINITION_ID))).thenReturn(Optional.of(jobDefinition)); // Execute @@ -504,51 +522,17 @@ public class JobCoordinatorImplTest { } } - @SafeVarargs - private JobDefinition createJobDefinition(Consumer>... theModifiers) { - JobDefinition.Builder builder = JobDefinition - .newBuilder() - .setJobDefinitionId(JOB_DEFINITION_ID) - .setJobDescription("This is a job description") - .setJobDefinitionVersion(1) - .setParametersType(TestJobParameters.class) - .addFirstStep(STEP_1, "Step 1", TestJobStep2InputType.class, myStep1Worker) - .addIntermediateStep(STEP_2, "Step 2", TestJobStep3InputType.class, myStep2Worker) - .addLastStep(STEP_3, "Step 3", myStep3Worker); - - for (Consumer> next : theModifiers) { - next.accept(builder); - } - - return builder.build(); - } - @Nonnull private JobWorkNotification createWorkNotification(String theStepId) { JobWorkNotification payload = new JobWorkNotification(); payload.setJobDefinitionId(JOB_DEFINITION_ID); payload.setJobDefinitionVersion(1); payload.setInstanceId(INSTANCE_ID); - payload.setChunkId(JobCoordinatorImplTest.CHUNK_ID); + payload.setChunkId(BaseBatch2Test.CHUNK_ID); payload.setTargetStepId(theStepId); return payload; } - @Nonnull - static JobInstance createInstance() { - JobInstance instance = new JobInstance(); - instance.setInstanceId(INSTANCE_ID); - instance.setStatus(StatusEnum.IN_PROGRESS); - instance.setJobDefinitionId(JOB_DEFINITION_ID); - instance.setJobDefinitionVersion(1); - instance.setParameters(new TestJobParameters() - .setParam1(PARAM_1_VALUE) - .setParam2(PARAM_2_VALUE) - .setPassword(PASSWORD_VALUE) - ); - return instance; - } - @Nonnull static WorkChunk createWorkChunk(String theTargetStepId, IModelJson theData) { return new WorkChunk() diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDataSinkTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDataSinkTest.java index 1b05f82b61b..7b11e109798 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDataSinkTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDataSinkTest.java @@ -88,7 +88,7 @@ class JobDataSinkTest { // Let's test our first step worker by calling run on it: when(myJobPersistence.storeWorkChunk(myBatchWorkChunkCaptor.capture())).thenReturn(CHUNK_ID); StepExecutionDetails details = new StepExecutionDetails<>(new TestJobParameters().setParam1("" + PID_COUNT), null, JOB_INSTANCE_ID, CHUNK_ID); - JobDataSink sink = new JobDataSink<>(myBatchJobSender, myJobPersistence, JOB_DEF_ID, JOB_DEF_VERSION, job.getSteps().get(1), JOB_INSTANCE_ID, FIRST_STEP_ID); + JobDataSink sink = new JobDataSink<>(myBatchJobSender, myJobPersistence, JOB_DEF_ID, JOB_DEF_VERSION, job.getSteps().get(1), JOB_INSTANCE_ID, FIRST_STEP_ID, false); RunOutcome result = firstStepWorker.run(details, sink); diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobCleanerServiceImplTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobMaintenanceServiceImplTest.java similarity index 77% rename from hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobCleanerServiceImplTest.java rename to hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobMaintenanceServiceImplTest.java index 8c2318581b0..6f5ad680fdc 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobCleanerServiceImplTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobMaintenanceServiceImplTest.java @@ -1,9 +1,13 @@ package ca.uhn.fhir.batch2.impl; +import ca.uhn.fhir.batch2.api.IJobCompletionHandler; import ca.uhn.fhir.batch2.api.IJobPersistence; +import ca.uhn.fhir.batch2.api.JobCompletionDetails; import ca.uhn.fhir.batch2.model.JobInstance; +import ca.uhn.fhir.batch2.model.JobWorkNotification; import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; +import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; import com.google.common.collect.Lists; import org.hl7.fhir.r4.model.DateTimeType; import org.junit.jupiter.api.BeforeEach; @@ -13,14 +17,10 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.messaging.Message; import java.util.Date; -import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.INSTANCE_ID; -import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.STEP_1; -import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.STEP_2; -import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.STEP_3; -import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.createInstance; import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.createWorkChunk; import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.createWorkChunkStep1; import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.createWorkChunkStep2; @@ -38,35 +38,48 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) -public class JobCleanerServiceImplTest { +public class JobMaintenanceServiceImplTest extends BaseBatch2Test { + @Mock + IJobCompletionHandler myCompletionHandler; @Mock private ISchedulerService mySchedulerService; @Mock private IJobPersistence myJobPersistence; - private JobCleanerServiceImpl mySvc; + private JobMaintenanceServiceImpl mySvc; @Captor private ArgumentCaptor myInstanceCaptor; + private JobDefinitionRegistry myJobDefinitionRegistry; + @Mock + private IChannelProducer myWorkChannelProducer; + @Captor + private ArgumentCaptor> myMessageCaptor; + @Captor + private ArgumentCaptor> myJobCompletionCaptor; @BeforeEach public void beforeEach() { - mySvc = new JobCleanerServiceImpl(mySchedulerService, myJobPersistence); + myJobDefinitionRegistry = new JobDefinitionRegistry(); + BatchJobSender batchJobSender = new BatchJobSender(myWorkChannelProducer); + mySvc = new JobMaintenanceServiceImpl(mySchedulerService, myJobPersistence, myJobDefinitionRegistry, batchJobSender); } @Test public void testInProgress_CalculateProgress_FirstCompleteButNoOtherStepsYetComplete() { + myJobDefinitionRegistry.addJobDefinition(createJobDefinition()); when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(createInstance())); when(myJobPersistence.fetchWorkChunksWithoutData(eq(INSTANCE_ID), anyInt(), eq(0))).thenReturn(Lists.newArrayList( createWorkChunk(STEP_1, null).setStatus(StatusEnum.COMPLETED) )); - mySvc.runCleanupPass(); + mySvc.runMaintenancePass(); verify(myJobPersistence, never()).updateInstance(any()); } @Test public void testInProgress_CalculateProgress_FirstStepComplete() { + myJobDefinitionRegistry.addJobDefinition(createJobDefinition()); when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(createInstance())); when(myJobPersistence.fetchWorkChunksWithoutData(eq(INSTANCE_ID), anyInt(), eq(0))).thenReturn(Lists.newArrayList( createWorkChunkStep1().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")), @@ -77,7 +90,7 @@ public class JobCleanerServiceImplTest { createWorkChunkStep3().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25) )); - mySvc.runCleanupPass(); + mySvc.runMaintenancePass(); verify(myJobPersistence, times(1)).updateInstance(myInstanceCaptor.capture()); JobInstance instance = myInstanceCaptor.getValue(); @@ -87,7 +100,7 @@ public class JobCleanerServiceImplTest { assertEquals(0.08333333333333333, instance.getCombinedRecordsProcessedPerSecond()); assertNotNull(instance.getStartTime()); assertEquals(parseTime("2022-02-12T14:00:00-04:00"), instance.getStartTime()); - assertEquals(null, instance.getEndTime()); + assertNull(instance.getEndTime()); assertEquals("00:10:00", instance.getEstimatedTimeRemaining()); verifyNoMoreInteractions(myJobPersistence); @@ -95,6 +108,8 @@ public class JobCleanerServiceImplTest { @Test public void testInProgress_CalculateProgress_InstanceHasErrorButNoChunksAreErrored() { + // Setup + myJobDefinitionRegistry.addJobDefinition(createJobDefinition()); JobInstance instance1 = createInstance(); instance1.setErrorMessage("This is an error message"); when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance1)); @@ -107,8 +122,10 @@ public class JobCleanerServiceImplTest { createWorkChunkStep3().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25) )); - mySvc.runCleanupPass(); + // Execute + mySvc.runMaintenancePass(); + // Verify verify(myJobPersistence, times(1)).updateInstance(myInstanceCaptor.capture()); JobInstance instance = myInstanceCaptor.getValue(); @@ -121,6 +138,30 @@ public class JobCleanerServiceImplTest { verifyNoMoreInteractions(myJobPersistence); } + @Test + public void testInProgress_GatedExecution_FirstStepComplete() { + // Setup + myJobDefinitionRegistry.addJobDefinition(createJobDefinition(t -> t.gatedExecution())); + when(myJobPersistence.fetchWorkChunksWithoutData(eq(INSTANCE_ID), eq(100), eq(0))).thenReturn(Lists.newArrayList( + createWorkChunkStep2().setStatus(StatusEnum.QUEUED).setId(CHUNK_ID), + createWorkChunkStep2().setStatus(StatusEnum.QUEUED).setId(CHUNK_ID_2) + )); + JobInstance instance1 = createInstance(); + instance1.setCurrentGatedStepId(STEP_1); + when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance1)); + + // Execute + mySvc.runMaintenancePass(); + + // Verify + verify(myWorkChannelProducer, times(2)).send(myMessageCaptor.capture()); + JobWorkNotification payload0 = myMessageCaptor.getAllValues().get(0).getPayload(); + assertEquals(STEP_2, payload0.getTargetStepId()); + assertEquals(CHUNK_ID, payload0.getChunkId()); + JobWorkNotification payload1 = myMessageCaptor.getAllValues().get(1).getPayload(); + assertEquals(STEP_2, payload1.getTargetStepId()); + assertEquals(CHUNK_ID_2, payload1.getChunkId()); + } @Test public void testFailed_PurgeOldInstance() { @@ -129,7 +170,7 @@ public class JobCleanerServiceImplTest { instance.setEndTime(parseTime("2001-01-01T12:12:12Z")); when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance)); - mySvc.runCleanupPass(); + mySvc.runMaintenancePass(); verify(myJobPersistence, times(1)).deleteInstanceAndChunks(eq(INSTANCE_ID)); verifyNoMoreInteractions(myJobPersistence); @@ -137,6 +178,9 @@ public class JobCleanerServiceImplTest { @Test public void testInProgress_CalculateProgress_AllStepsComplete() { + // Setup + + myJobDefinitionRegistry.addJobDefinition(createJobDefinition(t -> t.completionHandler(myCompletionHandler))); when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(createInstance())); when(myJobPersistence.fetchWorkChunksWithoutData(eq(INSTANCE_ID), anyInt(), eq(0))).thenReturn(Lists.newArrayList( createWorkChunkStep1().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:01:00-04:00")).setRecordsProcessed(25), @@ -147,7 +191,11 @@ public class JobCleanerServiceImplTest { createWorkChunkStep3().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25) )); - mySvc.runCleanupPass(); + // Execute + + mySvc.runMaintenancePass(); + + // Verify verify(myJobPersistence, times(2)).updateInstance(myInstanceCaptor.capture()); JobInstance instance = myInstanceCaptor.getAllValues().get(0); @@ -159,8 +207,12 @@ public class JobCleanerServiceImplTest { assertEquals(parseTime("2022-02-12T14:10:00-04:00"), instance.getEndTime()); verify(myJobPersistence, times(1)).deleteChunks(eq(INSTANCE_ID)); + verify(myCompletionHandler, times(1)).jobComplete(myJobCompletionCaptor.capture()); verifyNoMoreInteractions(myJobPersistence); + + assertEquals(INSTANCE_ID, myJobCompletionCaptor.getValue().getInstanceId()); + assertEquals(PARAM_1_VALUE, myJobCompletionCaptor.getValue().getParameters().getParam1()); } @Test @@ -175,7 +227,7 @@ public class JobCleanerServiceImplTest { createWorkChunkStep3().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25) )); - mySvc.runCleanupPass(); + mySvc.runMaintenancePass(); verify(myJobPersistence, times(2)).updateInstance(myInstanceCaptor.capture()); JobInstance instance = myInstanceCaptor.getAllValues().get(0);