Make WorkChunk handling transactional at state transitions. (#4621)

- use a separate enum for the states - chunks have different transitions than instances.
- use transactional update events for work-chunk state transitions
- introduce spec-test to define behaviour of batch2 storage
- replace synchronized facade with simpler ProxyUtil handler.
- change job cancellation to db update query
This commit is contained in:
michaelabuckley 2023-03-12 19:19:21 -04:00 committed by Tadgh
parent e3a1c4eb1c
commit 428fb14f0b
38 changed files with 1594 additions and 792 deletions

View File

@ -0,0 +1,47 @@
package ca.uhn.fhir.util;
import org.apache.commons.lang3.Validate;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
public class ProxyUtil {
private ProxyUtil() {}
/**
* Wrap theInstance in a Proxy that synchronizes every method.
*
* @param theClass the target interface
* @param theInstance the instance to wrap
* @return a Proxy implementing theClass interface that syncronizes every call on theInstance
* @param <T> the interface type
*/
public static <T> T synchronizedProxy(Class<T> theClass, T theInstance) {
Validate.isTrue(theClass.isInterface(), "%s is not an interface", theClass);
InvocationHandler handler = new SynchronizedHandler(theInstance);
Object object = Proxy.newProxyInstance(theClass.getClassLoader(), new Class<?>[] { theClass }, handler);
return theClass.cast(object);
}
/**
* Simple handler that first synchronizes on the delegate
*/
static class SynchronizedHandler implements InvocationHandler {
private final Object theDelegate;
SynchronizedHandler(Object theDelegate) {
this.theDelegate = theDelegate;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
synchronized (theDelegate) {
return method.invoke(theDelegate, args);
}
}
}
}

View File

@ -0,0 +1,4 @@
---
type: change
issue: 4621
title: "Batch2 work-chunk processing now aligns transaction boundaries with event transitions."

View File

@ -0,0 +1,68 @@
```mermaid
---
title: Batch2 Job Instance state transitions
---
stateDiagram-v2
[*] --> QUEUED : on db create and queued on kakfa
QUEUED --> IN_PROGRESS : on any work-chunk received by worker
%% and (see ca.uhn.fhir.batch2.progress.InstanceProgress.getNewStatus())
state first_step_finished <<choice>>
IN_PROGRESS --> first_step_finished : When 1st step finishes
first_step_finished --> COMPLETED: if no chunks produced
first_step_finished --> IN_PROGRESS: chunks produced
IN_PROGRESS --> in_progress_poll : on poll \n(count acomplete/failed/errored chunks)
in_progress_poll --> COMPLETED : 0 failures, errored, or incomplete\n AND at least 1 chunk complete
in_progress_poll --> ERRORED : no failed but errored chunks
in_progress_poll --> FINALIZE : none failed, gated execution\n last step\n queue REDUCER chunk
in_progress_poll --> IN_PROGRESS : still work to do
%% ERRORED is just like IN_PROGRESS, but it is a one-way trip from IN_PROGRESS to ERRORED.
%% FIXME We could probably delete/merge this state with IS_PROCESS, and use the error count in the UI.
note left of ERRORED
Parallel to IS_PROCESS
end note
state in_progress_poll <<choice>>
state error_progress_poll <<choice>>
ERRORED --> error_progress_poll : on poll \n(count acomplete/failed/errored chunks)
error_progress_poll --> FAILED : any failed chunks
error_progress_poll --> ERRORED : no failed but errored chunks
error_progress_poll --> FINALIZE : none failed, gated execution\n last step\n queue REDUCER chunk
error_progress_poll --> COMPLETED : 0 failures, errored, or incomplete AND at least 1 chunk complete
state do_report <<choice>>
FINALIZE --> do_reduction: poll util worker marks REDUCER chunk yes or no.
do_reduction --> COMPLETED : success
do_reduction --> FAILED : fail
in_progress_poll --> FAILED : any failed chunks
```
```mermaid
---
title: Batch2 Job Work Chunk state transitions
---
stateDiagram-v2
state QUEUED
state on_receive <<choice>>
state IN_PROGRESS
state ERROR
state execute <<choice>>
state FAILED
state COMPLETED
direction LR
[*] --> QUEUED : on store
%% worker processing states
QUEUED --> on_receive : on receive by worker
on_receive --> IN_PROGRESS : start execution
IN_PROGRESS --> execute: execute
execute --> ERROR : on re-triable error
execute --> COMPLETED : success\n maybe trigger instance first_step_finished
execute --> FAILED : on unrecoverable \n or too many errors
%% temporary error state until retry
ERROR --> on_receive : exception rollback\n triggers redelivery
%% terminal states
COMPLETED --> [*]
FAILED --> [*]
```

View File

@ -22,17 +22,19 @@ package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.config.BaseBatch2Config;
import ca.uhn.fhir.batch2.coordinator.SynchronizedJobPersistenceWrapper;
import ca.uhn.fhir.jpa.bulk.export.job.BulkExportJobConfig;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.system.HapiSystemProperties;
import ca.uhn.fhir.util.ProxyUtil;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.transaction.PlatformTransactionManager;
import javax.persistence.EntityManager;
@Configuration
@Import({
BulkExportJobConfig.class
@ -40,18 +42,18 @@ import org.springframework.transaction.PlatformTransactionManager;
public class JpaBatch2Config extends BaseBatch2Config {
@Bean
public IJobPersistence batch2JobInstancePersister(IBatch2JobInstanceRepository theJobInstanceRepository, IBatch2WorkChunkRepository theWorkChunkRepository, PlatformTransactionManager theTransactionManager) {
return new JpaJobPersistenceImpl(theJobInstanceRepository, theWorkChunkRepository, theTransactionManager);
public IJobPersistence batch2JobInstancePersister(IBatch2JobInstanceRepository theJobInstanceRepository, IBatch2WorkChunkRepository theWorkChunkRepository, PlatformTransactionManager theTransactionManager, EntityManager theEntityManager) {
return new JpaJobPersistenceImpl(theJobInstanceRepository, theWorkChunkRepository, theTransactionManager, theEntityManager);
}
@Primary
@Bean
public IJobPersistence batch2JobInstancePersisterWrapper(IBatch2JobInstanceRepository theJobInstanceRepository, IBatch2WorkChunkRepository theWorkChunkRepository, PlatformTransactionManager theTransactionManager) {
IJobPersistence retVal = batch2JobInstancePersister(theJobInstanceRepository, theWorkChunkRepository, theTransactionManager);
public IJobPersistence batch2JobInstancePersisterWrapper(IBatch2JobInstanceRepository theJobInstanceRepository, IBatch2WorkChunkRepository theWorkChunkRepository, PlatformTransactionManager theTransactionManager, EntityManager theEntityManager) {
IJobPersistence retVal = batch2JobInstancePersister(theJobInstanceRepository, theWorkChunkRepository, theTransactionManager, theEntityManager);
// Avoid H2 synchronization issues caused by
// https://github.com/h2database/h2database/issues/1808
if (HapiSystemProperties.isUnitTestModeEnabled()) {
retVal = new SynchronizedJobPersistenceWrapper(retVal);
retVal = ProxyUtil.synchronizedProxy(IJobPersistence.class, retVal);
}
return retVal;
}

View File

@ -25,9 +25,11 @@ import ca.uhn.fhir.batch2.api.JobOperationResultJson;
import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk;
import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.MarkWorkChunkAsErrorRequest;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
@ -51,6 +53,9 @@ import org.springframework.transaction.support.TransactionSynchronizationManager
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
@ -67,19 +72,22 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
public class JpaJobPersistenceImpl implements IJobPersistence {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
public static final String CREATE_TIME = "myCreateTime";
private final IBatch2JobInstanceRepository myJobInstanceRepository;
private final IBatch2WorkChunkRepository myWorkChunkRepository;
private final TransactionTemplate myTxTemplate;
private final EntityManager myEntityManager;
/**
* Constructor
*/
public JpaJobPersistenceImpl(IBatch2JobInstanceRepository theJobInstanceRepository, IBatch2WorkChunkRepository theWorkChunkRepository, PlatformTransactionManager theTransactionManager) {
public JpaJobPersistenceImpl(IBatch2JobInstanceRepository theJobInstanceRepository, IBatch2WorkChunkRepository theWorkChunkRepository, PlatformTransactionManager theTransactionManager, EntityManager theEntityManager) {
Validate.notNull(theJobInstanceRepository);
Validate.notNull(theWorkChunkRepository);
myJobInstanceRepository = theJobInstanceRepository;
myWorkChunkRepository = theWorkChunkRepository;
myEntityManager = theEntityManager;
// TODO: JA replace with HapiTransactionManager in megascale ticket
myTxTemplate = new TransactionTemplate(theTransactionManager);
@ -99,7 +107,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
entity.setSerializedData(theBatchWorkChunk.serializedData);
entity.setCreateTime(new Date());
entity.setStartTime(new Date());
entity.setStatus(StatusEnum.QUEUED);
entity.setStatus(WorkChunkStatusEnum.QUEUED);
myWorkChunkRepository.save(entity);
return entity.getId();
}
@ -107,7 +115,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
@Transactional(propagation = Propagation.REQUIRED)
public Optional<WorkChunk> fetchWorkChunkSetStartTimeAndMarkInProgress(String theChunkId) {
int rowsModified = myWorkChunkRepository.updateChunkStatusForStart(theChunkId, new Date(), StatusEnum.IN_PROGRESS, List.of(StatusEnum.QUEUED, StatusEnum.ERRORED, StatusEnum.IN_PROGRESS));
int rowsModified = myWorkChunkRepository.updateChunkStatusForStart(theChunkId, new Date(), WorkChunkStatusEnum.IN_PROGRESS, List.of(WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.ERRORED, WorkChunkStatusEnum.IN_PROGRESS));
if (rowsModified == 0) {
ourLog.info("Attempting to start chunk {} but it was already started.", theChunkId);
return Optional.empty();
@ -147,14 +155,14 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus(String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex) {
PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, "myCreateTime");
PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
return toInstanceList(myJobInstanceRepository.fetchInstancesByJobDefinitionIdAndStatus(theJobDefinitionId, theRequestedStatuses, pageRequest));
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public List<JobInstance> fetchInstancesByJobDefinitionId(String theJobDefinitionId, int thePageSize, int thePageIndex) {
PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, "myCreateTime");
PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
return toInstanceList(myJobInstanceRepository.findInstancesByJobDefinitionId(theJobDefinitionId, pageRequest));
}
@ -215,14 +223,14 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Transactional(propagation = Propagation.REQUIRES_NEW)
public List<JobInstance> fetchInstances(int thePageSize, int thePageIndex) {
// default sort is myCreateTime Asc
PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, "myCreateTime");
return myJobInstanceRepository.findAll(pageRequest).stream().map(t -> toInstance(t)).collect(Collectors.toList());
PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
return myJobInstanceRepository.findAll(pageRequest).stream().map(this::toInstance).collect(Collectors.toList());
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex) {
PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.DESC, "myCreateTime");
PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.DESC, CREATE_TIME);
return myJobInstanceRepository.findAll(pageRequest).stream().map(this::toInstance).collect(Collectors.toList());
}
@ -236,29 +244,44 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void markWorkChunkAsErroredAndIncrementErrorCount(String theChunkId, String theErrorMessage) {
String errorMessage = truncateErrorMessage(theErrorMessage);
myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(theChunkId, new Date(), errorMessage, StatusEnum.ERRORED);
public WorkChunkStatusEnum workChunkErrorEvent(WorkChunkErrorEvent theParameters) {
String chunkId = theParameters.getChunkId();
String errorMessage = truncateErrorMessage(theParameters.getErrorMsg());
int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED);
Validate.isTrue(changeCount>0, "changed chunk matching %s", chunkId);
Query query = myEntityManager.createQuery(
"update Batch2WorkChunkEntity " +
"set myStatus = :failed " +
",myErrorMessage = CONCAT('Too many errors: ', myErrorCount, '. Last error msg was ', myErrorMessage) " +
"where myId = :chunkId and myErrorCount > :maxCount");
query.setParameter("chunkId", chunkId);
query.setParameter("failed", WorkChunkStatusEnum.FAILED);
query.setParameter("maxCount", theParameters.getMaxRetries());
int failChangeCount = query.executeUpdate();
if (failChangeCount > 0) {
return WorkChunkStatusEnum.FAILED;
} else {
return WorkChunkStatusEnum.ERRORED;
}
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public Optional<WorkChunk> markWorkChunkAsErroredAndIncrementErrorCount(MarkWorkChunkAsErrorRequest theParameters) {
markWorkChunkAsErroredAndIncrementErrorCount(theParameters.getChunkId(), theParameters.getErrorMsg());
Optional<Batch2WorkChunkEntity> op = myWorkChunkRepository.findById(theParameters.getChunkId());
return op.map(c -> toChunk(c, theParameters.isIncludeData()));
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Transactional
public void markWorkChunkAsFailed(String theChunkId, String theErrorMessage) {
ourLog.info("Marking chunk {} as failed with message: {}", theChunkId, theErrorMessage);
String errorMessage = truncateErrorMessage(theErrorMessage);
myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(theChunkId, new Date(), errorMessage, StatusEnum.FAILED);
myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(theChunkId, new Date(), errorMessage, WorkChunkStatusEnum.FAILED);
}
@Nonnull
@Override
@Transactional
public void workChunkCompletionEvent(WorkChunkCompletionEvent theEvent) {
myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(theEvent.getChunkId(), new Date(), theEvent.getRecordsProcessed(), theEvent.getRecoveredErrorCount(), WorkChunkStatusEnum.COMPLETED);
}
@Nullable
private static String truncateErrorMessage(String theErrorMessage) {
String errorMessage;
if (theErrorMessage != null && theErrorMessage.length() > ERROR_MSG_MAX_LENGTH) {
@ -271,15 +294,8 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void markWorkChunkAsCompletedAndClearData(String theInstanceId, String theChunkId, int theRecordsProcessed) {
StatusEnum newStatus = StatusEnum.COMPLETED;
ourLog.debug("Marking chunk {} for instance {} to status {}", theChunkId, theInstanceId, newStatus);
myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(theChunkId, new Date(), theRecordsProcessed, newStatus);
}
@Override
public void markWorkChunksWithStatusAndWipeData(String theInstanceId, List<String> theChunkIds, StatusEnum theStatus, String theErrorMessage) {
@Transactional
public void markWorkChunksWithStatusAndWipeData(String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMessage) {
assert TransactionSynchronizationManager.isActualTransactionActive();
ourLog.debug("Marking all chunks for instance {} to status {}", theInstanceId, theStatus);
@ -290,12 +306,6 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
}
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void incrementWorkChunkErrorCount(String theChunkId, int theIncrementBy) {
myWorkChunkRepository.incrementWorkChunkErrorCount(theChunkId, theIncrementBy);
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId) {
@ -306,9 +316,9 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
if (instance.get().getStatus().isEnded()) {
return false;
}
List<StatusEnum> statusesForStep = myWorkChunkRepository.getDistinctStatusesForStep(theInstanceId, theCurrentStepId);
List<WorkChunkStatusEnum> statusesForStep = myWorkChunkRepository.getDistinctStatusesForStep(theInstanceId, theCurrentStepId);
ourLog.debug("Checking whether gated job can advanced to next step. [instanceId={}, currentStepId={}, statusesForStep={}]", theInstanceId, theCurrentStepId, statusesForStep);
return statusesForStep.stream().noneMatch(StatusEnum::isIncomplete) && statusesForStep.stream().anyMatch(status -> status == StatusEnum.COMPLETED);
return statusesForStep.stream().noneMatch(WorkChunkStatusEnum::isIncomplete) && statusesForStep.stream().anyMatch(status -> status == WorkChunkStatusEnum.COMPLETED);
}
/**
@ -343,7 +353,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
}
@Override
public List<String> fetchallchunkidsforstepWithStatus(String theInstanceId, String theStepId, StatusEnum theStatusEnum) {
public List<String> fetchAllChunkIdsForStepWithStatus(String theInstanceId, String theStepId, WorkChunkStatusEnum theStatusEnum) {
return myTxTemplate.execute(tx -> myWorkChunkRepository.fetchAllChunkIdsForStepWithStatus(theInstanceId, theStepId, theStatusEnum));
}
@ -352,15 +362,6 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
myJobInstanceRepository.updateInstanceUpdateTime(theInstanceId, new Date());
}
private void fetchChunksForStep(String theInstanceId, String theStepId, int thePageSize, int thePageIndex, Consumer<WorkChunk> theConsumer) {
myTxTemplate.executeWithoutResult(tx -> {
List<Batch2WorkChunkEntity> chunks = myWorkChunkRepository.fetchChunksForStep(PageRequest.of(thePageIndex, thePageSize), theInstanceId, theStepId);
for (Batch2WorkChunkEntity chunk : chunks) {
theConsumer.accept(toChunk(chunk, true));
}
});
}
/**
* Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope
@ -371,20 +372,10 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer));
}
/**
* Deprecated, use {@link ca.uhn.fhir.jpa.batch2.JpaJobPersistenceImpl#fetchAllWorkChunksForStepStream(String, String)}
* Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope
*/
@Override
@Deprecated
public Iterator<WorkChunk> fetchAllWorkChunksForStepIterator(String theInstanceId, String theStepId) {
return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> fetchChunksForStep(theInstanceId, theStepId, theBatchSize, thePageIndex, theConsumer));
}
@Override
@Transactional(propagation = Propagation.MANDATORY)
public Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) {
return myWorkChunkRepository.fetchChunksForStep(theInstanceId, theStepId).map((entity) -> toChunk(entity, true));
return myWorkChunkRepository.fetchChunksForStep(theInstanceId, theStepId).map(entity -> toChunk(entity, true));
}
/**
@ -469,4 +460,17 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
}
}
}
@Override
public void processCancelRequests() {
Query query = myEntityManager.createQuery(
"UPDATE Batch2JobInstanceEntity b " +
"set myStatus = ca.uhn.fhir.batch2.model.StatusEnum.CANCELLED " +
"where myCancelled = true " +
"AND myStatus IN (:states)");
query.setParameter("states", StatusEnum.CANCELLED.getPriorStates());
query.executeUpdate();
}
}

View File

@ -20,7 +20,7 @@ package ca.uhn.fhir.jpa.dao.data;
* #L%
*/
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
@ -28,6 +28,7 @@ import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.stream.Stream;
@ -49,7 +50,7 @@ public interface IBatch2WorkChunkRepository extends JpaRepository<Batch2WorkChun
List<Batch2WorkChunkEntity> fetchChunksNoData(Pageable thePageRequest, @Param("instanceId") String theInstanceId);
@Query("SELECT DISTINCT e.myStatus from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId")
List<StatusEnum> getDistinctStatusesForStep(@Param("instanceId") String theInstanceId, @Param("stepId") String theStepId);
List<WorkChunkStatusEnum> getDistinctStatusesForStep(@Param("instanceId") String theInstanceId, @Param("stepId") String theStepId);
/**
* Deprecated, use {@link ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository#fetchChunksForStep(String, String)}
@ -62,32 +63,28 @@ public interface IBatch2WorkChunkRepository extends JpaRepository<Batch2WorkChun
Stream<Batch2WorkChunkEntity> fetchChunksForStep(@Param("instanceId") String theInstanceId, @Param("targetStepId") String theTargetStepId);
@Modifying
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, e.myRecordsProcessed = :rp, e.mySerializedData = null WHERE e.myId = :id")
void updateChunkStatusAndClearDataForEndSuccess(@Param("id") String theChunkId, @Param("et") Date theEndTime, @Param("rp") int theRecordsProcessed, @Param("status") StatusEnum theInProgress);
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, " +
"e.myRecordsProcessed = :rp, e.myErrorCount = e.myErrorCount + :errorRetries, e.mySerializedData = null " +
"WHERE e.myId = :id")
void updateChunkStatusAndClearDataForEndSuccess(@Param("id") String theChunkId, @Param("et") Date theEndTime,
@Param("rp") int theRecordsProcessed, @Param("errorRetries") int theErrorRetries, @Param("status") WorkChunkStatusEnum theInProgress);
@Modifying
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, e.mySerializedData = null, e.myErrorMessage = :em WHERE e.myId IN(:ids)")
void updateAllChunksForInstanceStatusClearDataAndSetError(@Param("ids") List<String> theChunkIds, @Param("et") Date theEndTime, @Param("status") StatusEnum theInProgress, @Param("em") String theError);
void updateAllChunksForInstanceStatusClearDataAndSetError(@Param("ids") List<String> theChunkIds, @Param("et") Date theEndTime, @Param("status") WorkChunkStatusEnum theInProgress, @Param("em") String theError);
@Modifying
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, e.myErrorMessage = :em, e.myErrorCount = e.myErrorCount + 1 WHERE e.myId = :id")
void updateChunkStatusAndIncrementErrorCountForEndError(@Param("id") String theChunkId, @Param("et") Date theEndTime, @Param("em") String theErrorMessage, @Param("status") StatusEnum theInProgress);
int updateChunkStatusAndIncrementErrorCountForEndError(@Param("id") String theChunkId, @Param("et") Date theEndTime, @Param("em") String theErrorMessage, @Param("status") WorkChunkStatusEnum theInProgress);
@Modifying
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myStartTime = :st WHERE e.myId = :id AND e.myStatus IN :startStatuses")
int updateChunkStatusForStart(@Param("id") String theChunkId, @Param("st") Date theStartedTime, @Param("status") StatusEnum theInProgress, @Param("startStatuses") List<StatusEnum> theStartStatuses);
int updateChunkStatusForStart(@Param("id") String theChunkId, @Param("st") Date theStartedTime, @Param("status") WorkChunkStatusEnum theInProgress, @Param("startStatuses") Collection<WorkChunkStatusEnum> theStartStatuses);
@Modifying
@Query("DELETE FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId")
void deleteAllForInstance(@Param("instanceId") String theInstanceId);
@Modifying
@Query("UPDATE Batch2WorkChunkEntity e SET e.myErrorCount = e.myErrorCount + :by WHERE e.myId = :id")
void incrementWorkChunkErrorCount(@Param("id") String theChunkId, @Param("by") int theIncrementBy);
@Query("SELECT e.myId from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId")
List<String> fetchAllChunkIdsForStep(@Param("instanceId") String theInstanceId, @Param("stepId") String theStepId);
@Query("SELECT e.myId from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId AND e.myStatus = :status")
List<String> fetchAllChunkIdsForStepWithStatus(@Param("instanceId")String theInstanceId, @Param("stepId")String theStepId, @Param("status")StatusEnum theStatus);
List<String> fetchAllChunkIdsForStepWithStatus(@Param("instanceId")String theInstanceId, @Param("stepId")String theStepId, @Param("status") WorkChunkStatusEnum theStatus);
}

View File

@ -20,7 +20,7 @@ package ca.uhn.fhir.jpa.entity;
* #L%
*/
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
@ -87,7 +87,7 @@ public class Batch2WorkChunkEntity implements Serializable {
private String mySerializedData;
@Column(name = "STAT", length = STATUS_MAX_LENGTH, nullable = false)
@Enumerated(EnumType.STRING)
private StatusEnum myStatus;
private WorkChunkStatusEnum myStatus;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "INSTANCE_ID", insertable = false, updatable = false, foreignKey = @ForeignKey(name = "FK_BT2WC_INSTANCE"))
private Batch2JobInstanceEntity myInstance;
@ -228,11 +228,11 @@ public class Batch2WorkChunkEntity implements Serializable {
mySerializedData = theSerializedData;
}
public StatusEnum getStatus() {
public WorkChunkStatusEnum getStatus() {
return myStatus;
}
public void setStatus(StatusEnum theStatus) {
public void setStatus(WorkChunkStatusEnum theStatus) {
myStatus = theStatus;
}

View File

@ -33,7 +33,6 @@ import ca.uhn.test.concurrency.PointcutLatch;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@ -56,6 +55,7 @@ import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@ -106,13 +106,9 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
// create a job
// step 1
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> first = (step, sink) -> {
return RunOutcome.SUCCESS;
};
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> first = (step, sink) -> RunOutcome.SUCCESS;
// final step
ILastJobStepWorker<TestJobParameters, FirstStepOutput> last = (step, sink) -> {
return RunOutcome.SUCCESS;
};
ILastJobStepWorker<TestJobParameters, FirstStepOutput> last = (step, sink) -> RunOutcome.SUCCESS;
// job definition
String jobId = new Exception().getStackTrace()[0].getMethodName();
JobDefinition<? extends IModelJson> jd = JobDefinition.newBuilder()
@ -280,7 +276,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
};
// step 3
IReductionStepWorker<TestJobParameters, SecondStepOutput, ReductionStepOutput> last = new IReductionStepWorker<TestJobParameters, SecondStepOutput, ReductionStepOutput>() {
IReductionStepWorker<TestJobParameters, SecondStepOutput, ReductionStepOutput> last = new IReductionStepWorker<>() {
private final ArrayList<SecondStepOutput> myOutput = new ArrayList<>();
@ -288,6 +284,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
private final AtomicInteger mySecondGate = new AtomicInteger();
@Nonnull
@Override
public ChunkOutcome consume(ChunkExecutionDetails<TestJobParameters, SecondStepOutput> theChunkDetails) {
myOutput.add(theChunkDetails.getData());
@ -493,7 +490,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
assertEquals(MAX_CHUNK_ERROR_COUNT + 1, counter.get());
assertTrue(instance.getStatus() == StatusEnum.FAILED);
assertSame(StatusEnum.FAILED, instance.getStatus());
}
@Nonnull

View File

@ -39,6 +39,22 @@ import java.util.List;
import static ca.uhn.fhir.batch2.config.BaseBatch2Config.CHANNEL_NAME;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* The on-enter actions are defined in
* {@link ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater#handleStatusChange}
* {@link ca.uhn.fhir.batch2.progress.InstanceProgress#updateStatus(JobInstance)}
* {@link JobInstanceProcessor#cleanupInstance()}
* For chunks:
* {@link ca.uhn.fhir.jpa.batch2.JpaJobPersistenceImpl#storeWorkChunk}
* {@link JpaJobPersistenceImpl#fetchWorkChunkSetStartTimeAndMarkInProgress(String)}
* Chunk execution {@link ca.uhn.fhir.batch2.coordinator.StepExecutor#executeStep}
* wipmb figure this out
state transition triggers.
on-enter actions
on-exit actions
activities in state
*/
@TestPropertySource(properties = {
UnregisterScheduledProcessor.SCHEDULING_DISABLED_EQUALS_FALSE
})

View File

@ -5,17 +5,22 @@ import ca.uhn.fhir.batch2.api.JobOperationResultJson;
import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk;
import ca.uhn.fhir.batch2.jobs.imprt.NdJsonFileJson;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.MarkWorkChunkAsErrorRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.util.JobInstanceUtil;
import ca.uhn.fhir.util.JsonUtil;
import ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest;
import com.google.common.collect.Iterators;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.params.ParameterizedTest;
@ -23,6 +28,7 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.transaction.PlatformTransactionManager;
import javax.annotation.Nonnull;
import java.time.LocalDateTime;
@ -221,7 +227,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@ParameterizedTest
@MethodSource("provideStatuses")
public void testStartChunkOnlyWorksOnValidChunks(StatusEnum theStatus, boolean theShouldBeStartedByConsumer) {
public void testStartChunkOnlyWorksOnValidChunks(WorkChunkStatusEnum theStatus, boolean theShouldBeStartedByConsumer) {
// Setup
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
@ -290,6 +296,24 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertEquals(instanceId, foundInstances.get(0).getInstanceId());
}
/**
* Test bodies are defined in {@link AbstractIJobPersistenceSpecificationTest}.
* The nested test suite runs those tests here in a JPA context.
*/
@Nested
class Batch2SpecTest extends AbstractIJobPersistenceSpecificationTest {
@Override
protected PlatformTransactionManager getTxManager() {
return JpaJobPersistenceImplTest.this.getTxManager();
}
@Override
protected WorkChunk freshFetchWorkChunk(String chunkId) {
return JpaJobPersistenceImplTest.this.freshFetchWorkChunk(chunkId);
}
}
@Test
public void testFetchChunks() {
JobInstance instance = createInstance();
@ -359,76 +383,6 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertNull(chunk.getData());
}
@Test
void testStoreAndFetchChunksForInstance_NoData() {
//wipmb here
// given
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String queuedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, "some data");
String erroredId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 1, "some more data");
String completedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 2, "some more data");
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(erroredId);
MarkWorkChunkAsErrorRequest parameters = new MarkWorkChunkAsErrorRequest();
parameters.setChunkId(erroredId);
parameters.setErrorMsg("Our error message");
mySvc.markWorkChunkAsErroredAndIncrementErrorCount(parameters);
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(completedId);
mySvc.markWorkChunkAsCompletedAndClearData(instanceId, completedId, 11);
// when
Iterator<WorkChunk> workChunks = mySvc.fetchAllWorkChunksIterator(instanceId, false);
// then
ArrayList<WorkChunk> chunks = new ArrayList<>();
Iterators.addAll(chunks, workChunks);
assertEquals(3, chunks.size());
{
WorkChunk workChunk = chunks.get(0);
assertNull(workChunk.getData(), "we skip the data");
assertEquals(queuedId, workChunk.getId());
assertEquals(JOB_DEFINITION_ID, workChunk.getJobDefinitionId());
assertEquals(JOB_DEF_VER, workChunk.getJobDefinitionVersion());
assertEquals(instanceId, workChunk.getInstanceId());
assertEquals(TARGET_STEP_ID, workChunk.getTargetStepId());
assertEquals(0, workChunk.getSequence());
assertEquals(StatusEnum.QUEUED, workChunk.getStatus());
assertNotNull(workChunk.getCreateTime());
assertNotNull(workChunk.getStartTime());
assertNotNull(workChunk.getUpdateTime());
assertNull(workChunk.getEndTime());
assertNull(workChunk.getErrorMessage());
assertEquals(0, workChunk.getErrorCount());
assertEquals(null, workChunk.getRecordsProcessed());
}
{
WorkChunk workChunk1 = chunks.get(1);
assertEquals(StatusEnum.ERRORED, workChunk1.getStatus());
assertEquals("Our error message", workChunk1.getErrorMessage());
assertEquals(1, workChunk1.getErrorCount());
assertEquals(null, workChunk1.getRecordsProcessed());
assertNotNull(workChunk1.getEndTime());
}
{
WorkChunk workChunk2 = chunks.get(2);
assertEquals(StatusEnum.COMPLETED, workChunk2.getStatus());
assertNotNull(workChunk2.getEndTime());
assertEquals(11, workChunk2.getRecordsProcessed());
assertNull(workChunk2.getErrorMessage());
assertEquals(0, workChunk2.getErrorCount());
}
}
@Test
public void testStoreAndFetchWorkChunk_WithData() {
JobInstance instance = createInstance();
@ -436,16 +390,16 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
assertNotNull(id);
runInTransaction(() -> assertEquals(StatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(IllegalArgumentException::new);
assertEquals(36, chunk.getInstanceId().length());
assertEquals(JOB_DEFINITION_ID, chunk.getJobDefinitionId());
assertEquals(JOB_DEF_VER, chunk.getJobDefinitionVersion());
assertEquals(StatusEnum.IN_PROGRESS, chunk.getStatus());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
assertEquals(CHUNK_DATA, chunk.getData());
runInTransaction(() -> assertEquals(StatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
}
@Test
@ -455,26 +409,26 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(StatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(StatusEnum.IN_PROGRESS, chunk.getStatus());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
assertNotNull(chunk.getCreateTime());
assertNotNull(chunk.getStartTime());
assertNull(chunk.getEndTime());
assertNull(chunk.getRecordsProcessed());
assertNotNull(chunk.getData());
runInTransaction(() -> assertEquals(StatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
sleepUntilTimeChanges();
mySvc.markWorkChunkAsCompletedAndClearData(INSTANCE_ID, chunkId, 50);
mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(chunkId, 50, 0));
runInTransaction(() -> {
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(StatusEnum.COMPLETED, entity.getStatus());
assertEquals(WorkChunkStatusEnum.COMPLETED, entity.getStatus());
assertEquals(50, entity.getRecordsProcessed());
assertNotNull(entity.getCreateTime());
assertNotNull(entity.getStartTime());
@ -485,34 +439,13 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
});
}
@Test
public void testIncrementWorkChunkErrorCount() {
// Setup
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
// Execute
mySvc.incrementWorkChunkErrorCount(chunkId, 2);
mySvc.incrementWorkChunkErrorCount(chunkId, 3);
// Verify
List<WorkChunk> chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 100, 0);
assertEquals(1, chunks.size());
assertEquals(5, chunks.get(0).getErrorCount());
}
@Test
public void testGatedAdvancementByStatus() {
// Setup
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
mySvc.markWorkChunkAsCompletedAndClearData(INSTANCE_ID, chunkId, 0);
mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(chunkId, 0, 0));
boolean canAdvance = mySvc.canAdvanceInstanceToNextStep(instanceId, STEP_CHUNK_ID);
assertTrue(canAdvance);
@ -524,7 +457,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertFalse(canAdvance);
//Toggle it to complete
mySvc.markWorkChunkAsCompletedAndClearData(INSTANCE_ID, newChunkId, 0);
mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(newChunkId, 50, 0));
canAdvance = mySvc.canAdvanceInstanceToNextStep(instanceId, STEP_CHUNK_ID);
assertTrue(canAdvance);
@ -535,7 +468,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertFalse(canAdvance);
//Toggle IN_PROGRESS to complete
mySvc.markWorkChunkAsCompletedAndClearData(INSTANCE_ID, newerChunkId, 0);
mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(newerChunkId, 50, 0));
canAdvance = mySvc.canAdvanceInstanceToNextStep(instanceId, STEP_CHUNK_ID);
assertTrue(canAdvance);
}
@ -547,21 +480,21 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(StatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(StatusEnum.IN_PROGRESS, chunk.getStatus());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
MarkWorkChunkAsErrorRequest request = new MarkWorkChunkAsErrorRequest().setChunkId(chunkId).setErrorMsg("This is an error message");
mySvc.markWorkChunkAsErroredAndIncrementErrorCount(request);
WorkChunkErrorEvent request = new WorkChunkErrorEvent(chunkId).setErrorMsg("This is an error message");
mySvc.workChunkErrorEvent(request);
runInTransaction(() -> {
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(StatusEnum.ERRORED, entity.getStatus());
assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus());
assertEquals("This is an error message", entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
assertNotNull(entity.getStartTime());
@ -573,11 +506,11 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
// Mark errored again
MarkWorkChunkAsErrorRequest request2 = new MarkWorkChunkAsErrorRequest().setChunkId(chunkId).setErrorMsg("This is an error message 2");
mySvc.markWorkChunkAsErroredAndIncrementErrorCount(request2);
WorkChunkErrorEvent request2 = new WorkChunkErrorEvent(chunkId).setErrorMsg("This is an error message 2");
mySvc.workChunkErrorEvent(request2);
runInTransaction(() -> {
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(StatusEnum.ERRORED, entity.getStatus());
assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus());
assertEquals("This is an error message 2", entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
assertNotNull(entity.getStartTime());
@ -599,20 +532,20 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(StatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(StatusEnum.IN_PROGRESS, chunk.getStatus());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
mySvc.markWorkChunkAsFailed(chunkId, "This is an error message");
runInTransaction(() -> {
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(StatusEnum.FAILED, entity.getStatus());
assertEquals(WorkChunkStatusEnum.FAILED, entity.getStatus());
assertEquals("This is an error message", entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
assertNotNull(entity.getStartTime());
@ -688,17 +621,26 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
chunkIds.add(id);
}
runInTransaction(() -> mySvc.markWorkChunksWithStatusAndWipeData(instance.getInstanceId(), chunkIds, StatusEnum.COMPLETED, null));
runInTransaction(() -> mySvc.markWorkChunksWithStatusAndWipeData(instance.getInstanceId(), chunkIds, WorkChunkStatusEnum.COMPLETED, null));
Iterator<WorkChunk> reducedChunks = mySvc.fetchAllWorkChunksIterator(instanceId, true);
while (reducedChunks.hasNext()) {
WorkChunk reducedChunk = reducedChunks.next();
assertTrue(chunkIds.contains(reducedChunk.getId()));
assertEquals(StatusEnum.COMPLETED, reducedChunk.getStatus());
assertEquals(WorkChunkStatusEnum.COMPLETED, reducedChunk.getStatus());
}
}
private WorkChunk freshFetchWorkChunk(String chunkId) {
return runInTransaction(() -> {
return myWorkChunkRepository.findById(chunkId)
.map(e-> JobInstanceUtil.fromEntityToWorkChunk(e, true))
.orElseThrow(IllegalArgumentException::new);
});
}
@Nonnull
private JobInstance createInstance() {
JobInstance instance = new JobInstance();
@ -740,11 +682,11 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
*/
public static List<Arguments> provideStatuses() {
return List.of(
Arguments.of(StatusEnum.QUEUED, true),
Arguments.of(StatusEnum.IN_PROGRESS, true),
Arguments.of(StatusEnum.ERRORED, true),
Arguments.of(StatusEnum.FAILED, false),
Arguments.of(StatusEnum.COMPLETED, false)
Arguments.of(WorkChunkStatusEnum.QUEUED, true),
Arguments.of(WorkChunkStatusEnum.IN_PROGRESS, true),
Arguments.of(WorkChunkStatusEnum.ERRORED, true),
Arguments.of(WorkChunkStatusEnum.FAILED, false),
Arguments.of(WorkChunkStatusEnum.COMPLETED, false)
);
}
}

View File

@ -5,6 +5,7 @@ import ca.uhn.fhir.jpa.api.model.BulkExportJobResults;
import ca.uhn.fhir.jpa.api.svc.IBatch2JobRunner;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test;
import ca.uhn.fhir.jpa.test.config.TestR4Config;
import ca.uhn.fhir.jpa.util.BulkExportUtils;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
@ -80,7 +81,7 @@ public class BulkDataErrorAbuseTest extends BaseResourceProviderR4Test {
* run with the build and runs 100 jobs.
*/
@Test
@Disabled
@Disabled("for manual debugging")
public void testNonStopAbuseBatch2BulkExportStressTest() throws InterruptedException, ExecutionException {
duAbuseTest(Integer.MAX_VALUE);
}
@ -121,7 +122,8 @@ public class BulkDataErrorAbuseTest extends BaseResourceProviderR4Test {
options.setOutputFormat(Constants.CT_FHIR_NDJSON);
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
ExecutorService executorService = new ThreadPoolExecutor(10, 10,
int workerCount = TestR4Config.ourMaxThreads - 1; // apply a little connection hunger, but not starvation.
ExecutorService executorService = new ThreadPoolExecutor(workerCount, workerCount,
0L, TimeUnit.MILLISECONDS,
workQueue);

View File

@ -26,6 +26,7 @@ import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.test.utilities.ITestDataBuilder;
import ca.uhn.fhir.util.BundleBuilder;
import org.hamcrest.Matchers;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.AfterEach;
@ -105,7 +106,7 @@ public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuil
private void setupRetryFailures() {
myWorkChannel.addInterceptor(new ExecutorChannelInterceptor() {
@Override
public void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler, Exception ex) {
public void afterMessageHandled(@Nonnull Message<?> message, @Nonnull MessageChannel channel, @Nonnull MessageHandler handler, Exception ex) {
if (ex != null) {
ourLog.info("Work channel received exception {}", ex.getMessage());
channel.send(message);
@ -145,8 +146,7 @@ public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuil
failed.add(StatusEnum.ERRORED);
assertTrue(failed.contains(instance.getStatus()), instance.getStatus() + " is the actual status");
String errorMsg = instance.getErrorMessage();
assertTrue(errorMsg.contains("Too many errors"), errorMsg);
assertTrue(errorMsg.contains("Too many errors"), MyFailAfterThreeCreatesInterceptor.ERROR_MESSAGE);
assertThat(errorMsg, Matchers.containsString("Too many errors"));
} finally {
myWorkChannel.clearInterceptorsForUnitTest();
}
@ -173,7 +173,7 @@ public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuil
assertNotNull(instance);
assertEquals(StatusEnum.COMPLETED, instance.getStatus());
IBundleProvider searchResults = myPatientDao.search(SearchParameterMap.newSynchronous());
IBundleProvider searchResults = myPatientDao.search(SearchParameterMap.newSynchronous(), mySrd);
assertEquals(transactionsPerFile * fileCount, searchResults.sizeOrThrowNpe());
}
@ -250,7 +250,7 @@ public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuil
}
@Interceptor
public class MyFailAfterThreeCreatesInterceptor {
public static class MyFailAfterThreeCreatesInterceptor {
public static final String ERROR_MESSAGE = "This is an error message";

View File

@ -92,6 +92,7 @@ public class TestR4Config {
ourMaxThreads = 100;
}
}
ourLog.warn("ourMaxThreads={}", ourMaxThreads);
}
private final Deque<Exception> myLastStackTrace = new LinkedList<>();

View File

@ -0,0 +1,74 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.5-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<artifactId>hapi-fhir-storage-batch2-test-utilities</artifactId>
<name>HAPI FHIR JPA Server - Batch2 specification tests</name>
<description>Batch2 is a framework for managing and executing long running "batch" jobs</description>
<dependencies>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-storage-batch2</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-test-utilities</artifactId>
<version>${project.version}</version>
</dependency>
<!-- we export these since our exported tests use junit apis -->
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,562 @@
package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.StopWatch;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Specification tests for batch2 storage and event system.
* These tests are abstract, and do not depend on JPA.
* Test setups should use the public batch2 api to create scenarios.
*/
public abstract class AbstractIJobPersistenceSpecificationTest {
public static final String JOB_DEFINITION_ID = "definition-id";
public static final String TARGET_STEP_ID = "step-id";
public static final String DEF_CHUNK_ID = "definition-chunkId";
public static final String STEP_CHUNK_ID = "step-chunkId";
public static final int JOB_DEF_VER = 1;
public static final int SEQUENCE_NUMBER = 1;
public static final String CHUNK_DATA = "{\"key\":\"value\"}";
public static final String ERROR_MESSAGE_A = "This is an error message: A";
public static final String ERROR_MESSAGE_B = "This is a different error message: B";
public static final String ERROR_MESSAGE_C = "This is a different error message: C";
@Autowired
private IJobPersistence mySvc;
@Nested
class WorkChunkStorage {
@Test
public void testStoreAndFetchWorkChunk_NoData() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null);
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(IllegalArgumentException::new);
assertNull(chunk.getData());
}
@Test
public void testStoreAndFetchWorkChunk_WithData() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
assertNotNull(id);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(id).getStatus()));
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(IllegalArgumentException::new);
assertEquals(36, chunk.getInstanceId().length());
assertEquals(JOB_DEFINITION_ID, chunk.getJobDefinitionId());
assertEquals(JOB_DEF_VER, chunk.getJobDefinitionVersion());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
assertEquals(CHUNK_DATA, chunk.getData());
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, freshFetchWorkChunk(id).getStatus()));
}
/**
* Should match the diagram in batch2_states.md
* @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md
*/
@Nested
class StateTransitions {
private String myInstanceId;
private String myChunkId;
@BeforeEach
void setUp() {
JobInstance jobInstance = createInstance();
myInstanceId = mySvc.storeNewInstance(jobInstance);
}
private String createChunk() {
return storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, myInstanceId, 0, CHUNK_DATA);
}
@Test
public void chunkCreation_isQueued() {
myChunkId = createChunk();
WorkChunk fetchedWorkChunk = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.QUEUED, fetchedWorkChunk.getStatus(), "New chunks are QUEUED");
}
@Test
public void chunkReceived_queuedToInProgress() {
myChunkId = createChunk();
// the worker has received the chunk, and marks it started.
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
assertEquals(CHUNK_DATA, chunk.getData());
// verify the db was updated too
WorkChunk fetchedWorkChunk = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, fetchedWorkChunk.getStatus());
}
@Nested
class InProgressActions {
@BeforeEach
void setUp() {
// setup - the worker has received the chunk, and has marked it IN_PROGRESS.
myChunkId = createChunk();
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId);
}
@Test
public void processingOk_inProgressToSuccess_clearsDataSavesRecordCount() {
// execution ok
mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(myChunkId, 3, 0));
// verify the db was updated
var workChunkEntity = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.COMPLETED, workChunkEntity.getStatus());
assertNull(workChunkEntity.getData());
assertEquals(3, workChunkEntity.getRecordsProcessed());
assertNull(workChunkEntity.getErrorMessage());
assertEquals(0, workChunkEntity.getErrorCount());
}
@Test
public void processingRetryableError_inProgressToError_bumpsCountRecordsMessage() {
// execution had a retryable error
mySvc.workChunkErrorEvent(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_A));
// verify the db was updated
var workChunkEntity = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, workChunkEntity.getStatus());
assertEquals(ERROR_MESSAGE_A, workChunkEntity.getErrorMessage());
assertEquals(1, workChunkEntity.getErrorCount());
}
@Test
public void processingFailure_inProgressToFailed() {
// execution had a failure
mySvc.markWorkChunkAsFailed(myChunkId, "some error");
// verify the db was updated
var workChunkEntity = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.FAILED, workChunkEntity.getStatus());
assertEquals("some error", workChunkEntity.getErrorMessage());
}
}
@Nested
class ErrorActions {
public static final String FIRST_ERROR_MESSAGE = ERROR_MESSAGE_A;
@BeforeEach
void setUp() {
// setup - the worker has received the chunk, and has marked it IN_PROGRESS.
myChunkId = createChunk();
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId);
// execution had a retryable error
mySvc.workChunkErrorEvent(new WorkChunkErrorEvent(myChunkId, FIRST_ERROR_MESSAGE));
}
/**
* The consumer will retry after a retryable error is thrown
*/
@Test
void errorRetry_errorToInProgress() {
// when consumer restarts chunk
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId).orElseThrow(IllegalArgumentException::new);
// then
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
// verify the db state, error message, and error count
var workChunkEntity = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, workChunkEntity.getStatus());
assertEquals(FIRST_ERROR_MESSAGE, workChunkEntity.getErrorMessage(), "Original error message kept");
assertEquals(1, workChunkEntity.getErrorCount(), "error count kept");
}
@Test
void errorRetry_repeatError_increasesErrorCount() {
// setup - the consumer is re-trying, and marks it IN_PROGRESS
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId);
// when another error happens
mySvc.workChunkErrorEvent(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B));
// verify the state, new message, and error count
var workChunkEntity = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, workChunkEntity.getStatus());
assertEquals(ERROR_MESSAGE_B, workChunkEntity.getErrorMessage(), "new error message");
assertEquals(2, workChunkEntity.getErrorCount(), "error count inc");
}
@Test
void errorThenRetryAndComplete_addsErrorCounts() {
// setup - the consumer is re-trying, and marks it IN_PROGRESS
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId);
// then it completes ok.
mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(myChunkId, 3, 1));
// verify the state, new message, and error count
var workChunkEntity = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.COMPLETED, workChunkEntity.getStatus());
assertEquals(FIRST_ERROR_MESSAGE, workChunkEntity.getErrorMessage(), "Error message kept.");
assertEquals(2, workChunkEntity.getErrorCount(), "error combined with earlier error");
}
@Test
void errorRetry_maxErrors_movesToFailed() {
// we start with 1 error already
// 2nd try
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId);
mySvc.workChunkErrorEvent(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B));
var chunk = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, chunk.getStatus());
assertEquals(2, chunk.getErrorCount());
// 3rd try
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId);
mySvc.workChunkErrorEvent(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B));
chunk = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, chunk.getStatus());
assertEquals(3, chunk.getErrorCount());
// 4th try
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId);
mySvc.workChunkErrorEvent(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_C));
chunk = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.FAILED, chunk.getStatus());
assertEquals(4, chunk.getErrorCount());
assertThat("Error message contains last error", chunk.getErrorMessage(), containsString(ERROR_MESSAGE_C));
assertThat("Error message contains error count and complaint", chunk.getErrorMessage(), containsString("many errors: 4"));
}
}
}
@Test
public void testFetchChunks() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
List<String> ids = new ArrayList<>();
for (int i = 0; i < 10; i++) {
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, i, CHUNK_DATA);
ids.add(id);
}
List<WorkChunk> chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 0);
assertNull(chunks.get(0).getData());
assertNull(chunks.get(1).getData());
assertNull(chunks.get(2).getData());
assertThat(chunks.stream().map(WorkChunk::getId).collect(Collectors.toList()),
contains(ids.get(0), ids.get(1), ids.get(2)));
chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 1);
assertThat(chunks.stream().map(WorkChunk::getId).collect(Collectors.toList()),
contains(ids.get(3), ids.get(4), ids.get(5)));
chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 2);
assertThat(chunks.stream().map(WorkChunk::getId).collect(Collectors.toList()),
contains(ids.get(6), ids.get(7), ids.get(8)));
chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 3);
assertThat(chunks.stream().map(WorkChunk::getId).collect(Collectors.toList()),
contains(ids.get(9)));
chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 4);
assertThat(chunks.stream().map(WorkChunk::getId).collect(Collectors.toList()),
empty());
}
@Test
public void testMarkChunkAsCompleted_Success() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(chunkId).getStatus()));
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
assertNotNull(chunk.getCreateTime());
assertNotNull(chunk.getStartTime());
assertNull(chunk.getEndTime());
assertNull(chunk.getRecordsProcessed());
assertNotNull(chunk.getData());
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, freshFetchWorkChunk(chunkId).getStatus()));
sleepUntilTimeChanges();
runInTransaction(() -> mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(chunkId, 50, 0)));
WorkChunk entity = freshFetchWorkChunk(chunkId);
assertEquals(WorkChunkStatusEnum.COMPLETED, entity.getStatus());
assertEquals(50, entity.getRecordsProcessed());
assertNotNull(entity.getCreateTime());
assertNotNull(entity.getStartTime());
assertNotNull(entity.getEndTime());
assertNull(entity.getData());
assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime());
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
}
@Test
public void testMarkChunkAsCompleted_Error() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(chunkId).getStatus()));
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
WorkChunkErrorEvent request = new WorkChunkErrorEvent(chunkId, ERROR_MESSAGE_A);
mySvc.workChunkErrorEvent(request);
runInTransaction(() -> {
WorkChunk entity = freshFetchWorkChunk(chunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus());
assertEquals(ERROR_MESSAGE_A, entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
assertNotNull(entity.getStartTime());
assertNotNull(entity.getEndTime());
assertEquals(1, entity.getErrorCount());
assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime());
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
});
// Mark errored again
WorkChunkErrorEvent request2 = new WorkChunkErrorEvent(chunkId, "This is an error message 2");
mySvc.workChunkErrorEvent(request2);
runInTransaction(() -> {
WorkChunk entity = freshFetchWorkChunk(chunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus());
assertEquals("This is an error message 2", entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
assertNotNull(entity.getStartTime());
assertNotNull(entity.getEndTime());
assertEquals(2, entity.getErrorCount());
assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime());
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
});
List<WorkChunk> chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 100, 0);
assertEquals(1, chunks.size());
assertEquals(2, chunks.get(0).getErrorCount());
}
@Test
public void testMarkChunkAsCompleted_Fail() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(chunkId).getStatus()));
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
mySvc.markWorkChunkAsFailed(chunkId, "This is an error message");
runInTransaction(() -> {
WorkChunk entity = freshFetchWorkChunk(chunkId);
assertEquals(WorkChunkStatusEnum.FAILED, entity.getStatus());
assertEquals("This is an error message", entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
assertNotNull(entity.getStartTime());
assertNotNull(entity.getEndTime());
assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime());
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
});
}
@Test
public void markWorkChunksWithStatusAndWipeData_marksMultipleChunksWithStatus_asExpected() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
ArrayList<String> chunkIds = new ArrayList<>();
for (int i = 0; i < 10; i++) {
BatchWorkChunk chunk = new BatchWorkChunk(
"defId",
1,
"stepId",
instanceId,
0,
"{}"
);
String id = mySvc.storeWorkChunk(chunk);
chunkIds.add(id);
}
runInTransaction(() -> mySvc.markWorkChunksWithStatusAndWipeData(instance.getInstanceId(), chunkIds, WorkChunkStatusEnum.COMPLETED, null));
Iterator<WorkChunk> reducedChunks = mySvc.fetchAllWorkChunksIterator(instanceId, true);
while (reducedChunks.hasNext()) {
WorkChunk reducedChunk = reducedChunks.next();
assertTrue(chunkIds.contains(reducedChunk.getId()));
assertEquals(WorkChunkStatusEnum.COMPLETED, reducedChunk.getStatus());
}
}
}
@Nested
class InstanceStateTransitions {
@ParameterizedTest
@EnumSource(StatusEnum.class)
void cancelRequest_cancelsJob_whenNotFinalState(StatusEnum theState) {
// given
JobInstance cancelledInstance = createInstance();
cancelledInstance.setStatus(theState);
String instanceId1 = mySvc.storeNewInstance(cancelledInstance);
mySvc.cancelInstance(instanceId1);
JobInstance normalInstance = createInstance();
normalInstance.setStatus(theState);
String instanceId2 = mySvc.storeNewInstance(normalInstance);
// when
runInTransaction(()-> mySvc.processCancelRequests());
// then
JobInstance freshInstance1 = mySvc.fetchInstance(instanceId1).orElseThrow();
if (theState.isCancellable()) {
assertEquals(StatusEnum.CANCELLED, freshInstance1.getStatus(), "cancel request processed");
} else {
assertEquals(theState, freshInstance1.getStatus(), "cancel request ignored - state unchanged");
}
JobInstance freshInstance2 = mySvc.fetchInstance(instanceId2).orElseThrow();
assertEquals(theState, freshInstance2.getStatus(), "cancel request ignored - cancelled not set");
}
}
@Nonnull
private JobInstance createInstance() {
JobInstance instance = new JobInstance();
instance.setJobDefinitionId(JOB_DEFINITION_ID);
instance.setStatus(StatusEnum.QUEUED);
instance.setJobDefinitionVersion(JOB_DEF_VER);
instance.setParameters(CHUNK_DATA);
instance.setReport("TEST");
return instance;
}
private String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) {
BatchWorkChunk batchWorkChunk = new BatchWorkChunk(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData);
return mySvc.storeWorkChunk(batchWorkChunk);
}
protected abstract PlatformTransactionManager getTxManager();
protected abstract WorkChunk freshFetchWorkChunk(String chunkId);
public TransactionTemplate newTxTemplate() {
TransactionTemplate retVal = new TransactionTemplate(getTxManager());
retVal.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
retVal.afterPropertiesSet();
return retVal;
}
public void runInTransaction(Runnable theRunnable) {
newTxTemplate().execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(@Nonnull TransactionStatus theStatus) {
theRunnable.run();
}
});
}
public <T> T runInTransaction(Callable<T> theRunnable) {
return newTxTemplate().execute(t -> {
try {
return theRunnable.call();
} catch (Exception theE) {
throw new InternalErrorException(theE);
}
});
}
/**
* Sleep until at least 1 ms has elapsed
*/
public void sleepUntilTimeChanges() {
StopWatch sw = new StopWatch();
await().until(() -> sw.getMillis() > 0);
}
}

