updated batch 2 framework with READY state

This commit is contained in:
leif stawnyczy 2024-03-06 15:25:25 -05:00
parent 56f5697149
commit d7633a80a4
13 changed files with 286 additions and 150 deletions

View File

@ -0,0 +1,12 @@
---
type: add
issue: 5745
title: "Added another work chunk state to Batch2 jobs: READY.
This work chunk state will be the initial state work chunks
are created. After which, they will be picked up by a maintenance
pass and have their states updated to QUEUED before sending them to
the queue.
The exception is for ReductionStep chunks (because reduction steps
are not read off of the queue, but executed by the maintenance job
inline.
"

View File

@ -20,28 +20,43 @@ After a job has been defined, *instances* of that job can be submitted for batch
The Batch Job Coordinator will then store two records in the database:
- Job Instance with status QUEUED: that is the parent record for all data concerning this job
- Batch Work Chunk with status QUEUED: this describes the first "chunk" of work required for this job. The first Batch Work Chunk contains no data.
- Batch Work Chunk with status READY: this describes the first "chunk" of work required for this job. The first Batch Work Chunk contains no data.
Lastly the Batch Job Coordinator publishes a message to the Batch Notification Message Channel (named `batch2-work-notification`) to inform worker threads that this first chunk of work is now ready for processing.
### The Maintenance Job
### Job Processing - First Step
A Scheduled Job runs every so often (default once a minute), and does the following for each Job Instance in the database:
HAPI-FHIR Batch Jobs run based on job notification messages. The process is kicked off by the first chunk of work. When this notification message arrives, the message handler makes a single call to the first step defined in the job definition, passing in the job parameters as input.
1. Moves all `READY` work chunks into the `QUEUED` state and publishes a message to the Batch Notification Message Channel to inform worker threads that a work chunk is now ready for processing. \*
1. Calculates job progress (% of work chunks in `COMPLETE` status). If the job is finished, purges any left over work chunks still in the database.
1. Cleans up any complete, failed, or cancelled jobs that need to be removed.
1. Moves any gated jobs onto their next step.
1. If the final step of a gated job is a reduction step, a reduction step execution will be triggered.
The handler then does the following:
1. Change the work chunk status from QUEUED to IN_PROGRESS
2. Change the Job Instance status from QUEUED to IN_PROGRESS
3. If the Job Instance is cancelled, change the status to CANCELLED and abort processing.
4. The first step of the job definition is executed with the job parameters
5. This step creates new work chunks. For each work chunk it creates, it json serializes the work chunk data, stores it in the database, and publishes a new message to the Batch Notification Message Channel to notify worker threads that there are new work chunks waiting to be processed.
6. If the step succeeded, the work chunk status is changed from IN_PROGRESS to COMPLETED, and the data it contained is deleted.
7. If the step failed, the work chunk status is changed from IN_PROGRESS to either ERRORED or FAILED depending on the severity of the error.
\* An exception is for the final reduction step, where work chunks are not published to the Batch Notification Message Channel,
but instead processed inline.
### Job Processing - Middle steps
### Batch Notification Message Handler
Middle Steps in the job definition are executed in the same way, except instead of only using the Job Parameters as input, they use both the Job Parameters and the Work Chunk data produced from the previous step.
HAPI-FHIR Batch Jobs run based on job notification messages of the Batch Notification Message Channel (named `batch2-work-notification`).
### Job Processing - Final Step
When a notification message arrives, the handler does the following:
1. Change the work chunk status from `QUEUED` to `IN_PROGRESS`
1. Change the Job Instance status from `QUEUED` to `IN_PROGRESS`
1. If the Job Instance is cancelled, change the status to `CANCELLED` and abort processing
1. If the step creates new work chunks, each work chunk will be created in the `READY` state and will be handled in the next maintenance job pass.
1. If the step succeeds, the work chunk status is changed from `IN_PROGRESS` to `COMPLETED`, and the data it contained is deleted.
1. If the step fails, the work chunk status is changed from `IN_PROGRESS` to either `ERRORED` or `FAILED`, depending on the severity of the error.
### First Step
The first step in a job definition is executed with just the job parameters.
### Middle steps
Middle Steps in the job definition are executed using the initial Job Parameters and the Work Chunk data produced from the previous step.
### Final Step
The final step operates the same way as the middle steps, except it does not produce any new work chunks.

View File

@ -64,7 +64,9 @@ import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
@ -392,15 +394,12 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId) {
Optional<Batch2JobInstanceEntity> instance = myJobInstanceRepository.findById(theInstanceId);
if (instance.isEmpty()) {
return false;
}
if (instance.get().getStatus().isEnded()) {
if (getRunningJob(theInstanceId) == null) {
return false;
}
Set<WorkChunkStatusEnum> statusesForStep =
myWorkChunkRepository.getDistinctStatusesForStep(theInstanceId, theCurrentStepId);
getDistinctWorkChunkStatesForJobAndStep(theInstanceId, theCurrentStepId);
ourLog.debug(
"Checking whether gated job can advanced to next step. [instanceId={}, currentStepId={}, statusesForStep={}]",
@ -410,6 +409,25 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
return statusesForStep.isEmpty() || statusesForStep.equals(Set.of(WorkChunkStatusEnum.COMPLETED));
}
@Override
public Set<WorkChunkStatusEnum> getDistinctWorkChunkStatesForJobAndStep(String theInstanceId, String theCurrentStepId) {
if (getRunningJob(theInstanceId) == null) {
return Collections.unmodifiableSet(new HashSet<>());
}
return myWorkChunkRepository.getDistinctStatusesForStep(theInstanceId, theCurrentStepId);
}
private Batch2JobInstanceEntity getRunningJob(String theInstanceId) {
Optional<Batch2JobInstanceEntity> instance = myJobInstanceRepository.findById(theInstanceId);
if (instance.isEmpty()) {
return null;
}
if (instance.get().getStatus().isEnded()) {
return null;
}
return instance.get();
}
private void fetchChunks(
String theInstanceId,
boolean theIncludeData,
@ -471,16 +489,6 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
.map(this::toChunk);
}
@Override
public List<WorkChunk> getAllWorkChunksForJob(String theInstanceId) {
return myTransactionService.withSystemRequest()
.withPropagation(Propagation.REQUIRES_NEW)
.execute(() -> {
return myWorkChunkRepository.fetchChunks(Pageable.ofSize(100), theInstanceId)
.stream().map(this::toChunk).collect(Collectors.toList());
});
}
@Override
public boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier) {
Batch2JobInstanceEntity instanceEntity =

View File

@ -197,7 +197,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
myBatch2JobHelper.awaitWorkChunksQueued(startResponse.getInstanceId());
myBatch2JobHelper.runMaintenancePass();
myFirstStepLatch.awaitExpected();
myBatch2JobHelper.awaitJobCompletion(startResponse.getInstanceId());
@ -222,12 +222,10 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myFirstStepLatch.setExpectedCount(1);
myLastStepLatch.setExpectedCount(1);
String batchJobId = myJobCoordinator.startInstance(new SystemRequestDetails(), request).getInstanceId();
myBatch2JobHelper.awaitWorkChunksQueued(batchJobId);
myBatch2JobHelper.runMaintenancePass();
myFirstStepLatch.awaitExpected();
myBatch2JobHelper.awaitWorkChunksQueued(batchJobId);
myBatch2JobHelper.assertFastTracking(batchJobId);
// Since there was only one chunk, the job should proceed without requiring a maintenance pass
myBatch2JobHelper.awaitJobCompletion(batchJobId);
myLastStepLatch.awaitExpected();
@ -297,7 +295,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
String instanceId = startResponse.getInstanceId();
myBatch2JobHelper.awaitWorkChunksQueued(instanceId);
myBatch2JobHelper.runMaintenancePass();
myFirstStepLatch.awaitExpected();
assertNotNull(instanceId);
@ -388,7 +386,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
String instanceId = startResponse.getInstanceId();
myBatch2JobHelper.awaitWorkChunksQueued(instanceId);
myBatch2JobHelper.runMaintenancePass();
myFirstStepLatch.awaitExpected();
assertNotNull(instanceId);
@ -447,7 +445,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
String instanceId = startResponse.getInstanceId();
myBatch2JobHelper.awaitWorkChunksQueued(instanceId);
myBatch2JobHelper.runMaintenancePass();
myFirstStepLatch.awaitExpected();
myLastStepLatch.setExpectedCount(2);
@ -503,7 +501,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
String instanceId = startResponse.getInstanceId();
myBatch2JobHelper.awaitWorkChunksQueued(instanceId);
myBatch2JobHelper.runMaintenancePass();
myFirstStepLatch.awaitExpected();
// validate

View File

@ -24,8 +24,6 @@ import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
@ -37,7 +35,6 @@ import org.thymeleaf.util.ArrayUtils;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -89,26 +86,6 @@ public class Batch2JobHelper {
return awaitJobawaitJobHasStatusWithoutMaintenancePass(theInstanceId, 10, theExpectedStatus);
}
public JobInstance awaitWorkChunksQueued(String theInstanceId) {
return awaitWorkChunksQueued(theInstanceId, 10);
}
public JobInstance awaitWorkChunksQueued(String theInstanceId, int theSecondsToWait) {
await()
.atMost(theSecondsToWait, TimeUnit.SECONDS)
.pollDelay(100, TimeUnit.MILLISECONDS)
.until(() -> {
runMaintenancePass();
// if none in READY, we can assume they're queued
List<WorkChunk> workChunks = myJobPersistence.getAllWorkChunksForJob(theInstanceId);
return workChunks.stream().noneMatch(c -> c.getStatus() == WorkChunkStatusEnum.READY);
});
runMaintenancePass();
return myJobCoordinator.getInstance(theInstanceId);
}
public JobInstance awaitJobHasStatus(String theInstanceId, int theSecondsToWait, StatusEnum... theExpectedStatus) {
assert !TransactionSynchronizationManager.isActualTransactionActive();

View File

@ -6,6 +6,11 @@ import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
/**
* A fast scheduler to use for Batch2 job Integration Tests.
* This scheduler will run every 200ms (instead of the default 1min)
* so that our ITs can complete in a sane amount of time.
*/
@Configuration
public class Batch2FastSchedulerConfig {
@Autowired

View File

@ -119,6 +119,13 @@ public interface IJobPersistence extends IWorkChunkPersistence {
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId);
/**
* Returns set of all distinct states for the specified job instance id
* and step id.
*/
@Transactional
Set<WorkChunkStatusEnum> getDistinctWorkChunkStatesForJobAndStep(String theInstanceId, String theCurrentStepId);
/**
* Fetch all chunks for a given instance.
*
@ -145,15 +152,6 @@ public interface IJobPersistence extends IWorkChunkPersistence {
@Transactional
Stream<WorkChunk> fetchAllWorkChunksForJobInStates(String theInstanceId, Set<WorkChunkStatusEnum> theWorkChunkStatuses);
/**
* This method is only useful for testing.
* @param theInstanceId
* @param theStatuses
* @return
*/
@VisibleForTesting
List<WorkChunk> getAllWorkChunksForJob(String theInstanceId);
/**
* Callback to update a JobInstance within a locked transaction.
* Return true from the callback if the record write should continue, or false if

View File

@ -143,45 +143,18 @@ public class JobCoordinatorImpl implements IJobCoordinator {
myJobParameterJsonValidator.validateJobParameters(theRequestDetails, theStartRequest, jobDefinition);
// we only create the first chunk amd job here
// JobMaintenanceServiceImpl.doMaintenancePass will handle the rest
IJobPersistence.CreateResult instanceAndFirstChunk = myTransactionService
.withSystemRequestOnDefaultPartition()
.withPropagation(Propagation.REQUIRES_NEW)
.execute(() -> myJobPersistence.onCreateWithFirstChunk(jobDefinition, theStartRequest.getParameters()));
// JobWorkNotification workNotification = JobWorkNotification.firstStepNotification(
// jobDefinition, instanceAndFirstChunk.jobInstanceId, instanceAndFirstChunk.workChunkId);
// sendBatchJobWorkNotificationAfterCommit(workNotification);
Batch2JobStartResponse response = new Batch2JobStartResponse();
response.setInstanceId(instanceAndFirstChunk.jobInstanceId);
return response;
}
/**
* In order to make sure that the data is actually in the DB when JobWorkNotification is handled,
* this method registers a transaction synchronization that sends JobWorkNotification to Job WorkChannel
* if and when the current database transaction is successfully committed.
* If the transaction is rolled back, the JobWorkNotification will not be sent to the job WorkChannel.
*/
private void sendBatchJobWorkNotificationAfterCommit(final JobWorkNotification theJobWorkNotification) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public int getOrder() {
return 0;
}
@Override
public void afterCommit() {
myBatchJobSender.sendWorkChannelMessage(theJobWorkNotification);
}
});
} else {
myBatchJobSender.sendWorkChannelMessage(theJobWorkNotification);
}
}
/**
* Cache will be used if an identical job is QUEUED or IN_PROGRESS. Otherwise a new one will kickoff.
*/

