Add gated execution mode for Batch2 (#3545)

* Add gated execution mode for Batcvh2

* Allow for async processing

* Test fixes

* Cleanup

* Add javadoc

* Fixes
This commit is contained in:
James Agnew 2022-04-18 20:20:52 -04:00 committed by GitHub
parent 51f38eae98
commit 5dfdc91682
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 657 additions and 200 deletions

View File

@ -35,6 +35,7 @@ import org.springframework.data.domain.PageRequest;
import javax.annotation.Nonnull;
import javax.transaction.Transactional;
import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@ -91,6 +92,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
entity.setDefinitionVersion(theInstance.getJobDefinitionVersion());
entity.setStatus(theInstance.getStatus());
entity.setParams(theInstance.getParameters());
entity.setCurrentGatedStepId(theInstance.getCurrentGatedStepId());
entity.setCreateTime(new Date());
entity = myJobInstanceRepository.save(entity);
@ -156,6 +158,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
retVal.setErrorCount(theEntity.getErrorCount());
retVal.setEstimatedTimeRemaining(theEntity.getEstimatedTimeRemaining());
retVal.setParameters(theEntity.getParams());
retVal.setCurrentGatedStepId(theEntity.getCurrentGatedStepId());
return retVal;
}
@ -202,6 +205,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
instance.setErrorMessage(theInstance.getErrorMessage());
instance.setErrorCount(theInstance.getErrorCount());
instance.setEstimatedTimeRemaining(theInstance.getEstimatedTimeRemaining());
instance.setCurrentGatedStepId(theInstance.getCurrentGatedStepId());
myJobInstanceRepository.save(instance);
}

View File