View File

@ -20,10 +20,8 @@ package ca.uhn.fhir.batch2.api;
* #L%
*/
import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk;
import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.MarkWorkChunkAsErrorRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
@ -40,27 +38,11 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
public interface IJobPersistence {
/**
* Stores a chunk of work for later retrieval. This method should be atomic and should only
* return when the chunk has been successfully stored in the database.
* <p>
* Chunk should be stored with a status of {@link ca.uhn.fhir.batch2.model.StatusEnum#QUEUED}
/**
*
* @param theBatchWorkChunk the batch work chunk to be stored
* @return a globally unique identifier for this chunk. This should be a sequentially generated ID, a UUID, or something like that which is guaranteed to never overlap across jobs or instances.
* Some of this is tested in {@link ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest}
*/
String storeWorkChunk(BatchWorkChunk theBatchWorkChunk);
/**
* Fetches a chunk of work from storage, and update the stored status to {@link StatusEnum#IN_PROGRESS}.
* This will only fetch chunks which are currently QUEUED or ERRORRED.
*
* @param theChunkId The ID, as returned by {@link #storeWorkChunk(BatchWorkChunk theBatchWorkChunk)}
* @return The chunk of work
*/
Optional<WorkChunk> fetchWorkChunkSetStartTimeAndMarkInProgress(String theChunkId);
public interface IJobPersistence extends IWorkChunkPersistence {
/**
@ -83,7 +65,6 @@ public interface IJobPersistence {
/**
* Fetches any existing jobs matching provided request parameters
* @return
*/
List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int theStart, int theBatchSize);
@ -101,10 +82,6 @@ public interface IJobPersistence {
/**
* Fetch all job instances for a given job definition id
* @param theJobDefinitionId
* @param theCount
* @param theStart
* @return
*/
List<JobInstance> fetchInstancesByJobDefinitionId(String theJobDefinitionId, int theCount, int theStart);
@ -117,62 +94,6 @@ public interface IJobPersistence {
return Page.empty();
}
/**
* Marks a given chunk as having errored (i.e. may be recoverable)
*
* @param theChunkId The chunk ID
*/
@Deprecated
void markWorkChunkAsErroredAndIncrementErrorCount(String theChunkId, String theErrorMessage);
/**
* Marks a given chunk as having errored (ie, may be recoverable)
*
* Returns the work chunk.
*
* NB: For backwards compatibility reasons, it could be an empty optional, but
* this doesn't mean it has no workchunk (just implementers are not updated)
*
* @param theParameters - the parameters for marking the workchunk with error
* @return - workchunk optional, if available.
*/
default Optional<WorkChunk> markWorkChunkAsErroredAndIncrementErrorCount(MarkWorkChunkAsErrorRequest theParameters) {
// old method - please override me
markWorkChunkAsErroredAndIncrementErrorCount(theParameters.getChunkId(), theParameters.getErrorMsg());
return Optional.empty(); // returning empty so as not to break implementers
}
/**
* Marks a given chunk as having failed (i.e. probably not recoverable)
*
* @param theChunkId The chunk ID
*/
void markWorkChunkAsFailed(String theChunkId, String theErrorMessage);
/**
* Marks a given chunk as having finished
*
* @param theChunkId The chunk ID
* @param theRecordsProcessed The number of records completed during chunk processing
*/
void markWorkChunkAsCompletedAndClearData(String theInstanceId, String theChunkId, int theRecordsProcessed);
/**
* Marks all work chunks with the provided status and erases the data
* @param theInstanceId - the instance id
* @param theChunkIds - the ids of work chunks being reduced to single chunk
* @param theStatus - the status to mark
* @param theErrorMsg - error message (if status warrants it)
*/
void markWorkChunksWithStatusAndWipeData(String theInstanceId, List<String> theChunkIds, StatusEnum theStatus, String theErrorMsg);
/**
* Increments the work chunk error count by the given amount
*
* @param theChunkId The chunk ID
* @param theIncrementBy The number to increment the error count by
*/
void incrementWorkChunkErrorCount(String theChunkId, int theIncrementBy);
@Transactional(propagation = Propagation.REQUIRES_NEW)
boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId);
@ -197,20 +118,7 @@ public interface IJobPersistence {
Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData);
/**
* Deprecated, use {@link ca.uhn.fhir.batch2.api.IJobPersistence#fetchAllWorkChunksForStepStream(String, String)}
* Fetch all chunks with data for a given instance for a given step id
* @param theInstanceId
* @param theStepId
* @return - an iterator for fetching work chunks
*/
@Deprecated
Iterator<WorkChunk> fetchAllWorkChunksForStepIterator(String theInstanceId, String theStepId);
/**
* Fetch all chunks with data for a given instance for a given step id
* @param theInstanceId
* @param theStepId
* @return - a stream for fetching work chunks
*/
Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId);
@ -256,7 +164,9 @@ public interface IJobPersistence {
*/
JobOperationResultJson cancelInstance(String theInstanceId);
List<String> fetchallchunkidsforstepWithStatus(String theInstanceId, String theStepId, StatusEnum theStatusEnum);
void updateInstanceUpdateTime(String theInstanceId);
@Transactional
void processCancelRequests();
}

