Batch 2 documentation and SRP cleanup (#3616)

* begin writing documentation.  minor refactor.

* document submitting a job

* single responsibility principle

* single responsibility principle

* single responsibility principle

* reduce method parameter count

* reduce method parameter count

* fix test mocks

* reduce method arguments

* update documentation

* add tests
resolve warnings

* resolve warnings

* moar test

* moar test

* Add @Nonnull and @Nullable annotation to construction parameters

* pre-review cleanup

* pre-review cleanup

* review feedback

* review feedback.  added FIXME

* started writing IT with FIXMES

* started writing IT with FIXMES

* added batch 2 state transition integration tests

* FIXME

* improve readability of test

* improve readability of test

* fix race condition

* pre-merge cleanup
This commit is contained in:
Ken Stevens 2022-05-17 17:25:43 -04:00 committed by GitHub
parent 54f578c8b1
commit 27024067b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 1251 additions and 397 deletions

View File

@ -83,6 +83,8 @@ page.server_jpa_partitioning.partition_interceptor_examples=Partition Intercepto
page.server_jpa_partitioning.partitioning_management_operations=Partitioning Management Operations
page.server_jpa_partitioning.enabling_in_hapi_fhir=Enabling Partitioning in HAPI FHIR
section.server_jpa_batch.title=JPA Server: Batch Processing
page.server_jpa_batch.introduction=Batch Introduction
section.interceptors.title=Interceptors
page.interceptors.interceptors=Interceptors Overview

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 14 KiB

View File

@ -0,0 +1,54 @@
# HAPI-FHIR Batch Processing
## Introduction
HAPI-FHIR 5.1.0 introduced support for batch processing using the Spring Batch framework. However since its introduction, we discovered Spring Batch jobs do not recover well from ungraceful server shutdowns, which are increasingly common as implementors move to containerized deployment technologies such as Kubernetes.
HAPI-FHIR 6.0.0 has begun the process of replacing Spring Batch with a custom batch framework, called "batch2". This new "batch2" framework is designed to scale well across multiple processes sharing the same message broker, and most importantly, to robustly recover from unexpected server restarts.
## Design
### Definition
A HAPI-FHIR batch job definition consists of a job name, version, parameter json input type, and a chain of job steps. Each step of the chain declares the json output type it produces, which will be the input type for the following step. The final step in the chain does not declare an output type as the final step will typically do the work of the job, e.g. reindex resources, export data to disk, etc.
<a href="/hapi-fhir/docs/images/job-definition.svg"/>
### Submitting a Job
After a job has been defined, *instances* of that job can be submitted for batch processing by populating a `JobInstanceStartRequest` with the job name and job parameters json and then submitting that request to the Batch Job Coordinator.
The Batch Job Coordinator will then store two records in the database:
- Job Instance with status QUEUED: that is the parent record for all data concerning this job
- Batch Work Chunk with status QUEUED: this describes the first "chunk" of work required for this job. The first Batch Work Chunk contains no data.
Lastly the Batch Job Coordinator publishes a message to the Batch Notification Message Channel (named `batch2-work-notification`) to inform worker threads that this first chunk of work is now ready for processing.
### Job Processing - First Step
HAPI-FHIR Batch Jobs run based on job notification messages. The process is kicked off by the first chunk of work. When this notification message arrives, the message handler makes a single call to the first step defined in the job definition, passing in the job parameters as input.
The handler then does the following:
1. Change the work chunk status from QUEUED to IN_PROGRESS
2. Change the Job Instance status from QUEUED to IN_PROGRESS
3. If the Job Instance is cancelled, change the status to COMPLETED and abort processing.
4. The first step of the job definition is executed with the job parameters
5. This step creates new work chunks. For each work chunk it creates, it json serializes the work chunk data, stores it in the database, and publishes a new message to the Batch Notification Message Channel to notify worker threads that there are new work chunks waiting to be processed.
6. If the step succeeded, the work chunk status is changed from IN_PROGRESS to COMPLETED, and the data it contained is deleted.
7. If the step failed, the work chunk status is changed from IN_PROGRESS to either ERRORED or FAILED depending on the severity of the error.
### Job Processing - Middle steps
Middle Steps in the job definition are executed in the same way, except instead of only using the Job Parameters as input, they use both the Job Parameters and the Work Chunk data produced from the previous step.
### Job Processing - Final Step
The final step operates the same way as the middle steps, except it does not produce any new work chunks.
### Gated Execution
If a Job Definition is set to having Gated Execution, then all work chunks for one step must be COMPLETED before any work chunks for the next step may begin.
### Job Instance Completion
A Batch Job Maintenance Service runs every minute to monitor the status of all Job Instances and the Job Instance is transitioned to either COMPLETED, ERRORED or FAILED according to the status of all outstanding work chunks for that job instance. If the job instance is still IN_PROGRESS this maintenance service also estimates the time remaining to complete the job.

View File

@ -31,9 +31,13 @@ public interface IBatch2JobInstanceRepository extends JpaRepository<Batch2JobIns
@Modifying
@Query("UPDATE Batch2JobInstanceEntity e SET e.myStatus = :status WHERE e.myId = :id")
void updateInstanceStatus(@Param("id") String theInstanceId, @Param("status") StatusEnum theInProgress);
void updateInstanceStatus(@Param("id") String theInstanceId, @Param("status") StatusEnum theStatus);
@Modifying
@Query("UPDATE Batch2JobInstanceEntity e SET e.myCancelled = :cancelled WHERE e.myId = :id")
void updateInstanceCancelled(@Param("id") String theInstanceId, @Param("cancelled") boolean theCancelled);
@Modifying
@Query("UPDATE Batch2JobInstanceEntity e SET e.myCurrentGatedStepId = :currentGatedStepId WHERE e.myId = :id")
void updateInstanceCurrentGatedStepId(@Param("id") String theInstanceId, @Param("currentGatedStepId") String theCurrentGatedStepId);
}

View File

@ -120,7 +120,6 @@ public class SubscriptionChannelRegistry {
String channelName = theActiveSubscription.getChannelName();
ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId(), channelName);
boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId());
ChannelRetryConfiguration retryConfig = theActiveSubscription.getRetryConfigurationParameters();
if (!removed) {
ourLog.warn("Failed to remove subscription {} from channel {}", theActiveSubscription.getId(), channelName);

View File

@ -20,8 +20,8 @@ package ca.uhn.fhir.jpa.test;
* #L%
*/
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import org.hamcrest.Matchers;
@ -33,24 +33,37 @@ import static org.hamcrest.Matchers.equalTo;
public class Batch2JobHelper {
@Autowired
private IJobMaintenanceService myJobCleanerService;
private IJobMaintenanceService myJobMaintenanceService;
@Autowired
private IJobCoordinator myJobCoordinator;
public void awaitJobCompletion(String theId) {
await().until(() -> {
myJobCleanerService.runMaintenancePass();
myJobMaintenanceService.runMaintenancePass();
return myJobCoordinator.getInstance(theId).getStatus();
}, equalTo(StatusEnum.COMPLETED));
}
public JobInstance awaitJobFailure(String theId) {
await().until(() -> {
myJobCleanerService.runMaintenancePass();
myJobMaintenanceService.runMaintenancePass();
return myJobCoordinator.getInstance(theId).getStatus();
}, Matchers.anyOf(equalTo(StatusEnum.ERRORED),equalTo(StatusEnum.FAILED)));
}, Matchers.anyOf(equalTo(StatusEnum.ERRORED), equalTo(StatusEnum.FAILED)));
return myJobCoordinator.getInstance(theId);
}
public void awaitJobCancelled(String theId) {
await().until(() -> {
myJobMaintenanceService.runMaintenancePass();
return myJobCoordinator.getInstance(theId).getStatus();
}, equalTo(StatusEnum.CANCELLED));
}
public void awaitJobInProgress(String theId) {
await().until(() -> {
myJobMaintenanceService.runMaintenancePass();
return myJobCoordinator.getInstance(theId).getStatus();
}, equalTo(StatusEnum.IN_PROGRESS));
}
}

View File

