This commit is contained in:
leif stawnyczy 2024-03-05 14:36:28 -05:00
parent 370d2c16b1
commit 56f5697149
16 changed files with 287 additions and 51 deletions

View File

@ -47,6 +47,8 @@ stateDiagram-v2
title: Batch2 Job Work Chunk state transitions
---
stateDiagram-v2
[*]:
state READY
state QUEUED
state on_receive <<choice>>
state IN_PROGRESS
@ -55,10 +57,11 @@ stateDiagram-v2
state FAILED
state COMPLETED
direction LR
[*] --> QUEUED : on create
[*] --> READY : on create - normal or step
READY --> QUEUED : placed on kafka (maint.)
%% worker processing states
QUEUED --> on_receive : on deque by worker
QUEUED --> on_receive : on deque by worker
on_receive --> IN_PROGRESS : start execution
IN_PROGRESS --> execute: execute

View File

@ -120,7 +120,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
entity.setSerializedData(theBatchWorkChunk.serializedData);
entity.setCreateTime(new Date());
entity.setStartTime(new Date());
entity.setStatus(WorkChunkStatusEnum.QUEUED);
entity.setStatus(WorkChunkStatusEnum.READY);
ourLog.debug("Create work chunk {}/{}/{}", entity.getInstanceId(), entity.getId(), entity.getTargetStepId());
ourLog.trace(
"Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData());
@ -137,6 +137,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
List.of(WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.ERRORED, WorkChunkStatusEnum.IN_PROGRESS);
int rowsModified = myWorkChunkRepository.updateChunkStatusForStart(
theChunkId, new Date(), WorkChunkStatusEnum.IN_PROGRESS, priorStates);
if (rowsModified == 0) {
ourLog.info("Attempting to start chunk {} but it was already started.", theChunkId);
return Optional.empty();
@ -288,6 +289,12 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
.collect(Collectors.toList()));
}
@Override
public int enqueueWorkChunkForProcessing(String theChunkId) {
return myWorkChunkRepository.updateChunkStatus(theChunkId,
WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.READY);
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex) {
@ -458,6 +465,22 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
.map(this::toChunk);
}
@Override
public Stream<WorkChunk> fetchAllWorkChunksForJobInStates(String theInstanceId, Set<WorkChunkStatusEnum> theWorkChunkStatuses) {
return myWorkChunkRepository.fetchChunksForJobInStates(theInstanceId, theWorkChunkStatuses)
.map(this::toChunk);
}
@Override
public List<WorkChunk> getAllWorkChunksForJob(String theInstanceId) {
return myTransactionService.withSystemRequest()
.withPropagation(Propagation.REQUIRES_NEW)
.execute(() -> {
return myWorkChunkRepository.fetchChunks(Pageable.ofSize(100), theInstanceId)
.stream().map(this::toChunk).collect(Collectors.toList());
});
}
@Override
public boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier) {
Batch2JobInstanceEntity instanceEntity =

View File

@ -63,6 +63,12 @@ public interface IBatch2WorkChunkRepository
Stream<Batch2WorkChunkEntity> fetchChunksForStep(
@Param("instanceId") String theInstanceId, @Param("targetStepId") String theTargetStepId);
@Query(
"SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId AND e.myStatus IN :states"
)
Stream<Batch2WorkChunkEntity> fetchChunksForJobInStates(
@Param("instanceId") String theInstanceId, @Param("states") Collection<WorkChunkStatusEnum> theStates);
@Modifying
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, "
+ "e.myRecordsProcessed = :rp, e.myErrorCount = e.myErrorCount + :errorRetries, e.mySerializedData = null, "
@ -102,6 +108,16 @@ public interface IBatch2WorkChunkRepository
@Param("status") WorkChunkStatusEnum theInProgress,
@Param("startStatuses") Collection<WorkChunkStatusEnum> theStartStatuses);
@Modifying
@Query(
"UPDATE Batch2WorkChunkEntity e SET e.myStatus = :newStatus WHERE e.myId = :id AND e.myStatus = :oldStatus"
)
int updateChunkStatus(
@Param("id") String theChunkId,
@Param("newStatus") WorkChunkStatusEnum theNewStatus,
@Param("oldStatus") WorkChunkStatusEnum theOldStatus
);
@Modifying
@Query("DELETE FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId")
int deleteAllForInstance(@Param("instanceId") String theInstanceId);