View File

@ -0,0 +1,127 @@
package ca.uhn.fhir.batch2.api;
import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import org.springframework.transaction.annotation.Transactional;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
/**
* Work Chunk api, implementing the WorkChunk state machine.
* Test specification is in {@link ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest}
* @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md
*/
public interface IWorkChunkPersistence {
//////////////////////////////////
// WorkChunk calls
//////////////////////////////////
/**
* Stores a chunk of work for later retrieval.
* The first state event, as the chunk is created.
* This method should be atomic and should only
* return when the chunk has been successfully stored in the database.
* Chunk should be stored with a status of {@link WorkChunkStatusEnum#QUEUED}
*
* @param theBatchWorkChunk the batch work chunk to be stored
* @return a globally unique identifier for this chunk.
*/
@Transactional
String storeWorkChunk(BatchWorkChunk theBatchWorkChunk);
/**
* Fetches a chunk of work from storage, and update the stored status to {@link WorkChunkStatusEnum#IN_PROGRESS}.
* The second state event, as the worker starts processing.
* This will only fetch chunks which are currently QUEUED or ERRORRED.
*
* @param theChunkId The ID from {@link #storeWorkChunk(BatchWorkChunk theBatchWorkChunk)}
* @return The WorkChunk or empty if no chunk with that id exists in the QUEUED or ERRORRED states
*/
@Transactional
Optional<WorkChunk> fetchWorkChunkSetStartTimeAndMarkInProgress(String theChunkId);
/**
* Marks a given chunk as having errored (ie, may be recoverable)
*
* Returns the work chunk.
*
* @param theParameters - the parameters for marking the workchunk with error
* @return - workchunk optional, if available.
*/
@Transactional
WorkChunkStatusEnum workChunkErrorEvent(WorkChunkErrorEvent theParameters);
/**
* Marks a given chunk as having failed (i.e. probably not recoverable)
*
* @param theChunkId The chunk ID
*/
@Transactional
void markWorkChunkAsFailed(String theChunkId, String theErrorMessage);
/**
* Report success and complete the chunk.
* @param theEvent with record and error count
*/
@Transactional
void workChunkCompletionEvent(WorkChunkCompletionEvent theEvent);
/**
* Marks all work chunks with the provided status and erases the data
* @param theInstanceId - the instance id
* @param theChunkIds - the ids of work chunks being reduced to single chunk
* @param theStatus - the status to mark
* @param theErrorMsg - error message (if status warrants it)
*/
@Transactional
void markWorkChunksWithStatusAndWipeData(String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMsg);
/**
* Fetches all chunks for a given instance, without loading the data
*
* @param theInstanceId The instance ID
* @param thePageSize The page size
* @param thePageIndex The page index
*/
List<WorkChunk> fetchWorkChunksWithoutData(String theInstanceId, int thePageSize, int thePageIndex);
/**
* Fetch all chunks for a given instance.
* @param theInstanceId - instance id
* @param theWithData - whether or not to include the data
* @return - an iterator for fetching work chunks
*/
Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData);
/**
* Fetch all chunks with data for a given instance for a given step id
* @return - a stream for fetching work chunks
*/
Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId);
/**
* Fetch chunk ids for starting a gated step.
*
* @param theInstanceId the job
* @param theStepId the step that is starting
* @return the WorkChunk ids
*/
List<String> fetchAllChunkIdsForStepWithStatus(String theInstanceId, String theStepId, WorkChunkStatusEnum theStatusEnum);
}