@ -37,6 +37,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
@ -85,7 +86,8 @@ public class DeleteExpungeProcessor implements ItemProcessor<List<Long>, List<St
List<ResourceLink> conflictResourceLinks = Collections.synchronizedList(new ArrayList<>());
PartitionRunner partitionRunner = new PartitionRunner(PROCESS_NAME, THREAD_PREFIX, myDaoConfig.getExpungeBatchSize(), myDaoConfig.getExpungeThreadCount());
partitionRunner.runInPartitionedThreads(thePids, someTargetPids -> findResourceLinksWithTargetPidIn(thePids, someTargetPids, conflictResourceLinks));
Consumer<List<Long>> listConsumer = someTargetPids -> findResourceLinksWithTargetPidIn(thePids, someTargetPids, conflictResourceLinks);
partitionRunner.runInPartitionedThreads(thePids, listConsumer);
if (conflictResourceLinks.isEmpty()) {
return;

View File

@ -38,6 +38,7 @@ import javax.persistence.TemporalType;
import java.io.Serializable;
import java.util.Date;
import static ca.uhn.fhir.batch2.model.JobDefinition.ID_MAX_LENGTH;
import static ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity.ERROR_MSG_MAX_LENGTH;
import static org.apache.commons.lang3.StringUtils.left;
@ -101,6 +102,16 @@ public class Batch2JobInstanceEntity implements Serializable {
private int myErrorCount;
@Column(name = "EST_REMAINING", length = TIME_REMAINING_LENGTH, nullable = true)
private String myEstimatedTimeRemaining;
@Column(name = "CUR_GATED_STEP_ID", length = ID_MAX_LENGTH, nullable = true)
private String myCurrentGatedStepId;
public String getCurrentGatedStepId() {
return myCurrentGatedStepId;
}
public void setCurrentGatedStepId(String theCurrentGatedStepId) {
myCurrentGatedStepId = theCurrentGatedStepId;
}
public boolean isCancelled() {
return myCancelled;

View File

@ -49,11 +49,10 @@ import java.util.stream.Collectors;
@SuppressWarnings({"SqlNoDataSourceInspection", "SpellCheckingInspection"})
public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
private final Set<FlagEnum> myFlags;
// H2, Derby, MariaDB, and MySql automatically add indexes to foreign keys
public static final DriverTypeEnum[] NON_AUTOMATIC_FK_INDEX_PLATFORMS = new DriverTypeEnum[]{
DriverTypeEnum.POSTGRES_9_4, DriverTypeEnum.ORACLE_12C, DriverTypeEnum.MSSQL_2012};
private final Set<FlagEnum> myFlags;
/**
@ -278,10 +277,17 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
version
.onTable("HFJ_RESOURCE")
.dropIndex("20220314.1", "IDX_INDEXSTATUS");
version
.onTable("BT2_JOB_INSTANCE")
.addColumn("20220416.1", "CUR_GATED_STEP_ID")
.nullable()
.type(ColumnTypeEnum.STRING, 100);
}
/**
* new numeric search indexing
*
* @see ca.uhn.fhir.jpa.search.builder.predicate.NumberPredicateBuilder
* @see ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamNumber
*/
@ -320,6 +326,7 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
/**
* new quantity search indexing
*
* @see ca.uhn.fhir.jpa.search.builder.predicate.QuantityPredicateBuilder
* @see ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamQuantity
* @see ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamQuantityNormalized

View File

@ -20,7 +20,7 @@ package ca.uhn.fhir.jpa.test;
* #L%
*/
import ca.uhn.fhir.batch2.api.IJobCleanerService;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
@ -33,21 +33,21 @@ import static org.hamcrest.Matchers.equalTo;
public class Batch2JobHelper {
@Autowired
private IJobCleanerService myJobCleanerService;
private IJobMaintenanceService myJobCleanerService;
@Autowired
private IJobCoordinator myJobCoordinator;
public void awaitJobCompletion(String theId) {
await().until(() -> {
myJobCleanerService.runCleanupPass();
myJobCleanerService.runMaintenancePass();
return myJobCoordinator.getInstance(theId).getStatus();
}, equalTo(StatusEnum.COMPLETED));
}
public JobInstance awaitJobFailure(String theId) {
await().until(() -> {
myJobCleanerService.runCleanupPass();
myJobCleanerService.runMaintenancePass();
return myJobCoordinator.getInstance(theId).getStatus();
}, Matchers.anyOf(equalTo(StatusEnum.ERRORED),equalTo(StatusEnum.FAILED)));
return myJobCoordinator.getInstance(theId);

View File

@ -1,6 +1,6 @@
package ca.uhn.fhir.jpa.bulk.imprt2;
import ca.uhn.fhir.batch2.api.IJobCleanerService;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.jobs.imprt.BulkImportAppCtx;
import ca.uhn.fhir.batch2.jobs.imprt.BulkImportFileServlet;
@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@ -56,7 +55,7 @@ public class BulkImportR4Test extends BaseJpaR4Test {
@Autowired
private IJobCoordinator myJobCoordinator;
@Autowired
private IJobCleanerService myJobCleanerService;
private IJobMaintenanceService myJobCleanerService;
@Autowired
private IBatch2JobInstanceRepository myJobInstanceRepository;
@Autowired
@ -99,7 +98,7 @@ public class BulkImportR4Test extends BaseJpaR4Test {
// Verify
await().until(() -> {
myJobCleanerService.runCleanupPass();
myJobCleanerService.runMaintenancePass();
JobInstance instance = myJobCoordinator.getInstance(instanceId);
return instance.getStatus();
}, equalTo(StatusEnum.COMPLETED));
@ -152,7 +151,7 @@ public class BulkImportR4Test extends BaseJpaR4Test {
// Verify
await().until(() -> {
myJobCleanerService.runCleanupPass();
myJobCleanerService.runMaintenancePass();
JobInstance instance = myJobCoordinator.getInstance(instanceId);
return instance.getStatus();
}, equalTo(StatusEnum.ERRORED));
@ -174,7 +173,7 @@ public class BulkImportR4Test extends BaseJpaR4Test {
});
await().until(() -> {
myJobCleanerService.runCleanupPass();
myJobCleanerService.runMaintenancePass();
JobInstance instance = myJobCoordinator.getInstance(instanceId);
return instance.getErrorCount();
}, equalTo(3));
@ -224,7 +223,7 @@ public class BulkImportR4Test extends BaseJpaR4Test {
// Verify
await().until(() -> {
myJobCleanerService.runCleanupPass();
myJobCleanerService.runMaintenancePass();
JobInstance instance = myJobCoordinator.getInstance(instanceId);
return instance.getStatus();
}, equalTo(StatusEnum.FAILED));
@ -267,7 +266,7 @@ public class BulkImportR4Test extends BaseJpaR4Test {
// Verify
await().until(() -> {
myJobCleanerService.runCleanupPass();
myJobCleanerService.runMaintenancePass();
JobInstance instance = myJobCoordinator.getInstance(instanceId);
return instance.getStatus();
}, equalTo(StatusEnum.FAILED));

View File

@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ReindexJobTest extends BaseJpaR4Test {
@Autowired
private IJobCoordinator myJobCoordinator;

View File

@ -1,7 +1,7 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>TRACE</level>
<level>INFO</level>
</filter>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} [%file:%line] %msg%n</pattern>
@ -23,14 +23,6 @@
<logger name="org.eclipse.jetty.websocket" additivity="false" level="info">
<appender-ref ref="STDOUT" />
</logger>
<!-- TODO GGG HS can probably eventually disable this. -->
<logger name="org.hibernate.search.backend.lucene.infostream" additivity="true" level="trace">
<appender-ref ref="STDOUT" />
</logger>
<logger name="org.hibernate.search.query" additivity="true" level="trace">
<appender-ref ref="STDOUT" />
</logger>
j
<logger name="org.hibernate.event.internal.DefaultPersistEventListener" additivity="true" level="info">
<appender-ref ref="STDOUT" />
</logger>

View File

@ -41,6 +41,7 @@ public class ReindexAppCtx {
.setJobDefinitionVersion(1)
.setParametersType(ReindexJobParameters.class)
.setParametersValidator(reindexJobParametersValidator())
.gatedExecution()
.addFirstStep(
"generate-ranges",
"Generate data ranges to reindex",

View File

@ -0,0 +1,9 @@
package ca.uhn.fhir.batch2.api;
import ca.uhn.fhir.model.api.IModelJson;
public interface IJobCompletionHandler<PT extends IModelJson> {
void jobComplete(JobCompletionDetails<PT> theDetails);
}

View File

@ -54,5 +54,4 @@ public interface IJobCoordinator {
void cancelInstance(String theInstanceId) throws ResourceNotFoundException;
}

View File

@ -20,8 +20,8 @@ package ca.uhn.fhir.batch2.api;
* #L%
*/
public interface IJobCleanerService {
public interface IJobMaintenanceService {
void runCleanupPass();
void runMaintenancePass();
}

View File

@ -0,0 +1,56 @@
package ca.uhn.fhir.batch2.api;
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.model.api.IModelJson;
import org.apache.commons.lang3.Validate;
import javax.annotation.Nonnull;
public class JobCompletionDetails<PT extends IModelJson> {
private final PT myParameters;
private final String myInstanceId;
public JobCompletionDetails(@Nonnull PT theParameters, @Nonnull String theInstanceId) {
Validate.notNull(theParameters);
myParameters = theParameters;
myInstanceId = theInstanceId;
}
/**
* Returns the parameters associated with this job instance. Note that parameters
* are set when the job instance is created and can not be modified after that.
*/
@Nonnull
public PT getParameters() {
return myParameters;
}
/**
* Returns the job instance ID being executed
*/
@Nonnull
public String getInstanceId() {
return myInstanceId;
}
}

View File

@ -6,7 +6,7 @@
* framework, used to start and stop jobs and inquire about status.
* </ol>
* <ol>
* {@link ca.uhn.fhir.batch2.api.IJobCleanerService} is a background processor that
* {@link ca.uhn.fhir.batch2.api.IJobMaintenanceService} is a background processor that
* updates statistics and clean up stale data
* </ol>
* <ol>

View File

@ -20,11 +20,11 @@ package ca.uhn.fhir.batch2.config;
* #L%
*/
import ca.uhn.fhir.batch2.api.IJobCleanerService;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.impl.BatchJobSender;
import ca.uhn.fhir.batch2.impl.JobCleanerServiceImpl;
import ca.uhn.fhir.batch2.impl.JobMaintenanceServiceImpl;
import ca.uhn.fhir.batch2.impl.JobCoordinatorImpl;
import ca.uhn.fhir.batch2.impl.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
@ -48,9 +48,14 @@ public abstract class BaseBatch2Config {
}
@Bean
public IJobCoordinator batch2JobCoordinator(IChannelFactory theChannelFactory, IJobPersistence theJobInstancePersister, JobDefinitionRegistry theJobDefinitionRegistry) {
public BatchJobSender batchJobSender(IChannelFactory theChannelFactory) {
return new BatchJobSender(batch2ProcessingChannelProducer(theChannelFactory));
}
@Bean
public IJobCoordinator batch2JobCoordinator(IChannelFactory theChannelFactory, IJobPersistence theJobInstancePersister, JobDefinitionRegistry theJobDefinitionRegistry, BatchJobSender theBatchJobSender) {
return new JobCoordinatorImpl(
new BatchJobSender(batch2ProcessingChannelProducer(theChannelFactory)),
theBatchJobSender,
batch2ProcessingChannelReceiver(theChannelFactory),
theJobInstancePersister,
theJobDefinitionRegistry
@ -58,8 +63,8 @@ public abstract class BaseBatch2Config {
}
@Bean
public IJobCleanerService batch2JobCleaner(ISchedulerService theSchedulerService, IJobPersistence theJobPersistence) {
return new JobCleanerServiceImpl(theSchedulerService, theJobPersistence);
public IJobMaintenanceService batch2JobMaintenanceService(ISchedulerService theSchedulerService, IJobPersistence theJobPersistence, JobDefinitionRegistry theJobDefinitionRegistry, BatchJobSender theBatchJobSender) {
return new JobMaintenanceServiceImpl(theSchedulerService, theJobPersistence, theJobDefinitionRegistry, theBatchJobSender);
}
@Bean

View File

@ -37,6 +37,7 @@ import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.model.api.annotation.PasswordField;
@ -49,6 +50,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
@ -78,15 +80,13 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato
private final JobDefinitionRegistry myJobDefinitionRegistry;
private final MessageHandler myReceiverHandler = new WorkChannelMessageHandler();
private final ValidatorFactory myValidatorFactory = Validation.buildDefaultValidatorFactory();
@Autowired
private ISchedulerService mySchedulerService;
/**
* Constructor
*/
public JobCoordinatorImpl(
@Nonnull BatchJobSender theBatchJobSender,
@Nonnull IChannelReceiver theWorkChannelReceiver,
@Nonnull IJobPersistence theJobPersistence,
@Nonnull JobDefinitionRegistry theJobDefinitionRegistry) {
public JobCoordinatorImpl(@Nonnull BatchJobSender theBatchJobSender, @Nonnull IChannelReceiver theWorkChannelReceiver, @Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinitionRegistry theJobDefinitionRegistry) {
super(theJobPersistence);
myBatchJobSender = theBatchJobSender;
myWorkChannelReceiver = theWorkChannelReceiver;
@ -95,9 +95,7 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato
@Override
public String startInstance(JobInstanceStartRequest theStartRequest) {
JobDefinition<?> jobDefinition = myJobDefinitionRegistry
.getLatestJobDefinition(theStartRequest.getJobDefinitionId())
.orElseThrow(() -> new IllegalArgumentException(Msg.code(2063) + "Unknown job definition ID: " + theStartRequest.getJobDefinitionId()));
JobDefinition<?> jobDefinition = myJobDefinitionRegistry.getLatestJobDefinition(theStartRequest.getJobDefinitionId()).orElseThrow(() -> new IllegalArgumentException(Msg.code(2063) + "Unknown job definition ID: " + theStartRequest.getJobDefinitionId()));
if (isBlank(theStartRequest.getParameters())) {
throw new InvalidRequestException(Msg.code(2065) + "No parameters supplied");
@ -115,6 +113,10 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato
instance.setStatus(StatusEnum.QUEUED);
instance.setParameters(theStartRequest.getParameters());
if (jobDefinition.isGatedExecution()) {
instance.setCurrentGatedStepId(firstStepId);
}
String instanceId = myJobPersistence.storeNewInstance(instance);
BatchWorkChunk batchWorkChunk = new BatchWorkChunk(jobDefinitionId, jobDefinitionVersion, firstStepId, instanceId, 0, null);
@ -132,11 +134,7 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato
Validator validator = myValidatorFactory.getValidator();
PT parameters = theStartRequest.getParameters(theJobDefinition.getParametersType());
Set<ConstraintViolation<IModelJson>> constraintErrors = validator.validate(parameters);
List<String> errorStrings = constraintErrors
.stream()
.map(t -> t.getPropertyPath() + " - " + t.getMessage())
.sorted()
.collect(Collectors.toList());
List<String> errorStrings = constraintErrors.stream().map(t -> t.getPropertyPath() + " - " + t.getMessage()).sorted().collect(Collectors.toList());
// Programmatic Validator
IJobParametersValidator<PT> parametersValidator = theJobDefinition.getParametersValidator();
@ -147,13 +145,7 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato
}
if (!errorStrings.isEmpty()) {
String message = "Failed to validate parameters for job of type " +
theJobDefinition.getJobDefinitionId() +
": " +
errorStrings
.stream()
.map(t -> "\n * " + t)
.collect(Collectors.joining());
String message = "Failed to validate parameters for job of type " + theJobDefinition.getJobDefinitionId() + ": " + errorStrings.stream().map(t -> "\n * " + t).collect(Collectors.joining());
throw new InvalidRequestException(Msg.code(2039) + message);
}
@ -161,19 +153,12 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato
@Override
public JobInstance getInstance(String theInstanceId) {
return myJobPersistence
.fetchInstance(theInstanceId)
.map(t -> massageInstanceForUserAccess(t))
.orElseThrow(() -> new ResourceNotFoundException(Msg.code(2040) + "Unknown instance ID: " + UrlUtil.escapeUrlParam(theInstanceId)));
return myJobPersistence.fetchInstance(theInstanceId).map(t -> massageInstanceForUserAccess(t)).orElseThrow(() -> new ResourceNotFoundException(Msg.code(2040) + "Unknown instance ID: " + UrlUtil.escapeUrlParam(theInstanceId)));
}
@Override
public List<JobInstance> getInstances(int thePageSize, int thePageIndex) {
return myJobPersistence
.fetchInstances(thePageSize, thePageIndex)
.stream()
.map(t -> massageInstanceForUserAccess(t))
.collect(Collectors.toList());
return myJobPersistence.fetchInstances(thePageSize, thePageIndex).stream().map(t -> massageInstanceForUserAccess(t)).collect(Collectors.toList());
}
@Override
@ -195,7 +180,7 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato
return retVal;
}
private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> boolean executeStep(@Nonnull WorkChunk theWorkChunk, String jobDefinitionId, String targetStepId, Class<IT> theInputType, PT parameters, IJobStepWorker<PT, IT, OT> worker, BaseDataSink<OT> dataSink) {
private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> boolean executeStep(@Nonnull WorkChunk theWorkChunk, String theJobDefinitionId, String theTargetStepId, Class<IT> theInputType, PT theParameters, IJobStepWorker<PT, IT, OT> theWorker, BaseDataSink<OT> theDataSink) {
IT data = null;
if (!theInputType.equals(VoidModel.class)) {
data = theWorkChunk.getData(theInputType);
@ -204,21 +189,21 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato
String instanceId = theWorkChunk.getInstanceId();
String chunkId = theWorkChunk.getId();
StepExecutionDetails<PT, IT> stepExecutionDetails = new StepExecutionDetails<>(parameters, data, instanceId, chunkId);
StepExecutionDetails<PT, IT> stepExecutionDetails = new StepExecutionDetails<>(theParameters, data, instanceId, chunkId);
RunOutcome outcome;
try {
outcome = worker.run(stepExecutionDetails, dataSink);
Validate.notNull(outcome, "Step worker returned null: %s", worker.getClass());
outcome = theWorker.run(stepExecutionDetails, theDataSink);
Validate.notNull(outcome, "Step theWorker returned null: %s", theWorker.getClass());
} catch (JobExecutionFailedException e) {
ourLog.error("Unrecoverable failure executing job {} step {}", jobDefinitionId, targetStepId, e);
ourLog.error("Unrecoverable failure executing job {} step {}", theJobDefinitionId, theTargetStepId, e);
myJobPersistence.markWorkChunkAsFailed(chunkId, e.toString());
return false;
} catch (Exception e) {
ourLog.error("Failure executing job {} step {}", jobDefinitionId, targetStepId, e);
ourLog.error("Failure executing job {} step {}", theJobDefinitionId, theTargetStepId, e);
myJobPersistence.markWorkChunkAsErroredAndIncrementErrorCount(chunkId, e.toString());
throw new JobExecutionFailedException(Msg.code(2041) + e.getMessage(), e);
} catch (Throwable t) {
ourLog.error("Unexpected failure executing job {} step {}", jobDefinitionId, targetStepId, t);
ourLog.error("Unexpected failure executing job {} step {}", theJobDefinitionId, theTargetStepId, t);
myJobPersistence.markWorkChunkAsFailed(chunkId, t.toString());
return false;
}
@ -226,7 +211,7 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato
int recordsProcessed = outcome.getRecordsProcessed();
myJobPersistence.markWorkChunkAsCompletedAndClearData(chunkId, recordsProcessed);
int recoveredErrorCount = dataSink.getRecoveredErrorCount();
int recoveredErrorCount = theDataSink.getRecoveredErrorCount();
if (recoveredErrorCount > 0) {
myJobPersistence.incrementWorkChunkErrorCount(chunkId, recoveredErrorCount);
}
@ -296,32 +281,40 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato
return;
}
executeStep(chunk, jobDefinitionId, jobDefinitionVersion, definition, targetStep, nextStep, targetStepId, firstStep, instance, instanceId);
executeStep(chunk, jobDefinitionId, jobDefinitionVersion, definition, targetStep, nextStep, targetStepId, firstStep, instance);
}
@SuppressWarnings("unchecked")
private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> void executeStep(WorkChunk chunk, String jobDefinitionId, int jobDefinitionVersion, JobDefinition<PT> definition, JobDefinitionStep<PT, IT, OT> theStep, JobDefinitionStep<PT, OT, ?> theSubsequentStep, String targetStepId, boolean firstStep, JobInstance instance, String instanceId) {
PT parameters = (PT) instance.getParameters(definition.getParametersType());
private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> void executeStep(WorkChunk theWorkChunk, String theJobDefinitionId, int theJobDefinitionVersion, JobDefinition<PT> theDefinition, JobDefinitionStep<PT, IT, OT> theStep, JobDefinitionStep<PT, OT, ?> theSubsequentStep, String theTargetStepId, boolean theFirstStep, JobInstance theInstance) {
String instanceId = theInstance.getInstanceId();
PT parameters = theInstance.getParameters(theDefinition.getParametersType());
IJobStepWorker<PT, IT, OT> worker = theStep.getJobStepWorker();
BaseDataSink<OT> dataSink;
if (theSubsequentStep != null) {
dataSink = new JobDataSink<>(myBatchJobSender, myJobPersistence, jobDefinitionId, jobDefinitionVersion, theSubsequentStep, instanceId, theStep.getStepId());
boolean finalStep = theSubsequentStep == null;
if (!finalStep) {
dataSink = new JobDataSink<>(myBatchJobSender, myJobPersistence, theJobDefinitionId, theJobDefinitionVersion, theSubsequentStep, instanceId, theStep.getStepId(), theDefinition.isGatedExecution());
} else {
dataSink = (BaseDataSink<OT>) new FinalStepDataSink(jobDefinitionId, instanceId, theStep.getStepId());
dataSink = (BaseDataSink<OT>) new FinalStepDataSink(theJobDefinitionId, instanceId, theStep.getStepId());
}
Class<IT> inputType = theStep.getInputType();
boolean success = executeStep(chunk, jobDefinitionId, targetStepId, inputType, parameters, worker, dataSink);
boolean success = executeStep(theWorkChunk, theJobDefinitionId, theTargetStepId, inputType, parameters, worker, dataSink);
if (!success) {
return;
}
int workChunkCount = dataSink.getWorkChunkCount();
if (firstStep && workChunkCount == 0) {
ourLog.info("First step of job instance {} produced no work chunks, marking as completed", instanceId);
if (theFirstStep && workChunkCount == 0) {
ourLog.info("First step of job theInstance {} produced no work chunks, marking as completed", instanceId);
myJobPersistence.markInstanceAsCompleted(instanceId);
}
if (theDefinition.isGatedExecution() && theFirstStep) {
theInstance.setCurrentGatedStepId(theTargetStepId);
myJobPersistence.updateInstance(theInstance);
}
}
private JobDefinition<?> getDefinitionOrThrowException(String jobDefinitionId, int jobDefinitionVersion) {

View File

@ -36,14 +36,16 @@ class JobDataSink<OT extends IModelJson> extends BaseDataSink<OT> {
private final int myJobDefinitionVersion;
private final JobDefinitionStep<?, ?, ?> myTargetStep;
private final AtomicInteger myChunkCounter = new AtomicInteger(0);
private final boolean myGatedExecution;
JobDataSink(BatchJobSender theBatchJobSender, IJobPersistence theJobPersistence, String theJobDefinitionId, int theJobDefinitionVersion, JobDefinitionStep<?, ?, ?> theTargetStep, String theInstanceId, String theCurrentStepId) {
JobDataSink(BatchJobSender theBatchJobSender, IJobPersistence theJobPersistence, String theJobDefinitionId, int theJobDefinitionVersion, JobDefinitionStep<?, ?, ?> theTargetStep, String theInstanceId, String theCurrentStepId, boolean theGatedExecution) {
super(theInstanceId, theCurrentStepId);
myBatchJobSender = theBatchJobSender;
myJobPersistence = theJobPersistence;
myJobDefinitionId = theJobDefinitionId;
myJobDefinitionVersion = theJobDefinitionVersion;
myTargetStep = theTargetStep;
myGatedExecution = theGatedExecution;
}
@Override
@ -59,9 +61,11 @@ class JobDataSink<OT extends IModelJson> extends BaseDataSink<OT> {
BatchWorkChunk batchWorkChunk = new BatchWorkChunk(jobDefinitionId, jobDefinitionVersion, targetStepId, instanceId, sequence, dataValueString);
String chunkId = myJobPersistence.storeWorkChunk(batchWorkChunk);
if (!myGatedExecution) {
JobWorkNotification workNotification = new JobWorkNotification(jobDefinitionId, jobDefinitionVersion, instanceId, targetStepId, chunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
}
}
@Override
public int getWorkChunkCount() {

View File

@ -20,15 +20,22 @@ package ca.uhn.fhir.batch2.impl;
* #L%
*/
import ca.uhn.fhir.batch2.api.IJobCleanerService;
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.JobCompletionDetails;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.StopWatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.time.DateUtils;
import org.quartz.JobExecutionContext;
@ -36,16 +43,27 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
/**
* This class performs regular polls of the stored jobs in order to
* calculate statistics and delete expired tasks. This class does
* perform maintenance. This includes two major functions.
*
* <p>
* First, we calculate statistics and delete expired tasks. This class does
* the following things:
* <ul>
* <li>For instances that are IN_PROGRESS, calculates throughput and percent complete</li>
@ -55,45 +73,60 @@ import java.util.concurrent.TimeUnit;
* <li>For instances that are IN_PROGRESS with an error message set where no chunks are ERRORED or FAILED, clears the error message in the instance (meaning presumably there was an error but it cleared)</li>
* <li>For instances that are COMPLETE or FAILED and are old, delete them entirely</li>
* </ul>
* </p>
*
* <p>
* Second, we check for any job instances where the job is configured to
* have gated execution. For these instances, we check if the current step
* is complete (all chunks are in COMPLETE status) and trigger the next step.
* </p>
*/
public class JobCleanerServiceImpl extends BaseJobService implements IJobCleanerService {
public class JobMaintenanceServiceImpl extends BaseJobService implements IJobMaintenanceService {
public static final int INSTANCES_PER_PASS = 100;
public static final long PURGE_THRESHOLD = 7L * DateUtils.MILLIS_PER_DAY;
private static final Logger ourLog = LoggerFactory.getLogger(JobCleanerServiceImpl.class);
private static final Logger ourLog = LoggerFactory.getLogger(JobMaintenanceServiceImpl.class);
private final ISchedulerService mySchedulerService;
private final JobDefinitionRegistry myJobDefinitionRegistry;
private final BatchJobSender myBatchJobSender;
/**
* Constructor
*/
public JobCleanerServiceImpl(ISchedulerService theSchedulerService, IJobPersistence theJobPersistence) {
public JobMaintenanceServiceImpl(ISchedulerService theSchedulerService, IJobPersistence theJobPersistence, JobDefinitionRegistry theJobDefinitionRegistry, BatchJobSender theBatchJobSender) {
super(theJobPersistence);
Validate.notNull(theSchedulerService);
Validate.notNull(theJobDefinitionRegistry);
Validate.notNull(theBatchJobSender);
mySchedulerService = theSchedulerService;
myJobDefinitionRegistry = theJobDefinitionRegistry;
myBatchJobSender = theBatchJobSender;
}
@PostConstruct
public void start() {
ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition();
jobDefinition.setId(JobCleanerScheduledJob.class.getName());
jobDefinition.setJobClass(JobCleanerScheduledJob.class);
jobDefinition.setId(JobMaintenanceScheduledJob.class.getName());
jobDefinition.setJobClass(JobMaintenanceScheduledJob.class);
mySchedulerService.scheduleClusteredJob(DateUtils.MILLIS_PER_MINUTE, jobDefinition);
}
@Override
public void runCleanupPass() {
public void runMaintenancePass() {
// NB: If you add any new logic, update the class javadoc
Set<String> processedInstanceIds = new HashSet<>();
JobChunkProgressAccumulator progressAccumulator = new JobChunkProgressAccumulator();
for (int page = 0; ; page++) {
List<JobInstance> instances = myJobPersistence.fetchInstances(INSTANCES_PER_PASS, page);
for (JobInstance instance : instances) {
if (processedInstanceIds.add(instance.getInstanceId())) {
cleanupInstance(instance);
cleanupInstance(instance, progressAccumulator);
triggerGatedExecutions(instance, progressAccumulator);
}
}
@ -103,13 +136,13 @@ public class JobCleanerServiceImpl extends BaseJobService implements IJobCleaner
}
}
private void cleanupInstance(JobInstance theInstance) {
private void cleanupInstance(JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) {
switch (theInstance.getStatus()) {
case QUEUED:
break;
case IN_PROGRESS:
case ERRORED:
calculateInstanceProgress(theInstance);
calculateInstanceProgress(theInstance, theProgressAccumulator);
break;
case COMPLETED:
case FAILED:
@ -132,7 +165,7 @@ public class JobCleanerServiceImpl extends BaseJobService implements IJobCleaner
}
private void calculateInstanceProgress(JobInstance theInstance) {
private void calculateInstanceProgress(JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) {
int resourcesProcessed = 0;
int incompleteChunkCount = 0;
int completeChunkCount = 0;
@ -146,6 +179,8 @@ public class JobCleanerServiceImpl extends BaseJobService implements IJobCleaner
List<WorkChunk> chunks = myJobPersistence.fetchWorkChunksWithoutData(theInstance.getInstanceId(), INSTANCES_PER_PASS, page);
for (WorkChunk chunk : chunks) {
theProgressAccumulator.addChunk(chunk.getInstanceId(), chunk.getId(), chunk.getTargetStepId(), chunk.getStatus());
errorCountForAllStatuses += chunk.getErrorCount();
if (chunk.getRecordsProcessed() != null) {
@ -201,7 +236,12 @@ public class JobCleanerServiceImpl extends BaseJobService implements IJobCleaner
changedStatus = false;
if (incompleteChunkCount == 0 && erroredChunkCount == 0 && failedChunkCount == 0) {
changedStatus |= updateInstanceStatus(theInstance, StatusEnum.COMPLETED);
boolean completed = updateInstanceStatus(theInstance, StatusEnum.COMPLETED);
if (completed) {
JobDefinition<?> definition = myJobDefinitionRegistry.getJobDefinition(theInstance.getJobDefinitionId(), theInstance.getJobDefinitionVersion()).orElseThrow(() -> new IllegalStateException("Unknown job " + theInstance.getJobDefinitionId() + "/" + theInstance.getJobDefinitionVersion()));
invokeJobCompletionHandler(theInstance, definition);
}
changedStatus |= completed;
}
if (erroredChunkCount > 0) {
changedStatus |= updateInstanceStatus(theInstance, StatusEnum.ERRORED);
@ -246,6 +286,17 @@ public class JobCleanerServiceImpl extends BaseJobService implements IJobCleaner
}
private <PT extends IModelJson> void invokeJobCompletionHandler(JobInstance theInstance, JobDefinition<PT> definition) {
IJobCompletionHandler<PT> completionHandler = definition.getCompletionHandler();
if (completionHandler != null) {
String instanceId = theInstance.getInstanceId();
PT jobParameters = theInstance.getParameters(definition.getParametersType());
JobCompletionDetails<PT> completionDetails = new JobCompletionDetails<>(jobParameters, instanceId);
completionHandler.jobComplete(completionDetails);
}
}
private boolean updateInstanceStatus(JobInstance theInstance, StatusEnum newStatus) {
if (theInstance.getStatus() != newStatus) {
ourLog.info("Marking job instance {} of type {} as {}", theInstance.getInstanceId(), theInstance.getJobDefinitionId(), newStatus);
@ -255,16 +306,109 @@ public class JobCleanerServiceImpl extends BaseJobService implements IJobCleaner
return false;
}
private void triggerGatedExecutions(JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) {
if (!theInstance.isRunning()) {
return;
}
public static class JobCleanerScheduledJob implements HapiJob {
String jobDefinitionId = theInstance.getJobDefinitionId();
int jobDefinitionVersion = theInstance.getJobDefinitionVersion();
String instanceId = theInstance.getInstanceId();
JobDefinition<?> definition = myJobDefinitionRegistry.getJobDefinition(jobDefinitionId, jobDefinitionVersion).orElseThrow(() -> new IllegalStateException("Unknown job definition: " + jobDefinitionId + " " + jobDefinitionVersion));
if (!definition.isGatedExecution()) {
return;
}
String currentStepId = theInstance.getCurrentGatedStepId();
if (isBlank(currentStepId)) {
return;
}
if (definition.isLastStep(currentStepId)) {
return;
}
int incompleteChunks = theProgressAccumulator.countChunksWithStatus(instanceId, currentStepId, StatusEnum.getIncompleteStatuses());
if (incompleteChunks == 0) {
int currentStepIndex = definition.getStepIndex(currentStepId);
String nextStepId = definition.getSteps().get(currentStepIndex + 1).getStepId();
ourLog.info("All processing is complete for gated execution of instance {} step {}. Proceeding to step {}", instanceId, currentStepId, nextStepId);
List<String> chunksForNextStep = theProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, EnumSet.of(StatusEnum.QUEUED));
for (String nextChunkId : chunksForNextStep) {
JobWorkNotification workNotification = new JobWorkNotification(jobDefinitionId, jobDefinitionVersion, instanceId, nextStepId, nextChunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
}
theInstance.setCurrentGatedStepId(nextStepId);
myJobPersistence.updateInstance(theInstance);
}
}
public static class JobMaintenanceScheduledJob implements HapiJob {
@Autowired
private IJobCleanerService myTarget;
private IJobMaintenanceService myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.runCleanupPass();
myTarget.runMaintenancePass();
}
}
/**
* While performing cleanup, the cleanup job loads all of the known
* work chunks to examine their status. This bean collects the counts that
* are found, so that they can be reused for maintenance jobs without
* needing to hit the database a second time.
*/
private static class JobChunkProgressAccumulator {
private final Set<String> myConsumedInstanceAndChunkIds = new HashSet<>();
private final Multimap<String, ChunkStatusCountKey> myInstanceIdToChunkStatuses = ArrayListMultimap.create();
public void addChunk(String theInstanceId, String theChunkId, String theStepId, StatusEnum theStatus) {
// Note: If chunks are being written while we're executing, we may see the same chunk twice. This
// check avoids adding it twice.
if (myConsumedInstanceAndChunkIds.add(theInstanceId + " " + theChunkId)) {
myInstanceIdToChunkStatuses.put(theInstanceId, new ChunkStatusCountKey(theChunkId, theStepId, theStatus));
}
}
public int countChunksWithStatus(String theInstanceId, String theStepId, Set<StatusEnum> theStatuses) {
return getChunkIdsWithStatus(theInstanceId, theStepId, theStatuses).size();
}
public List<String> getChunkIdsWithStatus(String theInstanceId, String theStepId, Set<StatusEnum> theStatuses) {
return getChunkStatuses(theInstanceId).stream().filter(t -> t.myStepId.equals(theStepId)).filter(t -> theStatuses.contains(t.myStatus)).map(t -> t.myChunkId).collect(Collectors.toList());
}
@Nonnull
private Collection<ChunkStatusCountKey> getChunkStatuses(String theInstanceId) {
Collection<ChunkStatusCountKey> chunkStatuses = myInstanceIdToChunkStatuses.get(theInstanceId);
chunkStatuses = defaultIfNull(chunkStatuses, emptyList());
return chunkStatuses;
}
private static class ChunkStatusCountKey {
public final String myChunkId;
public final String myStepId;
public final StatusEnum myStatus;
private ChunkStatusCountKey(String theChunkId, String theStepId, StatusEnum theStatus) {
myChunkId = theChunkId;
myStepId = theStepId;
myStatus = theStatus;
}
}
}
}

View File

@ -22,8 +22,10 @@ package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.batch2.model;
* #L%
*/
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
import ca.uhn.fhir.batch2.api.IJobParametersValidator;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.VoidModel;
@ -31,6 +32,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
public class JobDefinition<PT extends IModelJson> {
@ -42,11 +44,14 @@ public class JobDefinition<PT extends IModelJson> {
private final List<JobDefinitionStep<PT, ?, ?>> mySteps;
private final String myJobDescription;
private final IJobParametersValidator<PT> myParametersValidator;
private final boolean myGatedExecution;
private final List<String> myStepIds;
private final IJobCompletionHandler<PT> myCompletionHandler;
/**
* Constructor
*/
private JobDefinition(String theJobDefinitionId, int theJobDefinitionVersion, String theJobDescription, Class<PT> theParametersType, List<JobDefinitionStep<PT, ?, ?>> theSteps, IJobParametersValidator<PT> theParametersValidator) {
private JobDefinition(String theJobDefinitionId, int theJobDefinitionVersion, String theJobDescription, Class<PT> theParametersType, List<JobDefinitionStep<PT, ?, ?>> theSteps, IJobParametersValidator<PT> theParametersValidator, boolean theGatedExecution, IJobCompletionHandler<PT> theCompletionHandler) {
Validate.isTrue(theJobDefinitionId.length() <= ID_MAX_LENGTH, "Maximum ID length is %d", ID_MAX_LENGTH);
Validate.notBlank(theJobDefinitionId, "No job definition ID supplied");
Validate.notBlank(theJobDescription, "No job description supplied");
@ -56,8 +61,16 @@ public class JobDefinition<PT extends IModelJson> {
myJobDefinitionVersion = theJobDefinitionVersion;
myJobDescription = theJobDescription;
mySteps = theSteps;
myStepIds = mySteps.stream().map(t -> t.getStepId()).collect(Collectors.toList());
myParametersType = theParametersType;
myParametersValidator = theParametersValidator;
myGatedExecution = theGatedExecution;
myCompletionHandler = theCompletionHandler;
}
@Nullable
public IJobCompletionHandler<PT> getCompletionHandler() {
return myCompletionHandler;
}
@Nullable
@ -97,8 +110,18 @@ public class JobDefinition<PT extends IModelJson> {
return mySteps;
}
public static Builder<IModelJson, VoidModel> newBuilder() {
return new Builder<>();
public boolean isGatedExecution() {
return myGatedExecution;
}
public int getStepIndex(String theStepId) {
int retVal = myStepIds.indexOf(theStepId);
Validate.isTrue(retVal != -1);
return retVal;
}
public boolean isLastStep(String theStepId) {
return getStepIndex(theStepId) == (myStepIds.size() - 1);
}
public static class Builder<PT extends IModelJson, NIT extends IModelJson> {
@ -111,12 +134,14 @@ public class JobDefinition<PT extends IModelJson> {
private Class<NIT> myNextInputType;
@Nullable
private IJobParametersValidator<PT> myParametersValidator;
private boolean myGatedExecution;
private IJobCompletionHandler<PT> myCompletionHandler;
Builder() {
mySteps = new ArrayList<>();
}
Builder(List<JobDefinitionStep<PT, ?, ?>> theSteps, String theJobDefinitionId, int theJobDefinitionVersion, String theJobDescription, Class<PT> theJobParametersType, Class<NIT> theNextInputType, IJobParametersValidator<PT> theParametersValidator) {
Builder(List<JobDefinitionStep<PT, ?, ?>> theSteps, String theJobDefinitionId, int theJobDefinitionVersion, String theJobDescription, Class<PT> theJobParametersType, Class<NIT> theNextInputType, IJobParametersValidator<PT> theParametersValidator, boolean theGatedExecution, IJobCompletionHandler<PT> theCompletionHandler) {
mySteps = theSteps;
myJobDefinitionId = theJobDefinitionId;
myJobDefinitionVersion = theJobDefinitionVersion;
@ -124,6 +149,8 @@ public class JobDefinition<PT extends IModelJson> {
myJobParametersType = theJobParametersType;
myNextInputType = theNextInputType;
myParametersValidator = theParametersValidator;
myGatedExecution = theGatedExecution;
myCompletionHandler = theCompletionHandler;
}
/**
@ -154,7 +181,7 @@ public class JobDefinition<PT extends IModelJson> {
*/
public <OT extends IModelJson> Builder<PT, OT> addFirstStep(String theStepId, String theStepDescription, Class<OT> theOutputType, IJobStepWorker<PT, VoidModel, OT> theStepWorker) {
mySteps.add(new JobDefinitionStep<>(theStepId, theStepDescription, theStepWorker, VoidModel.class, theOutputType));
return new Builder<>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, theOutputType, myParametersValidator);
return new Builder<>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, theOutputType, myParametersValidator, myGatedExecution, myCompletionHandler);
}
/**
@ -168,7 +195,7 @@ public class JobDefinition<PT extends IModelJson> {
*/
public <OT extends IModelJson> Builder<PT, OT> addIntermediateStep(String theStepId, String theStepDescription, Class<OT> theOutputType, IJobStepWorker<PT, NIT, OT> theStepWorker) {
mySteps.add(new JobDefinitionStep<>(theStepId, theStepDescription, theStepWorker, myNextInputType, theOutputType));
return new Builder<>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, theOutputType, myParametersValidator);
return new Builder<>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, theOutputType, myParametersValidator, myGatedExecution, myCompletionHandler);
}
/**
@ -182,12 +209,12 @@ public class JobDefinition<PT extends IModelJson> {
*/
public Builder<PT, VoidModel> addLastStep(String theStepId, String theStepDescription, IJobStepWorker<PT, NIT, VoidModel> theStepWorker) {
mySteps.add(new JobDefinitionStep<>(theStepId, theStepDescription, theStepWorker, myNextInputType, VoidModel.class));
return new Builder<>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, VoidModel.class, myParametersValidator);
return new Builder<>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, VoidModel.class, myParametersValidator, myGatedExecution, myCompletionHandler);
}
public JobDefinition<PT> build() {
Validate.notNull(myJobParametersType, "No job parameters type was supplied");
return new JobDefinition<>(myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, Collections.unmodifiableList(mySteps), myParametersValidator);
return new JobDefinition<>(myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, Collections.unmodifiableList(mySteps), myParametersValidator, myGatedExecution, myCompletionHandler);
}
public Builder<PT, NIT> setJobDescription(String theJobDescription) {
@ -233,12 +260,58 @@ public class JobDefinition<PT extends IModelJson> {
* @param theParametersValidator The validator (must not be null. Do not call this method at all if you do not want a parameters validator).
*/
@SuppressWarnings("unchecked")
public <NPT extends IModelJson> Builder<PT, NIT> setParametersValidator(@Nonnull IJobParametersValidator<PT> theParametersValidator) {
public Builder<PT, NIT> setParametersValidator(@Nonnull IJobParametersValidator<PT> theParametersValidator) {
Validate.notNull(theParametersValidator, "theParametersValidator must not be null");
Validate.isTrue(myParametersValidator == null, "Can not supply multiple parameters validators. Already have: %s", myParametersValidator);
myParametersValidator = theParametersValidator;
return (Builder<PT, NIT>) this;
return this;
}
/**
* If this is set, the framework will wait for all work chunks to be
* processed for an individual step before moving on to beginning
* processing on the next step. Otherwise, processing on subsequent
* steps may begin as soon as any data has been produced.
* <p>
* This is useful in a few cases:
* <ul>
* <li>
* If there are potential constraint issues, e.g. data being
* written by the third step depends on all data from the
* second step already being written
* </li>
* <li>
* If multiple steps require expensive database queries, it may
* reduce the chances of timeouts to ensure that they are run
* discretely.
* </li>
* </ul>
* </p>
* <p>
* Setting this mode means the job may take longer, since it will
* rely on a polling mechanism to determine that one step is
* complete before beginning any processing for the next step.
* </p>
*/
public Builder<PT, NIT> gatedExecution() {
myGatedExecution = true;
return this;
}
/**
* Supplies an optional callback that will be invoked when the job is complete
*/
public Builder<PT, NIT> completionHandler(IJobCompletionHandler<PT> theCompletionHandler) {
Validate.isTrue(myCompletionHandler == null, "Can not supply multiple completion handlers");
myCompletionHandler = theCompletionHandler;
return this;
}
}
public static Builder<IModelJson, VoidModel> newBuilder() {
return new Builder<>();
}
}

View File

@ -74,13 +74,12 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
@JsonProperty(value = "progress", access = JsonProperty.Access.READ_ONLY)
private double myProgress;
@JsonProperty(value = "currentGatedStepId", access = JsonProperty.Access.READ_ONLY)
private String myCurrentGatedStepId;
@JsonProperty(value = "errorMessage", access = JsonProperty.Access.READ_ONLY)
private String myErrorMessage;
@JsonProperty(value = "errorCount", access = JsonProperty.Access.READ_ONLY)
private int myErrorCount;
@JsonProperty(value = "estimatedCompletion", access = JsonProperty.Access.READ_ONLY)
private String myEstimatedTimeRemaining;
@ -111,6 +110,15 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
setStatus(theJobInstance.getStatus());
setTotalElapsedMillis(theJobInstance.getTotalElapsedMillis());
setWorkChunksPurged(theJobInstance.isWorkChunksPurged());
setCurrentGatedStepId(theJobInstance.getCurrentGatedStepId());
}
public String getCurrentGatedStepId() {
return myCurrentGatedStepId;
}
public void setCurrentGatedStepId(String theCurrentGatedStepId) {
myCurrentGatedStepId = theCurrentGatedStepId;
}
public int getErrorCount() {
@ -258,4 +266,11 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
.append("estimatedTimeRemaining", myEstimatedTimeRemaining)
.toString();
}
/**
* Returns true if the job instance is in {@link StatusEnum#IN_PROGRESS} and is not cancelled
*/
public boolean isRunning() {
return getStatus() == StatusEnum.IN_PROGRESS && !isCancelled();
}
}

View File

@ -20,33 +20,63 @@ package ca.uhn.fhir.batch2.model;
* #L%
*/
import java.util.Collections;
import java.util.EnumSet;
import java.util.Set;
public enum StatusEnum {
/**
* Task is waiting to execute and should begin with no intervention required.
*/
QUEUED,
QUEUED(true),
/**
* Task is current executing
*/
IN_PROGRESS,
IN_PROGRESS(true),
/**
* Task completed successfully
*/
COMPLETED,
COMPLETED(false),
/**
* Task execution resulted in an error but the error may be transient (or transient status is unknown).
* Retrying may result in success.
*/
ERRORED,
ERRORED(true),
/**
* Task has failed and is known to be unrecoverable. There is no reason to believe that retrying will
* result in a different outcome.
*/
FAILED
FAILED(true);
private final boolean myIncomplete;
private static Set<StatusEnum> ourIncompleteStatuses;
StatusEnum(boolean theIncomplete) {
myIncomplete = theIncomplete;
}
/**
* Statuses that represent a job that has not yet completed. I.e.
* all statuses except {@link #COMPLETED}
*/
public static Set<StatusEnum> getIncompleteStatuses() {
Set<StatusEnum> retVal = ourIncompleteStatuses;
if (retVal == null) {
EnumSet<StatusEnum> set = EnumSet.noneOf(StatusEnum.class);
for (StatusEnum next : values()) {
if (next.myIncomplete) {
set.add(next);
}
}
ourIncompleteStatuses = Collections.unmodifiableSet(set);
retVal = ourIncompleteStatuses;
}
return retVal;
}
}

View File

@ -0,0 +1,74 @@
package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import javax.annotation.Nonnull;
import java.util.function.Consumer;
@ExtendWith(MockitoExtension.class)
public abstract class BaseBatch2Test {
public static final String JOB_DEFINITION_ID = "JOB_DEFINITION_ID";
public static final String PARAM_1_VALUE = "PARAM 1 VALUE";
public static final String PARAM_2_VALUE = "PARAM 2 VALUE";
public static final String PASSWORD_VALUE = "PASSWORD VALUE";
public static final String STEP_1 = "STEP_1";
public static final String STEP_2 = "STEP_2";
public static final String STEP_3 = "STEP_3";
public static final String INSTANCE_ID = "INSTANCE-ID";
public static final String CHUNK_ID = "CHUNK-ID";
public static final String CHUNK_ID_2 = "CHUNK-ID-2";
public static final String DATA_1_VALUE = "data 1 value";
public static final String DATA_2_VALUE = "data 2 value";
public static final String DATA_3_VALUE = "data 3 value";
public static final String DATA_4_VALUE = "data 4 value";
@Mock
protected IJobStepWorker<TestJobParameters, VoidModel, TestJobStep2InputType> myStep1Worker;
@Mock
protected IJobStepWorker<TestJobParameters, TestJobStep2InputType, TestJobStep3InputType> myStep2Worker;
@Mock
protected IJobStepWorker<TestJobParameters, TestJobStep3InputType, VoidModel> myStep3Worker;
@Nonnull
static JobInstance createInstance() {
JobInstance instance = new JobInstance();
instance.setInstanceId(INSTANCE_ID);
instance.setStatus(StatusEnum.IN_PROGRESS);
instance.setJobDefinitionId(JOB_DEFINITION_ID);
instance.setJobDefinitionVersion(1);
instance.setParameters(new TestJobParameters()
.setParam1(PARAM_1_VALUE)
.setParam2(PARAM_2_VALUE)
.setPassword(PASSWORD_VALUE)
);
return instance;
}
@SafeVarargs
final JobDefinition<TestJobParameters> createJobDefinition(Consumer<JobDefinition.Builder<TestJobParameters, ?>>... theModifiers) {
JobDefinition.Builder<TestJobParameters, VoidModel> builder = JobDefinition
.newBuilder()
.setJobDefinitionId(JOB_DEFINITION_ID)
.setJobDescription("This is a job description")
.setJobDefinitionVersion(1)
.setParametersType(TestJobParameters.class)
.addFirstStep(STEP_1, "Step 1", TestJobStep2InputType.class, myStep1Worker)
.addIntermediateStep(STEP_2, "Step 2", TestJobStep3InputType.class, myStep2Worker)
.addLastStep(STEP_3, "Step 3", myStep3Worker);
for (Consumer<JobDefinition.Builder<TestJobParameters, ?>> next : theModifiers) {
next.accept(builder);
}
return builder.build();
}
}

View File

@ -3,7 +3,6 @@ package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IJobParametersValidator;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
@ -35,12 +34,12 @@ import org.springframework.messaging.MessageDeliveryException;
import javax.annotation.Nonnull;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -49,21 +48,8 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@TestMethodOrder(MethodOrderer.MethodName.class)
public class JobCoordinatorImplTest {
public class JobCoordinatorImplTest extends BaseBatch2Test {
public static final String JOB_DEFINITION_ID = "JOB_DEFINITION_ID";
public static final String PARAM_1_VALUE = "PARAM 1 VALUE";
public static final String PARAM_2_VALUE = "PARAM 2 VALUE";
public static final String PASSWORD_VALUE = "PASSWORD VALUE";
public static final String STEP_1 = "STEP_1";
public static final String STEP_2 = "STEP_2";
public static final String STEP_3 = "STEP_3";
public static final String INSTANCE_ID = "INSTANCE-ID";
public static final String CHUNK_ID = "CHUNK-ID";
public static final String DATA_1_VALUE = "data 1 value";
public static final String DATA_2_VALUE = "data 2 value";
public static final String DATA_3_VALUE = "data 3 value";
public static final String DATA_4_VALUE = "data 4 value";
private static final Logger ourLog = LoggerFactory.getLogger(JobCoordinatorImplTest.class);
private final IChannelReceiver myWorkChannelReceiver = LinkedBlockingChannel.newSynchronous("receiver");
private JobCoordinatorImpl mySvc;
@ -73,12 +59,6 @@ public class JobCoordinatorImplTest {
private IJobPersistence myJobInstancePersister;
@Mock
private JobDefinitionRegistry myJobDefinitionRegistry;
@Mock
private IJobStepWorker<TestJobParameters, VoidModel, TestJobStep2InputType> myStep1Worker;
@Mock
private IJobStepWorker<TestJobParameters, TestJobStep2InputType, TestJobStep3InputType> myStep2Worker;
@Mock
private IJobStepWorker<TestJobParameters, TestJobStep3InputType, VoidModel> myStep3Worker;
@Captor
private ArgumentCaptor<StepExecutionDetails<TestJobParameters, VoidModel>> myStep1ExecutionDetailsCaptor;
@Captor
@ -114,7 +94,7 @@ public class JobCoordinatorImplTest {
// Setup
JobDefinition definition = createJobDefinition();
JobDefinition<?> definition = createJobDefinition();
JobInstance instance = createInstance();
when(myJobDefinitionRegistry.getJobDefinition(eq(JOB_DEFINITION_ID), eq(1))).thenReturn(Optional.of(definition));
@ -160,7 +140,8 @@ public class JobCoordinatorImplTest {
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunkStep1()));
when(myStep1Worker.run(any(), any())).thenAnswer(t -> {
IJobDataSink<TestJobStep2InputType> sink = t.getArgument(1, IJobDataSink.class);
sink.accept(new TestJobStep2InputType("data value 1", "data value 2"));
sink.accept(new TestJobStep2InputType("data value 1a", "data value 2a"));
sink.accept(new TestJobStep2InputType("data value 1b", "data value 2b"));
return new RunOutcome(50);
});
mySvc.start();
@ -178,6 +159,8 @@ public class JobCoordinatorImplTest {
assertEquals(PASSWORD_VALUE, params.getPassword());
verify(myJobInstancePersister, times(1)).markWorkChunkAsCompletedAndClearData(any(), eq(50));
verify(myJobInstancePersister, times(0)).fetchWorkChunksWithoutData(any(), anyInt(), anyInt());
verify(myBatchJobSender, times(2)).sendWorkChannelMessage(any());
}
/**
@ -210,6 +193,41 @@ public class JobCoordinatorImplTest {
verify(myJobInstancePersister, times(1)).markInstanceAsCompleted(eq(INSTANCE_ID));
}
@Test
public void testPerformStep_FirstStep_GatedExecutionMode() {
// Setup
JobDefinition<TestJobParameters> jobDefinition = createJobDefinition(
t -> t.gatedExecution()
);
when(myJobDefinitionRegistry.getJobDefinition(eq(JOB_DEFINITION_ID), eq(1))).thenReturn(Optional.of(jobDefinition));
when(myJobInstancePersister.fetchInstanceAndMarkInProgress(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance()));
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunkStep1()));
when(myStep1Worker.run(any(), any())).thenAnswer(t -> {
IJobDataSink<TestJobStep2InputType> sink = t.getArgument(1, IJobDataSink.class);
sink.accept(new TestJobStep2InputType("data value 1a", "data value 2a"));
sink.accept(new TestJobStep2InputType("data value 1b", "data value 2b"));
return new RunOutcome(50);
});
mySvc.start();
// Execute
myWorkChannelReceiver.send(new JobWorkNotificationJsonMessage(createWorkNotification(STEP_1)));
// Verify
verify(myStep1Worker, times(1)).run(myStep1ExecutionDetailsCaptor.capture(), any());
TestJobParameters params = myStep1ExecutionDetailsCaptor.getValue().getParameters();
assertEquals(PARAM_1_VALUE, params.getParam1());
assertEquals(PARAM_2_VALUE, params.getParam2());
assertEquals(PASSWORD_VALUE, params.getPassword());
verify(myJobInstancePersister, times(1)).markWorkChunkAsCompletedAndClearData(any(), eq(50));
verify(myBatchJobSender, times(0)).sendWorkChannelMessage(any());
}
@Test
public void testPerformStep_SecondStep() {
@ -276,7 +294,7 @@ public class JobCoordinatorImplTest {
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_2, new TestJobStep2InputType(DATA_1_VALUE, DATA_2_VALUE))));
when(myJobDefinitionRegistry.getJobDefinition(eq(JOB_DEFINITION_ID), eq(1))).thenReturn(Optional.of(createJobDefinition()));
when(myJobInstancePersister.fetchInstanceAndMarkInProgress(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance()));
when(myStep2Worker.run(any(), any())).thenAnswer(t->{
when(myStep2Worker.run(any(), any())).thenAnswer(t -> {
IJobDataSink<?> sink = t.getArgument(1, IJobDataSink.class);
sink.recoveredError("Error message 1");
sink.recoveredError("Error message 2");
@ -479,7 +497,7 @@ public class JobCoordinatorImplTest {
}
return null;
};
JobDefinition<?> jobDefinition = createJobDefinition(t->t.setParametersValidator(v));
JobDefinition<?> jobDefinition = createJobDefinition(t -> t.setParametersValidator(v));
when(myJobDefinitionRegistry.getLatestJobDefinition(eq(JOB_DEFINITION_ID))).thenReturn(Optional.of(jobDefinition));
// Execute
@ -504,51 +522,17 @@ public class JobCoordinatorImplTest {
}
}
@SafeVarargs
private JobDefinition<TestJobParameters> createJobDefinition(Consumer<JobDefinition.Builder<TestJobParameters, ?>>... theModifiers) {
JobDefinition.Builder<TestJobParameters, VoidModel> builder = JobDefinition
.newBuilder()
.setJobDefinitionId(JOB_DEFINITION_ID)
.setJobDescription("This is a job description")
.setJobDefinitionVersion(1)
.setParametersType(TestJobParameters.class)
.addFirstStep(STEP_1, "Step 1", TestJobStep2InputType.class, myStep1Worker)
.addIntermediateStep(STEP_2, "Step 2", TestJobStep3InputType.class, myStep2Worker)
.addLastStep(STEP_3, "Step 3", myStep3Worker);
for (Consumer<JobDefinition.Builder<TestJobParameters, ?>> next : theModifiers) {
next.accept(builder);
}
return builder.build();
}
@Nonnull
private JobWorkNotification createWorkNotification(String theStepId) {
JobWorkNotification payload = new JobWorkNotification();
payload.setJobDefinitionId(JOB_DEFINITION_ID);
payload.setJobDefinitionVersion(1);
payload.setInstanceId(INSTANCE_ID);
payload.setChunkId(JobCoordinatorImplTest.CHUNK_ID);
payload.setChunkId(BaseBatch2Test.CHUNK_ID);
payload.setTargetStepId(theStepId);
return payload;
}
@Nonnull
static JobInstance createInstance() {
JobInstance instance = new JobInstance();
instance.setInstanceId(INSTANCE_ID);
instance.setStatus(StatusEnum.IN_PROGRESS);
instance.setJobDefinitionId(JOB_DEFINITION_ID);
instance.setJobDefinitionVersion(1);
instance.setParameters(new TestJobParameters()
.setParam1(PARAM_1_VALUE)
.setParam2(PARAM_2_VALUE)
.setPassword(PASSWORD_VALUE)
);
return instance;
}
@Nonnull
static WorkChunk createWorkChunk(String theTargetStepId, IModelJson theData) {
return new WorkChunk()

View File

@ -88,7 +88,7 @@ class JobDataSinkTest {
// Let's test our first step worker by calling run on it:
when(myJobPersistence.storeWorkChunk(myBatchWorkChunkCaptor.capture())).thenReturn(CHUNK_ID);
StepExecutionDetails<TestJobParameters, VoidModel> details = new StepExecutionDetails<>(new TestJobParameters().setParam1("" + PID_COUNT), null, JOB_INSTANCE_ID, CHUNK_ID);
JobDataSink<Step1Output> sink = new JobDataSink<>(myBatchJobSender, myJobPersistence, JOB_DEF_ID, JOB_DEF_VERSION, job.getSteps().get(1), JOB_INSTANCE_ID, FIRST_STEP_ID);
JobDataSink<Step1Output> sink = new JobDataSink<>(myBatchJobSender, myJobPersistence, JOB_DEF_ID, JOB_DEF_VERSION, job.getSteps().get(1), JOB_INSTANCE_ID, FIRST_STEP_ID, false);
RunOutcome result = firstStepWorker.run(details, sink);

View File

@ -1,9 +1,13 @@
package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.JobCompletionDetails;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import com.google.common.collect.Lists;
import org.hl7.fhir.r4.model.DateTimeType;
import org.junit.jupiter.api.BeforeEach;
@ -13,14 +17,10 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.messaging.Message;
import java.util.Date;
import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.INSTANCE_ID;
import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.STEP_1;
import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.STEP_2;
import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.STEP_3;
import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.createInstance;
import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.createWorkChunk;
import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.createWorkChunkStep1;
import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.createWorkChunkStep2;
@ -38,35 +38,48 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class JobCleanerServiceImplTest {
public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
@Mock
IJobCompletionHandler<TestJobParameters> myCompletionHandler;
@Mock
private ISchedulerService mySchedulerService;
@Mock
private IJobPersistence myJobPersistence;
private JobCleanerServiceImpl mySvc;
private JobMaintenanceServiceImpl mySvc;
@Captor
private ArgumentCaptor<JobInstance> myInstanceCaptor;
private JobDefinitionRegistry myJobDefinitionRegistry;
@Mock
private IChannelProducer myWorkChannelProducer;
@Captor
private ArgumentCaptor<Message<JobWorkNotification>> myMessageCaptor;
@Captor
private ArgumentCaptor<JobCompletionDetails<TestJobParameters>> myJobCompletionCaptor;
@BeforeEach
public void beforeEach() {
mySvc = new JobCleanerServiceImpl(mySchedulerService, myJobPersistence);
myJobDefinitionRegistry = new JobDefinitionRegistry();
BatchJobSender batchJobSender = new BatchJobSender(myWorkChannelProducer);
mySvc = new JobMaintenanceServiceImpl(mySchedulerService, myJobPersistence, myJobDefinitionRegistry, batchJobSender);
}
@Test
public void testInProgress_CalculateProgress_FirstCompleteButNoOtherStepsYetComplete() {
myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(createInstance()));
when(myJobPersistence.fetchWorkChunksWithoutData(eq(INSTANCE_ID), anyInt(), eq(0))).thenReturn(Lists.newArrayList(
createWorkChunk(STEP_1, null).setStatus(StatusEnum.COMPLETED)
));
mySvc.runCleanupPass();
mySvc.runMaintenancePass();
verify(myJobPersistence, never()).updateInstance(any());
}
@Test
public void testInProgress_CalculateProgress_FirstStepComplete() {
myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(createInstance()));
when(myJobPersistence.fetchWorkChunksWithoutData(eq(INSTANCE_ID), anyInt(), eq(0))).thenReturn(Lists.newArrayList(
createWorkChunkStep1().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")),
@ -77,7 +90,7 @@ public class JobCleanerServiceImplTest {
createWorkChunkStep3().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
));
mySvc.runCleanupPass();
mySvc.runMaintenancePass();
verify(myJobPersistence, times(1)).updateInstance(myInstanceCaptor.capture());
JobInstance instance = myInstanceCaptor.getValue();
@ -87,7 +100,7 @@ public class JobCleanerServiceImplTest {
assertEquals(0.08333333333333333, instance.getCombinedRecordsProcessedPerSecond());
assertNotNull(instance.getStartTime());
assertEquals(parseTime("2022-02-12T14:00:00-04:00"), instance.getStartTime());
assertEquals(null, instance.getEndTime());
assertNull(instance.getEndTime());
assertEquals("00:10:00", instance.getEstimatedTimeRemaining());
verifyNoMoreInteractions(myJobPersistence);
@ -95,6 +108,8 @@ public class JobCleanerServiceImplTest {
@Test
public void testInProgress_CalculateProgress_InstanceHasErrorButNoChunksAreErrored() {
// Setup
myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
JobInstance instance1 = createInstance();
instance1.setErrorMessage("This is an error message");
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance1));
@ -107,8 +122,10 @@ public class JobCleanerServiceImplTest {
createWorkChunkStep3().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
));
mySvc.runCleanupPass();
// Execute
mySvc.runMaintenancePass();
// Verify
verify(myJobPersistence, times(1)).updateInstance(myInstanceCaptor.capture());
JobInstance instance = myInstanceCaptor.getValue();
@ -121,6 +138,30 @@ public class JobCleanerServiceImplTest {
verifyNoMoreInteractions(myJobPersistence);
}
@Test
public void testInProgress_GatedExecution_FirstStepComplete() {
// Setup
myJobDefinitionRegistry.addJobDefinition(createJobDefinition(t -> t.gatedExecution()));
when(myJobPersistence.fetchWorkChunksWithoutData(eq(INSTANCE_ID), eq(100), eq(0))).thenReturn(Lists.newArrayList(
createWorkChunkStep2().setStatus(StatusEnum.QUEUED).setId(CHUNK_ID),
createWorkChunkStep2().setStatus(StatusEnum.QUEUED).setId(CHUNK_ID_2)
));
JobInstance instance1 = createInstance();
instance1.setCurrentGatedStepId(STEP_1);
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance1));
// Execute
mySvc.runMaintenancePass();
// Verify
verify(myWorkChannelProducer, times(2)).send(myMessageCaptor.capture());
JobWorkNotification payload0 = myMessageCaptor.getAllValues().get(0).getPayload();
assertEquals(STEP_2, payload0.getTargetStepId());
assertEquals(CHUNK_ID, payload0.getChunkId());
JobWorkNotification payload1 = myMessageCaptor.getAllValues().get(1).getPayload();
assertEquals(STEP_2, payload1.getTargetStepId());
assertEquals(CHUNK_ID_2, payload1.getChunkId());
}
@Test
public void testFailed_PurgeOldInstance() {
@ -129,7 +170,7 @@ public class JobCleanerServiceImplTest {
instance.setEndTime(parseTime("2001-01-01T12:12:12Z"));
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
mySvc.runCleanupPass();
mySvc.runMaintenancePass();
verify(myJobPersistence, times(1)).deleteInstanceAndChunks(eq(INSTANCE_ID));
verifyNoMoreInteractions(myJobPersistence);
@ -137,6 +178,9 @@ public class JobCleanerServiceImplTest {
@Test
public void testInProgress_CalculateProgress_AllStepsComplete() {
// Setup
myJobDefinitionRegistry.addJobDefinition(createJobDefinition(t -> t.completionHandler(myCompletionHandler)));
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(createInstance()));
when(myJobPersistence.fetchWorkChunksWithoutData(eq(INSTANCE_ID), anyInt(), eq(0))).thenReturn(Lists.newArrayList(
createWorkChunkStep1().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:01:00-04:00")).setRecordsProcessed(25),
@ -147,7 +191,11 @@ public class JobCleanerServiceImplTest {
createWorkChunkStep3().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
));
mySvc.runCleanupPass();
// Execute
mySvc.runMaintenancePass();
// Verify
verify(myJobPersistence, times(2)).updateInstance(myInstanceCaptor.capture());
JobInstance instance = myInstanceCaptor.getAllValues().get(0);
@ -159,8 +207,12 @@ public class JobCleanerServiceImplTest {
assertEquals(parseTime("2022-02-12T14:10:00-04:00"), instance.getEndTime());
verify(myJobPersistence, times(1)).deleteChunks(eq(INSTANCE_ID));
verify(myCompletionHandler, times(1)).jobComplete(myJobCompletionCaptor.capture());
verifyNoMoreInteractions(myJobPersistence);
assertEquals(INSTANCE_ID, myJobCompletionCaptor.getValue().getInstanceId());
assertEquals(PARAM_1_VALUE, myJobCompletionCaptor.getValue().getParameters().getParam1());
}
@Test
@ -175,7 +227,7 @@ public class JobCleanerServiceImplTest {
createWorkChunkStep3().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
));
mySvc.runCleanupPass();
mySvc.runMaintenancePass();
verify(myJobPersistence, times(2)).updateInstance(myInstanceCaptor.capture());
JobInstance instance = myInstanceCaptor.getAllValues().get(0);