View File

@ -27,6 +27,7 @@ import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
import ca.uhn.fhir.jpa.test.config.Batch2FastSchedulerConfig;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.util.JsonUtil;
@ -43,6 +44,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Sort;
import org.springframework.test.context.ContextConfiguration;
import java.util.ArrayList;
import java.util.Iterator;
@ -60,6 +62,9 @@ import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@ContextConfiguration(classes = {
Batch2FastSchedulerConfig.class
})
public class Batch2CoordinatorIT extends BaseJpaR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(Batch2CoordinatorIT.class);
@ -192,6 +197,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
myBatch2JobHelper.awaitWorkChunksQueued(startResponse.getInstanceId());
myFirstStepLatch.awaitExpected();
myBatch2JobHelper.awaitJobCompletion(startResponse.getInstanceId());
@ -216,8 +222,9 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myFirstStepLatch.setExpectedCount(1);
myLastStepLatch.setExpectedCount(1);
String batchJobId = myJobCoordinator.startInstance(new SystemRequestDetails(), request).getInstanceId();
myBatch2JobHelper.awaitWorkChunksQueued(batchJobId);
myFirstStepLatch.awaitExpected();
myBatch2JobHelper.awaitWorkChunksQueued(batchJobId);
myBatch2JobHelper.assertFastTracking(batchJobId);
// Since there was only one chunk, the job should proceed without requiring a maintenance pass
@ -290,6 +297,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
String instanceId = startResponse.getInstanceId();
myBatch2JobHelper.awaitWorkChunksQueued(instanceId);
myFirstStepLatch.awaitExpected();
assertNotNull(instanceId);
@ -379,8 +387,8 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
JobInstanceStartRequest request = buildRequest(jobId);
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
String instanceId = startResponse.getInstanceId();
myBatch2JobHelper.awaitWorkChunksQueued(instanceId);
myFirstStepLatch.awaitExpected();
assertNotNull(instanceId);
@ -439,6 +447,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
String instanceId = startResponse.getInstanceId();
myBatch2JobHelper.awaitWorkChunksQueued(instanceId);
myFirstStepLatch.awaitExpected();
myLastStepLatch.setExpectedCount(2);
@ -494,6 +503,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
String instanceId = startResponse.getInstanceId();
myBatch2JobHelper.awaitWorkChunksQueued(instanceId);
myFirstStepLatch.awaitExpected();
// validate

View File

@ -13,15 +13,19 @@ import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
import ca.uhn.fhir.jpa.test.config.Batch2FastSchedulerConfig;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.test.utilities.UnregisterScheduledProcessor;
import ca.uhn.test.concurrency.PointcutLatch;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nonnull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -31,8 +35,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import jakarta.annotation.Nonnull;
import jakarta.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
@ -53,7 +55,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@TestPropertySource(properties = {
UnregisterScheduledProcessor.SCHEDULING_DISABLED_EQUALS_FALSE
})
@ContextConfiguration(classes = {Batch2JobMaintenanceIT.SpringConfig.class})
@ContextConfiguration(classes = {Batch2FastSchedulerConfig.class})
public class Batch2JobMaintenanceIT extends BaseJpaR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(Batch2JobMaintenanceIT.class);
@ -122,7 +124,8 @@ public class Batch2JobMaintenanceIT extends BaseJpaR4Test {
myFirstStepLatch.setExpectedCount(1);
myLastStepLatch.setExpectedCount(1);
String batchJobId = myJobCoordinator.startInstance(request).getInstanceId();
String batchJobId = myJobCoordinator.startInstance(new SystemRequestDetails(), request).getInstanceId();
myBatch2JobHelper.awaitJobHasStatus(batchJobId, StatusEnum.QUEUED);
myFirstStepLatch.awaitExpected();
myBatch2JobHelper.assertFastTracking(batchJobId);
@ -251,14 +254,4 @@ public class Batch2JobMaintenanceIT extends BaseJpaR4Test {
myResult = theResult;
}
}
static class SpringConfig {
@Autowired
IJobMaintenanceService myJobMaintenanceService;
@PostConstruct
void fastScheduler() {
((JobMaintenanceServiceImpl)myJobMaintenanceService).setScheduledJobFrequencyMillis(200);
}
}
}