View File

@ -31,6 +31,7 @@ import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
@ -216,7 +217,7 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
// complete the steps without making a new work chunk
myJobPersistence.markWorkChunksWithStatusAndWipeData(instance.getInstanceId(),
response.getSuccessfulChunkIds(),
StatusEnum.COMPLETED,
WorkChunkStatusEnum.COMPLETED,
null // error message - none
);
}
@ -225,7 +226,7 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
// mark any failed chunks as failed for aborting
myJobPersistence.markWorkChunksWithStatusAndWipeData(instance.getInstanceId(),
response.getFailedChunksIds(),
StatusEnum.FAILED,
WorkChunkStatusEnum.FAILED,
"JOB ABORTED");
}
return null;

View File

@ -26,18 +26,15 @@ import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.JobStepFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.model.MarkWorkChunkAsErrorRequest;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import java.util.Optional;
import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT;
public class StepExecutor {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
private final IJobPersistence myJobPersistence;
@ -75,23 +72,11 @@ public class StepExecutor {
} catch (Exception e) {
if (theStepExecutionDetails.hasAssociatedWorkChunk()) {
ourLog.error("Failure executing job {} step {}, marking chunk {} as ERRORED", jobDefinitionId, targetStepId, chunkId, e);
MarkWorkChunkAsErrorRequest parameters = new MarkWorkChunkAsErrorRequest();
parameters.setChunkId(chunkId);
parameters.setErrorMsg(e.getMessage());
Optional<WorkChunk> updatedOp = myJobPersistence.markWorkChunkAsErroredAndIncrementErrorCount(parameters);
if (updatedOp.isPresent()) {
WorkChunk chunk = updatedOp.get();
// see comments on MAX_CHUNK_ERROR_COUNT
if (chunk.getErrorCount() > MAX_CHUNK_ERROR_COUNT) {
String errorMsg = "Too many errors: "
+ chunk.getErrorCount()
+ ". Last error msg was "
+ e.getMessage();
myJobPersistence.markWorkChunkAsFailed(chunkId, errorMsg);
WorkChunkErrorEvent parameters = new WorkChunkErrorEvent(chunkId, e.getMessage());
WorkChunkStatusEnum newStatus = myJobPersistence.workChunkErrorEvent(parameters);
if (newStatus == WorkChunkStatusEnum.FAILED) {
return false;
}
}
} else {
ourLog.error("Failure executing job {} step {}, no associated work chunk", jobDefinitionId, targetStepId, e);
}
@ -108,10 +93,8 @@ public class StepExecutor {
int recordsProcessed = outcome.getRecordsProcessed();
int recoveredErrorCount = theDataSink.getRecoveredErrorCount();
myJobPersistence.markWorkChunkAsCompletedAndClearData(theStepExecutionDetails.getInstance().getInstanceId(), chunkId, recordsProcessed);
if (recoveredErrorCount > 0) {
myJobPersistence.incrementWorkChunkErrorCount(chunkId, recoveredErrorCount);
}
WorkChunkCompletionEvent event = new WorkChunkCompletionEvent(chunkId, recordsProcessed, recoveredErrorCount);
myJobPersistence.workChunkCompletionEvent(event);
}
return true;

View File

@ -1,201 +0,0 @@
package ca.uhn.fhir.batch2.coordinator;
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.JobOperationResultJson;
import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.MarkWorkChunkAsErrorRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
public class SynchronizedJobPersistenceWrapper implements IJobPersistence {
private final IJobPersistence myWrap;
/**
* Constructor
*/
public SynchronizedJobPersistenceWrapper(IJobPersistence theJobPersistence) {
myWrap = theJobPersistence;
}
@Override
public synchronized String storeWorkChunk(BatchWorkChunk theBatchWorkChunk) {
return myWrap.storeWorkChunk(theBatchWorkChunk);
}
@Override
public synchronized Optional<WorkChunk> fetchWorkChunkSetStartTimeAndMarkInProgress(String theChunkId) {
return myWrap.fetchWorkChunkSetStartTimeAndMarkInProgress(theChunkId);
}
@Override
public synchronized String storeNewInstance(JobInstance theInstance) {
return myWrap.storeNewInstance(theInstance);
}
@Override
public synchronized Optional<JobInstance> fetchInstance(String theInstanceId) {
return myWrap.fetchInstance(theInstanceId);
}
@Override
public List<JobInstance> fetchInstances(String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable) {
return myWrap.fetchInstances(theJobDefinitionId, theStatuses, theCutoff, thePageable);
}
@Override
public synchronized List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int theStart, int theBatchSize) {
return myWrap.fetchInstances(theRequest, theStart, theBatchSize);
}
@Override
public synchronized List<JobInstance> fetchInstances(int thePageSize, int thePageIndex) {
return myWrap.fetchInstances(thePageSize, thePageIndex);
}
@Override
public List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex) {
return myWrap.fetchRecentInstances(thePageSize, thePageIndex);
}
@Override
public List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus(String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex) {
return myWrap.fetchInstancesByJobDefinitionIdAndStatus(theJobDefinitionId, theRequestedStatuses, thePageSize, thePageIndex);
}
@Override
public List<JobInstance> fetchInstancesByJobDefinitionId(String theJobDefinitionId, int theCount, int theStart) {
return myWrap.fetchInstancesByJobDefinitionId(theJobDefinitionId, theCount, theStart);
}
@Override
public Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest) {
return myWrap.fetchJobInstances(theRequest);
}
@Override
public synchronized void markWorkChunkAsErroredAndIncrementErrorCount(String theChunkId, String theErrorMessage) {
myWrap.markWorkChunkAsErroredAndIncrementErrorCount(theChunkId, theErrorMessage);
}
@Override
public Optional<WorkChunk> markWorkChunkAsErroredAndIncrementErrorCount(MarkWorkChunkAsErrorRequest theParameters) {
return myWrap.markWorkChunkAsErroredAndIncrementErrorCount(theParameters);
}
@Override
public synchronized void markWorkChunkAsFailed(String theChunkId, String theErrorMessage) {
myWrap.markWorkChunkAsFailed(theChunkId, theErrorMessage);
}
@Override
public synchronized void markWorkChunkAsCompletedAndClearData(String theInstanceId, String theChunkId, int theRecordsProcessed) {
myWrap.markWorkChunkAsCompletedAndClearData(theInstanceId, theChunkId, theRecordsProcessed);
}
@Override
public void markWorkChunksWithStatusAndWipeData(String theInstanceId, List<String> theChunkIds, StatusEnum theStatus, String theErrorMsg) {
myWrap.markWorkChunksWithStatusAndWipeData(theInstanceId, theChunkIds, theStatus, theErrorMsg);
}
@Override
public void incrementWorkChunkErrorCount(String theChunkId, int theIncrementBy) {
myWrap.incrementWorkChunkErrorCount(theChunkId, theIncrementBy);
}
@Override
public boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId) {
return myWrap.canAdvanceInstanceToNextStep(theInstanceId, theCurrentStepId);
}
@Override
public synchronized List<WorkChunk> fetchWorkChunksWithoutData(String theInstanceId, int thePageSize, int thePageIndex) {
return myWrap.fetchWorkChunksWithoutData(theInstanceId, thePageSize, thePageIndex);
}
@Override
public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) {
return myWrap.fetchAllWorkChunksIterator(theInstanceId, theWithData);
}
@Override
public Iterator<WorkChunk> fetchAllWorkChunksForStepIterator(String theInstanceId, String theStepId) {
return myWrap.fetchAllWorkChunksForStepIterator(theInstanceId, theStepId);
}
@Override
public Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) {
return myWrap.fetchAllWorkChunksForStepStream(theInstanceId, theStepId);
}
@Override
public synchronized boolean updateInstance(JobInstance theInstance) {
return myWrap.updateInstance(theInstance);
}
@Override
public synchronized void deleteInstanceAndChunks(String theInstanceId) {
myWrap.deleteInstanceAndChunks(theInstanceId);
}
@Override
public synchronized void deleteChunksAndMarkInstanceAsChunksPurged(String theInstanceId) {
myWrap.deleteChunksAndMarkInstanceAsChunksPurged(theInstanceId);
}
@Override
public synchronized boolean markInstanceAsCompleted(String theInstanceId) {
return myWrap.markInstanceAsCompleted(theInstanceId);
}
@Override
public boolean markInstanceAsStatus(String theInstance, StatusEnum theStatusEnum) {
return myWrap.markInstanceAsStatus(theInstance, theStatusEnum);
}
@Override
public JobOperationResultJson cancelInstance(String theInstanceId) {
return myWrap.cancelInstance(theInstanceId);
}
@Override
public List<String> fetchallchunkidsforstepWithStatus(String theInstanceId, String theStepId, StatusEnum theStatusEnum) {
return myWrap.fetchallchunkidsforstepWithStatus(theInstanceId, theStepId, theStatusEnum);
}
@Override
public synchronized void updateInstanceUpdateTime(String theInstanceId) {
myWrap.updateInstanceUpdateTime(theInstanceId);
}
}