@ -0,0 +1,179 @@
package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.impl.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.test.concurrency.PointcutLatch;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import static org.junit.jupiter.api.Assertions.fail;
public class Batch2CoordinatorIT extends BaseJpaR4Test {
public static final int TEST_JOB_VERSION = 1;
@Autowired
JobDefinitionRegistry myJobDefinitionRegistry;
@Autowired
IJobCoordinator myJobCoordinator;
@Autowired
Batch2JobHelper myBatch2JobHelper;
private final PointcutLatch myFirstStepLatch = new PointcutLatch("First Step");
private final PointcutLatch myLastStepLatch = new PointcutLatch("Last Step");
private RunOutcome callLatch(PointcutLatch theLatch, StepExecutionDetails<?, ?> theStep) {
theLatch.call(theStep);
return RunOutcome.SUCCESS;
}
@Test
public void testFirstStepNoSink() throws InterruptedException {
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> firstStep = (step, sink) -> callLatch(myFirstStepLatch, step);
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> fail();
String jobId = "test-job-1";
JobDefinition<? extends IModelJson> definition = buildJobDefinition(jobId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
JobInstanceStartRequest request = buildRequest(jobId);
myFirstStepLatch.setExpectedCount(1);
String instanceId = myJobCoordinator.startInstance(request);
myFirstStepLatch.awaitExpected();
myBatch2JobHelper.awaitJobCompletion(instanceId);
}
@Test
public void testFirstStepToSecondStep() throws InterruptedException {
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> firstStep = (step, sink) -> {
sink.accept(new FirstStepOutput());
callLatch(myFirstStepLatch, step);
return RunOutcome.SUCCESS;
};
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> callLatch(myLastStepLatch, step);
String jobId = "test-job-2";
JobDefinition<? extends IModelJson> definition = buildJobDefinition(jobId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
JobInstanceStartRequest request = buildRequest(jobId);
myFirstStepLatch.setExpectedCount(1);
String instanceId = myJobCoordinator.startInstance(request);
myFirstStepLatch.awaitExpected();
myLastStepLatch.setExpectedCount(1);
myBatch2JobHelper.awaitJobCompletion(instanceId);
myLastStepLatch.awaitExpected();
}
@Test
public void JobExecutionFailedException_CausesInstanceFailure() {
// setup
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> firstStep = (step, sink) -> {
throw new JobExecutionFailedException("Expected Test Exception");
};
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> fail();
String jobId = "test-job-3";
JobDefinition<? extends IModelJson> definition = buildJobDefinition(jobId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
JobInstanceStartRequest request = buildRequest(jobId);
// execute
String instanceId = myJobCoordinator.startInstance(request);
// validate
myBatch2JobHelper.awaitJobFailure(instanceId);
}
@Test
public void testUnknownException_KeepsInProgress_CanCancelManually() throws InterruptedException {
// setup
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> firstStep = (step, sink) -> {
callLatch(myFirstStepLatch, step);
throw new RuntimeException("Expected Test Exception");
};
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> fail();
String jobId = "test-job-4";
JobDefinition<? extends IModelJson> definition = buildJobDefinition(jobId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
JobInstanceStartRequest request = buildRequest(jobId);
// execute
myFirstStepLatch.setExpectedCount(1);
String instanceId = myJobCoordinator.startInstance(request);
myFirstStepLatch.awaitExpected();
// validate
myBatch2JobHelper.awaitJobInProgress(instanceId);
// execute
myJobCoordinator.cancelInstance(instanceId);
// validate
myBatch2JobHelper.awaitJobCancelled(instanceId);
}
@Nonnull
private JobInstanceStartRequest buildRequest(String jobId) {
JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(jobId);
TestJobParameters parameters = new TestJobParameters();
request.setParameters(parameters);
return request;
}
@Nonnull
private JobDefinition<? extends IModelJson> buildJobDefinition(String theJobId, IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> theFirstStep, IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> theLastStep) {
return JobDefinition.newBuilder()
.setJobDefinitionId(theJobId)
.setJobDescription("test job")
.setJobDefinitionVersion(TEST_JOB_VERSION)
.setParametersType(TestJobParameters.class)
.gatedExecution()
.addFirstStep(
"first-step",
"Test first step",
FirstStepOutput.class,
theFirstStep
)
.addLastStep(
"last-step",
"Test last step",
theLastStep
)
.build();
}
static class TestJobParameters implements IModelJson {
TestJobParameters() {
}
}
static class FirstStepOutput implements IModelJson {
FirstStepOutput() {
}
}
}

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.batch2.impl;
package ca.uhn.fhir.batch2.api;
/*-
* #%L
@ -20,21 +20,18 @@ package ca.uhn.fhir.batch2.impl;
* #L%
*/
import ca.uhn.fhir.batch2.api.IJobPersistence;
import org.apache.commons.lang3.Validate;
import javax.annotation.Nonnull;
public abstract class BaseJobService {
protected final IJobPersistence myJobPersistence;
/**
* When a step throws an Exception that is not a {@link JobExecutionFailedException},
* the Smile Batch 2 framework will rethrow the exception as a {@link JobStepFailedException}.
* This will cause the job notification message to remain on the channel and the system will try executing the send
* operation repeatedly until it succeeds.
*/
public class JobStepFailedException extends RuntimeException {
/**
* Constructor
*
* @param theJobPersistence The persistence service
*/
public BaseJobService(@Nonnull IJobPersistence theJobPersistence) {
Validate.notNull(theJobPersistence);
myJobPersistence = theJobPersistence;
public JobStepFailedException(String theMessage, Throwable theCause) {
super(theMessage, theCause);
}
}

View File

@ -21,20 +21,22 @@ package ca.uhn.fhir.batch2.impl;
*/
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.model.api.IModelJson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class BaseDataSink<OT extends IModelJson> implements IJobDataSink<OT> {
abstract class BaseDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> implements IJobDataSink<OT> {
private static final Logger ourLog = LoggerFactory.getLogger(BaseDataSink.class);
private final String myInstanceId;
private final String myCurrentStepId;
private final JobWorkCursor<PT,IT,OT> myJobWorkCursor;
private int myRecoveredErrorCount;
protected BaseDataSink(String theInstanceId, String theCurrentStepId) {
protected BaseDataSink(String theInstanceId, JobWorkCursor<PT,IT,OT> theJobWorkCursor) {
myInstanceId = theInstanceId;
myCurrentStepId = theCurrentStepId;
myJobWorkCursor = theJobWorkCursor;
}
public String getInstanceId() {
@ -43,7 +45,7 @@ abstract class BaseDataSink<OT extends IModelJson> implements IJobDataSink<OT> {
@Override
public void recoveredError(String theMessage) {
ourLog.error("Error during job[{}] step[{}]: {}", myInstanceId, myCurrentStepId, theMessage);
ourLog.error("Error during job[{}] step[{}]: {}", myInstanceId, myJobWorkCursor.getCurrentStepId(), theMessage);
myRecoveredErrorCount++;
}
@ -52,4 +54,12 @@ abstract class BaseDataSink<OT extends IModelJson> implements IJobDataSink<OT> {
}
public abstract int getWorkChunkCount();
public boolean firstStepProducedNothing() {
return myJobWorkCursor.isFirstStep && getWorkChunkCount() == 0;
}
public JobDefinitionStep<PT,IT,OT> getTargetStep() {
return myJobWorkCursor.currentStep;
}
}

View File

@ -26,11 +26,13 @@ import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
public class BatchJobSender {
private static final Logger ourLog = LoggerFactory.getLogger(BatchJobSender.class);
private final IChannelProducer myWorkChannelProducer;
public BatchJobSender(IChannelProducer theWorkChannelProducer) {
public BatchJobSender(@Nonnull IChannelProducer theWorkChannelProducer) {
myWorkChannelProducer = theWorkChannelProducer;
}

View File

@ -20,9 +20,13 @@ package ca.uhn.fhir.batch2.impl;
* #L%
*/
import ca.uhn.fhir.batch2.model.JobDefinition;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
public class BatchWorkChunk {
public final String jobDefinitionId;
@ -42,7 +46,7 @@ public class BatchWorkChunk {
* @param theSerializedData The data. This will be in the form of a map where the values may be strings, lists, and other maps (i.e. JSON)
*/
public BatchWorkChunk(String theJobDefinitionId, int theJobDefinitionVersion, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) {
public BatchWorkChunk(@Nonnull String theJobDefinitionId, int theJobDefinitionVersion, @Nonnull String theTargetStepId, @Nonnull String theInstanceId, int theSequence, @Nullable String theSerializedData) {
jobDefinitionId = theJobDefinitionId;
jobDefinitionVersion = theJobDefinitionVersion;
targetStepId = theTargetStepId;
@ -51,6 +55,13 @@ public class BatchWorkChunk {
serializedData = theSerializedData;
}
public static BatchWorkChunk firstChunk(JobDefinition<?> theJobDefinition, String theInstanceId) {
String firstStepId = theJobDefinition.getFirstStepId();
String jobDefinitionId = theJobDefinition.getJobDefinitionId();
int jobDefinitionVersion = theJobDefinition.getJobDefinitionVersion();
return new BatchWorkChunk(jobDefinitionId, jobDefinitionVersion, firstStepId, theInstanceId, 0, null);
}
@Override
public boolean equals(Object theO) {
if (this == theO) return true;

View File

@ -22,12 +22,16 @@ package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.WorkChunkData;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class FinalStepDataSink extends BaseDataSink<VoidModel> {
import javax.annotation.Nonnull;
class FinalStepDataSink<PT extends IModelJson, IT extends IModelJson> extends BaseDataSink<PT,IT,VoidModel> {
private static final Logger ourLog = LoggerFactory.getLogger(FinalStepDataSink.class);
private final String myJobDefinitionId;
@ -35,8 +39,8 @@ class FinalStepDataSink extends BaseDataSink<VoidModel> {
/**
* Constructor
*/
FinalStepDataSink(String theJobDefinitionId, String theInstanceId, String theCurrentStepId) {
super(theInstanceId, theCurrentStepId);
FinalStepDataSink(@Nonnull String theJobDefinitionId, @Nonnull String theInstanceId, @Nonnull JobWorkCursor<PT,IT,VoidModel> theJobWorkCursor) {
super(theInstanceId, theJobWorkCursor);
myJobDefinitionId = theJobDefinitionId;
}

View File

@ -21,76 +21,50 @@ package ca.uhn.fhir.batch2.impl;
*/
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobParametersValidator;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.model.api.annotation.PasswordField;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.util.JsonUtil;
import ca.uhn.fhir.util.UrlUtil;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.validation.ConstraintViolation;
import javax.validation.Validation;
import javax.validation.Validator;
import javax.validation.ValidatorFactory;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinator {
public class JobCoordinatorImpl implements IJobCoordinator {
private static final Logger ourLog = LoggerFactory.getLogger(JobCoordinatorImpl.class);
private final IJobPersistence myJobPersistence;
private final BatchJobSender myBatchJobSender;
private final IChannelReceiver myWorkChannelReceiver;
private final JobDefinitionRegistry myJobDefinitionRegistry;
private final MessageHandler myReceiverHandler = new WorkChannelMessageHandler();
private final ValidatorFactory myValidatorFactory = Validation.buildDefaultValidatorFactory();
@Autowired
private ISchedulerService mySchedulerService;
private final MessageHandler myReceiverHandler;
private final JobQuerySvc myJobQuerySvc;
private final JobParameterJsonValidator myJobParameterJsonValidator;
/**
* Constructor
*/
public JobCoordinatorImpl(@Nonnull BatchJobSender theBatchJobSender, @Nonnull IChannelReceiver theWorkChannelReceiver, @Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinitionRegistry theJobDefinitionRegistry) {
super(theJobPersistence);
Validate.notNull(theJobPersistence);
myJobPersistence = theJobPersistence;
myBatchJobSender = theBatchJobSender;
myWorkChannelReceiver = theWorkChannelReceiver;
myJobDefinitionRegistry = theJobDefinitionRegistry;
myReceiverHandler = new WorkChannelMessageHandler(theJobPersistence, theJobDefinitionRegistry, theBatchJobSender);
myJobQuerySvc = new JobQuerySvc(theJobPersistence, theJobDefinitionRegistry);
myJobParameterJsonValidator = new JobParameterJsonValidator();
}
@Override
@ -101,70 +75,36 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato
throw new InvalidRequestException(Msg.code(2065) + "No parameters supplied");
}
validateJobParameters(theStartRequest, jobDefinition);
myJobParameterJsonValidator.validateJobParameters(theStartRequest, jobDefinition);
String firstStepId = jobDefinition.getSteps().get(0).getStepId();
String jobDefinitionId = jobDefinition.getJobDefinitionId();
int jobDefinitionVersion = jobDefinition.getJobDefinitionVersion();
JobInstance instance = new JobInstance();
instance.setJobDefinitionId(jobDefinitionId);
instance.setJobDefinitionVersion(jobDefinitionVersion);
instance.setStatus(StatusEnum.QUEUED);
JobInstance instance = JobInstance.fromJobDefinition(jobDefinition);
instance.setParameters(theStartRequest.getParameters());
if (jobDefinition.isGatedExecution()) {
instance.setCurrentGatedStepId(firstStepId);
}
instance.setStatus(StatusEnum.QUEUED);
String instanceId = myJobPersistence.storeNewInstance(instance);
BatchWorkChunk batchWorkChunk = new BatchWorkChunk(jobDefinitionId, jobDefinitionVersion, firstStepId, instanceId, 0, null);
BatchWorkChunk batchWorkChunk = BatchWorkChunk.firstChunk(jobDefinition, instanceId);
String chunkId = myJobPersistence.storeWorkChunk(batchWorkChunk);
JobWorkNotification workNotification = new JobWorkNotification(jobDefinitionId, jobDefinitionVersion, instanceId, firstStepId, chunkId);
JobWorkNotification workNotification = JobWorkNotification.firstStepNotification(jobDefinition, instanceId, chunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
return instanceId;
}
private <PT extends IModelJson> void validateJobParameters(JobInstanceStartRequest theStartRequest, JobDefinition<PT> theJobDefinition) {
// JSR 380
Validator validator = myValidatorFactory.getValidator();
PT parameters = theStartRequest.getParameters(theJobDefinition.getParametersType());
Set<ConstraintViolation<IModelJson>> constraintErrors = validator.validate(parameters);
List<String> errorStrings = constraintErrors.stream().map(t -> t.getPropertyPath() + " - " + t.getMessage()).sorted().collect(Collectors.toList());
// Programmatic Validator
IJobParametersValidator<PT> parametersValidator = theJobDefinition.getParametersValidator();
if (parametersValidator != null) {
List<String> outcome = parametersValidator.validate(parameters);
outcome = defaultIfNull(outcome, Collections.emptyList());
errorStrings.addAll(outcome);
}
if (!errorStrings.isEmpty()) {
String message = "Failed to validate parameters for job of type " + theJobDefinition.getJobDefinitionId() + ": " + errorStrings.stream().map(t -> "\n * " + t).collect(Collectors.joining());
throw new InvalidRequestException(Msg.code(2039) + message);
}
}
@Override
public JobInstance getInstance(String theInstanceId) {
return myJobPersistence.fetchInstance(theInstanceId).map(t -> massageInstanceForUserAccess(t)).orElseThrow(() -> new ResourceNotFoundException(Msg.code(2040) + "Unknown instance ID: " + UrlUtil.escapeUrlParam(theInstanceId)));
return myJobQuerySvc.fetchInstance(theInstanceId);
}
@Override
public List<JobInstance> getInstances(int thePageSize, int thePageIndex) {
return myJobPersistence.fetchInstances(thePageSize, thePageIndex).stream().map(t -> massageInstanceForUserAccess(t)).collect(Collectors.toList());
return myJobQuerySvc.fetchInstances(thePageSize, thePageIndex);
}
@Override
public List<JobInstance> getRecentInstances(int theCount, int theStart) {
return myJobPersistence.fetchRecentInstances(theCount, theStart)
.stream().map(this::massageInstanceForUserAccess).collect(Collectors.toList());
return myJobQuerySvc.fetchRecentInstances(theCount, theStart);
}
@Override
@ -172,59 +112,6 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato
myJobPersistence.cancelInstance(theInstanceId);
}
private JobInstance massageInstanceForUserAccess(JobInstance theInstance) {
JobInstance retVal = new JobInstance(theInstance);
JobDefinition definition = getDefinitionOrThrowException(theInstance.getJobDefinitionId(), theInstance.getJobDefinitionVersion());
// Serializing the parameters strips any write-only params
IModelJson parameters = retVal.getParameters(definition.getParametersType());
stripPasswordFields(parameters);
String parametersString = JsonUtil.serializeOrInvalidRequest(parameters);
retVal.setParameters(parametersString);
return retVal;
}
private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> boolean executeStep(@Nonnull WorkChunk theWorkChunk, String theJobDefinitionId, String theTargetStepId, Class<IT> theInputType, PT theParameters, IJobStepWorker<PT, IT, OT> theWorker, BaseDataSink<OT> theDataSink) {
IT data = null;
if (!theInputType.equals(VoidModel.class)) {
data = theWorkChunk.getData(theInputType);
}
String instanceId = theWorkChunk.getInstanceId();
String chunkId = theWorkChunk.getId();
StepExecutionDetails<PT, IT> stepExecutionDetails = new StepExecutionDetails<>(theParameters, data, instanceId, chunkId);
RunOutcome outcome;
try {
outcome = theWorker.run(stepExecutionDetails, theDataSink);
Validate.notNull(outcome, "Step theWorker returned null: %s", theWorker.getClass());
} catch (JobExecutionFailedException e) {
ourLog.error("Unrecoverable failure executing job {} step {}", theJobDefinitionId, theTargetStepId, e);
myJobPersistence.markWorkChunkAsFailed(chunkId, e.toString());
return false;
} catch (Exception e) {
ourLog.error("Failure executing job {} step {}", theJobDefinitionId, theTargetStepId, e);
myJobPersistence.markWorkChunkAsErroredAndIncrementErrorCount(chunkId, e.toString());
throw new JobExecutionFailedException(Msg.code(2041) + e.getMessage(), e);
} catch (Throwable t) {
ourLog.error("Unexpected failure executing job {} step {}", theJobDefinitionId, theTargetStepId, t);
myJobPersistence.markWorkChunkAsFailed(chunkId, t.toString());
return false;
}
int recordsProcessed = outcome.getRecordsProcessed();
myJobPersistence.markWorkChunkAsCompletedAndClearData(chunkId, recordsProcessed);
int recoveredErrorCount = theDataSink.getRecoveredErrorCount();
if (recoveredErrorCount > 0) {
myJobPersistence.incrementWorkChunkErrorCount(chunkId, recoveredErrorCount);
}
return true;
}
@PostConstruct
public void start() {
myWorkChannelReceiver.subscribe(myReceiverHandler);
@ -234,136 +121,4 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato
public void stop() {
myWorkChannelReceiver.unsubscribe(myReceiverHandler);
}
private void handleWorkChannelMessage(JobWorkNotificationJsonMessage theMessage) {
JobWorkNotification payload = theMessage.getPayload();
String chunkId = payload.getChunkId();
Validate.notNull(chunkId);
Optional<WorkChunk> chunkOpt = myJobPersistence.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId);
if (!chunkOpt.isPresent()) {
ourLog.error("Unable to find chunk with ID {} - Aborting", chunkId);
return;
}
WorkChunk chunk = chunkOpt.get();
String jobDefinitionId = payload.getJobDefinitionId();
int jobDefinitionVersion = payload.getJobDefinitionVersion();
JobDefinition definition = getDefinitionOrThrowException(jobDefinitionId, jobDefinitionVersion);
JobDefinitionStep targetStep = null;
JobDefinitionStep nextStep = null;
String targetStepId = payload.getTargetStepId();
boolean firstStep = false;
for (int i = 0; i < definition.getSteps().size(); i++) {
JobDefinitionStep<?, ?, ?> step = (JobDefinitionStep<?, ?, ?>) definition.getSteps().get(i);
if (step.getStepId().equals(targetStepId)) {
targetStep = step;
if (i == 0) {
firstStep = true;
}
if (i < (definition.getSteps().size() - 1)) {
nextStep = (JobDefinitionStep<?, ?, ?>) definition.getSteps().get(i + 1);
}
break;
}
}
if (targetStep == null) {
String msg = "Unknown step[" + targetStepId + "] for job definition ID[" + jobDefinitionId + "] version[" + jobDefinitionVersion + "]";
ourLog.warn(msg);
throw new InternalErrorException(Msg.code(2042) + msg);
}
Validate.isTrue(chunk.getTargetStepId().equals(targetStep.getStepId()), "Chunk %s has target step %s but expected %s", chunkId, chunk.getTargetStepId(), targetStep.getStepId());
Optional<JobInstance> instanceOpt = myJobPersistence.fetchInstanceAndMarkInProgress(payload.getInstanceId());
JobInstance instance = instanceOpt.orElseThrow(() -> new InternalErrorException("Unknown instance: " + payload.getInstanceId()));
String instanceId = instance.getInstanceId();
if (instance.isCancelled()) {
ourLog.info("Skipping chunk {} because job instance is cancelled", chunkId);
myJobPersistence.markInstanceAsCompleted(instanceId);
return;
}
executeStep(chunk, jobDefinitionId, jobDefinitionVersion, definition, targetStep, nextStep, targetStepId, firstStep, instance);
}
@SuppressWarnings("unchecked")
private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> void executeStep(WorkChunk theWorkChunk, String theJobDefinitionId, int theJobDefinitionVersion, JobDefinition<PT> theDefinition, JobDefinitionStep<PT, IT, OT> theStep, JobDefinitionStep<PT, OT, ?> theSubsequentStep, String theTargetStepId, boolean theFirstStep, JobInstance theInstance) {
String instanceId = theInstance.getInstanceId();
PT parameters = theInstance.getParameters(theDefinition.getParametersType());
IJobStepWorker<PT, IT, OT> worker = theStep.getJobStepWorker();
BaseDataSink<OT> dataSink;
boolean finalStep = theSubsequentStep == null;
if (!finalStep) {
dataSink = new JobDataSink<>(myBatchJobSender, myJobPersistence, theJobDefinitionId, theJobDefinitionVersion, theSubsequentStep, instanceId, theStep.getStepId(), theDefinition.isGatedExecution());
} else {
dataSink = (BaseDataSink<OT>) new FinalStepDataSink(theJobDefinitionId, instanceId, theStep.getStepId());
}
Class<IT> inputType = theStep.getInputType();
boolean success = executeStep(theWorkChunk, theJobDefinitionId, theTargetStepId, inputType, parameters, worker, dataSink);
if (!success) {
return;
}
int workChunkCount = dataSink.getWorkChunkCount();
if (theFirstStep && workChunkCount == 0) {
ourLog.info("First step of job theInstance {} produced no work chunks, marking as completed", instanceId);
myJobPersistence.markInstanceAsCompleted(instanceId);
}
if (theDefinition.isGatedExecution() && theFirstStep) {
theInstance.setCurrentGatedStepId(theTargetStepId);
myJobPersistence.updateInstance(theInstance);
}
}
private JobDefinition<?> getDefinitionOrThrowException(String jobDefinitionId, int jobDefinitionVersion) {
Optional<JobDefinition<?>> opt = myJobDefinitionRegistry.getJobDefinition(jobDefinitionId, jobDefinitionVersion);
if (!opt.isPresent()) {
String msg = "Unknown job definition ID[" + jobDefinitionId + "] version[" + jobDefinitionVersion + "]";
ourLog.warn(msg);
throw new InternalErrorException(Msg.code(2043) + msg);
}
return opt.get();
}
/**
* Scans a model object for fields marked as {@link PasswordField}
* and nulls them
*/
private static void stripPasswordFields(@Nonnull Object theParameters) {
Field[] declaredFields = theParameters.getClass().getDeclaredFields();
for (Field nextField : declaredFields) {
JsonProperty propertyAnnotation = nextField.getAnnotation(JsonProperty.class);
if (propertyAnnotation == null) {
continue;
}
nextField.setAccessible(true);
try {
Object nextValue = nextField.get(theParameters);
if (nextField.getAnnotation(PasswordField.class) != null) {
nextField.set(theParameters, null);
} else if (nextValue != null) {
stripPasswordFields(nextValue);
}
} catch (IllegalAccessException e) {
throw new InternalErrorException(Msg.code(2044) + e.getMessage(), e);
}
}
}
private class WorkChannelMessageHandler implements MessageHandler {
@Override
public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException {
handleWorkChannelMessage((JobWorkNotificationJsonMessage) theMessage);
}
}
}

View File

@ -21,48 +21,49 @@ package ca.uhn.fhir.batch2.impl;
*/
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.WorkChunkData;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.JsonUtil;
import javax.annotation.Nonnull;
import java.util.concurrent.atomic.AtomicInteger;
class JobDataSink<OT extends IModelJson> extends BaseDataSink<OT> {
class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> extends BaseDataSink<PT,IT,OT> {
private final BatchJobSender myBatchJobSender;
private final IJobPersistence myJobPersistence;
private final String myJobDefinitionId;
private final int myJobDefinitionVersion;
private final JobDefinitionStep<?, ?, ?> myTargetStep;
private final JobDefinitionStep<PT, OT, ?> myTargetStep;
private final AtomicInteger myChunkCounter = new AtomicInteger(0);
private final boolean myGatedExecution;
JobDataSink(BatchJobSender theBatchJobSender, IJobPersistence theJobPersistence, String theJobDefinitionId, int theJobDefinitionVersion, JobDefinitionStep<?, ?, ?> theTargetStep, String theInstanceId, String theCurrentStepId, boolean theGatedExecution) {
super(theInstanceId, theCurrentStepId);
JobDataSink(@Nonnull BatchJobSender theBatchJobSender, @Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinition<?> theDefinition, @Nonnull String theInstanceId, @Nonnull JobWorkCursor<PT, IT, OT> theJobWorkCursor) {
super(theInstanceId, theJobWorkCursor);
myBatchJobSender = theBatchJobSender;
myJobPersistence = theJobPersistence;
myJobDefinitionId = theJobDefinitionId;
myJobDefinitionVersion = theJobDefinitionVersion;
myTargetStep = theTargetStep;
myGatedExecution = theGatedExecution;
myJobDefinitionId = theDefinition.getJobDefinitionId();
myJobDefinitionVersion = theDefinition.getJobDefinitionVersion();
myTargetStep = theJobWorkCursor.nextStep;
myGatedExecution = theDefinition.isGatedExecution();
}
@Override
public void accept(WorkChunkData<OT> theData) {
String jobDefinitionId = myJobDefinitionId;
int jobDefinitionVersion = myJobDefinitionVersion;
String instanceId = getInstanceId();
String targetStepId = myTargetStep.getStepId();
int sequence = myChunkCounter.getAndIncrement();
OT dataValue = theData.getData();
String dataValueString = JsonUtil.serialize(dataValue, false);
BatchWorkChunk batchWorkChunk = new BatchWorkChunk(jobDefinitionId, jobDefinitionVersion, targetStepId, instanceId, sequence, dataValueString);
BatchWorkChunk batchWorkChunk = new BatchWorkChunk(myJobDefinitionId, myJobDefinitionVersion, targetStepId, instanceId, sequence, dataValueString);
String chunkId = myJobPersistence.storeWorkChunk(batchWorkChunk);
if (!myGatedExecution) {
JobWorkNotification workNotification = new JobWorkNotification(jobDefinitionId, jobDefinitionVersion, instanceId, targetStepId, chunkId);
JobWorkNotification workNotification = new JobWorkNotification(myJobDefinitionId, myJobDefinitionVersion, instanceId, targetStepId, chunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
}
}

View File

@ -25,7 +25,10 @@ import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.util.HashMap;
@ -36,17 +39,18 @@ import java.util.Set;
import java.util.TreeMap;
public class JobDefinitionRegistry {
private static final Logger ourLog = LoggerFactory.getLogger(JobDefinitionRegistry.class);
private final Map<String, TreeMap<Integer, JobDefinition<?>>> myJobs = new HashMap<>();
public <PT extends IModelJson> void addJobDefinition(JobDefinition<PT> theDefinition) {
public <PT extends IModelJson> void addJobDefinition(@Nonnull JobDefinition<PT> theDefinition) {
Validate.notNull(theDefinition);
Validate.notBlank(theDefinition.getJobDefinitionId());
Validate.isTrue(theDefinition.getJobDefinitionVersion() >= 1);
Validate.isTrue(theDefinition.getSteps().size() > 1);
Set<String> stepIds = new HashSet<>();
for (JobDefinitionStep<?,?,?> next : theDefinition.getSteps()) {
for (JobDefinitionStep<?, ?, ?> next : theDefinition.getSteps()) {
if (!stepIds.add(next.getStepId())) {
throw new ConfigurationException(Msg.code(2046) + "Duplicate step[" + next.getStepId() + "] in definition[" + theDefinition.getJobDefinitionId() + "] version: " + theDefinition.getJobDefinitionVersion());
}
@ -74,4 +78,14 @@ public class JobDefinitionRegistry {
}
return Optional.of(versionMap.get(theJobDefinitionVersion));
}
public JobDefinition<?> getJobDefinitionOrThrowException(String theJobDefinitionId, int theJobDefinitionVersion) {
Optional<JobDefinition<?>> opt = getJobDefinition(theJobDefinitionId, theJobDefinitionVersion);
if (opt.isEmpty()) {
String msg = "Unknown job definition ID[" + theJobDefinitionId + "] version[" + theJobDefinitionVersion + "]";
ourLog.warn(msg);
throw new InternalErrorException(Msg.code(2043) + msg);
}
return opt.get();
}
}

View File

@ -82,11 +82,12 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
* is complete (all chunks are in COMPLETE status) and trigger the next step.
* </p>
*/
public class JobMaintenanceServiceImpl extends BaseJobService implements IJobMaintenanceService {
public class JobMaintenanceServiceImpl implements IJobMaintenanceService {
public static final int INSTANCES_PER_PASS = 100;
public static final long PURGE_THRESHOLD = 7L * DateUtils.MILLIS_PER_DAY;
private static final Logger ourLog = LoggerFactory.getLogger(JobMaintenanceServiceImpl.class);
private final IJobPersistence myJobPersistence;
private final ISchedulerService mySchedulerService;
private final JobDefinitionRegistry myJobDefinitionRegistry;
private final BatchJobSender myBatchJobSender;
@ -95,12 +96,13 @@ public class JobMaintenanceServiceImpl extends BaseJobService implements IJobMai
/**
* Constructor
*/
public JobMaintenanceServiceImpl(ISchedulerService theSchedulerService, IJobPersistence theJobPersistence, JobDefinitionRegistry theJobDefinitionRegistry, BatchJobSender theBatchJobSender) {
super(theJobPersistence);
public JobMaintenanceServiceImpl(@Nonnull ISchedulerService theSchedulerService, @Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinitionRegistry theJobDefinitionRegistry, @Nonnull BatchJobSender theBatchJobSender) {
Validate.notNull(theSchedulerService);
Validate.notNull(theJobPersistence);
Validate.notNull(theJobDefinitionRegistry);
Validate.notNull(theBatchJobSender);
myJobPersistence = theJobPersistence;
mySchedulerService = theSchedulerService;
myJobDefinitionRegistry = theJobDefinitionRegistry;
myBatchJobSender = theBatchJobSender;
@ -255,17 +257,15 @@ public class JobMaintenanceServiceImpl extends BaseJobService implements IJobMai
double percentComplete = (double) (completeChunkCount) / (double) (incompleteChunkCount + completeChunkCount + failedChunkCount + erroredChunkCount);
theInstance.setProgress(percentComplete);
changedStatus = false;
if (incompleteChunkCount == 0 && erroredChunkCount == 0 && failedChunkCount == 0) {
boolean completed = updateInstanceStatus(theInstance, StatusEnum.COMPLETED);
if (completed) {
JobDefinition<?> definition = myJobDefinitionRegistry.getJobDefinition(theInstance.getJobDefinitionId(), theInstance.getJobDefinitionVersion()).orElseThrow(() -> new IllegalStateException("Unknown job " + theInstance.getJobDefinitionId() + "/" + theInstance.getJobDefinitionVersion()));
invokeJobCompletionHandler(theInstance, definition);
}
changedStatus |= completed;
}
if (erroredChunkCount > 0) {
changedStatus |= updateInstanceStatus(theInstance, StatusEnum.ERRORED);
changedStatus = completed;
} else if (erroredChunkCount > 0) {
changedStatus = updateInstanceStatus(theInstance, StatusEnum.ERRORED);
}
if (earliestStartTime != null && latestEndTime != null) {

View File

@ -0,0 +1,47 @@
package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.api.IJobParametersValidator;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import javax.annotation.Nonnull;
import javax.validation.ConstraintViolation;
import javax.validation.Validation;
import javax.validation.Validator;
import javax.validation.ValidatorFactory;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
class JobParameterJsonValidator {
private final ValidatorFactory myValidatorFactory = Validation.buildDefaultValidatorFactory();
<PT extends IModelJson> void validateJobParameters(@Nonnull JobInstanceStartRequest theStartRequest, @Nonnull JobDefinition<PT> theJobDefinition) {
// JSR 380
Validator validator = myValidatorFactory.getValidator();
PT parameters = theStartRequest.getParameters(theJobDefinition.getParametersType());
Set<ConstraintViolation<IModelJson>> constraintErrors = validator.validate(parameters);
List<String> errorStrings = constraintErrors.stream().map(t -> t.getPropertyPath() + " - " + t.getMessage()).sorted().collect(Collectors.toList());
// Programmatic Validator
IJobParametersValidator<PT> parametersValidator = theJobDefinition.getParametersValidator();
if (parametersValidator != null) {
List<String> outcome = parametersValidator.validate(parameters);
outcome = defaultIfNull(outcome, Collections.emptyList());
errorStrings.addAll(outcome);
}
if (!errorStrings.isEmpty()) {
String message = "Failed to validate parameters for job of type " + theJobDefinition.getJobDefinitionId() + ": " + errorStrings.stream().map(t -> "\n * " + t).collect(Collectors.joining());
throw new InvalidRequestException(Msg.code(2039) + message);
}
}
}

View File

@ -0,0 +1,90 @@
package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.model.api.annotation.PasswordField;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.util.JsonUtil;
import ca.uhn.fhir.util.UrlUtil;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nonnull;
import java.lang.reflect.Field;
import java.util.List;
import java.util.stream.Collectors;
/**
* Job Query services intended for end-users querying the status of jobs
*/
class JobQuerySvc {
private final IJobPersistence myJobPersistence;
private final JobDefinitionRegistry myJobDefinitionRegistry;
JobQuerySvc(@Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinitionRegistry theJobDefinitionRegistry) {
myJobPersistence = theJobPersistence;
myJobDefinitionRegistry = theJobDefinitionRegistry;
}
JobInstance fetchInstance(String theInstanceId) {
return myJobPersistence.fetchInstance(theInstanceId)
.map(this::massageInstanceForUserAccess)
.orElseThrow(() -> new ResourceNotFoundException(Msg.code(2040) + "Unknown instance ID: " + UrlUtil.escapeUrlParam(theInstanceId)));
}
List<JobInstance> fetchInstances(int thePageSize, int thePageIndex) {
return myJobPersistence.fetchInstances(thePageSize, thePageIndex).stream()
.map(this::massageInstanceForUserAccess)
.collect(Collectors.toList());
}
public List<JobInstance> fetchRecentInstances(int theCount, int theStart) {
return myJobPersistence.fetchRecentInstances(theCount, theStart)
.stream().map(this::massageInstanceForUserAccess)
.collect(Collectors.toList());
}
private JobInstance massageInstanceForUserAccess(JobInstance theInstance) {
JobInstance retVal = new JobInstance(theInstance);
JobDefinition<?> definition = myJobDefinitionRegistry.getJobDefinitionOrThrowException(theInstance.getJobDefinitionId(), theInstance.getJobDefinitionVersion());
// Serializing the parameters strips any write-only params
IModelJson parameters = retVal.getParameters(definition.getParametersType());
stripPasswordFields(parameters);
String parametersString = JsonUtil.serializeOrInvalidRequest(parameters);
retVal.setParameters(parametersString);
return retVal;
}
/**
* Scans a model object for fields marked as {@link PasswordField}
* and nulls them
*/
private static void stripPasswordFields(@Nonnull Object theParameters) {
Field[] declaredFields = theParameters.getClass().getDeclaredFields();
for (Field nextField : declaredFields) {
JsonProperty propertyAnnotation = nextField.getAnnotation(JsonProperty.class);
if (propertyAnnotation == null) {
continue;
}
nextField.setAccessible(true);
try {
Object nextValue = nextField.get(theParameters);
if (nextField.getAnnotation(PasswordField.class) != null) {
nextField.set(theParameters, null);
} else if (nextValue != null) {
stripPasswordFields(nextValue);
}
} catch (IllegalAccessException e) {
throw new InternalErrorException(Msg.code(2044) + e.getMessage(), e);
}
}
}
}

View File

@ -0,0 +1,125 @@
package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.JobStepFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.util.Optional;
public class JobStepExecutor<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> {
private static final Logger ourLog = LoggerFactory.getLogger(JobStepExecutor.class);
private final IJobPersistence myJobPersistence;
private final BatchJobSender myBatchJobSender;
private final JobDefinition<PT> myDefinition;
private final String myInstanceId;
private final WorkChunk myWorkChunk;
private final JobWorkCursor<PT, IT, OT> myCursor;
private final PT myParameters;
JobStepExecutor(@Nonnull IJobPersistence theJobPersistence, @Nonnull BatchJobSender theBatchJobSender, @Nonnull JobInstance theInstance, @Nonnull WorkChunk theWorkChunk, @Nonnull JobWorkCursor<PT, IT, OT> theCursor) {
myJobPersistence = theJobPersistence;
myBatchJobSender = theBatchJobSender;
myDefinition = theCursor.jobDefinition;
myInstanceId = theInstance.getInstanceId();
myParameters = theInstance.getParameters(myDefinition.getParametersType());
myWorkChunk = theWorkChunk;
myCursor = theCursor;
}
@SuppressWarnings("unchecked")
void executeStep() {
BaseDataSink<PT,IT,OT> dataSink;
if (myCursor.isFinalStep()) {
dataSink = (BaseDataSink<PT, IT, OT>) new FinalStepDataSink<>(myDefinition.getJobDefinitionId(), myInstanceId, myCursor.asFinalCursor());
} else {
dataSink = new JobDataSink<>(myBatchJobSender, myJobPersistence, myDefinition, myInstanceId, myCursor);
}
boolean success = executeStep(myDefinition.getJobDefinitionId(), myWorkChunk, myParameters, dataSink);
if (!success) {
return;
}
if (dataSink.firstStepProducedNothing()) {
ourLog.info("First step of job myInstance {} produced no work chunks, marking as completed", myInstanceId);
myJobPersistence.markInstanceAsCompleted(myInstanceId);
}
if (myDefinition.isGatedExecution() && myCursor.isFirstStep) {
initializeGatedExecution();
}
}
private void initializeGatedExecution() {
Optional<JobInstance> oInstance = myJobPersistence.fetchInstance(myInstanceId);
if (oInstance.isPresent()) {
JobInstance instance = oInstance.get();
instance.setCurrentGatedStepId(myCursor.getCurrentStepId());
myJobPersistence.updateInstance(instance);
}
}
private boolean executeStep(String theJobDefinitionId, @Nonnull WorkChunk theWorkChunk, PT theParameters, BaseDataSink<PT,IT,OT> theDataSink) {
JobDefinitionStep<PT, IT, OT> theTargetStep = theDataSink.getTargetStep();
String targetStepId = theTargetStep.getStepId();
Class<IT> inputType = theTargetStep.getInputType();
IJobStepWorker<PT, IT, OT> worker = theTargetStep.getJobStepWorker();
IT inputData = null;
if (!inputType.equals(VoidModel.class)) {
inputData = theWorkChunk.getData(inputType);
}
String instanceId = theWorkChunk.getInstanceId();
String chunkId = theWorkChunk.getId();
StepExecutionDetails<PT, IT> stepExecutionDetails = new StepExecutionDetails<>(theParameters, inputData, instanceId, chunkId);
RunOutcome outcome;
try {
outcome = worker.run(stepExecutionDetails, theDataSink);
Validate.notNull(outcome, "Step theWorker returned null: %s", worker.getClass());
} catch (JobExecutionFailedException e) {
ourLog.error("Unrecoverable failure executing job {} step {}", theJobDefinitionId, targetStepId, e);
myJobPersistence.markWorkChunkAsFailed(chunkId, e.toString());
return false;
} catch (Exception e) {
ourLog.error("Failure executing job {} step {}", theJobDefinitionId, targetStepId, e);
myJobPersistence.markWorkChunkAsErroredAndIncrementErrorCount(chunkId, e.toString());
throw new JobStepFailedException(Msg.code(2041) + e.getMessage(), e);
} catch (Throwable t) {
ourLog.error("Unexpected failure executing job {} step {}", theJobDefinitionId, targetStepId, t);
myJobPersistence.markWorkChunkAsFailed(chunkId, t.toString());
return false;
}
int recordsProcessed = outcome.getRecordsProcessed();
myJobPersistence.markWorkChunkAsCompletedAndClearData(chunkId, recordsProcessed);
int recoveredErrorCount = theDataSink.getRecoveredErrorCount();
if (recoveredErrorCount > 0) {
myJobPersistence.incrementWorkChunkErrorCount(chunkId, recoveredErrorCount);
}
return true;
}
}

View File

@ -0,0 +1,23 @@
package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.model.api.IModelJson;
import javax.annotation.Nonnull;
public class JobStepExecutorFactory {
private final IJobPersistence myJobPersistence;
private final BatchJobSender myBatchJobSender;
public JobStepExecutorFactory(@Nonnull IJobPersistence theJobPersistence, @Nonnull BatchJobSender theBatchJobSender) {
myJobPersistence = theJobPersistence;
myBatchJobSender = theBatchJobSender;
}
public <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> JobStepExecutor<PT,IT,OT> newJobStepExecutor(@Nonnull JobInstance theInstance, @Nonnull WorkChunk theWorkChunk, @Nonnull JobWorkCursor<PT, IT, OT> theCursor) {
return new JobStepExecutor<>(myJobPersistence, myBatchJobSender, theInstance, theWorkChunk, theCursor);
}
}

View File

@ -0,0 +1,80 @@
package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import javax.annotation.Nonnull;
import java.util.Optional;
/**
* This handler receives batch work request messages and performs the batch work requested by the message
*/
class WorkChannelMessageHandler implements MessageHandler {
private static final Logger ourLog = LoggerFactory.getLogger(WorkChannelMessageHandler.class);
private final IJobPersistence myJobPersistence;
private final JobDefinitionRegistry myJobDefinitionRegistry;
private final JobStepExecutorFactory myJobStepExecutorFactory;
WorkChannelMessageHandler(@Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinitionRegistry theJobDefinitionRegistry, @Nonnull BatchJobSender theBatchJobSender) {
myJobPersistence = theJobPersistence;
myJobDefinitionRegistry = theJobDefinitionRegistry;
myJobStepExecutorFactory = new JobStepExecutorFactory(theJobPersistence, theBatchJobSender);
}
@Override
public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException {
handleWorkChannelMessage((JobWorkNotificationJsonMessage) theMessage);
}
private void handleWorkChannelMessage(JobWorkNotificationJsonMessage theMessage) {
JobWorkNotification workNotification = theMessage.getPayload();
String chunkId = workNotification.getChunkId();
Validate.notNull(chunkId);
Optional<WorkChunk> chunkOpt = myJobPersistence.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId);
if (chunkOpt.isEmpty()) {
ourLog.error("Unable to find chunk with ID {} - Aborting", chunkId);
return;
}
WorkChunk workChunk = chunkOpt.get();
JobWorkCursor<?, ?, ?> cursor = buildCursorFromNotification(workNotification);
Validate.isTrue(workChunk.getTargetStepId().equals(cursor.getCurrentStepId()), "Chunk %s has target step %s but expected %s", chunkId, workChunk.getTargetStepId(), cursor.getCurrentStepId());
Optional<JobInstance> instanceOpt = myJobPersistence.fetchInstanceAndMarkInProgress(workNotification.getInstanceId());
JobInstance instance = instanceOpt.orElseThrow(() -> new InternalErrorException("Unknown instance: " + workNotification.getInstanceId()));
String instanceId = instance.getInstanceId();
if (instance.isCancelled()) {
ourLog.info("Skipping chunk {} because job instance is cancelled", chunkId);
myJobPersistence.markInstanceAsCompleted(instanceId);
return;
}
JobStepExecutor<?,?,?> stepExecutor = myJobStepExecutorFactory.newJobStepExecutor(instance, workChunk, cursor);
stepExecutor.executeStep();
}
private JobWorkCursor<?, ?, ?> buildCursorFromNotification(JobWorkNotification workNotification) {
String jobDefinitionId = workNotification.getJobDefinitionId();
int jobDefinitionVersion = workNotification.getJobDefinitionVersion();
JobDefinition<?> definition = myJobDefinitionRegistry.getJobDefinitionOrThrowException(jobDefinitionId, jobDefinitionVersion);
return JobWorkCursor.fromJobDefinitionAndWorkNotification(definition, workNotification);
}
}

View File

@ -26,6 +26,8 @@ import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.model.api.IModelJson;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@ -35,7 +37,7 @@ import java.util.List;
import java.util.stream.Collectors;
public class JobDefinition<PT extends IModelJson> {
private static final Logger ourLog = LoggerFactory.getLogger(JobDefinition.class);
public static final int ID_MAX_LENGTH = 100;
private final String myJobDefinitionId;
@ -61,7 +63,7 @@ public class JobDefinition<PT extends IModelJson> {
myJobDefinitionVersion = theJobDefinitionVersion;
myJobDescription = theJobDescription;
mySteps = theSteps;
myStepIds = mySteps.stream().map(t -> t.getStepId()).collect(Collectors.toList());
myStepIds = mySteps.stream().map(JobDefinitionStep::getStepId).collect(Collectors.toList());
myParametersType = theParametersType;
myParametersValidator = theParametersValidator;
myGatedExecution = theGatedExecution;
@ -78,6 +80,7 @@ public class JobDefinition<PT extends IModelJson> {
return myParametersValidator;
}
@SuppressWarnings("unused")
public String getJobDescription() {
return myJobDescription;
}
@ -110,6 +113,16 @@ public class JobDefinition<PT extends IModelJson> {
return mySteps;
}
/**
*
* @return Returns the stepId of the first step
* @throws IndexOutOfBoundsException if there is no first step
*/
public String getFirstStepId() {
JobDefinitionStep<PT, ?, ?> firstStep = mySteps.get(0);
return firstStep.getStepId();
}
public boolean isGatedExecution() {
return myGatedExecution;
}
@ -141,7 +154,7 @@ public class JobDefinition<PT extends IModelJson> {
mySteps = new ArrayList<>();
}
Builder(List<JobDefinitionStep<PT, ?, ?>> theSteps, String theJobDefinitionId, int theJobDefinitionVersion, String theJobDescription, Class<PT> theJobParametersType, Class<NIT> theNextInputType, IJobParametersValidator<PT> theParametersValidator, boolean theGatedExecution, IJobCompletionHandler<PT> theCompletionHandler) {
Builder(List<JobDefinitionStep<PT, ?, ?>> theSteps, String theJobDefinitionId, int theJobDefinitionVersion, String theJobDescription, Class<PT> theJobParametersType, Class<NIT> theNextInputType, @Nullable IJobParametersValidator<PT> theParametersValidator, boolean theGatedExecution, IJobCompletionHandler<PT> theCompletionHandler) {
mySteps = theSteps;
myJobDefinitionId = theJobDefinitionId;
myJobDefinitionVersion = theJobDefinitionVersion;
@ -259,7 +272,6 @@ public class JobDefinition<PT extends IModelJson> {
*
* @param theParametersValidator The validator (must not be null. Do not call this method at all if you do not want a parameters validator).
*/
@SuppressWarnings("unchecked")
public Builder<PT, NIT> setParametersValidator(@Nonnull IJobParametersValidator<PT> theParametersValidator) {
Validate.notNull(theParametersValidator, "theParametersValidator must not be null");
Validate.isTrue(myParametersValidator == null, "Can not supply multiple parameters validators. Already have: %s", myParametersValidator);

View File

@ -34,6 +34,7 @@ public class JobDefinitionStep<PT extends IModelJson, IT extends IModelJson, OT
private final String myStepDescription;
private final IJobStepWorker<PT, IT, OT> myJobStepWorker;
private final Class<IT> myInputType;
private final Class<OT> myOutputType;
public JobDefinitionStep(@Nonnull String theStepId, @Nonnull String theStepDescription, @Nonnull IJobStepWorker<PT, IT, OT> theJobStepWorker, @Nonnull Class<IT> theInputType, @Nonnull Class<OT> theOutputType) {
@ -63,4 +64,8 @@ public class JobDefinitionStep<PT extends IModelJson, IT extends IModelJson, OT
public Class<IT> getInputType() {
return myInputType;
}
public Class<OT> getOutputType() {
return myOutputType;
}
}

View File

@ -113,6 +113,17 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
setCurrentGatedStepId(theJobInstance.getCurrentGatedStepId());
}
public static JobInstance fromJobDefinition(JobDefinition<?> theJobDefinition) {
JobInstance instance = new JobInstance();
instance.setJobDefinitionId(theJobDefinition.getJobDefinitionId());
instance.setJobDefinitionVersion(theJobDefinition.getJobDefinitionVersion());
if (theJobDefinition.isGatedExecution()) {
instance.setCurrentGatedStepId(theJobDefinition.getFirstStepId());
}
return instance;
}
public String getCurrentGatedStepId() {
return myCurrentGatedStepId;
}

View File

@ -0,0 +1,92 @@
package ca.uhn.fhir.batch2.model;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* This immutable object is produced by reconciling a work notification message to its corresponding job definition.
* It holds information required to execute the current step and send data to the next step.
*
* @param <PT> Job Parameter type
* @param <IT> Current step input data type
* @param <OT> Current step output data type and next step input data type
*/
public class JobWorkCursor<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> {
private static final Logger ourLog = LoggerFactory.getLogger(JobWorkCursor.class);
public final JobDefinition<PT> jobDefinition;
public final boolean isFirstStep;
public final JobDefinitionStep<PT, IT, OT> currentStep;
public final JobDefinitionStep<PT, OT, ?> nextStep;
public JobWorkCursor(JobDefinition<PT> theJobDefinition, boolean theIsFirstStep, JobDefinitionStep<PT, IT, OT> theCurrentStep, JobDefinitionStep<PT, OT, ?> theNextStep) {
jobDefinition = theJobDefinition;
isFirstStep = theIsFirstStep;
currentStep = theCurrentStep;
nextStep = theNextStep;
validate();
}
private void validate() {
if (isFirstStep) {
Validate.isTrue(currentStep.getInputType() == VoidModel.class);
}
// Note that if it is not the first step, it can have VoidModel as it's input type--not all steps require input from
// the previous step
if (nextStep != null) {
Validate.isTrue(currentStep.getOutputType() == nextStep.getInputType());
}
}
public static <PT extends IModelJson> JobWorkCursor<PT,?,?> fromJobDefinitionAndWorkNotification(JobDefinition<PT> theJobDefinition, JobWorkNotification theWorkNotification) {
String requestedStepId = theWorkNotification.getTargetStepId();
boolean isFirstStep = false;
JobDefinitionStep<PT,?,?> currentStep = null;
JobDefinitionStep<PT,?,?> nextStep = null;
List<JobDefinitionStep<PT, ?, ?>> steps = theJobDefinition.getSteps();
for (int i = 0; i < steps.size(); i++) {
JobDefinitionStep<PT, ?, ?> step = steps.get(i);
if (step.getStepId().equals(requestedStepId)) {
currentStep = step;
if (i == 0) {
isFirstStep = true;
}
if (i < (steps.size() - 1)) {
nextStep = steps.get(i + 1);
}
break;
}
}
if (currentStep == null) {
String msg = "Unknown step[" + requestedStepId + "] for job definition ID[" + theJobDefinition.getJobDefinitionId() + "] version[" + theJobDefinition.getJobDefinitionVersion() + "]";
ourLog.warn(msg);
throw new InternalErrorException(Msg.code(2042) + msg);
}
return new JobWorkCursor(theJobDefinition, isFirstStep, currentStep, nextStep);
}
public String getCurrentStepId() {
return currentStep.getStepId();
}
public boolean isFinalStep() {
return nextStep == null;
}
@SuppressWarnings("unchecked")
public JobWorkCursor<PT,IT, VoidModel> asFinalCursor() {
Validate.isTrue(isFinalStep());
return (JobWorkCursor<PT,IT, VoidModel>)this;
}
}

View File

@ -53,6 +53,13 @@ public class JobWorkNotification implements IModelJson {
setTargetStepId(theTargetStepId);
}
public static JobWorkNotification firstStepNotification(JobDefinition<?> theJobDefinition, String theInstanceId, String theChunkId) {
String firstStepId = theJobDefinition.getFirstStepId();
String jobDefinitionId = theJobDefinition.getJobDefinitionId();
int jobDefinitionVersion = theJobDefinition.getJobDefinitionVersion();
return new JobWorkNotification(jobDefinitionId, jobDefinitionVersion, theInstanceId, firstStepId, theChunkId);
}
public String getJobDefinitionId() {
return myJobDefinitionId;
}

View File

@ -23,6 +23,8 @@ package ca.uhn.fhir.batch2.model;
import ca.uhn.fhir.rest.server.messaging.json.BaseJsonMessage;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nonnull;
public class JobWorkNotificationJsonMessage extends BaseJsonMessage<JobWorkNotification> {
@JsonProperty("payload")
@ -43,6 +45,7 @@ public class JobWorkNotificationJsonMessage extends BaseJsonMessage<JobWorkNotif
}
@Override
@Nonnull
public JobWorkNotification getPayload() {
return myPayload;
}

View File

@ -24,7 +24,7 @@ import ca.uhn.fhir.model.api.IModelJson;
public class WorkChunkData<OT extends IModelJson> {
private OT myData;
private final OT myData;
/**
* Constructor

View File

@ -53,7 +53,7 @@ public abstract class BaseBatch2Test {
}
@SafeVarargs
final JobDefinition<TestJobParameters> createJobDefinition(Consumer<JobDefinition.Builder<TestJobParameters, ?>>... theModifiers) {
protected final JobDefinition<TestJobParameters> createJobDefinition(Consumer<JobDefinition.Builder<TestJobParameters, ?>>... theModifiers) {
JobDefinition.Builder<TestJobParameters, VoidModel> builder = JobDefinition
.newBuilder()
.setJobDefinitionId(JOB_DEFINITION_ID)

View File

@ -16,6 +16,7 @@ import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.BeforeEach;
@ -27,12 +28,10 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.mockito.stubbing.Answer;
import org.springframework.messaging.MessageDeliveryException;
import javax.annotation.Nonnull;
import java.util.List;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -41,6 +40,7 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@ -49,8 +49,6 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@TestMethodOrder(MethodOrderer.MethodName.class)
public class JobCoordinatorImplTest extends BaseBatch2Test {
private static final Logger ourLog = LoggerFactory.getLogger(JobCoordinatorImplTest.class);
private final IChannelReceiver myWorkChannelReceiver = LinkedBlockingChannel.newSynchronous("receiver");
private JobCoordinatorImpl mySvc;
@Mock
@ -89,55 +87,13 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
verify(myJobInstancePersister, times(1)).cancelInstance(eq(INSTANCE_ID));
}
@Test
public void testFetchInstance_PasswordsRedacted() {
// Setup
JobDefinition<?> definition = createJobDefinition();
JobInstance instance = createInstance();
when(myJobDefinitionRegistry.getJobDefinition(eq(JOB_DEFINITION_ID), eq(1))).thenReturn(Optional.of(definition));
when(myJobInstancePersister.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(instance));
// Execute
JobInstance outcome = mySvc.getInstance(INSTANCE_ID);
ourLog.info("Job instance: {}", outcome);
ourLog.info("Parameters: {}", outcome.getParameters());
assertEquals(PARAM_1_VALUE, outcome.getParameters(TestJobParameters.class).getParam1());
assertEquals(PARAM_2_VALUE, outcome.getParameters(TestJobParameters.class).getParam2());
assertEquals(null, outcome.getParameters(TestJobParameters.class).getPassword());
}
@Test
public void testFetchInstances() {
// Setup
when(myJobDefinitionRegistry.getJobDefinition(eq(JOB_DEFINITION_ID), eq(1))).thenReturn(Optional.of(createJobDefinition()));
when(myJobInstancePersister.fetchInstances(eq(100), eq(0))).thenReturn(Lists.newArrayList(createInstance()));
// Execute
List<JobInstance> outcome = mySvc.getInstances(100, 0);
// Verify
assertEquals(1, outcome.size());
}
@SuppressWarnings("unchecked")
@Test
public void testPerformStep_FirstStep() {
// Setup
when(myJobDefinitionRegistry.getJobDefinition(eq(JOB_DEFINITION_ID), eq(1))).thenReturn(Optional.of(createJobDefinition()));
when(myJobInstancePersister.fetchInstanceAndMarkInProgress(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance()));
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunkStep1()));
setupMocks(createJobDefinition(), createWorkChunkStep1());
when(myStep1Worker.run(any(), any())).thenAnswer(t -> {
IJobDataSink<TestJobStep2InputType> sink = t.getArgument(1, IJobDataSink.class);
sink.accept(new TestJobStep2InputType("data value 1a", "data value 2a"));
@ -163,6 +119,12 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
verify(myBatchJobSender, times(2)).sendWorkChannelMessage(any());
}
private void setupMocks(JobDefinition<TestJobParameters> theJobDefinition, WorkChunk theWorkChunk) {
doReturn(theJobDefinition).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1));
when(myJobInstancePersister.fetchInstanceAndMarkInProgress(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance()));
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(theWorkChunk));
}
/**
* If the first step doesn't produce any work chunks, then
* the instance should be marked as complete right away.
@ -172,9 +134,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
// Setup
when(myJobDefinitionRegistry.getJobDefinition(eq(JOB_DEFINITION_ID), eq(1))).thenReturn(Optional.of(createJobDefinition()));
when(myJobInstancePersister.fetchInstanceAndMarkInProgress(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance()));
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunkStep1()));
setupMocks(createJobDefinition(), createWorkChunkStep1());
when(myStep1Worker.run(any(), any())).thenReturn(new RunOutcome(50));
mySvc.start();
@ -198,18 +158,15 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
// Setup
JobDefinition<TestJobParameters> jobDefinition = createJobDefinition(
t -> t.gatedExecution()
);
when(myJobDefinitionRegistry.getJobDefinition(eq(JOB_DEFINITION_ID), eq(1))).thenReturn(Optional.of(jobDefinition));
when(myJobInstancePersister.fetchInstanceAndMarkInProgress(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance()));
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunkStep1()));
when(myStep1Worker.run(any(), any())).thenAnswer(t -> {
IJobDataSink<TestJobStep2InputType> sink = t.getArgument(1, IJobDataSink.class);
JobDefinition<TestJobParameters> jobDefinition = createJobDefinition(JobDefinition.Builder::gatedExecution);
setupMocks(jobDefinition, createWorkChunkStep1());
Answer<RunOutcome> answer = t -> {
IJobDataSink<TestJobStep2InputType> sink = t.getArgument(1);
sink.accept(new TestJobStep2InputType("data value 1a", "data value 2a"));
sink.accept(new TestJobStep2InputType("data value 1b", "data value 2b"));
return new RunOutcome(50);
});
};
when(myStep1Worker.run(any(), any())).thenAnswer(answer);
mySvc.start();
// Execute
@ -234,7 +191,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
// Setup
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_2, new TestJobStep2InputType(DATA_1_VALUE, DATA_2_VALUE))));
when(myJobDefinitionRegistry.getJobDefinition(eq(JOB_DEFINITION_ID), eq(1))).thenReturn(Optional.of(createJobDefinition()));
doReturn(createJobDefinition()).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1));
when(myJobInstancePersister.fetchInstanceAndMarkInProgress(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance()));
when(myStep2Worker.run(any(), any())).thenReturn(new RunOutcome(50));
mySvc.start();
@ -259,7 +216,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
// Setup
when(myJobDefinitionRegistry.getJobDefinition(eq(JOB_DEFINITION_ID), eq(1))).thenReturn(Optional.of(createJobDefinition()));
doReturn(createJobDefinition()).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1));
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_2, new TestJobStep2InputType(DATA_1_VALUE, DATA_2_VALUE))));
when(myJobInstancePersister.fetchInstanceAndMarkInProgress(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance()));
when(myStep2Worker.run(any(), any())).thenThrow(new NullPointerException("This is an error message"));
@ -292,7 +249,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
// Setup
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_2, new TestJobStep2InputType(DATA_1_VALUE, DATA_2_VALUE))));
when(myJobDefinitionRegistry.getJobDefinition(eq(JOB_DEFINITION_ID), eq(1))).thenReturn(Optional.of(createJobDefinition()));
doReturn(createJobDefinition()).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1));
when(myJobInstancePersister.fetchInstanceAndMarkInProgress(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance()));
when(myStep2Worker.run(any(), any())).thenAnswer(t -> {
IJobDataSink<?> sink = t.getArgument(1, IJobDataSink.class);
@ -325,7 +282,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
// Setup
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunkStep3()));
when(myJobDefinitionRegistry.getJobDefinition(eq(JOB_DEFINITION_ID), eq(1))).thenReturn(Optional.of(createJobDefinition()));
doReturn(createJobDefinition()).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1));
when(myJobInstancePersister.fetchInstanceAndMarkInProgress(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance()));
when(myStep3Worker.run(any(), any())).thenReturn(new RunOutcome(50));
mySvc.start();
@ -352,7 +309,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
// Setup
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_3, new TestJobStep3InputType().setData3(DATA_3_VALUE).setData4(DATA_4_VALUE))));
when(myJobDefinitionRegistry.getJobDefinition(eq(JOB_DEFINITION_ID), eq(1))).thenReturn(Optional.of(createJobDefinition()));
doReturn(createJobDefinition()).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1));
when(myJobInstancePersister.fetchInstanceAndMarkInProgress(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance()));
when(myStep3Worker.run(any(), any())).thenAnswer(t -> {
IJobDataSink<VoidModel> sink = t.getArgument(1, IJobDataSink.class);
@ -376,7 +333,8 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
// Setup
when(myJobDefinitionRegistry.getJobDefinition(eq(JOB_DEFINITION_ID), eq(1))).thenReturn(Optional.empty());
String exceptionMessage = "badbadnotgood";
when(myJobDefinitionRegistry.getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1))).thenThrow(new InternalErrorException(exceptionMessage));
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunkStep2()));
mySvc.start();
@ -388,7 +346,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
} catch (MessageDeliveryException e) {
// Verify
assertEquals("HAPI-2043: Unknown job definition ID[JOB_DEFINITION_ID] version[1]", e.getMostSpecificCause().getMessage());
assertEquals(exceptionMessage, e.getMostSpecificCause().getMessage());
}
}

View File

@ -8,6 +8,8 @@ import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.JsonUtil;
@ -54,7 +56,7 @@ class JobDataSinkTest {
public void test_sink_accept() {
// setup
IJobStepWorker<TestJobParameters, VoidModel, Step1Output> firstStepWorker = new IJobStepWorker<TestJobParameters, VoidModel, Step1Output>() {
IJobStepWorker<TestJobParameters, VoidModel, Step1Output> firstStepWorker = new IJobStepWorker<>() {
@NotNull
@Override
public RunOutcome run(@NotNull StepExecutionDetails<TestJobParameters, VoidModel> theStepExecutionDetails, @NotNull IJobDataSink<Step1Output> theDataSink) throws JobExecutionFailedException {
@ -84,11 +86,15 @@ class JobDataSinkTest {
.addLastStep(LAST_STEP_ID, "s2desc", lastStepWorker)
.build();
JobDefinitionStep<TestJobParameters, VoidModel, Step1Output> firstStep = (JobDefinitionStep<TestJobParameters, VoidModel, Step1Output>) job.getSteps().get(0);
JobDefinitionStep<TestJobParameters, Step1Output, VoidModel> lastStep = (JobDefinitionStep<TestJobParameters, Step1Output, VoidModel>) job.getSteps().get(1);
// execute
// Let's test our first step worker by calling run on it:
when(myJobPersistence.storeWorkChunk(myBatchWorkChunkCaptor.capture())).thenReturn(CHUNK_ID);
StepExecutionDetails<TestJobParameters, VoidModel> details = new StepExecutionDetails<>(new TestJobParameters().setParam1("" + PID_COUNT), null, JOB_INSTANCE_ID, CHUNK_ID);
JobDataSink<Step1Output> sink = new JobDataSink<>(myBatchJobSender, myJobPersistence, JOB_DEF_ID, JOB_DEF_VERSION, job.getSteps().get(1), JOB_INSTANCE_ID, FIRST_STEP_ID, false);
JobWorkCursor<TestJobParameters, VoidModel, Step1Output> cursor = new JobWorkCursor<>(job, true, firstStep, lastStep);
JobDataSink<TestJobParameters, VoidModel, Step1Output> sink = new JobDataSink<>(myBatchJobSender, myJobPersistence, job, JOB_INSTANCE_ID, cursor);
RunOutcome result = firstStepWorker.run(details, sink);

View File

@ -4,6 +4,7 @@ import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ -12,7 +13,6 @@ import org.mockito.junit.jupiter.MockitoExtension;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
@ExtendWith(MockitoExtension.class)
class JobDefinitionRegistryTest {
@ -51,13 +51,13 @@ class JobDefinitionRegistryTest {
@Test
void testGetLatestJobDefinition() {
assertEquals(2, mySvc.getLatestJobDefinition("A").orElseThrow(() -> new IllegalArgumentException()).getJobDefinitionVersion());
assertEquals(2, mySvc.getLatestJobDefinition("A").orElseThrow(IllegalArgumentException::new).getJobDefinitionVersion());
}
@Test
void testGetJobDefinition() {
assertEquals(1, mySvc.getJobDefinition("A", 1).orElseThrow(() -> new IllegalArgumentException()).getJobDefinitionVersion());
assertEquals(2, mySvc.getJobDefinition("A", 2).orElseThrow(() -> new IllegalArgumentException()).getJobDefinitionVersion());
assertEquals(1, mySvc.getJobDefinition("A", 1).orElseThrow(IllegalArgumentException::new).getJobDefinitionVersion());
assertEquals(2, mySvc.getJobDefinition("A", 2).orElseThrow(IllegalArgumentException::new).getJobDefinitionVersion());
}
@Test
@ -109,4 +109,17 @@ class JobDefinitionRegistryTest {
}
@Test
public void getJobDefinitionOrThrowException() {
String jobDefinitionId = "Ranch Dressing Expert";
int jobDefinitionVersion = 12;
try {
mySvc.getJobDefinitionOrThrowException(jobDefinitionId, jobDefinitionVersion);
fail();
} catch (InternalErrorException e) {
assertEquals("HAPI-2043: Unknown job definition ID[" + jobDefinitionId + "] version[" + jobDefinitionVersion + "]", e.getMessage());
}
}
}

View File

@ -144,7 +144,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
@Test
public void testInProgress_GatedExecution_FirstStepComplete() {
// Setup
myJobDefinitionRegistry.addJobDefinition(createJobDefinition(t -> t.gatedExecution()));
myJobDefinitionRegistry.addJobDefinition(createJobDefinition(JobDefinition.Builder::gatedExecution));
when(myJobPersistence.fetchWorkChunksWithoutData(eq(INSTANCE_ID), eq(100), eq(0))).thenReturn(Lists.newArrayList(
createWorkChunkStep2().setStatus(StatusEnum.QUEUED).setId(CHUNK_ID),
createWorkChunkStep2().setStatus(StatusEnum.QUEUED).setId(CHUNK_ID_2)

View File

@ -0,0 +1,76 @@
package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
class JobQuerySvcTest extends BaseBatch2Test {
private static final Logger ourLog = LoggerFactory.getLogger(JobQuerySvcTest.class);
@Mock
IJobPersistence myJobPersistence;
@Mock
JobDefinitionRegistry myJobDefinitionRegistry;
JobQuerySvc mySvc;
@BeforeEach
public void beforeEach() {
mySvc = new JobQuerySvc(myJobPersistence, myJobDefinitionRegistry);
}
@Test
public void testFetchInstance_PasswordsRedacted() {
// Setup
JobDefinition<?> definition = createJobDefinition();
JobInstance instance = createInstance();
doReturn(definition).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1));
when(myJobPersistence.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(instance));
// Execute
JobInstance outcome = mySvc.fetchInstance(INSTANCE_ID);
ourLog.info("Job instance: {}", outcome);
ourLog.info("Parameters: {}", outcome.getParameters());
assertEquals(PARAM_1_VALUE, outcome.getParameters(TestJobParameters.class).getParam1());
assertEquals(PARAM_2_VALUE, outcome.getParameters(TestJobParameters.class).getParam2());
assertNull(outcome.getParameters(TestJobParameters.class).getPassword());
}
@Test
public void testFetchInstances() {
// Setup
doReturn(createJobDefinition()).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1));
when(myJobPersistence.fetchInstances(eq(100), eq(0))).thenReturn(Lists.newArrayList(createInstance()));
// Execute
List<JobInstance> outcome = mySvc.fetchInstances(100, 0);
// Verify
assertEquals(1, outcome.size());
}
}

View File

@ -0,0 +1,91 @@
package ca.uhn.fhir.batch2.model;
import ca.uhn.fhir.batch2.impl.BaseBatch2Test;
import ca.uhn.fhir.batch2.impl.TestJobParameters;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.fail;
class JobWorkCursorTest extends BaseBatch2Test {
private JobDefinition<TestJobParameters> myDefinition;
@BeforeEach
public void before() {
myDefinition = createJobDefinition();
}
@Test
public void createCursorStep1() {
// setup
JobWorkNotification workNotification = new JobWorkNotification();
workNotification.setTargetStepId(STEP_1);
// execute
JobWorkCursor<TestJobParameters, ?, ?> cursor = JobWorkCursor.fromJobDefinitionAndWorkNotification(myDefinition, workNotification);
// verify
assertCursor(cursor, true, false, STEP_1, STEP_2);
}
@Test
public void createCursorStep2() {
// setup
JobWorkNotification workNotification = new JobWorkNotification();
workNotification.setTargetStepId(STEP_2);
// execute
JobWorkCursor<TestJobParameters, ?, ?> cursor = JobWorkCursor.fromJobDefinitionAndWorkNotification(myDefinition, workNotification);
// verify
assertCursor(cursor, false, false, STEP_2, STEP_3);
}
@Test
public void createCursorStep3() {
// setup
JobWorkNotification workNotification = new JobWorkNotification();
workNotification.setTargetStepId(STEP_3);
// execute
JobWorkCursor<TestJobParameters, ?, ?> cursor = JobWorkCursor.fromJobDefinitionAndWorkNotification(myDefinition, workNotification);
// verify
assertCursor(cursor, false, true, STEP_3, null);
}
@Test
public void unknownStep() {
// setup
JobWorkNotification workNotification = new JobWorkNotification();
String targetStepId = "Made a searching and fearless moral inventory of ourselves";
workNotification.setTargetStepId(targetStepId);
// execute
try {
JobWorkCursor.fromJobDefinitionAndWorkNotification(myDefinition, workNotification);
// verify
fail();
} catch (InternalErrorException e) {
assertEquals("HAPI-2042: Unknown step[" + targetStepId + "] for job definition ID[JOB_DEFINITION_ID] version[1]", e.getMessage());
}
}
private void assertCursor(JobWorkCursor<TestJobParameters,?,?> theCursor, boolean theExpectedIsFirstStep, boolean theExpectedIsFinalStep, String theExpectedCurrentStep, String theExpectedNextStep) {
assertEquals(theExpectedIsFirstStep, theCursor.isFirstStep);
assertEquals(theExpectedIsFinalStep, theCursor.isFinalStep());
assertEquals(theExpectedCurrentStep, theCursor.currentStep.getStepId());
if (theExpectedNextStep == null) {
assertNull(theCursor.nextStep);
} else {
assertEquals(theExpectedNextStep, theCursor.nextStep.getStepId());
}
assertEquals(myDefinition.getJobDefinitionId(), theCursor.jobDefinition.getJobDefinitionId());
assertEquals(myDefinition.getJobDefinitionVersion(), theCursor.jobDefinition.getJobDefinitionVersion());
}
}

View File

@ -0,0 +1,169 @@
package ca.uhn.fhir.jpa.subscription.channel.impl;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.test.concurrency.PointcutLatch;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
class LinkedBlockingChannelFactoryTest {
private static final Logger ourLog = LoggerFactory.getLogger(LinkedBlockingChannelFactoryTest.class);
private static final String TEST_CHANNEL_NAME = "test-channel-name";
private static final String TEST_PAYLOAD = "payload";
LinkedBlockingChannelFactory myChannelFactory = new LinkedBlockingChannelFactory((name, settings) -> name);
private List<String> myReceivedPayloads;
private PointcutLatch[] myHandlerCanProceedLatch = {
new PointcutLatch("first delivery"),
new PointcutLatch("second delivery")
};
private int myFailureCount = 0;
@BeforeEach
public void before() {
myReceivedPayloads = new ArrayList<>();
}
@Test
void testDeliverOneAtATime() {
// setup
AtomicInteger index = new AtomicInteger();
LinkedBlockingChannel producer = (LinkedBlockingChannel) buildChannels(() -> startProcessingMessage(index.getAndIncrement()));
// execute
prepareToHandleMessage(0);
producer.send(new TestMessage(TEST_PAYLOAD));
producer.send(new TestMessage(TEST_PAYLOAD));
producer.send(new TestMessage(TEST_PAYLOAD));
validateThreeMessagesDelivered(producer);
}
private void validateThreeMessagesDelivered(LinkedBlockingChannel producer) {
// The first send was dequeued but our handler won't deliver it until we unblock it
await().until(() -> producer.getQueueSizeForUnitTest() == 2);
// no messages received yet
assertThat(myReceivedPayloads, hasSize(0));
// Unblock the first latch so message handling is allowed to proceed
finishProcessingMessage(0);
// our queue size should decrement
await().until(() -> producer.getQueueSizeForUnitTest() == 1);
// and we should now have received 1 message
assertThat(myReceivedPayloads, hasSize(1));
assertEquals(TEST_PAYLOAD, myReceivedPayloads.get(0));
// Unblock the second latch so message handling is allowed to proceed
finishProcessingMessage(1);
// our queue size decrements again
await().until(() -> producer.getQueueSizeForUnitTest() == 0);
// and we should now have received 2 messages
assertThat(myReceivedPayloads, hasSize(2));
assertEquals(TEST_PAYLOAD, myReceivedPayloads.get(1));
}
@Test
void testDeliveryResumesAfterFailedMessages() {
// setup
LinkedBlockingChannel producer = (LinkedBlockingChannel) buildChannels(failTwiceThenProceed());
// execute
prepareToHandleMessage(0);
producer.send(new TestMessage(TEST_PAYLOAD)); // fail
producer.send(new TestMessage(TEST_PAYLOAD)); // fail
producer.send(new TestMessage(TEST_PAYLOAD)); // succeed
producer.send(new TestMessage(TEST_PAYLOAD)); // succeed
producer.send(new TestMessage(TEST_PAYLOAD)); // succeed
validateThreeMessagesDelivered(producer);
assertEquals(2, myFailureCount);
}
@NotNull
private Runnable failTwiceThenProceed() {
AtomicInteger counter = new AtomicInteger();
return () -> {
int value = counter.getAndIncrement();
if (value < 2) {
++myFailureCount;
// This exception will be thrown the first two times this method is run
throw new RuntimeException("Expected Exception " + value);
} else {
startProcessingMessage(value - 2);
}
};
}
private void prepareToHandleMessage(int theIndex) {
myHandlerCanProceedLatch[theIndex].setExpectedCount(1);
}
private void startProcessingMessage(int theIndex) {
try {
myHandlerCanProceedLatch[theIndex].awaitExpected();
} catch (InterruptedException e) {
ourLog.warn("interrupted", e);
}
}
private void finishProcessingMessage(int theIndex) {
if (theIndex + 1 < myHandlerCanProceedLatch.length) {
prepareToHandleMessage(theIndex + 1);
}
myHandlerCanProceedLatch[theIndex].call("");
}
private IChannelProducer buildChannels(Runnable theCallback) {
ChannelProducerSettings channelSettings = new ChannelProducerSettings();
channelSettings.setConcurrentConsumers(1);
IChannelProducer producer = myChannelFactory.getOrCreateProducer(TEST_CHANNEL_NAME, TestMessage.class, channelSettings);
IChannelReceiver reciever = myChannelFactory.getOrCreateReceiver(TEST_CHANNEL_NAME, TestMessage.class, new ChannelConsumerSettings());
reciever.subscribe(msg -> {
theCallback.run();
myReceivedPayloads.add((String) msg.getPayload());
});
return producer;
}
static class TestMessage implements Message<String> {
private final String payload;
TestMessage(String thePayload) {
payload = thePayload;
}
@Override
public String getPayload() {
return payload;
}
@Override
public MessageHeaders getHeaders() {
return null;
}
}
}