View File

@ -24,7 +24,6 @@ import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkData;
import ca.uhn.fhir.i18n.Msg;
@ -50,7 +49,6 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
private final JobDefinitionStep<PT, OT, ?> myTargetStep;
private final AtomicInteger myChunkCounter = new AtomicInteger(0);
private final AtomicReference<String> myLastChunkId = new AtomicReference<>();
private final boolean myGatedExecution;
private final IHapiTransactionService myHapiTransactionService;
JobDataSink(
@ -66,7 +64,6 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
myJobDefinitionId = theDefinition.getJobDefinitionId();
myJobDefinitionVersion = theDefinition.getJobDefinitionVersion();
myTargetStep = theJobWorkCursor.nextStep;
myGatedExecution = theDefinition.isGatedExecution();
myHapiTransactionService = theHapiTransactionService;
}
@ -79,6 +76,9 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
OT dataValue = theData.getData();
String dataValueString = JsonUtil.serialize(dataValue, false);
// once finished, create workchunks in READY state
// the JobMaintenanceServiceImpl will transition these to
// QUEUED when necessary
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(
myJobDefinitionId, myJobDefinitionVersion, targetStepId, instanceId, sequence, dataValueString);
String chunkId = myHapiTransactionService
@ -87,12 +87,6 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
.execute(() -> myJobPersistence.onWorkChunkCreate(batchWorkChunk));
myLastChunkId.set(chunkId);
// if (!myGatedExecution) {
// JobWorkNotification workNotification = new JobWorkNotification(
// myJobDefinitionId, myJobDefinitionVersion, instanceId, targetStepId, chunkId);
// myBatchJobSender.sendWorkChannelMessage(workNotification);
// }
}
@Override