View File

@ -21,8 +21,8 @@ package ca.uhn.fhir.batch2.maintenance;
*/
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.util.Logs;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
@ -38,7 +38,6 @@ import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
import static org.slf4j.LoggerFactory.getLogger;
/**
* While performing cleanup, the cleanup job loads all of the known
@ -52,7 +51,7 @@ public class JobChunkProgressAccumulator {
private final Set<String> myConsumedInstanceAndChunkIds = new HashSet<>();
private final Multimap<String, ChunkStatusCountValue> myInstanceIdToChunkStatuses = ArrayListMultimap.create();
int countChunksWithStatus(String theInstanceId, String theStepId, StatusEnum... theStatuses) {
int countChunksWithStatus(String theInstanceId, String theStepId, WorkChunkStatusEnum... theStatuses) {
return getChunkIdsWithStatus(theInstanceId, theStepId, theStatuses).size();
}
@ -60,7 +59,7 @@ public class JobChunkProgressAccumulator {
return myInstanceIdToChunkStatuses.get(theInstanceId).stream().filter(chunkCount -> chunkCount.myStepId.equals(theStepId)).collect(Collectors.toList()).size();
}
public List<String> getChunkIdsWithStatus(String theInstanceId, String theStepId, StatusEnum... theStatuses) {
public List<String> getChunkIdsWithStatus(String theInstanceId, String theStepId, WorkChunkStatusEnum... theStatuses) {
return getChunkStatuses(theInstanceId).stream()
.filter(t -> t.myStepId.equals(theStepId))
.filter(t -> ArrayUtils.contains(theStatuses, t.myStatus))
@ -89,9 +88,9 @@ public class JobChunkProgressAccumulator {
private static class ChunkStatusCountValue {
public final String myChunkId;
public final String myStepId;
public final StatusEnum myStatus;
public final WorkChunkStatusEnum myStatus;
private ChunkStatusCountValue(String theChunkId, String theStepId, StatusEnum theStatus) {
private ChunkStatusCountValue(String theChunkId, String theStepId, WorkChunkStatusEnum theStatus) {
myChunkId = theChunkId;
myStepId = theStepId;
myStatus = theStatus;

View File

@ -28,7 +28,7 @@ import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.model.api.IModelJson;
@ -84,6 +84,7 @@ public class JobInstanceProcessor {
ourLog.debug("Finished job processing: {} - {}", myInstanceId, stopWatch);
}
// wipmb should we delete this? Or reduce it to an instance event?
private void handleCancellation(JobInstance theInstance) {
if (theInstance.isPendingCancellationRequest()) {
theInstance.setErrorMessage(buildCancelledMessage(theInstance));
@ -147,7 +148,7 @@ public class JobInstanceProcessor {
private void triggerGatedExecutions(JobInstance theInstance) {
if (!theInstance.isRunning()) {
ourLog.debug("JobInstance {} is not in a \"running\" state. Status {}",
theInstance.getInstanceId(), theInstance.getStatus().name());
theInstance.getInstanceId(), theInstance.getStatus());
return;
}
@ -186,12 +187,12 @@ public class JobInstanceProcessor {
private void processChunksForNextSteps(JobInstance theInstance, String nextStepId) {
String instanceId = theInstance.getInstanceId();
List<String> queuedChunksForNextStep = myProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, StatusEnum.QUEUED);
List<String> queuedChunksForNextStep = myProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, WorkChunkStatusEnum.QUEUED);
int totalChunksForNextStep = myProgressAccumulator.getTotalChunkCountForInstanceAndStep(instanceId, nextStepId);
if (totalChunksForNextStep != queuedChunksForNextStep.size()) {
ourLog.debug("Total ProgressAccumulator QUEUED chunk count does not match QUEUED chunk size! [instanceId={}, stepId={}, totalChunks={}, queuedChunks={}]", instanceId, nextStepId, totalChunksForNextStep, queuedChunksForNextStep.size());
}
List<String> chunksToSubmit = myJobPersistence.fetchallchunkidsforstepWithStatus(instanceId, nextStepId, StatusEnum.QUEUED);
List<String> chunksToSubmit = myJobPersistence.fetchAllChunkIdsForStepWithStatus(instanceId, nextStepId, WorkChunkStatusEnum.QUEUED);
for (String nextChunkId : chunksToSubmit) {
JobWorkNotification workNotification = new JobWorkNotification(theInstance, nextStepId, nextChunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);

View File

@ -0,0 +1,38 @@
package ca.uhn.fhir.batch2.model;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
/**
* Payloads for WorkChunk state transitions.
* Some events in the state-machine update the chunk metadata (e.g. error message).
* This class provides a base-class for those event objects.
* @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md
*/
public abstract class BaseWorkChunkEvent {
protected final String myChunkId;
protected BaseWorkChunkEvent(String theChunkId) {
myChunkId = theChunkId;
}
public String getChunkId() {
return myChunkId;
}
@Override
public boolean equals(Object theO) {
if (this == theO) return true;
if (theO == null || getClass() != theO.getClass()) return false;
BaseWorkChunkEvent that = (BaseWorkChunkEvent) theO;
return new EqualsBuilder().append(myChunkId, that.myChunkId).isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37).append(myChunkId).toHashCode();
}
}

