Prevent chunk from returning to in-progress unless it is errorred, in… (#4417)

* Prevent chunk from returning to in-progress unless it is errorred, in-progress, or queued

* changelog

* Update logger

* Add new test

* Add test
This commit is contained in:
Tadgh 2023-01-09 19:36:06 -08:00 committed by GitHub
parent 84d83f6230
commit 65bf8d47ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 61 additions and 8 deletions

View File

@ -0,0 +1,6 @@
---
type: fix
issue: 4417
jira: SMILE-5405
title: "Fixed a bug with batch2 which could cause previously completed chunks to be set back to in-progress. This could cause a batch job to never complete.
Now, a safeguard to ensure a job can never return to in-progress once it has completed or failed."

View File

@ -29,6 +29,7 @@ import ca.uhn.fhir.batch2.model.MarkWorkChunkAsErrorRequest;
import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository; import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository; import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity; import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
@ -63,7 +64,7 @@ import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isBlank;
public class JpaJobPersistenceImpl implements IJobPersistence { public class JpaJobPersistenceImpl implements IJobPersistence {
private static final Logger ourLog = LoggerFactory.getLogger(JpaJobPersistenceImpl.class); private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
private final IBatch2JobInstanceRepository myJobInstanceRepository; private final IBatch2JobInstanceRepository myJobInstanceRepository;
private final IBatch2WorkChunkRepository myWorkChunkRepository; private final IBatch2WorkChunkRepository myWorkChunkRepository;
@ -104,9 +105,14 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override @Override
@Transactional(propagation = Propagation.REQUIRED) @Transactional(propagation = Propagation.REQUIRED)
public Optional<WorkChunk> fetchWorkChunkSetStartTimeAndMarkInProgress(String theChunkId) { public Optional<WorkChunk> fetchWorkChunkSetStartTimeAndMarkInProgress(String theChunkId) {
myWorkChunkRepository.updateChunkStatusForStart(theChunkId, new Date(), StatusEnum.IN_PROGRESS); int rowsModified = myWorkChunkRepository.updateChunkStatusForStart(theChunkId, new Date(), StatusEnum.IN_PROGRESS, List.of(StatusEnum.QUEUED, StatusEnum.ERRORED, StatusEnum.IN_PROGRESS));
Optional<Batch2WorkChunkEntity> chunk = myWorkChunkRepository.findById(theChunkId); if (rowsModified == 0) {
return chunk.map(t -> toChunk(t, true)); ourLog.info("Attempting to start chunk {} but it was already started.", theChunkId);
return Optional.empty();
} else {
Optional<Batch2WorkChunkEntity> chunk = myWorkChunkRepository.findById(theChunkId);
return chunk.map(t -> toChunk(t, true));
}
} }
@Override @Override

View File

@ -55,8 +55,8 @@ public interface IBatch2WorkChunkRepository extends JpaRepository<Batch2WorkChun
void updateChunkStatusAndIncrementErrorCountForEndError(@Param("id") String theChunkId, @Param("et") Date theEndTime, @Param("em") String theErrorMessage, @Param("status") StatusEnum theInProgress); void updateChunkStatusAndIncrementErrorCountForEndError(@Param("id") String theChunkId, @Param("et") Date theEndTime, @Param("em") String theErrorMessage, @Param("status") StatusEnum theInProgress);
@Modifying @Modifying
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myStartTime = :st WHERE e.myId = :id") @Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myStartTime = :st WHERE e.myId = :id AND e.myStatus IN :startStatuses")
void updateChunkStatusForStart(@Param("id") String theChunkId, @Param("st") Date theStartedTime, @Param("status") StatusEnum theInProgress); int updateChunkStatusForStart(@Param("id") String theChunkId, @Param("st") Date theStartedTime, @Param("status") StatusEnum theInProgress, @Param("startStatuses") List<StatusEnum> theStartStatuses);
@Modifying @Modifying
@Query("DELETE FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId") @Query("DELETE FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId")

View File

@ -17,6 +17,9 @@ import ca.uhn.fhir.util.JsonUtil;
import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
@ -25,6 +28,7 @@ import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -129,6 +133,41 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
}); });
} }
/**
* Returns a set of statuses, and whether they should be successfully picked up and started by a consumer.
* @return
*/
public static List<Arguments> provideStatuses() {
return List.of(
Arguments.of(StatusEnum.QUEUED, true),
Arguments.of(StatusEnum.IN_PROGRESS, true),
Arguments.of(StatusEnum.ERRORED, true),
Arguments.of(StatusEnum.FAILED, false),
Arguments.of(StatusEnum.COMPLETED, false)
);
}
@ParameterizedTest
@MethodSource("provideStatuses")
public void testStartChunkOnlyWorksOnValidChunks(StatusEnum theStatus, boolean theShouldBeStartedByConsumer) {
// Setup
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
BatchWorkChunk batchWorkChunk = new BatchWorkChunk(JOB_DEFINITION_ID, JOB_DEF_VER, TARGET_STEP_ID, instanceId,0, CHUNK_DATA);
String chunkId = mySvc.storeWorkChunk(batchWorkChunk);
Optional<Batch2WorkChunkEntity> byId = myWorkChunkRepository.findById(chunkId);
Batch2WorkChunkEntity entity = byId.get();
entity.setStatus(theStatus);
myWorkChunkRepository.save(entity);
// Execute
Optional<WorkChunk> workChunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId);
// Verify
boolean chunkStarted = workChunk.isPresent();
assertEquals(chunkStarted, theShouldBeStartedByConsumer);
}
@Test @Test
public void testCancelInstance() { public void testCancelInstance() {
JobInstance instance = createInstance(); JobInstance instance = createInstance();

View File

@ -50,14 +50,15 @@ public interface IJobPersistence {
String storeWorkChunk(BatchWorkChunk theBatchWorkChunk); String storeWorkChunk(BatchWorkChunk theBatchWorkChunk);
/** /**
* Fetches a chunk of work from storage, and update the stored status * Fetches a chunk of work from storage, and update the stored status to {@link StatusEnum#IN_PROGRESS}.
* to {@link ca.uhn.fhir.batch2.model.StatusEnum#IN_PROGRESS} * This will only fetch chunks which are currently QUEUED or ERRORRED.
* *
* @param theChunkId The ID, as returned by {@link #storeWorkChunk(BatchWorkChunk theBatchWorkChunk)} * @param theChunkId The ID, as returned by {@link #storeWorkChunk(BatchWorkChunk theBatchWorkChunk)}
* @return The chunk of work * @return The chunk of work
*/ */
Optional<WorkChunk> fetchWorkChunkSetStartTimeAndMarkInProgress(String theChunkId); Optional<WorkChunk> fetchWorkChunkSetStartTimeAndMarkInProgress(String theChunkId);
/** /**
* Store a new job instance. This will be called when a new job instance is being kicked off. * Store a new job instance. This will be called when a new job instance is being kicked off.
* *

View File

@ -392,6 +392,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
verify(myJobPersistence, times(1)).fetchInstances(anyInt(), eq(0)); verify(myJobPersistence, times(1)).fetchInstances(anyInt(), eq(0));
} }
@Test @Test
void triggerMaintenancePass_twoSimultaneousRequests_onlyCallOnce() throws InterruptedException, ExecutionException { void triggerMaintenancePass_twoSimultaneousRequests_onlyCallOnce() throws InterruptedException, ExecutionException {
CountDownLatch simulatedMaintenancePasslatch = new CountDownLatch(1); CountDownLatch simulatedMaintenancePasslatch = new CountDownLatch(1);