View File

@ -32,9 +32,11 @@ import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch;
import jakarta.persistence.EntityManager;
@ -103,11 +105,10 @@ public class JobInstanceProcessor {
theInstance = myJobPersistence.fetchInstance(myInstanceId).orElseThrow();
}
String instanceId = theInstance.getInstanceId();
JobDefinition<? extends IModelJson> jobDefinition =
myJobDefinitionegistry.getJobDefinitionOrThrowException(theInstance);
enqueueReadyChunks(instanceId, jobDefinition);
enqueueReadyChunks(theInstance, jobDefinition);
cleanupInstance(theInstance);
triggerGatedExecutions(theInstance, jobDefinition);
@ -204,8 +205,8 @@ public class JobInstanceProcessor {
String instanceId = theInstance.getInstanceId();
String currentStepId = jobWorkCursor.getCurrentStepId();
boolean shouldAdvance = myJobPersistence.canAdvanceInstanceToNextStep(instanceId, currentStepId);
if (shouldAdvance) {
boolean canAdvance = myJobPersistence.canAdvanceInstanceToNextStep(instanceId, currentStepId);
if (canAdvance) {
String nextStepId = jobWorkCursor.nextStep.getStepId();
ourLog.info(
"All processing is complete for gated execution of instance {} step {}. Proceeding to step {}",
@ -232,59 +233,89 @@ public class JobInstanceProcessor {
/**
* Chunks are initially created in READY state.
* We will move READY chunks to QUEUE'd and sends them to the queue/topic (kafka)
* We will move READY chunks to QUEUE'd and send them to the queue/topic (kafka)
* for processing.
*
* We could block chunks from being moved from QUEUE'd to READY here for gated steps
* but currently, progress is calculated by looking at completed chunks only;
* we'd need a new GATE_WAITING state to move chunks to to prevent jobs from
* completing prematurely.
*/
private void enqueueReadyChunks(String theJobInstanceId, JobDefinition<?> theJobDefinition) {
private void enqueueReadyChunks(JobInstance theJobInstance, JobDefinition<?> theJobDefinition) {
// we need a transaction to access the stream of workchunks
// because workchunks are created in READY state, there's an unknown
// number of them
// number of them (and so we could be reading many from the db)
getTxBuilder()
.withPropagation(Propagation.REQUIRES_NEW)
.execute(() -> {
Stream<WorkChunk> readyChunks = myJobPersistence.fetchAllWorkChunksForJobInStates(theJobInstanceId, Set.of(WorkChunkStatusEnum.READY));
Stream<WorkChunk> readyChunks = myJobPersistence.fetchAllWorkChunksForJobInStates(theJobInstance.getInstanceId(),
Set.of(WorkChunkStatusEnum.READY));
// for each chunk id
// in transaction....
// -move to QUEUE'd
// -sent to topic
// commit
readyChunks.forEach(chunk -> {
/*
* For each chunk id
* * Move to QUEUE'd
* * Send to kafka topic
* * Send to topic
* * flush changes
* * commit
*/
getTxBuilder().execute(() -> {
updateChunk(chunk, theJobInstanceId, theJobDefinition);
// flush this transaction
myEntityManager.flush();
myEntityManager.unwrap(Session.class)
.doWork(Connection::commit);
if (updateChunkAndSendToQueue(chunk, theJobInstance, theJobDefinition)) {
// flush this transaction
myEntityManager.flush();
myEntityManager.unwrap(Session.class)
.doWork(Connection::commit);
}
});
});
});
}
private void updateChunk(WorkChunk theChunk, String theInstanceId, JobDefinition<?> theJobDefinition) {
/**
* Updates the Work Chunk and sends it to the queue.
*
* Because ReductionSteps are done inline by the maintenance pass,
* those will not be sent to the queue (but they will still have their
* status updated from READY -> QUEUED).
*
* Returns true after processing.
*/
private boolean updateChunkAndSendToQueue(WorkChunk theChunk, JobInstance theInstance, JobDefinition<?> theJobDefinition) {
String chunkId = theChunk.getId();
int updated = myJobPersistence.enqueueWorkChunkForProcessing(chunkId);
if (updated == 1) {
JobWorkCursor<?, ?, ?> jobWorkCursor =
JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, theChunk.getTargetStepId());
if (theJobDefinition.isGatedExecution() && jobWorkCursor.isFinalStep() && jobWorkCursor.isReductionStep()) {
// reduction steps are processed by
// ReductionStepExecutorServiceImpl
// which does not wait for steps off the queue but reads all the
// "QUEUE'd" chunks and processes them inline
return true;
}
// send to the queue
// we use current step id because it has not been moved to the next step (yet)
JobWorkNotification workNotification = new JobWorkNotification(
theJobDefinition.getJobDefinitionId(), theJobDefinition.getJobDefinitionVersion(),
theInstanceId, theChunk.getTargetStepId(), chunkId);
theInstance.getInstanceId(), theChunk.getTargetStepId(), chunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
return true;
} else {
// TODO - throw?
// means the work chunk is likely already gone...
// we'll log and skip it. If it's still in the DB, the next pass
// will pick it up. Otherwise, it's no longer important
ourLog.error("Job Instance {} failed to transition work chunk with id {} from READY to QUEUED; skipping work chunk.",
theInstance.getInstanceId(), theChunk.getId());
// nothing changed, nothing to commit
return false;
}
}
IHapiTransactionService.IExecutionBuilder getTxBuilder() {
private IHapiTransactionService.IExecutionBuilder getTxBuilder() {
return myTransactionService.withSystemRequest()
.withRequestPartitionId(RequestPartitionId.allPartitions());
}

View File

@ -57,6 +57,7 @@ public class JobInstanceProgressCalculator {
StopWatch stopWatch = new StopWatch();
ourLog.trace("calculating progress: {}", theInstanceId);
// calculate progress based on number of work chunks in COMPLETE state
InstanceProgress instanceProgress = calculateInstanceProgress(theInstanceId);
myJobPersistence.updateInstance(theInstanceId, currentInstance -> {

View File

@ -26,6 +26,7 @@ import ca.uhn.fhir.jpa.dao.tx.NonTransactionalHapiTransactionService;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.AfterEach;
@ -73,7 +74,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
private JobDefinitionRegistry myJobDefinitionRegistry;
@Mock
private IJobMaintenanceService myJobMaintenanceService;
private IHapiTransactionService myTransactionService = new NonTransactionalHapiTransactionService();
private final IHapiTransactionService myTransactionService = new NonTransactionalHapiTransactionService();
@Captor
private ArgumentCaptor<StepExecutionDetails<TestJobParameters, VoidModel>> myStep1ExecutionDetailsCaptor;
@Captor
@ -144,7 +145,6 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
assertEquals(PASSWORD_VALUE, params.getPassword());
verify(myJobInstancePersister, times(1)).onWorkChunkCompletion(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0));
verify(myBatchJobSender, times(2)).sendWorkChannelMessage(any());
}
private void setupMocks(JobDefinition<TestJobParameters> theJobDefinition, WorkChunk theWorkChunk) {
@ -467,7 +467,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(JOB_DEFINITION_ID);
startRequest.setParameters(new TestJobParameters().setParam1(PARAM_1_VALUE).setParam2(PARAM_2_VALUE).setPassword(PASSWORD_VALUE));
mySvc.startInstance(startRequest);
mySvc.startInstance(new SystemRequestDetails(), startRequest);
// Verify
@ -476,12 +476,6 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
assertSame(jobDefinition, myJobDefinitionCaptor.getValue());
assertEquals(startRequest.getParameters(), myParametersJsonCaptor.getValue());
verify(myBatchJobSender, times(1)).sendWorkChannelMessage(myJobWorkNotificationCaptor.capture());
assertEquals(CHUNK_ID, myJobWorkNotificationCaptor.getAllValues().get(0).getChunkId());
assertEquals(JOB_DEFINITION_ID, myJobWorkNotificationCaptor.getAllValues().get(0).getJobDefinitionId());
assertEquals(1, myJobWorkNotificationCaptor.getAllValues().get(0).getJobDefinitionVersion());
assertEquals(STEP_1, myJobWorkNotificationCaptor.getAllValues().get(0).getTargetStepId());
verifyNoMoreInteractions(myJobInstancePersister);
verifyNoMoreInteractions(myStep1Worker);
verifyNoMoreInteractions(myStep2Worker);

View File

@ -17,6 +17,7 @@ import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
@ -26,6 +27,7 @@ import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import com.google.common.collect.Lists;
import jakarta.persistence.EntityManager;
import org.hibernate.Session;
import org.hl7.fhir.r4.model.DateTimeType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -38,12 +40,14 @@ import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.transaction.support.TransactionCallback;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
@ -51,6 +55,8 @@ import java.util.concurrent.Future;
import java.util.stream.Collectors;
import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunkStep1;
import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunkStep2;
import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunkStep3;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -60,7 +66,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -70,6 +79,21 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
private static class TestHapiTransactionservice extends HapiTransactionService {
@Override
protected <T> T doExecute(ExecutionBuilder theExecutionBuilder, TransactionCallback<T> theCallback) {
return overrideExecute(theCallback);
}
/**
* Override method for testing purposes (if needed)
*/
public <T> T overrideExecute(TransactionCallback<T> theCallback) {
return null;
}
}
@RegisterExtension
LogbackCaptureTestExtension myLogCapture = new LogbackCaptureTestExtension((Logger) LoggerFactory.getLogger("ca.uhn.fhir.log.batch_troubleshooting"), Level.WARN);
@Mock
@ -88,8 +112,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
private IChannelProducer myWorkChannelProducer;
@Mock
private EntityManager myEntityManager;
@Mock
IHapiTransactionService myTransactionService;
@Spy
private IHapiTransactionService myTransactionService = new TestHapiTransactionservice();
@Captor
private ArgumentCaptor<Message<JobWorkNotification>> myMessageCaptor;
@Captor
@ -161,7 +185,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:02-04:00")),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:03-04:00")),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25),
JobCoordinatorImplTest.createWorkChunkStep3().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
createWorkChunkStep3().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
JobInstance instance = createInstance();
@ -202,7 +226,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:02-04:00")).setErrorCount(2),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:03-04:00")).setErrorCount(2),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25),
JobCoordinatorImplTest.createWorkChunkStep3().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
createWorkChunkStep3().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
JobInstance instance = createInstance();
@ -289,7 +313,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:01-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:02-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:03-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25),JobCoordinatorImplTest.createWorkChunkStep3().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25), createWorkChunkStep3().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition(t -> t.completionHandler(myCompletionHandler)));
@ -330,7 +354,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.FAILED).setStartTime(parseTime("2022-02-12T14:00:02-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25).setErrorMessage("This is an error message"),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:03-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25),
JobCoordinatorImplTest.createWorkChunkStep3().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
createWorkChunkStep3().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
@ -343,7 +367,6 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
mySvc.runMaintenancePass();
assertEquals(0.8333333333333334, instance.getProgress());
assertEquals(StatusEnum.FAILED, instance.getStatus());
assertEquals("This is an error message", instance.getErrorMessage());
@ -358,6 +381,114 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
verifyNoMoreInteractions(myJobPersistence);
}
private void runEnqueueReadyChunksTest(List<WorkChunk> theChunks, JobDefinition<TestJobParameters> theJobDefinition) {
myJobDefinitionRegistry.addJobDefinition(theJobDefinition);
JobInstance instance = createInstance();
instance.setJobDefinitionId(theJobDefinition.getJobDefinitionId());
Session sessionContract = mock(Session.class);
// mocks
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(instance));
when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenAnswer(t -> theChunks.stream());
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean()))
.thenAnswer(t -> theChunks.stream().map(c -> c.setStatus(WorkChunkStatusEnum.QUEUED)).toList().iterator());
// lenient, because we are using this test setup in 3 different tests;
// one of which will never call this
lenient().when(myEntityManager.unwrap(eq(Session.class)))
.thenReturn(sessionContract);
// we just need it to fire, so we'll fire it manually
when(((TestHapiTransactionservice)myTransactionService).overrideExecute(any()))
.thenAnswer(args -> {
TransactionCallback<?> callback = args.getArgument(0);
callback.doInTransaction(null);
return null;
});
// test
mySvc.runMaintenancePass();
}
@Test
public void testMaintenancePass_withREADYworkChunksForReductionSteps_movedToQueueButNotPublished() {
// setup
List<WorkChunk> chunks = List.of(
createWorkChunkStep3().setStatus(WorkChunkStatusEnum.READY),
createWorkChunkStep3().setStatus(WorkChunkStatusEnum.READY)
);
// when
when(myJobPersistence.enqueueWorkChunkForProcessing(anyString()))
.thenReturn(1);
// test
runEnqueueReadyChunksTest(chunks, createJobDefinitionWithReduction());
// verify
// saved, but not sent to the queue
verify(myEntityManager, times(2)).flush();
verify(myJobPersistence, times(2)).enqueueWorkChunkForProcessing(anyString());
verify(myWorkChannelProducer, never()).send(any());
}
@Test
public void testMaintenancePass_withREADYworkChunksForNonReductionStep_movedToQUEUEDandPublished() {
// setup
List<WorkChunk> chunks = List.of(
createWorkChunkStep2().setStatus(WorkChunkStatusEnum.READY),
createWorkChunkStep2().setStatus(WorkChunkStatusEnum.READY)
);
// when
when(myJobPersistence.enqueueWorkChunkForProcessing(anyString()))
.thenReturn(1);
// test
runEnqueueReadyChunksTest(chunks, createJobDefinition());
// verify
verify(myEntityManager, times(2)).flush();
verify(myJobPersistence, times(2)).enqueueWorkChunkForProcessing(anyString());
verify(myWorkChannelProducer, times(2)).send(myMessageCaptor.capture());
List<Message<JobWorkNotification>> sentMessages = myMessageCaptor.getAllValues();
for (Message<JobWorkNotification> msg : sentMessages) {
JobWorkNotification payload = msg.getPayload();
assertEquals(STEP_2, payload.getTargetStepId());
assertEquals(CHUNK_ID, payload.getChunkId());
}
}
@Test
public void testMaintenancePass_whenUpdateFails_skipsWorkChunkAndLogs() {
// setup
List<WorkChunk> chunks = List.of(
createWorkChunkStep3().setStatus(WorkChunkStatusEnum.READY),
createWorkChunkStep3().setStatus(WorkChunkStatusEnum.READY)
);
myLogCapture.setUp(Level.ERROR);
// when
when(myJobPersistence.enqueueWorkChunkForProcessing(anyString()))
.thenReturn(0); // fails to update/not found
// test
runEnqueueReadyChunksTest(chunks, createJobDefinitionWithReduction());
// verify
verify(myJobPersistence, times(2)).enqueueWorkChunkForProcessing(anyString());
verify(myEntityManager, never()).flush();
verify(myWorkChannelProducer, never()).send(any());
List<ILoggingEvent> events = myLogCapture.getLogEvents();
assertEquals(2, events.size());
for (ILoggingEvent evt : events) {
assertTrue(evt.getMessage().contains("skipping work chunk"));
}
}
@Test
void triggerMaintenancePass_noneInProgress_runsMaintenance() {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Collections.emptyList());
@ -416,7 +547,6 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
assertTrue(result2.get());
}
private static Date parseTime(String theDate) {
return new DateTimeType(theDate).getValue();
}