View File

@ -25,7 +25,6 @@ import ca.uhn.fhir.jpa.util.JsonDateDeserializer;
import ca.uhn.fhir.jpa.util.JsonDateSerializer;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
@ -332,6 +331,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson,
.append("jobDefinitionId", getJobDefinitionId() + "/" + myJobDefinitionVersion)
.append("instanceId", myInstanceId)
.append("status", myStatus)
.append("myCancelled", myCancelled)
.append("createTime", myCreateTime)
.append("startTime", myStartTime)
.append("endTime", myEndTime)
@ -369,7 +369,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson,
case FAILED:
case CANCELLED:
default:
Logs.getBatchTroubleshootingLog().debug("Status {} is considered \"not running\"", getStatus().name());
Logs.getBatchTroubleshootingLog().debug("Status {} is considered \"not running\"", myStatus);
}
return false;
}

View File

@ -1,56 +0,0 @@
package ca.uhn.fhir.batch2.model;
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
public class MarkWorkChunkAsErrorRequest {
private String myChunkId;
private String myErrorMsg;
private boolean myIncludeData;
public String getChunkId() {
return myChunkId;
}
public MarkWorkChunkAsErrorRequest setChunkId(String theChunkId) {
myChunkId = theChunkId;
return this;
}
public String getErrorMsg() {
return myErrorMsg;
}
public MarkWorkChunkAsErrorRequest setErrorMsg(String theErrorMsg) {
myErrorMsg = theErrorMsg;
return this;
}
public boolean isIncludeData() {
return myIncludeData;
}
public MarkWorkChunkAsErrorRequest setIncludeData(boolean theIncludeData) {
myIncludeData = theIncludeData;
return this;
}
}

View File