View File

@ -24,6 +24,8 @@ import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
@ -35,6 +37,7 @@ import org.thymeleaf.util.ArrayUtils;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -86,6 +89,26 @@ public class Batch2JobHelper {
return awaitJobawaitJobHasStatusWithoutMaintenancePass(theInstanceId, 10, theExpectedStatus);
}
public JobInstance awaitWorkChunksQueued(String theInstanceId) {
return awaitWorkChunksQueued(theInstanceId, 10);
}
public JobInstance awaitWorkChunksQueued(String theInstanceId, int theSecondsToWait) {
await()
.atMost(theSecondsToWait, TimeUnit.SECONDS)
.pollDelay(100, TimeUnit.MILLISECONDS)
.until(() -> {
runMaintenancePass();
// if none in READY, we can assume they're queued
List<WorkChunk> workChunks = myJobPersistence.getAllWorkChunksForJob(theInstanceId);
return workChunks.stream().noneMatch(c -> c.getStatus() == WorkChunkStatusEnum.READY);
});
runMaintenancePass();
return myJobCoordinator.getInstance(theInstanceId);
}
public JobInstance awaitJobHasStatus(String theInstanceId, int theSecondsToWait, StatusEnum... theExpectedStatus) {
assert !TransactionSynchronizationManager.isActualTransactionActive();
@ -175,7 +198,11 @@ public class Batch2JobHelper {
}
public void awaitGatedStepId(String theExpectedGatedStepId, String theInstanceId) {
await().until(() -> theExpectedGatedStepId.equals(myJobCoordinator.getInstance(theInstanceId).getCurrentGatedStepId()));
await().until(() -> {
String currentGatedStepId = myJobCoordinator.getInstance(theInstanceId).getCurrentGatedStepId();
System.out.println("YYYYY " + currentGatedStepId);
return theExpectedGatedStepId.equals(currentGatedStepId);
});
}
public long getCombinedRecordsProcessed(String theInstanceId) {

View File

@ -0,0 +1,18 @@
package ca.uhn.fhir.jpa.test.config;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
@Configuration
public class Batch2FastSchedulerConfig {
@Autowired
IJobMaintenanceService myJobMaintenanceService;
@PostConstruct
void fastScheduler() {
((JobMaintenanceServiceImpl)myJobMaintenanceService).setScheduledJobFrequencyMillis(200);
}
}

View File

@ -534,9 +534,18 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
// when
runInTransaction(()-> new JobInstanceProcessor(mySvc, null, instanceId1, new JobChunkProgressAccumulator(), null, jobDefinitionRegistry)
.process());
runInTransaction(()-> {
new JobInstanceProcessor(
mySvc,
null,
instanceId1,
new JobChunkProgressAccumulator(),
null,
jobDefinitionRegistry,
null, // entity manager
null // transaction manager
).process();
});
// then
JobInstance freshInstance1 = mySvc.fetchInstance(instanceId1).orElseThrow();

View File

@ -25,7 +25,9 @@ import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.slf4j.Logger;
@ -86,6 +88,8 @@ public interface IJobPersistence extends IWorkChunkPersistence {
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
List<JobInstance> fetchInstances(int thePageSize, int thePageIndex);
int enqueueWorkChunkForProcessing(String theChunkId);
/**
* Fetch instances ordered by myCreateTime DESC
*/
@ -131,6 +135,25 @@ public interface IJobPersistence extends IWorkChunkPersistence {
*/
Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId);
/**
* Fetch all WorkChunks for job with instance id theInstanceId that are in
* theWorkChunkStatuses.
* @param theInstanceId the instance id of the job
* @param theWorkChunkStatuses the statuses of interest
* @return a stream of work chunks
*/
@Transactional
Stream<WorkChunk> fetchAllWorkChunksForJobInStates(String theInstanceId, Set<WorkChunkStatusEnum> theWorkChunkStatuses);
/**
* This method is only useful for testing.
* @param theInstanceId
* @param theStatuses
* @return
*/
@VisibleForTesting
List<WorkChunk> getAllWorkChunksForJob(String theInstanceId);
/**
* Callback to update a JobInstance within a locked transaction.
* Return true from the callback if the record write should continue, or false if

View File

@ -38,6 +38,7 @@ import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import jakarta.persistence.EntityManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -103,15 +104,21 @@ public abstract class BaseBatch2Config {
JpaStorageSettings theStorageSettings,
BatchJobSender theBatchJobSender,
WorkChunkProcessor theExecutor,
IReductionStepExecutorService theReductionStepExecutorService) {
IReductionStepExecutorService theReductionStepExecutorService,
IHapiTransactionService theTransactionService,
EntityManager theEntityManager
) {
return new JobMaintenanceServiceImpl(
theSchedulerService,
myPersistence,
theStorageSettings,
theJobDefinitionRegistry,
theBatchJobSender,
theExecutor,
theReductionStepExecutorService);
theSchedulerService,
myPersistence,
theStorageSettings,
theJobDefinitionRegistry,
theBatchJobSender,
theExecutor,
theReductionStepExecutorService,
theTransactionService,
theEntityManager
);
}
@Bean

View File

@ -148,9 +148,10 @@ public class JobCoordinatorImpl implements IJobCoordinator {
.withPropagation(Propagation.REQUIRES_NEW)
.execute(() -> myJobPersistence.onCreateWithFirstChunk(jobDefinition, theStartRequest.getParameters()));
JobWorkNotification workNotification = JobWorkNotification.firstStepNotification(
jobDefinition, instanceAndFirstChunk.jobInstanceId, instanceAndFirstChunk.workChunkId);
sendBatchJobWorkNotificationAfterCommit(workNotification);
// JobWorkNotification workNotification = JobWorkNotification.firstStepNotification(
// jobDefinition, instanceAndFirstChunk.jobInstanceId, instanceAndFirstChunk.workChunkId);
// sendBatchJobWorkNotificationAfterCommit(workNotification);
Batch2JobStartResponse response = new Batch2JobStartResponse();
response.setInstanceId(instanceAndFirstChunk.jobInstanceId);

View File

@ -88,11 +88,11 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
myLastChunkId.set(chunkId);
if (!myGatedExecution) {
JobWorkNotification workNotification = new JobWorkNotification(
myJobDefinitionId, myJobDefinitionVersion, instanceId, targetStepId, chunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
}
// if (!myGatedExecution) {
// JobWorkNotification workNotification = new JobWorkNotification(
// myJobDefinitionId, myJobDefinitionVersion, instanceId, targetStepId, chunkId);
// myBatchJobSender.sendWorkChannelMessage(workNotification);
// }
}
@Override

View File

@ -28,16 +28,25 @@ import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch;
import jakarta.persistence.EntityManager;
import org.apache.commons.lang3.time.DateUtils;
import org.hibernate.Session;
import org.slf4j.Logger;
import org.springframework.transaction.annotation.Propagation;
import java.sql.Connection;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
public class JobInstanceProcessor {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@ -52,13 +61,19 @@ public class JobInstanceProcessor {
private final String myInstanceId;
private final JobDefinitionRegistry myJobDefinitionegistry;
private final IHapiTransactionService myTransactionService;
private final EntityManager myEntityManager;
public JobInstanceProcessor(
IJobPersistence theJobPersistence,
BatchJobSender theBatchJobSender,
String theInstanceId,
JobChunkProgressAccumulator theProgressAccumulator,
IReductionStepExecutorService theReductionStepExecutorService,
JobDefinitionRegistry theJobDefinitionRegistry) {
JobDefinitionRegistry theJobDefinitionRegistry,
EntityManager theEntityManager,
IHapiTransactionService theTransactionService
) {
myJobPersistence = theJobPersistence;
myBatchJobSender = theBatchJobSender;
myInstanceId = theInstanceId;
@ -68,6 +83,9 @@ public class JobInstanceProcessor {
myJobInstanceProgressCalculator =
new JobInstanceProgressCalculator(theJobPersistence, theProgressAccumulator, theJobDefinitionRegistry);
myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry);
myEntityManager = theEntityManager;
myTransactionService = theTransactionService;
}
public void process() {
@ -84,8 +102,14 @@ public class JobInstanceProcessor {
// reload after update
theInstance = myJobPersistence.fetchInstance(myInstanceId).orElseThrow();
}
String instanceId = theInstance.getInstanceId();
JobDefinition<? extends IModelJson> jobDefinition =
myJobDefinitionegistry.getJobDefinitionOrThrowException(theInstance);
enqueueReadyChunks(instanceId, jobDefinition);
cleanupInstance(theInstance);
triggerGatedExecutions(theInstance);
triggerGatedExecutions(theInstance, jobDefinition);
ourLog.debug("Finished job processing: {} - {}", myInstanceId, stopWatch);
}
@ -156,7 +180,7 @@ public class JobInstanceProcessor {
return false;
}
private void triggerGatedExecutions(JobInstance theInstance) {
private void triggerGatedExecutions(JobInstance theInstance, JobDefinition<?> theJobDefinition) {
if (!theInstance.isRunning()) {
ourLog.debug(
"JobInstance {} is not in a \"running\" state. Status {}",
@ -169,10 +193,8 @@ public class JobInstanceProcessor {
return;
}
JobDefinition<? extends IModelJson> jobDefinition =
myJobDefinitionegistry.getJobDefinitionOrThrowException(theInstance);
JobWorkCursor<?, ?, ?> jobWorkCursor =
JobWorkCursor.fromJobDefinitionAndRequestedStepId(jobDefinition, theInstance.getCurrentGatedStepId());
JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, theInstance.getCurrentGatedStepId());
// final step
if (jobWorkCursor.isFinalStep() && !jobWorkCursor.isReductionStep()) {
@ -193,7 +215,7 @@ public class JobInstanceProcessor {
if (jobWorkCursor.nextStep.isReductionStep()) {
JobWorkCursor<?, ?, ?> nextJobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(
jobDefinition, jobWorkCursor.nextStep.getStepId());
jobWorkCursor.getJobDefinition(), jobWorkCursor.nextStep.getStepId());
myReductionStepExecutorService.triggerReductionStep(instanceId, nextJobWorkCursor);
} else {
// otherwise, continue processing as expected
@ -204,10 +226,69 @@ public class JobInstanceProcessor {
"Not ready to advance gated execution of instance {} from step {} to {}.",
instanceId,
currentStepId,
jobWorkCursor.nextStep.getStepId());
jobWorkCursor.nextStep.getStepId());
}
}
/**
* Chunks are initially created in READY state.
* We will move READY chunks to QUEUE'd and sends them to the queue/topic (kafka)
*/
private void enqueueReadyChunks(String theJobInstanceId, JobDefinition<?> theJobDefinition) {
// we need a transaction to access the stream of workchunks
// because workchunks are created in READY state, there's an unknown
// number of them
getTxBuilder()
.withPropagation(Propagation.REQUIRES_NEW)
.execute(() -> {
Stream<WorkChunk> readyChunks = myJobPersistence.fetchAllWorkChunksForJobInStates(theJobInstanceId, Set.of(WorkChunkStatusEnum.READY));
// for each chunk id
// in transaction....
// -move to QUEUE'd
// -sent to topic
// commit
readyChunks.forEach(chunk -> {
/*
* For each chunk id
* * Move to QUEUE'd
* * Send to kafka topic
* * flush changes
* * commit
*/
getTxBuilder().execute(() -> {
updateChunk(chunk, theJobInstanceId, theJobDefinition);
// flush this transaction
myEntityManager.flush();
myEntityManager.unwrap(Session.class)
.doWork(Connection::commit);
});
});
});
}
private void updateChunk(WorkChunk theChunk, String theInstanceId, JobDefinition<?> theJobDefinition) {
String chunkId = theChunk.getId();
int updated = myJobPersistence.enqueueWorkChunkForProcessing(chunkId);
if (updated == 1) {
// send to the queue
// we use current step id because it has not been moved to the next step (yet)
JobWorkNotification workNotification = new JobWorkNotification(
theJobDefinition.getJobDefinitionId(), theJobDefinition.getJobDefinitionVersion(),
theInstanceId, theChunk.getTargetStepId(), chunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
} else {
// TODO - throw?
}
}
IHapiTransactionService.IExecutionBuilder getTxBuilder() {
return myTransactionService.withSystemRequest()
.withRequestPartitionId(RequestPartitionId.allPartitions());
}
private void processChunksForNextSteps(JobInstance theInstance, String nextStepId) {
String instanceId = theInstance.getInstanceId();
List<String> queuedChunksForNextStep =

View File

@ -27,7 +27,9 @@ import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
@ -35,6 +37,7 @@ import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.util.Logs;
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
import jakarta.persistence.EntityManager;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.time.DateUtils;
import org.quartz.JobExecutionContext;
@ -89,6 +92,8 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
private final JobDefinitionRegistry myJobDefinitionRegistry;
private final BatchJobSender myBatchJobSender;
private final WorkChunkProcessor myJobExecutorSvc;
private final IHapiTransactionService myTransactionService;
private final EntityManager myEntityManager;
private final Semaphore myRunMaintenanceSemaphore = new Semaphore(1);
@ -107,7 +112,10 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
@Nonnull JobDefinitionRegistry theJobDefinitionRegistry,
@Nonnull BatchJobSender theBatchJobSender,
@Nonnull WorkChunkProcessor theExecutor,
@Nonnull IReductionStepExecutorService theReductionStepExecutorService) {
@Nonnull IReductionStepExecutorService theReductionStepExecutorService,
IHapiTransactionService theTransactionService,
EntityManager theEntityManager
) {
myStorageSettings = theStorageSettings;
myReductionStepExecutorService = theReductionStepExecutorService;
Validate.notNull(theSchedulerService);
@ -120,6 +128,8 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
myJobDefinitionRegistry = theJobDefinitionRegistry;
myBatchJobSender = theBatchJobSender;
myJobExecutorSvc = theExecutor;
myTransactionService = theTransactionService;
myEntityManager = theEntityManager;
}
@Override
@ -158,6 +168,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
private boolean runMaintenanceDirectlyWithTimeout() {
if (getQueueLength() > 0) {
System.out.println("THREDS WAITING RETURN FALSE FOR FAST TRACKING");
ourLog.debug(
"There are already {} threads waiting to run a maintenance pass. Ignoring request.",
getQueueLength());
@ -232,7 +243,10 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
instanceId,
progressAccumulator,
myReductionStepExecutorService,
myJobDefinitionRegistry);
myJobDefinitionRegistry,
myEntityManager,
myTransactionService
);
ourLog.debug(
"Triggering maintenance process for instance {} in status {}",
instanceId,

View File

@ -32,6 +32,7 @@ import java.util.Set;
*/
public enum WorkChunkStatusEnum {
// wipmb For 6.8 Add WAITING for gated, and READY for in db, but not yet sent to channel.
READY,
QUEUED,
IN_PROGRESS,
ERRORED,
@ -58,6 +59,8 @@ public enum WorkChunkStatusEnum {
public Set<WorkChunkStatusEnum> getNextStates() {
switch (this) {
case READY:
return EnumSet.of(QUEUED);
case QUEUED:
return EnumSet.of(IN_PROGRESS);
case IN_PROGRESS:

View File

@ -17,14 +17,15 @@ import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.util.Logs;
import ca.uhn.test.util.LogbackCaptureTestExtension;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import com.google.common.collect.Lists;
import jakarta.persistence.EntityManager;
import org.hl7.fhir.r4.model.DateTimeType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -85,6 +86,10 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
private JobDefinitionRegistry myJobDefinitionRegistry;
@Mock
private IChannelProducer myWorkChannelProducer;
@Mock
private EntityManager myEntityManager;
@Mock
IHapiTransactionService myTransactionService;
@Captor
private ArgumentCaptor<Message<JobWorkNotification>> myMessageCaptor;
@Captor
@ -102,7 +107,10 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
myJobDefinitionRegistry,
batchJobSender,
myJobExecutorSvc,
myReductionStepExecutorService);
myReductionStepExecutorService,
myTransactionService,
myEntityManager
);
myStorageSettings.setJobFastTrackingEnabled(true);
}