@ -26,9 +26,13 @@ import org.slf4j.Logger;
import javax.annotation.Nonnull;
import java.util.Collections;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.Set;
/**
* Status of a Batch2 Job Instance.
*/
public enum StatusEnum {
/**
@ -55,7 +59,7 @@ public enum StatusEnum {
* Task execution resulted in an error but the error may be transient (or transient status is unknown).
* Retrying may result in success.
*/
ERRORED(true, true, false),
ERRORED(true, true, true),
/**
* Task has failed and is known to be unrecoverable. There is no reason to believe that retrying will
@ -70,6 +74,32 @@ public enum StatusEnum {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
/** Map from state to Set of legal inbound states */
static final EnumMap<StatusEnum, Set<StatusEnum>> ourFromStates;
/** Map from state to Set of legal outbound states */
static final EnumMap<StatusEnum, Set<StatusEnum>> ourToStates;
static {
// wipmb make immutable.
ourFromStates = new EnumMap<>(StatusEnum.class);
ourToStates = new EnumMap<>(StatusEnum.class);
Set<StatusEnum> cancelableStates = EnumSet.noneOf(StatusEnum.class);
for (StatusEnum nextEnum: StatusEnum.values()) {
ourFromStates.put(nextEnum, EnumSet.noneOf(StatusEnum.class));
ourToStates.put(nextEnum, EnumSet.noneOf(StatusEnum.class));
}
for (StatusEnum nextPriorEnum: StatusEnum.values()) {
for (StatusEnum nextNextEnum: StatusEnum.values()) {
if (isLegalStateTransition(nextPriorEnum, nextNextEnum)) {
ourFromStates.get(nextNextEnum).add(nextPriorEnum);
ourToStates.get(nextPriorEnum).add(nextNextEnum);
}
}
}
}
private final boolean myIncomplete;
private final boolean myEnded;
private final boolean myIsCancellable;
@ -196,4 +226,18 @@ public enum StatusEnum {
public boolean isCancellable() {
return myIsCancellable;
}
/**
* States that may transition to this state.
*/
public Set<StatusEnum> getPriorStates() {
return ourFromStates.get(this);
}
/**
* States this state may transtion to.
*/
public Set<StatusEnum> getNextStates() {
return ourToStates.get(this);
}
}

View File

@ -34,6 +34,12 @@ import java.util.Date;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
/**
* Payload for step processing.
* Implements a state machine on {@link WorkChunkStatusEnum}.
*
* @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md
*/
public class WorkChunk implements IModelJson {
@JsonProperty("id")
@ -43,7 +49,7 @@ public class WorkChunk implements IModelJson {
private int mySequence;
@JsonProperty("status")
private StatusEnum myStatus;
private WorkChunkStatusEnum myStatus;
@JsonProperty("jobDefinitionId")
private String myJobDefinitionId;
@ -131,11 +137,11 @@ public class WorkChunk implements IModelJson {
return this;
}
public StatusEnum getStatus() {
public WorkChunkStatusEnum getStatus() {
return myStatus;
}
public WorkChunk setStatus(StatusEnum theStatus) {
public WorkChunk setStatus(WorkChunkStatusEnum theStatus) {
myStatus = theStatus;
return this;
}

View File

@ -0,0 +1,42 @@
package ca.uhn.fhir.batch2.model;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
/**
* Payload for the work-chunk completion event with the record and error counts.
*/
public class WorkChunkCompletionEvent extends BaseWorkChunkEvent {
int myRecordsProcessed;
int myRecoveredErrorCount;
public WorkChunkCompletionEvent(String theChunkId, int theRecordsProcessed, int theRecoveredErrorCount) {
super(theChunkId);
myRecordsProcessed = theRecordsProcessed;
myRecoveredErrorCount = theRecoveredErrorCount;
}
public int getRecordsProcessed() {
return myRecordsProcessed;
}
public int getRecoveredErrorCount() {
return myRecoveredErrorCount;
}
@Override
public boolean equals(Object theO) {
if (this == theO) return true;
if (theO == null || getClass() != theO.getClass()) return false;
WorkChunkCompletionEvent that = (WorkChunkCompletionEvent) theO;
return new EqualsBuilder().appendSuper(super.equals(theO)).append(myRecordsProcessed, that.myRecordsProcessed).append(myRecoveredErrorCount, that.myRecoveredErrorCount).isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37).appendSuper(super.hashCode()).append(myRecordsProcessed).append(myRecoveredErrorCount).toHashCode();
}
}

View File

@ -0,0 +1,90 @@
package ca.uhn.fhir.batch2.model;
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT;
/**
* Payload for the work-chunk error event including the error message, and the allowed retry count.
* @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md
*/
public class WorkChunkErrorEvent extends BaseWorkChunkEvent {
private String myErrorMsg;
private int maxRetries = MAX_CHUNK_ERROR_COUNT;
public WorkChunkErrorEvent(String theChunkId) {
super(theChunkId);
}
public WorkChunkErrorEvent(String theChunkId, String theErrorMessage) {
super(theChunkId);
myErrorMsg = theErrorMessage;
}
public String getErrorMsg() {
return myErrorMsg;
}
public WorkChunkErrorEvent setErrorMsg(String theErrorMsg) {
myErrorMsg = theErrorMsg;
return this;
}
public int getMaxRetries() {
return maxRetries;
}
// wipmb - will we ever want this?
public void setMaxRetries(int theMaxRetries) {
maxRetries = theMaxRetries;
}
@Override
public boolean equals(Object theO) {
if (this == theO) return true;
if (theO == null || getClass() != theO.getClass()) return false;
WorkChunkErrorEvent that = (WorkChunkErrorEvent) theO;
return new EqualsBuilder()
.appendSuper(super.equals(theO))
.append(myChunkId, that.myChunkId)
.append(myErrorMsg, that.myErrorMsg)
.append(maxRetries, that.maxRetries)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.appendSuper(super.hashCode())
.append(myChunkId)
.append(myErrorMsg)
.append(maxRetries)
.toHashCode();
}
}

View File

@ -0,0 +1,52 @@
package ca.uhn.fhir.batch2.model;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.Set;
/**
* States for the {@link WorkChunk} state machine.
* @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md
*/
public enum WorkChunkStatusEnum {
// TODO: Whis is missing a state - WAITING for gated. it would simplify stats wipmb - not this PR
QUEUED, IN_PROGRESS, ERRORED, FAILED, COMPLETED;
private static final EnumMap<WorkChunkStatusEnum, Set<WorkChunkStatusEnum>> ourPriorStates;
static {
ourPriorStates = new EnumMap<>(WorkChunkStatusEnum.class);
for (WorkChunkStatusEnum nextEnum: WorkChunkStatusEnum.values()) {
ourPriorStates.put(nextEnum, EnumSet.noneOf(WorkChunkStatusEnum.class));
}
for (WorkChunkStatusEnum nextPriorEnum: WorkChunkStatusEnum.values()) {
for (WorkChunkStatusEnum nextEnum: nextPriorEnum.getNextStates()) {
ourPriorStates.get(nextEnum).add(nextPriorEnum);
}
}
}
public boolean isIncomplete() {
return (this != WorkChunkStatusEnum.COMPLETED);
}
public Set<WorkChunkStatusEnum> getNextStates() {
switch (this) {
case QUEUED:
return EnumSet.of(IN_PROGRESS);
case IN_PROGRESS:
return EnumSet.of(IN_PROGRESS, ERRORED, FAILED, COMPLETED);
case ERRORED:
return EnumSet.of(IN_PROGRESS, FAILED, COMPLETED);
// terminal states
case FAILED:
case COMPLETED:
default:
return EnumSet.noneOf(WorkChunkStatusEnum.class);
}
}
public Set<WorkChunkStatusEnum> getPriorStates() {
return ourPriorStates.get(this);
}
}

View File

@ -0,0 +1,10 @@
/**
* Our distributed batch processing library.
*
* WIPMB Plan
* done split status enum
* done move work chunk methods to IWorkChunkPersistence
* wipmb convert work chunk methods to events - requires bump
* wipmb review tx layer - the variety of @Transaction annotations is scary.
*/
package ca.uhn.fhir.batch2;

View File

@ -23,6 +23,7 @@ package ca.uhn.fhir.batch2.progress;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch;
import org.apache.commons.lang3.builder.ToStringBuilder;
@ -47,7 +48,7 @@ class InstanceProgress {
private Long myLatestEndTime = null;
private String myErrormessage = null;
private StatusEnum myNewStatus = null;
private Map<String, Map<StatusEnum, Integer>> myStepToStatusCountMap = new HashMap<>();
private Map<String, Map<WorkChunkStatusEnum, Integer>> myStepToStatusCountMap = new HashMap<>();
public void addChunk(WorkChunk theChunk) {
myErrorCountForAllStatuses += theChunk.getErrorCount();
@ -60,7 +61,7 @@ class InstanceProgress {
private void updateCompletionStatus(WorkChunk theChunk) {
//Update the status map first.
Map<StatusEnum, Integer> statusToCountMap = myStepToStatusCountMap.getOrDefault(theChunk.getTargetStepId(), new HashMap<>());
Map<WorkChunkStatusEnum, Integer> statusToCountMap = myStepToStatusCountMap.getOrDefault(theChunk.getTargetStepId(), new HashMap<>());
statusToCountMap.put(theChunk.getStatus(), statusToCountMap.getOrDefault(theChunk.getStatus(), 0) + 1);
switch (theChunk.getStatus()) {
@ -81,8 +82,6 @@ class InstanceProgress {
myFailedChunkCount++;
myErrormessage = theChunk.getErrorMessage();
break;
case CANCELLED:
break;
}
ourLog.trace("Chunk has status {} with errored chunk count {}", theChunk.getStatus(), myErroredChunkCount);
}

View File

@ -15,9 +15,11 @@ import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.batch2.model.MarkWorkChunkAsErrorRequest;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.dao.tx.NonTransactionalHapiTransactionService;
@ -33,6 +35,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ -139,7 +142,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
assertEquals(PARAM_2_VALUE, params.getParam2());
assertEquals(PASSWORD_VALUE, params.getPassword());
verify(myJobInstancePersister, times(1)).markWorkChunkAsCompletedAndClearData(eq(INSTANCE_ID), any(), eq(50));
verify(myJobInstancePersister, times(1)).workChunkCompletionEvent(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0));
verify(myJobInstancePersister, times(0)).fetchWorkChunksWithoutData(any(), anyInt(), anyInt());
verify(myBatchJobSender, times(2)).sendWorkChannelMessage(any());
}
@ -257,7 +260,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
assertEquals(PARAM_2_VALUE, params.getParam2());
assertEquals(PASSWORD_VALUE, params.getPassword());
verify(myJobInstancePersister, times(1)).markWorkChunkAsCompletedAndClearData(eq(INSTANCE_ID), any(), eq(50));
verify(myJobInstancePersister, times(1)).workChunkCompletionEvent(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0));
verify(myBatchJobSender, times(0)).sendWorkChannelMessage(any());
}
@ -284,7 +287,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
assertEquals(PARAM_2_VALUE, params.getParam2());
assertEquals(PASSWORD_VALUE, params.getPassword());
verify(myJobInstancePersister, times(1)).markWorkChunkAsCompletedAndClearData(eq(INSTANCE_ID), eq(CHUNK_ID), eq(50));
verify(myJobInstancePersister, times(1)).workChunkCompletionEvent(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0));
}
@Test
@ -316,13 +319,13 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
assertEquals(PARAM_2_VALUE, params.getParam2());
assertEquals(PASSWORD_VALUE, params.getPassword());
ArgumentCaptor<MarkWorkChunkAsErrorRequest> parametersArgumentCaptor = ArgumentCaptor.forClass(MarkWorkChunkAsErrorRequest.class);
verify(myJobInstancePersister, times(1)).markWorkChunkAsErroredAndIncrementErrorCount(parametersArgumentCaptor.capture());
MarkWorkChunkAsErrorRequest capturedParams = parametersArgumentCaptor.getValue();
ArgumentCaptor<WorkChunkErrorEvent> parametersArgumentCaptor = ArgumentCaptor.forClass(WorkChunkErrorEvent.class);
verify(myJobInstancePersister, times(1)).workChunkErrorEvent(parametersArgumentCaptor.capture());
WorkChunkErrorEvent capturedParams = parametersArgumentCaptor.getValue();
assertEquals(CHUNK_ID, capturedParams.getChunkId());
assertEquals("This is an error message", capturedParams.getErrorMsg());
verify(myJobInstancePersister, times(1)).markWorkChunkAsCompletedAndClearData(eq(INSTANCE_ID), eq(CHUNK_ID), eq(0));
verify(myJobInstancePersister, times(1)).workChunkCompletionEvent(new WorkChunkCompletionEvent(CHUNK_ID, 0, 0));
}
@ -354,8 +357,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
assertEquals(PARAM_2_VALUE, params.getParam2());
assertEquals(PASSWORD_VALUE, params.getPassword());
verify(myJobInstancePersister, times(1)).incrementWorkChunkErrorCount(eq(CHUNK_ID), eq(2));
verify(myJobInstancePersister, times(1)).markWorkChunkAsCompletedAndClearData(eq(INSTANCE_ID), eq(CHUNK_ID), eq(50));
verify(myJobInstancePersister, times(1)).workChunkCompletionEvent(eq(new WorkChunkCompletionEvent(CHUNK_ID, 50, 2)));
}
@Test
@ -381,7 +383,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
assertEquals(PARAM_2_VALUE, params.getParam2());
assertEquals(PASSWORD_VALUE, params.getPassword());
verify(myJobInstancePersister, times(1)).markWorkChunkAsCompletedAndClearData(eq(INSTANCE_ID), eq(CHUNK_ID), eq(50));
verify(myJobInstancePersister, times(1)).workChunkCompletionEvent(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0));
}
@SuppressWarnings("unchecked")
@ -613,7 +615,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
.setJobDefinitionVersion(1)
.setTargetStepId(theTargetStepId)
.setData(theData)
.setStatus(StatusEnum.IN_PROGRESS)
.setStatus(WorkChunkStatusEnum.IN_PROGRESS)
.setInstanceId(INSTANCE_ID);
}

View File

@ -14,6 +14,7 @@ import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.dao.tx.NonTransactionalHapiTransactionService;
import org.junit.jupiter.api.BeforeEach;
@ -101,7 +102,7 @@ public class ReductionStepExecutorServiceImplTest {
// verification
assertFalse(result.isSuccessful());
ArgumentCaptor<List> submittedListIds = ArgumentCaptor.forClass(List.class);
ArgumentCaptor<StatusEnum> statusCaptor = ArgumentCaptor.forClass(StatusEnum.class);
ArgumentCaptor<WorkChunkStatusEnum> statusCaptor = ArgumentCaptor.forClass(WorkChunkStatusEnum.class);
verify(myJobPersistence, times(chunkIds.size()))
.markWorkChunksWithStatusAndWipeData(
eq(INSTANCE_ID),
@ -118,9 +119,9 @@ public class ReductionStepExecutorServiceImplTest {
// assumes the order of which is called first
// successes, then failures
assertEquals(2, statusCaptor.getAllValues().size());
List<StatusEnum> statuses = statusCaptor.getAllValues();
assertEquals(StatusEnum.COMPLETED, statuses.get(0));
assertEquals(StatusEnum.FAILED, statuses.get(1));
List<WorkChunkStatusEnum> statuses = statusCaptor.getAllValues();
assertEquals(WorkChunkStatusEnum.COMPLETED, statuses.get(0));
assertEquals(WorkChunkStatusEnum.FAILED, statuses.get(1));
}
@ -165,7 +166,7 @@ public class ReductionStepExecutorServiceImplTest {
assertTrue(result.isSuccessful());
ArgumentCaptor<List<String>> chunkIdCaptor = ArgumentCaptor.forClass(List.class);
verify(myJobPersistence).markWorkChunksWithStatusAndWipeData(eq(INSTANCE_ID),
chunkIdCaptor.capture(), eq(StatusEnum.COMPLETED), eq(null));
chunkIdCaptor.capture(), eq(WorkChunkStatusEnum.COMPLETED), eq(null));
List<String> capturedIds = chunkIdCaptor.getValue();
assertEquals(chunkIds.size(), capturedIds.size());
for (String chunkId : chunkIds) {

View File

@ -1,6 +1,5 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.ChunkExecutionDetails;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
@ -8,38 +7,30 @@ import ca.uhn.fhir.batch2.api.ILastJobStepWorker;
import ca.uhn.fhir.batch2.api.IReductionStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.JobStepFailedException;
import ca.uhn.fhir.batch2.api.ReductionStepExecutionDetails;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.ChunkOutcome;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionReductionStep;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.MarkWorkChunkAsErrorRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkData;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.dao.tx.NonTransactionalHapiTransactionService;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.JsonUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.transaction.PlatformTransactionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -48,107 +39,40 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@SuppressWarnings({"unchecked", "rawtypes"})
@ExtendWith(MockitoExtension.class)
public class WorkChunkProcessorTest {
private static final Logger ourLog = LoggerFactory.getLogger(WorkChunkProcessorTest.class);
public static final String REDUCTION_STEP_ID = "step last";
static final String INSTANCE_ID = "instanceId";
static final String JOB_DEFINITION_ID = "jobDefId";
public static final String REDUCTION_STEP_ID = "step last";
// static internal use classes
private enum StepType {
REDUCTION,
INTERMEDIATE,
FINAL
}
static class TestJobParameters implements IModelJson { }
static class StepInputData implements IModelJson { }
static class StepOutputData implements IModelJson { }
private static class TestDataSink<OT extends IModelJson> extends BaseDataSink<TestJobParameters, StepInputData, OT> {
private BaseDataSink<?, ?, ?> myActualDataSink;
TestDataSink(JobWorkCursor<TestJobParameters, StepInputData, OT> theWorkCursor) {
super(INSTANCE_ID,
theWorkCursor);
}
public void setDataSink(BaseDataSink<?, ?, ?> theSink) {
myActualDataSink = theSink;
}
@Override
public void accept(WorkChunkData<OT> theData) {
}
@Override
public int getWorkChunkCount() {
return 0;
}
}
// our test class
private class TestWorkChunkProcessor extends WorkChunkProcessor {
public TestWorkChunkProcessor(IJobPersistence thePersistence, BatchJobSender theSender, IHapiTransactionService theHapiTransactionService) {
super(thePersistence, theSender, theHapiTransactionService);
}
@Override
protected <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> BaseDataSink<PT, IT, OT> getDataSink(
JobWorkCursor<PT, IT, OT> theCursor,
JobDefinition<PT> theJobDefinition,
String theInstanceId
) {
// cause we don't want to test the actual DataSink class here!
myDataSink.setDataSink(super.getDataSink(theCursor, theJobDefinition, theInstanceId));
return (BaseDataSink<PT, IT, OT>) myDataSink;
}
}
// general mocks
private TestDataSink myDataSink;
// step worker mocks
private final IJobStepWorker<TestJobParameters, StepInputData, StepOutputData> myNonReductionStep = mock(IJobStepWorker.class);
private final IReductionStepWorker<TestJobParameters, StepInputData, StepOutputData> myReductionStep = mock(IReductionStepWorker.class);
private final ILastJobStepWorker<TestJobParameters, StepInputData> myLastStep = mock(ILastJobStepWorker.class);
private TestDataSink myDataSink;
// class specific mocks
@Mock
private IJobPersistence myJobPersistence;
@Mock
private BatchJobSender myJobSender;
private IHapiTransactionService myMockTransactionManager = new NonTransactionalHapiTransactionService();
// general mocks
private TestWorkChunkProcessor myExecutorSvc;
@BeforeEach
public void init() {
myExecutorSvc = new TestWorkChunkProcessor(myJobPersistence, myJobSender, myMockTransactionManager);
myExecutorSvc = new TestWorkChunkProcessor(myJobPersistence, myJobSender);
}
private <OT extends IModelJson> JobDefinitionStep<TestJobParameters, StepInputData, OT> mockOutWorkCursor(
@ -188,7 +112,6 @@ public class WorkChunkProcessorTest {
return step;
}
@Test
public void doExecution_nonReductionIntermediateStepWithValidInput_executesAsExpected() {
doExecution_nonReductionStep(0);
@ -227,12 +150,13 @@ public class WorkChunkProcessorTest {
// verify
assertTrue(result.isSuccessful());
verify(myJobPersistence)
.markWorkChunkAsCompletedAndClearData(any(), eq(chunk.getId()), anyInt());
.workChunkCompletionEvent(any(WorkChunkCompletionEvent.class));
assertTrue(myDataSink.myActualDataSink instanceof JobDataSink);
if (theRecoveredErrorsForDataSink > 0) {
verify(myJobPersistence)
.incrementWorkChunkErrorCount(anyString(), eq(theRecoveredErrorsForDataSink));
.workChunkCompletionEvent(any(WorkChunkCompletionEvent.class));
//.workChunkErrorEvent(anyString(new WorkChunkErrorEvent(chunk.getId(), theRecoveredErrorsForDataSink)));
}
// nevers
@ -315,13 +239,13 @@ public class WorkChunkProcessorTest {
// when
when(myNonReductionStep.run(any(), any()))
.thenThrow(new RuntimeException(errorMsg));
when(myJobPersistence.markWorkChunkAsErroredAndIncrementErrorCount(any(MarkWorkChunkAsErrorRequest.class)))
when(myJobPersistence.workChunkErrorEvent(any(WorkChunkErrorEvent.class)))
.thenAnswer((p) -> {
WorkChunk ec = new WorkChunk();
ec.setId(chunk.getId());
int count = errorCounter.getAndIncrement();
ec.setErrorCount(count);
return Optional.of(ec);
return count<=WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT?ec.getStatus():WorkChunkStatusEnum.FAILED;
});
// test
@ -341,6 +265,7 @@ public class WorkChunkProcessorTest {
*/
processedOutcomeSuccessfully = output.isSuccessful();
} catch (JobStepFailedException ex) {
ourLog.info("Caught error:", ex);
assertTrue(ex.getMessage().contains(errorMsg));
counter++;
}
@ -349,7 +274,7 @@ public class WorkChunkProcessorTest {
* we check for > MAX_CHUNK_ERROR_COUNT (+1)
* we want it to run one extra time here (+1)
*/
} while (processedOutcomeSuccessfully == null && counter < WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT + 2);
} while (processedOutcomeSuccessfully == null && counter <= WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT + 2);
// verify
assertNotNull(processedOutcomeSuccessfully);
@ -389,12 +314,12 @@ public class WorkChunkProcessorTest {
private void verifyNoErrors(int theRecoveredErrorCount) {
if (theRecoveredErrorCount == 0) {
verify(myJobPersistence, never())
.incrementWorkChunkErrorCount(anyString(), anyInt());
.workChunkErrorEvent(any());
}
verify(myJobPersistence, never())
.markWorkChunkAsFailed(anyString(), anyString());
verify(myJobPersistence, never())
.markWorkChunkAsErroredAndIncrementErrorCount(anyString(), anyString());
.workChunkErrorEvent(any(WorkChunkErrorEvent.class));
}
private void verifyNonReductionStep() {
@ -406,24 +331,6 @@ public class WorkChunkProcessorTest {
.fetchAllWorkChunksForStepStream(anyString(), anyString());
}
static JobInstance getTestJobInstance() {
JobInstance instance = JobInstance.fromInstanceId(INSTANCE_ID);
instance.setParameters(new TestJobParameters());
return instance;
}
static WorkChunk createWorkChunk(String theId) {
WorkChunk chunk = new WorkChunk();
chunk.setInstanceId(INSTANCE_ID);
chunk.setId(theId);
chunk.setStatus(StatusEnum.QUEUED);
chunk.setData(JsonUtil.serialize(
new StepInputData()
));
return chunk;
}
@SuppressWarnings("unchecked")
private JobDefinition<TestJobParameters> createTestJobDefinition(boolean theWithReductionStep) {
JobDefinition<TestJobParameters> def = null;
@ -510,4 +417,80 @@ public class WorkChunkProcessorTest {
return null;
}
private enum StepType {
REDUCTION,
INTERMEDIATE,
FINAL
}
// our test class
private class TestWorkChunkProcessor extends WorkChunkProcessor {
public TestWorkChunkProcessor(IJobPersistence thePersistence, BatchJobSender theSender) {
super(thePersistence, theSender);
}
@Override
protected <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> BaseDataSink<PT, IT, OT> getDataSink(
JobWorkCursor<PT, IT, OT> theCursor,
JobDefinition<PT> theJobDefinition,
String theInstanceId
) {
// cause we don't want to test the actual DataSink class here!
myDataSink.setDataSink(super.getDataSink(theCursor, theJobDefinition, theInstanceId));
return (BaseDataSink<PT, IT, OT>) myDataSink;
}
}
static class TestJobParameters implements IModelJson {
}
static class StepInputData implements IModelJson {
}
static class StepOutputData implements IModelJson {
}
private static class TestDataSink<OT extends IModelJson> extends BaseDataSink<TestJobParameters, StepInputData, OT> {
private BaseDataSink<?, ?, ?> myActualDataSink;
TestDataSink(JobWorkCursor<TestJobParameters, StepInputData, OT> theWorkCursor) {
super(INSTANCE_ID,
theWorkCursor);
}
public void setDataSink(BaseDataSink<?, ?, ?> theSink) {
myActualDataSink = theSink;
}
@Override
public void accept(WorkChunkData<OT> theData) {
}
@Override
public int getWorkChunkCount() {
return 0;
}
}
static JobInstance getTestJobInstance() {
JobInstance instance = JobInstance.fromInstanceId(INSTANCE_ID);
instance.setParameters(new TestJobParameters());
return instance;
}
static WorkChunk createWorkChunk(String theId) {
WorkChunk chunk = new WorkChunk();
chunk.setInstanceId(INSTANCE_ID);
chunk.setId(theId);
chunk.setStatus(WorkChunkStatusEnum.QUEUED);
chunk.setData(JsonUtil.serialize(
new StepInputData()
));
return chunk;
}
}

View File

@ -15,6 +15,7 @@ import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
@ -103,7 +104,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
@Test
public void testInProgress_CalculateProgress_FirstCompleteButNoOtherStepsYetComplete() {
List<WorkChunk> chunks = new ArrayList<>();
chunks.add(JobCoordinatorImplTest.createWorkChunk(STEP_1, null).setStatus(StatusEnum.COMPLETED));
chunks.add(JobCoordinatorImplTest.createWorkChunk(STEP_1, null).setStatus(WorkChunkStatusEnum.COMPLETED));
myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(createInstance()));
@ -116,12 +117,12 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
@Test
public void testInProgress_CalculateProgress_FirstStepComplete() {
List<WorkChunk> chunks = Arrays.asList(
createWorkChunkStep1().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:01-04:00")),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:02-04:00")),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:03-04:00")),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25),
JobCoordinatorImplTest.createWorkChunkStep3().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
createWorkChunkStep1().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:01-04:00")),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:02-04:00")),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:03-04:00")),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25),
JobCoordinatorImplTest.createWorkChunkStep3().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
when(myJobPersistence.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance()));
@ -149,12 +150,12 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
public void testInProgress_CalculateProgress_InstanceHasErrorButNoChunksAreErrored() {
// Setup
List<WorkChunk> chunks = Arrays.asList(
createWorkChunkStep1().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:01-04:00")),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:02-04:00")).setErrorCount(2),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:03-04:00")).setErrorCount(2),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25),
JobCoordinatorImplTest.createWorkChunkStep3().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
createWorkChunkStep1().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:01-04:00")),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:02-04:00")).setErrorCount(2),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:03-04:00")).setErrorCount(2),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25),
JobCoordinatorImplTest.createWorkChunkStep3().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
JobInstance instance1 = createInstance();
@ -184,8 +185,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
public void testInProgress_GatedExecution_FirstStepComplete() {
// Setup
List<WorkChunk> chunks = Arrays.asList(
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.QUEUED).setId(CHUNK_ID),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.QUEUED).setId(CHUNK_ID_2)
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.QUEUED).setId(CHUNK_ID),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.QUEUED).setId(CHUNK_ID_2)
);
when (myJobPersistence.canAdvanceInstanceToNextStep(any(), any())).thenReturn(true);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition(JobDefinition.Builder::gatedExecution));
@ -193,7 +194,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false)))
.thenReturn(chunks.iterator());
when(myJobPersistence.fetchallchunkidsforstepWithStatus(eq(INSTANCE_ID), eq(STEP_2), eq(StatusEnum.QUEUED)))
when(myJobPersistence.fetchAllChunkIdsForStepWithStatus(eq(INSTANCE_ID), eq(STEP_2), eq(WorkChunkStatusEnum.QUEUED)))
.thenReturn(chunks.stream().map(chunk -> chunk.getId()).collect(Collectors.toList()));
JobInstance instance1 = createInstance();
@ -236,22 +237,22 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
List<WorkChunk> chunks = new ArrayList<>();
chunks.add(
createWorkChunkStep1().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:01:00-04:00")).setRecordsProcessed(25)
createWorkChunkStep1().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:01:00-04:00")).setRecordsProcessed(25)
);
chunks.add(
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:01-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25)
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:01-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25)
);
chunks.add(
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:02-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25)
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:02-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25)
);
chunks.add(
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:03-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25)
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:03-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25)
);
chunks.add(
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
);
chunks.add(
JobCoordinatorImplTest.createWorkChunkStep3().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
JobCoordinatorImplTest.createWorkChunkStep3().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition(t -> t.completionHandler(myCompletionHandler)));
@ -289,22 +290,22 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
public void testInProgress_CalculateProgress_OneStepFailed() {
ArrayList<WorkChunk> chunks = new ArrayList<>();
chunks.add(
createWorkChunkStep1().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:01:00-04:00")).setRecordsProcessed(25)
createWorkChunkStep1().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:01:00-04:00")).setRecordsProcessed(25)
);
chunks.add(
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:01-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25)
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:01-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25)
);
chunks.add(
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.FAILED).setStartTime(parseTime("2022-02-12T14:00:02-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25).setErrorMessage("This is an error message")
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.FAILED).setStartTime(parseTime("2022-02-12T14:00:02-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25).setErrorMessage("This is an error message")
);
chunks.add(
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:03-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25)
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:03-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25)
);
chunks.add(
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
);
chunks.add(
JobCoordinatorImplTest.createWorkChunkStep3().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
JobCoordinatorImplTest.createWorkChunkStep3().setStatus(WorkChunkStatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
@ -339,10 +340,10 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
// Setup
ArrayList<WorkChunk> chunks = new ArrayList<>();
chunks.add(
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.QUEUED).setId(CHUNK_ID)
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.QUEUED).setId(CHUNK_ID)
);
chunks.add(
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.QUEUED).setId(CHUNK_ID_2)
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.QUEUED).setId(CHUNK_ID_2)
);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition(JobDefinition.Builder::gatedExecution));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean())).thenAnswer(t->chunks.iterator());
@ -369,10 +370,10 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
// Setup
ArrayList<WorkChunk> chunks = new ArrayList<>();
chunks.add(
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.QUEUED).setId(CHUNK_ID)
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.QUEUED).setId(CHUNK_ID)
);
chunks.add(
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(StatusEnum.QUEUED).setId(CHUNK_ID_2)
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.QUEUED).setId(CHUNK_ID_2)
);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition(JobDefinition.Builder::gatedExecution));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean())).thenAnswer(t->chunks.iterator());

View File

@ -3,9 +3,12 @@ package ca.uhn.fhir.batch2.model;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.jupiter.api.Assertions.assertEquals;
class StatusEnumTest {
@ -69,6 +72,20 @@ class StatusEnumTest {
})
public void testStateTransition(StatusEnum origStatus, StatusEnum newStatus, boolean expected) {
assertEquals(expected, StatusEnum.isLegalStateTransition(origStatus, newStatus));
if (expected) {
assertThat(StatusEnum.ourFromStates.get(newStatus), hasItem(origStatus));
assertThat(StatusEnum.ourToStates.get(origStatus), hasItem(newStatus));
} else {
assertThat(StatusEnum.ourFromStates.get(newStatus), not(hasItem(origStatus)));
assertThat(StatusEnum.ourToStates.get(origStatus), not(hasItem(newStatus)));
}
}
@ParameterizedTest
@EnumSource(StatusEnum.class)
public void testCancellableStates(StatusEnum theState) {
assertEquals(StatusEnum.ourFromStates.get(StatusEnum.CANCELLED).contains(theState), theState.isCancellable()
|| theState == StatusEnum.CANCELLED); // hack: isLegalStateTransition() always returns true for self-transition
}
@Test

View File

@ -0,0 +1,37 @@
package ca.uhn.fhir.batch2.model;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.Arrays;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class WorkChunkStatusEnumTest {
@ParameterizedTest
@EnumSource(WorkChunkStatusEnum.class)
void allStatesExceptCOMPLETEDareIncomplete(WorkChunkStatusEnum theEnum) {
if (theEnum == WorkChunkStatusEnum.COMPLETED) {
assertFalse(theEnum.isIncomplete());
} else {
assertTrue(theEnum.isIncomplete());
}
}
@ParameterizedTest
@EnumSource(WorkChunkStatusEnum.class)
void allowedPriorStates_matchesNextStates(WorkChunkStatusEnum theEnum) {
Arrays.stream(WorkChunkStatusEnum.values()).forEach(nextPrior->{
if (nextPrior.getNextStates().contains(theEnum)) {
assertThat("is prior", theEnum.getPriorStates(), hasItem(nextPrior));
} else {
assertThat("is not prior", theEnum.getPriorStates(), not(hasItem(nextPrior)));
}
});
}
}