Batch2 cleanup (#4779)

Post review batch2 cleanup.

Review fixes and cleanup from #4647

Deleted methods in IJobPersistence unused in prod paths, and replaced their usage in tests.
Lots of docs.
Replace copy-on-write pattern in JobDefinitionRegistry with simple ConcurrentHashMap
Fixed bad mappings from job ERRORED state to external APIs. People think ERRORED is a failed state. ¯\_(ツ)_/¯
Added some spec tests for chunk purging
Deprecated ERRORED. For deleting in 6.8
Lots of plans for 6.8. Too risky for 6.6
This commit is contained in:
michaelabuckley 2023-04-28 15:44:30 -04:00 committed by GitHub
parent e2e5ff6bb8
commit 81854baa02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 280 additions and 285 deletions

View File

@ -4,7 +4,7 @@
title: Batch2 Job Instance state transitions
---
stateDiagram-v2
[*] --> QUEUED : on db create and queued on kakfa
[*] --> QUEUED : on db create and first chunk queued on kakfa
QUEUED --> IN_PROGRESS : on any work-chunk received by worker
%% and (see ca.uhn.fhir.batch2.progress.InstanceProgress.getNewStatus())
state first_step_finished <<choice>>
@ -16,6 +16,7 @@ stateDiagram-v2
in_progress_poll --> ERRORED : no failed but errored chunks
in_progress_poll --> FINALIZE : none failed, gated execution\n last step\n queue REDUCER chunk
in_progress_poll --> IN_PROGRESS : still work to do
in_progress_poll --> CANCELLED : user requested cancel.
%% ERRORED is just like IN_PROGRESS, but it is a one-way trip from IN_PROGRESS to ERRORED.
%% FIXME We could probably delete/merge this state with IS_PROCESS, and use the error count in the UI.
note left of ERRORED
@ -28,11 +29,17 @@ stateDiagram-v2
error_progress_poll --> ERRORED : no failed but errored chunks
error_progress_poll --> FINALIZE : none failed, gated execution\n last step\n queue REDUCER chunk
error_progress_poll --> COMPLETED : 0 failures, errored, or incomplete AND at least 1 chunk complete
error_progress_poll --> CANCELLED : user requested cancel.
state do_report <<choice>>
FINALIZE --> do_reduction: poll util worker marks REDUCER chunk yes or no.
do_reduction --> COMPLETED : success
do_reduction --> FAILED : fail
in_progress_poll --> FAILED : any failed chunks
%% terminal states
COMPLETED --> [*]
FAILED --> [*]
CANCELLED --> [*]
```
```mermaid

View File

@ -53,7 +53,6 @@ import javax.annotation.Nullable;
import javax.persistence.EntityManager;
import javax.persistence.LockModeType;
import javax.persistence.Query;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
@ -320,16 +319,6 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
return statusesForStep.isEmpty() || statusesForStep.equals(Set.of(WorkChunkStatusEnum.COMPLETED));
}
/**
* Note: Not @Transactional because {@link #fetchChunks(String, boolean, int, int, Consumer)} starts a transaction
*/
@Override
public List<WorkChunk> fetchWorkChunksWithoutData(String theInstanceId, int thePageSize, int thePageIndex) {
ArrayList<WorkChunk> chunks = new ArrayList<>();
fetchChunks(theInstanceId, false, thePageSize, thePageIndex, chunks::add);
return chunks;
}
private void fetchChunks(String theInstanceId, boolean theIncludeData, int thePageSize, int thePageIndex, Consumer<WorkChunk> theConsumer) {
myTransactionService
.withSystemRequest()
@ -375,7 +364,6 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
}
@Override
@Transactional
public boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier) {
Batch2JobInstanceEntity instanceEntity = myEntityManager.find(Batch2JobInstanceEntity.class, theInstanceId, LockModeType.PESSIMISTIC_WRITE);
if (null == instanceEntity) {
@ -408,29 +396,15 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void deleteChunksAndMarkInstanceAsChunksPurged(String theInstanceId) {
ourLog.info("Deleting all chunks for instance ID: {}", theInstanceId);
myJobInstanceRepository.updateWorkChunksPurgedTrue(theInstanceId);
myWorkChunkRepository.deleteAllForInstance(theInstanceId);
int updateCount = myJobInstanceRepository.updateWorkChunksPurgedTrue(theInstanceId);
int deleteCount = myWorkChunkRepository.deleteAllForInstance(theInstanceId);
ourLog.debug("Purged {} chunks, and updated {} instance.", deleteCount, updateCount);
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public boolean markInstanceAsCompleted(String theInstanceId) {
int recordsChanged = myJobInstanceRepository.updateInstanceStatus(theInstanceId, StatusEnum.COMPLETED);
return recordsChanged > 0;
}
@Override
public boolean markInstanceAsStatus(String theInstance, StatusEnum theStatusEnum) {
int recordsChanged = myTransactionService
.withSystemRequest()
.execute(()->myJobInstanceRepository.updateInstanceStatus(theInstance, theStatusEnum));
return recordsChanged > 0;
}
@Override
public boolean markInstanceAsStatusWhenStatusIn(String theInstance, StatusEnum theStatusEnum, Set<StatusEnum> thePriorStates) {
int recordsChanged = myJobInstanceRepository.updateInstanceStatus(theInstance, theStatusEnum);
ourLog.debug("Update job {} to status {} if in status {}: {}", theInstance, theStatusEnum, thePriorStates, recordsChanged>0);
public boolean markInstanceAsStatusWhenStatusIn(String theInstanceId, StatusEnum theStatusEnum, Set<StatusEnum> thePriorStates) {
int recordsChanged = myJobInstanceRepository.updateInstanceStatusIfIn(theInstanceId, theStatusEnum, thePriorStates);
ourLog.debug("Update job {} to status {} if in status {}: {}", theInstanceId, theStatusEnum, thePriorStates, recordsChanged>0);
return recordsChanged > 0;
}
@ -440,7 +414,8 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
int recordsChanged = myJobInstanceRepository.updateInstanceCancelled(theInstanceId, true);
String operationString = "Cancel job instance " + theInstanceId;
// TODO MB this is much too detailed to be down here - this should be up at the api layer. Replace with simple enum.
// wipmb For 6.8 - This is too detailed to be down here - this should be up at the api layer.
// Replace with boolean result or ResourceNotFound exception. Build the message up at the ui.
String messagePrefix = "Job instance <" + theInstanceId + ">";
if (recordsChanged > 0) {
return JobOperationResultJson.newSuccess(operationString, messagePrefix + " successfully cancelled.");

View File

@ -33,10 +33,6 @@ import java.util.Set;
public interface IBatch2JobInstanceRepository extends JpaRepository<Batch2JobInstanceEntity, String>, IHapiFhirJpaRepository {
@Modifying
@Query("UPDATE Batch2JobInstanceEntity e SET e.myStatus = :status WHERE e.myId = :id and e.myStatus <> :status")
int updateInstanceStatus(@Param("id") String theInstanceId, @Param("status") StatusEnum theStatus);
@Modifying
@Query("UPDATE Batch2JobInstanceEntity e SET e.myStatus = :status WHERE e.myId = :id and e.myStatus IN ( :prior_states )")
int updateInstanceStatusIfIn(@Param("id") String theInstanceId, @Param("status") StatusEnum theNewState, @Param("prior_states") Set<StatusEnum> thePriorStates);

View File

@ -78,7 +78,7 @@ public interface IBatch2WorkChunkRepository extends JpaRepository<Batch2WorkChun
@Modifying
@Query("DELETE FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId")
void deleteAllForInstance(@Param("instanceId") String theInstanceId);
int deleteAllForInstance(@Param("instanceId") String theInstanceId);
@Query("SELECT e.myId from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId AND e.myStatus = :status")
List<String> fetchAllChunkIdsForStepWithStatus(@Param("instanceId")String theInstanceId, @Param("stepId")String theStepId, @Param("status") WorkChunkStatusEnum theStatus);

View File

@ -105,7 +105,7 @@ public class Batch2WorkChunkEntity implements Serializable {
}
/**
* Projection constructor for no-date path.
* Projection constructor for no-data path.
*/
public Batch2WorkChunkEntity(String theId, int theSequence, String theJobDefinitionId, int theJobDefinitionVersion,
String theInstanceId, String theTargetStepId, WorkChunkStatusEnum theStatus,

View File

@ -18,7 +18,7 @@ import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.data.domain.Pageable;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Date;
@ -40,6 +40,7 @@ class JpaJobPersistenceImplTest {
IBatch2JobInstanceRepository myJobInstanceRepository;
@Mock
IBatch2WorkChunkRepository myWorkChunkRepository;
@SuppressWarnings("unused") // injected into mySvc
@Spy
IHapiTransactionService myTxManager = new NonTransactionalHapiTransactionService();
@InjectMocks
@ -87,21 +88,6 @@ class JpaJobPersistenceImplTest {
assertEquals("Job instance <test-instance-id> was already cancelled. Nothing to do.", result.getMessage());
}
@Test
public void markInstanceAsCompleted_withInstanceId_updatesToCompleted() {
// setup
String jobId = "jobid";
// test
mySvc.markInstanceAsCompleted(jobId);
// verify
ArgumentCaptor<StatusEnum> statusCaptor = ArgumentCaptor.forClass(StatusEnum.class);
verify(myJobInstanceRepository)
.updateInstanceStatus(eq(jobId), statusCaptor.capture());
assertEquals(StatusEnum.COMPLETED, statusCaptor.getValue());
}
@Test
public void deleteChunks_withInstanceId_callsChunkRepoDelete() {
// setup
@ -194,8 +180,8 @@ class JpaJobPersistenceImplTest {
private Batch2JobInstanceEntity createBatch2JobInstanceEntity() {
Batch2JobInstanceEntity entity = new Batch2JobInstanceEntity();
entity.setId("id");
entity.setStartTime(Date.from(LocalDate.of(2000, 1, 2).atStartOfDay().toInstant(ZoneOffset.UTC)));
entity.setEndTime(Date.from(LocalDate.of(2000, 2, 3).atStartOfDay().toInstant(ZoneOffset.UTC)));
entity.setStartTime(Date.from(LocalDateTime.of(2000, 1, 2, 0, 0).toInstant(ZoneOffset.UTC)));
entity.setEndTime(Date.from(LocalDateTime.of(2000, 2, 3, 0, 0).toInstant(ZoneOffset.UTC)));
entity.setStatus(StatusEnum.COMPLETED);
entity.setCancelled(true);
entity.setFastTracking(true);

View File

@ -17,6 +17,7 @@ import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.util.JsonUtil;
import ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Nested;
@ -44,8 +45,6 @@ import java.util.Set;
import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -99,28 +98,6 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
return mySvc.onWorkChunkCreate(batchWorkChunk);
}
@Test
public void testDeleteChunks() {
// Setup
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
for (int i = 0; i < 10; i++) {
storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, i, CHUNK_DATA);
}
// Execute
mySvc.deleteChunksAndMarkInstanceAsChunksPurged(instanceId);
// Verify
runInTransaction(() -> {
assertEquals(1, myJobInstanceRepository.findAll().size());
assertEquals(0, myWorkChunkRepository.findAll().size());
});
}
@Test
public void testStoreAndFetchInstance() {
JobInstance instance = createInstance();
@ -308,41 +285,6 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
}
}
@Test
public void testFetchChunks() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
List<String> ids = new ArrayList<>();
for (int i = 0; i < 10; i++) {
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, i, CHUNK_DATA);
ids.add(id);
}
List<WorkChunk> chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 0);
assertNull(chunks.get(0).getData());
assertNull(chunks.get(1).getData());
assertNull(chunks.get(2).getData());
assertThat(chunks.stream().map(WorkChunk::getId).collect(Collectors.toList()),
contains(ids.get(0), ids.get(1), ids.get(2)));
chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 1);
assertThat(chunks.stream().map(WorkChunk::getId).collect(Collectors.toList()),
contains(ids.get(3), ids.get(4), ids.get(5)));
chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 2);
assertThat(chunks.stream().map(WorkChunk::getId).collect(Collectors.toList()),
contains(ids.get(6), ids.get(7), ids.get(8)));
chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 3);
assertThat(chunks.stream().map(WorkChunk::getId).collect(Collectors.toList()),
contains(ids.get(9)));
chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 4);
assertThat(chunks.stream().map(WorkChunk::getId).collect(Collectors.toList()),
empty());
}
@Test
public void testUpdateTime() {
// Setup
@ -581,7 +523,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
});
List<WorkChunk> chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 100, 0);
List<WorkChunk> chunks = ImmutableList.copyOf(mySvc.fetchAllWorkChunksIterator(instanceId, true));
assertEquals(1, chunks.size());
assertEquals(2, chunks.get(0).getErrorCount());
}
@ -616,19 +558,6 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
});
}
@Test
public void testMarkInstanceAsCompleted() {
String instanceId = mySvc.storeNewInstance(createInstance());
assertTrue(mySvc.markInstanceAsCompleted(instanceId));
assertFalse(mySvc.markInstanceAsCompleted(instanceId));
runInTransaction(() -> {
Batch2JobInstanceEntity entity = myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalArgumentException::new);
assertEquals(StatusEnum.COMPLETED, entity.getStatus());
});
}
@Test
public void markWorkChunksWithStatusAndWipeData_marksMultipleChunksWithStatus_asExpected() {
JobInstance instance = createInstance();
@ -661,7 +590,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
private WorkChunk freshFetchWorkChunk(String chunkId) {
return runInTransaction(() ->
myWorkChunkRepository.findById(chunkId)
.map(e-> JobInstanceUtil.fromEntityToWorkChunk(e))
.map(JobInstanceUtil::fromEntityToWorkChunk)
.orElseThrow(IllegalArgumentException::new));
}

View File

@ -43,13 +43,13 @@ public class BulkExportUtil {
return BulkExportJobStatusEnum.SUBMITTED;
case COMPLETED :
return BulkExportJobStatusEnum.COMPLETE;
case ERRORED:
case IN_PROGRESS:
return BulkExportJobStatusEnum.BUILDING;
default:
ourLog.warn("Unrecognized status {}; treating as FAILED/CANCELLED/ERRORED", status.name());
case FAILED:
case CANCELLED:
case ERRORED:
return BulkExportJobStatusEnum.ERROR;
}
}

View File

@ -239,6 +239,7 @@ public class BulkDataImportProvider {
streamOperationOutcomeResponse(response, msg, "information");
break;
}
case ERRORED:
case IN_PROGRESS: {
response.setStatus(Constants.STATUS_HTTP_202_ACCEPTED);
String msg = "Job was created at " + renderTime(instance.getCreateTime()) +
@ -258,8 +259,7 @@ public class BulkDataImportProvider {
streamOperationOutcomeResponse(response, msg, "information");
break;
}
case FAILED:
case ERRORED: {
case FAILED: {
response.setStatus(Constants.STATUS_HTTP_500_INTERNAL_ERROR);
String msg = "Job is in " + instance.getStatus() + " state with " +
instance.getErrorCount() + " error count. Last error: " + instance.getErrorMessage();

View File

@ -315,9 +315,9 @@ public class BulkDataImportProviderTest {
}
@Test
public void testPollForStatus_ERROR() throws IOException {
public void testPollForStatus_FAILED() throws IOException {
JobInstance jobInfo = new JobInstance()
.setStatus(StatusEnum.ERRORED)
.setStatus(StatusEnum.FAILED)
.setErrorMessage("It failed.")
.setErrorCount(123)
.setCreateTime(parseDate("2022-01-01T12:00:00-04:00"))
@ -336,7 +336,7 @@ public class BulkDataImportProviderTest {
assertEquals("Server Error", response.getStatusLine().getReasonPhrase());
String responseContent = IOUtils.toString(response.getEntity().getContent(), Charsets.UTF_8);
ourLog.info("Response content: {}", responseContent);
assertThat(responseContent, containsString("\"diagnostics\": \"Job is in ERRORED state with 123 error count. Last error: It failed.\""));
assertThat(responseContent, containsString("\"diagnostics\": \"Job is in FAILED state with 123 error count. Last error: It failed.\""));
}
}

View File

@ -38,6 +38,7 @@ import ca.uhn.fhir.util.StopWatch;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobStep2InputType;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobStep3InputType;
import com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
@ -57,21 +58,20 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.emptyString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.theInstance;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
* Specification tests for batch2 storage and event system.
@ -317,42 +317,6 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
}
}
@Test
public void testFetchChunks() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
List<String> ids = new ArrayList<>();
for (int i = 0; i < 10; i++) {
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, i, CHUNK_DATA);
ids.add(id);
}
List<WorkChunk> chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 0);
assertNull(chunks.get(0).getData());
assertNull(chunks.get(1).getData());
assertNull(chunks.get(2).getData());
assertThat(chunks.stream().map(WorkChunk::getId).collect(Collectors.toList()),
contains(ids.get(0), ids.get(1), ids.get(2)));
chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 1);
assertThat(chunks.stream().map(WorkChunk::getId).collect(Collectors.toList()),
contains(ids.get(3), ids.get(4), ids.get(5)));
chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 2);
assertThat(chunks.stream().map(WorkChunk::getId).collect(Collectors.toList()),
contains(ids.get(6), ids.get(7), ids.get(8)));
chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 3);
assertThat(chunks.stream().map(WorkChunk::getId).collect(Collectors.toList()),
contains(ids.get(9)));
chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 4);
assertThat(chunks.stream().map(WorkChunk::getId).collect(Collectors.toList()),
empty());
}
@Test
public void testMarkChunkAsCompleted_Success() {
JobInstance instance = createInstance();
@ -437,7 +401,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
});
List<WorkChunk> chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 100, 0);
List<WorkChunk> chunks = ImmutableList.copyOf(mySvc.fetchAllWorkChunksIterator(instanceId, true));
assertEquals(1, chunks.size());
assertEquals(2, chunks.get(0).getErrorCount());
}
@ -588,6 +552,29 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
}
}
@Test
void testDeleteChunksAndMarkInstanceAsChunksPurged_doesWhatItSays() {
// given
JobDefinition<?> jd = withJobDefinition();
IJobPersistence.CreateResult createResult = newTxTemplate().execute(status->
mySvc.onCreateWithFirstChunk(jd, "{}"));
String instanceId = createResult.jobInstanceId;
for (int i = 0; i < 10; i++) {
storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, i, CHUNK_DATA);
}
JobInstance readback = freshFetchJobInstance(instanceId);
assertFalse(readback.isWorkChunksPurged());
assertTrue(mySvc.fetchAllWorkChunksIterator(instanceId, true).hasNext(), "has chunk");
// when
mySvc.deleteChunksAndMarkInstanceAsChunksPurged(instanceId);
// then
readback = freshFetchJobInstance(instanceId);
assertTrue(readback.isWorkChunksPurged(), "purged set");
assertFalse(mySvc.fetchAllWorkChunksIterator(instanceId, true).hasNext(), "chunks gone");
}
@Test
void testInstanceUpdate_modifierApplied() {
// given

View File

@ -26,7 +26,7 @@ import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.i18n.Msg;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,8 +49,9 @@ import java.util.stream.Stream;
* Some of this is tested in {@link ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest}
* This is a transactional interface, but we have pushed the declaration of calls that have
* {@code @Transactional(propagation = Propagation.REQUIRES_NEW)} down to the implementations since we have a synchronized
* wrapper that was double-createing the NEW transaction.
* wrapper that was double-creating the NEW transaction.
*/
// wipmb For 6.8 - regularize the tx boundary. Probably make them all MANDATORY
public interface IJobPersistence extends IWorkChunkPersistence {
Logger ourLog = LoggerFactory.getLogger(IJobPersistence.class);
@ -70,30 +71,34 @@ public interface IJobPersistence extends IWorkChunkPersistence {
*/
Optional<JobInstance> fetchInstance(String theInstanceId);
default List<JobInstance> fetchInstances(String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable) {
throw new UnsupportedOperationException(Msg.code(2271) + "Unsupported operation in this implementation");
}
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
List<JobInstance> fetchInstances(String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable);
/**
* Fetches any existing jobs matching provided request parameters
*/
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int theStart, int theBatchSize);
/**
* Fetch all instances
*/
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
List<JobInstance> fetchInstances(int thePageSize, int thePageIndex);
/**
* Fetch instances ordered by myCreateTime DESC
*/
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex);
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus(String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex);
/**
* Fetch all job instances for a given job definition id
*/
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
List<JobInstance> fetchInstancesByJobDefinitionId(String theJobDefinitionId, int theCount, int theStart);
/**
@ -101,11 +106,11 @@ public interface IJobPersistence extends IWorkChunkPersistence {
* @param theRequest - the job fetch request
* @return - a page of job instances
*/
default Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest) {
return Page.empty();
}
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest);
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId);
/**
@ -116,9 +121,12 @@ public interface IJobPersistence extends IWorkChunkPersistence {
* @param thePageSize The page size
* @param thePageIndex The page index
*/
List<WorkChunk> fetchWorkChunksWithoutData(String theInstanceId, int thePageSize, int thePageIndex);
default List<WorkChunk> fetchWorkChunksWithoutData(String theInstanceId, int thePageSize, int thePageIndex) {
// for back-compat
// wipmb delete after merge.
Validate.isTrue(false, "Dead path");
return null;
}
/**
* Fetch all chunks for a given instance.
@ -126,22 +134,30 @@ public interface IJobPersistence extends IWorkChunkPersistence {
* @param theWithData - whether or not to include the data
* @return - an iterator for fetching work chunks
*/
Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData);
default Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) {
// for back-compat
// wipmb delete after merge.
Validate.isTrue(false, "Dead path");
return null;
}
/**
* Fetch all chunks with data for a given instance for a given step id - read-only.
*
* @return - a stream for fetching work chunks
*/
@Transactional(propagation = Propagation.MANDATORY, readOnly = true)
Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId);
default Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) {
// for back-compat
// wipmb delete after merge.
Validate.isTrue(false, "Dead path");
return null;
}
/**
* Callback to update a JobInstance within a locked transaction.
* Return true from the callback if the record write should continue, or false if
* the change should be discarded.
*/
@FunctionalInterface
interface JobInstanceUpdateCallback {
/**
* Modify theInstance within a write-lock transaction.
@ -152,15 +168,18 @@ public interface IJobPersistence extends IWorkChunkPersistence {
}
/**
* Goofy hack for now to create a tx boundary.
* Brute-force hack for now to create a tx boundary - takes a write-lock on the instance
* while the theModifier runs.
* Keep the callback short to keep the lock-time short.
* If the status is changing, use {@link ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater}
* instead to ensure state-change callbacks are invoked properly.
* inside theModifier to ensure state-change callbacks are invoked properly.
*
* @param theInstanceId the id of the instance to modify
* @param theModifier a hook to modify the instance - return true to finish the record write
* @return true if the instance was modified
*/
// todo mb consider changing callers to actual objects we can unit test.
// wipmb For 6.8 - consider changing callers to actual objects we can unit test
@Transactional
boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier);
/**
@ -168,6 +187,7 @@ public interface IJobPersistence extends IWorkChunkPersistence {
*
* @param theInstanceId The instance ID
*/
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
void deleteInstanceAndChunks(String theInstanceId);
/**
@ -175,6 +195,7 @@ public interface IJobPersistence extends IWorkChunkPersistence {
*
* @param theInstanceId The instance ID
*/
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
void deleteChunksAndMarkInstanceAsChunksPurged(String theInstanceId);
/**
@ -183,9 +204,20 @@ public interface IJobPersistence extends IWorkChunkPersistence {
* @param theInstanceId The instance ID
* @return true if the instance status changed
*/
boolean markInstanceAsCompleted(String theInstanceId);
default boolean markInstanceAsCompleted(String theInstanceId) {
// for back-compat
// wipmb delete after merge.
Validate.isTrue(false, "Dead path");
return false;
}
default boolean markInstanceAsStatus(String theInstance, StatusEnum theStatusEnum) {
// wipmb delete after merge.
// for back-compat
Validate.isTrue(false, "Dead path");
return false;
}
boolean markInstanceAsStatus(String theInstance, StatusEnum theStatusEnum);
@Transactional(propagation = Propagation.MANDATORY)
boolean markInstanceAsStatusWhenStatusIn(String theInstance, StatusEnum theStatusEnum, Set<StatusEnum> thePriorStates);
@ -195,8 +227,10 @@ public interface IJobPersistence extends IWorkChunkPersistence {
*
* @param theInstanceId The instance ID
*/
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
JobOperationResultJson cancelInstance(String theInstanceId);
@Transactional(propagation = Propagation.MANDATORY)
void updateInstanceUpdateTime(String theInstanceId);
@ -207,8 +241,6 @@ public interface IJobPersistence extends IWorkChunkPersistence {
*
* @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md
*/
///////
// job events
class CreateResult {
public final String jobInstanceId;
@ -228,7 +260,18 @@ public interface IJobPersistence extends IWorkChunkPersistence {
}
}
/**
* Create the job, and it's first chunk.
*
* We create the chunk atomically with the job so that we never have a state with
* zero unfinished chunks left until the job is complete. Makes the maintenance run simpler.
*
* @param theJobDefinition what kind of job
* @param theParameters params for the job
* @return the ids of the instance and first chunk
*/
@Nonnull
@Transactional(propagation = Propagation.MANDATORY)
default CreateResult onCreateWithFirstChunk(JobDefinition<?> theJobDefinition, String theParameters) {
JobInstance instance = JobInstance.fromJobDefinition(theJobDefinition);
instance.setParameters(theParameters);
@ -249,6 +292,7 @@ public interface IJobPersistence extends IWorkChunkPersistence {
* Ignore other prior states.
* @return did the transition happen
*/
@Transactional(propagation = Propagation.MANDATORY)
default boolean onChunkDequeued(String theJobInstanceId) {
return markInstanceAsStatusWhenStatusIn(theJobInstanceId, StatusEnum.IN_PROGRESS, Collections.singleton(StatusEnum.QUEUED));
}

View File

@ -34,7 +34,13 @@ import java.util.stream.Stream;
/**
* Work Chunk api, implementing the WorkChunk state machine.
* Test specification is in {@link ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest}
* Test specification is in {@link ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest}.
* Note on transaction boundaries: these are messy - some methods expect an existing transaction and are
* marked with {@code @Transactional(propagation = Propagation.MANDATORY)}, some will create a tx as needed
* and are marked {@code @Transactional(propagation = Propagation.REQUIRED)}, and some run in a NEW transaction
* and are not marked on the interface, but on the implementors instead. We had a bug where interface
* methods marked {@code @Transactional(propagation = Propagation.REQUIRES_NEW)} were starting two (2!)
* transactions because of our synchronized wrapper.
*
* @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md
*/
@ -76,6 +82,7 @@ public interface IWorkChunkPersistence {
* @param theParameters - the error message and max retry count.
* @return - the new status - ERRORED or ERRORED, depending on retry count
*/
// on impl - @Transactional(propagation = Propagation.REQUIRES_NEW)
WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters);
/**
@ -108,23 +115,12 @@ public interface IWorkChunkPersistence {
@Transactional(propagation = Propagation.MANDATORY)
void markWorkChunksWithStatusAndWipeData(String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMsg);
/**
* Fetches all chunks for a given instance, without loading the data
*
* @param theInstanceId The instance ID
* @param thePageSize The page size
* @param thePageIndex The page index
*/
List<WorkChunk> fetchWorkChunksWithoutData(String theInstanceId, int thePageSize, int thePageIndex);
/**
* Fetch all chunks for a given instance.
*
* @param theInstanceId - instance id
* @param theWithData - whether or not to include the data
* @param theWithData - whether to include the data - not needed for stats collection
* @return - an iterator for fetching work chunks
* wipmb replace with a stream and a consumer in 6.8
*/
Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData);
@ -134,6 +130,7 @@ public interface IWorkChunkPersistence {
*
* @return - a stream for fetching work chunks
*/
@Transactional(propagation = Propagation.MANDATORY, readOnly = true)
Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId);
/**

View File

@ -176,6 +176,7 @@ public class JobCoordinatorImpl implements IJobCoordinator {
return myJobQuerySvc.fetchAllInstances(theFetchRequest);
}
// wipmb For 6.8 - Clarify this interface. We currently return a JobOperationResultJson, and don't throw ResourceNotFoundException
@Override
public JobOperationResultJson cancelInstance(String theInstanceId) throws ResourceNotFoundException {
return myJobPersistence.cancelInstance(theInstanceId);

View File

@ -33,7 +33,6 @@ import org.slf4j.Logger;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -41,13 +40,13 @@ import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
public class JobDefinitionRegistry {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
// TODO MB is this safe? Can ue use ConcurrentHashMap instead?
private volatile Map<String, NavigableMap<Integer, JobDefinition<?>>> myJobs = new HashMap<>();
private final Map<String, NavigableMap<Integer, JobDefinition<?>>> myJobDefinitions = new ConcurrentHashMap<>();
/**
* Add a job definition only if it is not registered
@ -78,8 +77,7 @@ public class JobDefinitionRegistry {
}
}
Map<String, NavigableMap<Integer, JobDefinition<?>>> newJobsMap = cloneJobsMap();
NavigableMap<Integer, JobDefinition<?>> versionMap = newJobsMap.computeIfAbsent(jobDefinitionId, t -> new TreeMap<>());
NavigableMap<Integer, JobDefinition<?>> versionMap = myJobDefinitions.computeIfAbsent(jobDefinitionId, t -> new TreeMap<>());
if (versionMap.containsKey(theDefinition.getJobDefinitionVersion())) {
if (versionMap.get(theDefinition.getJobDefinitionVersion()) == theDefinition) {
ourLog.warn("job[{}] version: {} already registered. Not registering again.", jobDefinitionId, theDefinition.getJobDefinitionVersion());
@ -88,37 +86,23 @@ public class JobDefinitionRegistry {
throw new ConfigurationException(Msg.code(2047) + "Multiple definitions for job[" + jobDefinitionId + "] version: " + theDefinition.getJobDefinitionVersion());
}
versionMap.put(theDefinition.getJobDefinitionVersion(), theDefinition);
myJobs = newJobsMap;
}
public synchronized void removeJobDefinition(@Nonnull String theDefinitionId, int theVersion) {
Validate.notBlank(theDefinitionId);
Validate.isTrue(theVersion >= 1);
Map<String, NavigableMap<Integer, JobDefinition<?>>> newJobsMap = cloneJobsMap();
NavigableMap<Integer, JobDefinition<?>> versionMap = newJobsMap.get(theDefinitionId);
NavigableMap<Integer, JobDefinition<?>> versionMap = myJobDefinitions.get(theDefinitionId);
if (versionMap != null) {
versionMap.remove(theVersion);
if (versionMap.isEmpty()) {
newJobsMap.remove(theDefinitionId);
myJobDefinitions.remove(theDefinitionId);
}
}
myJobs = newJobsMap;
}
@Nonnull
private Map<String, NavigableMap<Integer, JobDefinition<?>>> cloneJobsMap() {
Map<String, NavigableMap<Integer, JobDefinition<?>>> newJobsMap = new HashMap<>();
for (Map.Entry<String, NavigableMap<Integer, JobDefinition<?>>> nextEntry : myJobs.entrySet()) {
newJobsMap.put(nextEntry.getKey(), new TreeMap<>(nextEntry.getValue()));
}
return newJobsMap;
}
public Optional<JobDefinition<?>> getLatestJobDefinition(@Nonnull String theJobDefinitionId) {
NavigableMap<Integer, JobDefinition<?>> versionMap = myJobs.get(theJobDefinitionId);
NavigableMap<Integer, JobDefinition<?>> versionMap = myJobDefinitions.get(theJobDefinitionId);
if (versionMap == null || versionMap.isEmpty()) {
return Optional.empty();
}
@ -126,7 +110,7 @@ public class JobDefinitionRegistry {
}
public Optional<JobDefinition<?>> getJobDefinition(@Nonnull String theJobDefinitionId, int theJobDefinitionVersion) {
NavigableMap<Integer, JobDefinition<?>> versionMap = myJobs.get(theJobDefinitionId);
NavigableMap<Integer, JobDefinition<?>> versionMap = myJobDefinitions.get(theJobDefinitionId);
if (versionMap == null || versionMap.isEmpty()) {
return Optional.empty();
}
@ -155,14 +139,14 @@ public class JobDefinitionRegistry {
* @return a list of Job Definition Ids in alphabetical order
*/
public List<String> getJobDefinitionIds() {
return myJobs.keySet()
return myJobDefinitions.keySet()
.stream()
.sorted()
.collect(Collectors.toList());
}
public boolean isEmpty() {
return myJobs.isEmpty();
return myJobDefinitions.isEmpty();
}
@SuppressWarnings("unchecked")
@ -171,6 +155,6 @@ public class JobDefinitionRegistry {
}
public Collection<Integer> getJobDefinitionVersions(String theDefinitionId) {
return myJobs.getOrDefault(theDefinitionId, ImmutableSortedMap.of()).keySet();
return myJobDefinitions.getOrDefault(theDefinitionId, ImmutableSortedMap.of()).keySet();
}
}

View File

@ -86,6 +86,7 @@ public class JobStepExecutor<PT extends IModelJson, IT extends IModelJson, OT ex
});
}
// This flag could be stale, but checking for fast-track is a safe operation.
if (myInstance.isFastTracking()) {
handleFastTracking(stepExecutorOutput.getDataSink());
}
@ -94,6 +95,8 @@ public class JobStepExecutor<PT extends IModelJson, IT extends IModelJson, OT ex
private void handleFastTracking(BaseDataSink<PT, IT, OT> theDataSink) {
if (theDataSink.getWorkChunkCount() <= 1) {
ourLog.debug("Gated job {} step {} produced exactly one chunk: Triggering a maintenance pass.", myDefinition.getJobDefinitionId(), myCursor.currentStep.getStepId());
// wipmb 6.8 either delete fast-tracking, or narrow this call to just this instance and step
// This runs full maintenance for EVERY job as each chunk completes in a fast tracked job. That's a LOT of work.
boolean success = myJobMaintenanceService.triggerMaintenancePass();
if (!success) {
myJobPersistence.updateInstance(myInstance.getInstanceId(), instance-> {

View File

@ -28,7 +28,6 @@ import ca.uhn.fhir.batch2.model.ChunkOutcome;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
@ -52,6 +51,7 @@ import org.springframework.transaction.annotation.Propagation;
import javax.annotation.Nonnull;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Timer;
@ -63,6 +63,10 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import static ca.uhn.fhir.batch2.model.StatusEnum.ERRORED;
import static ca.uhn.fhir.batch2.model.StatusEnum.FINALIZE;
import static ca.uhn.fhir.batch2.model.StatusEnum.IN_PROGRESS;
public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorService, IHasScheduledJobs {
public static final String SCHEDULED_JOB_ID = ReductionStepExecutorScheduledJob.class.getName();
private static final Logger ourLog = LoggerFactory.getLogger(ReductionStepExecutorServiceImpl.class);
@ -154,6 +158,8 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
JobDefinitionStep<PT, IT, OT> step = theJobWorkCursor.getCurrentStep();
// wipmb For 6.8 - this runs four tx. That's at least 2 too many
// combine the fetch and the case statement. Use optional for the boolean.
JobInstance instance = executeInTransactionWithSynchronization(() ->
myJobPersistence.fetchInstance(theInstanceId).orElseThrow(() -> new InternalErrorException("Unknown instance: " + theInstanceId)));
@ -162,7 +168,9 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
case IN_PROGRESS:
case ERRORED:
// this will take a write lock on the JobInstance, preventing duplicates.
if (myJobPersistence.markInstanceAsStatus(instance.getInstanceId(), StatusEnum.FINALIZE)) {
boolean changed = executeInTransactionWithSynchronization(() ->
myJobPersistence.markInstanceAsStatusWhenStatusIn(instance.getInstanceId(), FINALIZE, EnumSet.of(IN_PROGRESS, ERRORED)));
if (changed) {
ourLog.info("Job instance {} has been set to FINALIZE state - Beginning reducer step", instance.getInstanceId());
shouldProceed = true;
}
@ -178,7 +186,7 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
if (!shouldProceed) {
ourLog.warn(
"JobInstance[{}] should not be finalized at this time. In memory status is {}. Reduction step will not rerun!"
+ " This could be a long running reduction job resulting in the processed msg not being acknowledge,"
+ " This could be a long running reduction job resulting in the processed msg not being acknowledged,"
+ " or the result of a failed process or server restarting.",
instance.getInstanceId(),
instance.getStatus()
@ -189,7 +197,7 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
PT parameters = instance.getParameters(theJobWorkCursor.getJobDefinition().getParametersType());
IReductionStepWorker<PT, IT, OT> reductionStepWorker = (IReductionStepWorker<PT, IT, OT>) step.getJobStepWorker();
instance.setStatus(StatusEnum.FINALIZE);
instance.setStatus(FINALIZE);
boolean defaultSuccessValue = true;
ReductionStepChunkProcessingResponse response = new ReductionStepChunkProcessingResponse(defaultSuccessValue);

View File

@ -194,7 +194,7 @@ class WorkChannelMessageHandler implements MessageHandler {
ourLog.info("Received work notification for {}", workNotification);
// There are three paths through this code:
// 1. Normal execution. We validate, load, update statuses, all in a tx. Then we proces the chunk.
// 1. Normal execution. We validate, load, update statuses, all in a tx. Then we process the chunk.
// 2. Discard chunk. If some validation fails (e.g. no chunk with that id), we log and discard the chunk.
// Probably a db rollback, with a stale queue.
// 3. Fail and retry. If we throw an exception out of here, Spring will put the queue message back, and redeliver later.
@ -238,7 +238,7 @@ class WorkChannelMessageHandler implements MessageHandler {
if (setupProcessing.isEmpty()) {
// If any setup failed, roll back the chunk and instance status changes.
ourLog.debug("WorkChunk setup tx rollback");
ourLog.debug("WorkChunk setup failed - rollback tx");
theTransactionStatus.setRollbackOnly();
}
// else COMMIT the work.

View File

@ -133,17 +133,18 @@ public class JobInstanceProcessor {
break;
case CANCELLED:
purgeExpiredInstance(theInstance);
//wipmb For 6.8 - Are we deliberately not purging chunks for cancelled jobs? This is a very complicated way to say that.
return;
}
if (theInstance.isFinished() && !theInstance.isWorkChunksPurged()) {
myJobPersistence.deleteChunksAndMarkInstanceAsChunksPurged(theInstance.getInstanceId());
// update final statistics.
// wipmb For 6.8 - do we need to run stats again? If the status changed to finished, then we just ran them above.
InstanceProgress progress = myJobInstanceProgressCalculator.calculateInstanceProgress(theInstance.getInstanceId());
myJobPersistence.updateInstance(theInstance.getInstanceId(), instance->{
progress.updateInstance(instance);
instance.setWorkChunksPurged(true);
return true;
});
}
@ -222,6 +223,7 @@ public class JobInstanceProcessor {
});
if (!changed) {
// we collided with another maintenance job.
ourLog.warn("Skipping gate advance to {} for instance {} - already advanced.", nextStepId, instanceId);
return;
}

View File

@ -395,10 +395,10 @@ public class JobInstance implements IModelJson, IJobInstance {
switch (getStatus()) {
case IN_PROGRESS:
case ERRORED:
case FINALIZE:
return true;
case COMPLETED:
case ERRORED:
case QUEUED:
case FAILED:
case CANCELLED:

View File

@ -33,6 +33,8 @@ import java.util.Set;
/**
* Status of a Batch2 Job Instance.
* The initial state is QUEUED.
* The terminal states are COMPLETED, CANCELLED, or FAILED.
*/
public enum StatusEnum {
@ -57,9 +59,13 @@ public enum StatusEnum {
COMPLETED(false, true, false),
/**
* Task execution resulted in an error but the error may be transient (or transient status is unknown).
* Retrying may result in success.
* Chunk execution resulted in an error but the error may be transient (or transient status is unknown).
* The job may still complete successfully.
* @deprecated this is basically a synonym for IN_PROGRESS - display should use the presence of an error message on the instance
* to indicate that there has been a transient error.
*/
@Deprecated(since = "6.6")
// wipmb For 6.8 - remove all inbound transitions, and allow transition back to IN_PROGRESS. use message in ui to show danger status
ERRORED(true, false, true),
/**
@ -69,7 +75,7 @@ public enum StatusEnum {
FAILED(true, true, false),
/**
* Task has been cancelled.
* Task has been cancelled by the user.
*/
CANCELLED(true, true, false);
@ -183,10 +189,8 @@ public enum StatusEnum {
canTransition = true;
break;
case IN_PROGRESS:
canTransition = theNewStatus != QUEUED;
break;
case ERRORED:
canTransition = theNewStatus == FAILED || theNewStatus == COMPLETED || theNewStatus == CANCELLED || theNewStatus == ERRORED;
canTransition = theNewStatus != QUEUED;
break;
case CANCELLED:
// terminal state cannot transition
@ -206,7 +210,8 @@ public enum StatusEnum {
}
if (!canTransition) {
ourLog.trace("Tried to execute an illegal state transition. [origStatus={}, newStatus={}]", theOrigStatus, theNewStatus);
// we have a bug?
ourLog.warn("Tried to execute an illegal state transition. [origStatus={}, newStatus={}]", theOrigStatus, theNewStatus);
}
return canTransition;
}

View File

@ -25,10 +25,13 @@ import java.util.Set;
/**
* States for the {@link WorkChunk} state machine.
* The initial state is QUEUED.
* The terminal states are FAILED, COMPLETED.
*
* @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md
*/
public enum WorkChunkStatusEnum {
// TODO MB: missing a state - WAITING for gated. it would simplify stats - not in this MR - later
// wipmb For 6.8 Add WAITING for gated, and READY for in db, but not yet sent to channel.
QUEUED, IN_PROGRESS, ERRORED, FAILED, COMPLETED;
private static final EnumMap<WorkChunkStatusEnum, Set<WorkChunkStatusEnum>> ourPriorStates;

View File

@ -21,6 +21,8 @@
/**
* Our distributed batch processing library.
*
* <p>
*
* A running job corresponds to a {@link ca.uhn.fhir.batch2.model.JobInstance}.
* Jobs are modeled as a sequence of steps, operating on {@link ca.uhn.fhir.batch2.model.WorkChunk}s
* containing json data. The first step is special -- it is empty, and the data is assumed to be the job parameters.
@ -28,12 +30,82 @@
* Each step defines the input chunk type, the output chunk type, and a procedure that receives the input and emits 0 or more outputs.
* We have a special kind of final step called a reducer, which corresponds to the stream Collector concept.
*
* </p><p>
*
* Job instances and work chunks are stored in the database. Work is distributed to workers via queues.
* The queue message is just the ids of the chunk (chunk id, step id, instance id, job definition id, etc.).
* The worker receives the notification from Spring Messaging ({@link ca.uhn.fhir.batch2.coordinator.WorkChannelMessageHandler#handleMessage}),
* fetches the data from the store and processes the data using the handler defined in for the step.
* The worker updates chunk state as appropriate. It may also update the job instance state.
*
* </p><p>
*
* Once a minute, Quartz runs the {@link ca.uhn.fhir.batch2.api.IJobMaintenanceService#runMaintenancePass() maintenance pass}.
* This loop inspects every job, and dispatches to {@link ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor#process() the JobInstanceProcessor}.
* The JobInstanceProcessor counts the outstanding chunks for a job, and uses these statistics to fire the working state transitions (below).
*
* </p><p>
*
* Job and chunk processing follow state machines described {@link hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md}
* Chunks have a simple {@link ca.uhn.fhir.batch2.model.WorkChunkStatusEnum state system} with states
* QUEUED, IN_PROGRESS, ERRORED, FAILED, COMPLETED.
* The initial state is QUEUED, and the final states are FAILED, and COMPLETED:
*
* <ul>
* <li> Chunks are created QUEUED (NB - should be READY or WAITING) and notification is posted to the channel for non-gated steps.</li>
* <li>
* Workers receive a notification and advance QUEUED->IN_PROGRESS.
* {@link ca.uhn.fhir.batch2.api.IWorkChunkPersistence#onWorkChunkDequeue(String)}
* </li>
* <li>
* On normal execution, the chunk advances IN_PROGRESS->COMPLETED {@link ca.uhn.fhir.batch2.api.IWorkChunkPersistence#onWorkChunkCompletion} </li>
* <li> On a retryiable error, IN_PROGRESS->ERROR with an error message and the chunk is put back on the queue. {@link ca.uhn.fhir.batch2.api.IWorkChunkPersistence#onWorkChunkError} </li>
* <li> On a hard failure, or too many errors, IN_PROGRESS->FAILED with the error message. {@link ca.uhn.fhir.batch2.api.IWorkChunkPersistence#onWorkChunkFailed} </li>
* </ul>
*
* </p><p>
*
* Jobs have a state machine with {@link ca.uhn.fhir.batch2.model.StatusEnum states}:
* QUEUED, IN_PROGRESS, ERRORED, COMPLETED, FINALIZE, and FAILED.
* ERRORED is a near synonym for IN_PROGRESS and shows that a chunk has shown a transient error during this job.
* Hard failures move to final state FAILED.
* The initial state is QUEUED, and the terminal states are COMPLETED, CANCELLED, and FAILED.
* Most transitions happen during the maintenance run, but some are triggered by the worker.
*
* <ul>
* <li> Jobs are created in QUEUED state, along with their first chunk. The chunk is also sent to the channel.
* {@link ca.uhn.fhir.batch2.coordinator.JobCoordinatorImpl#startInstance}
* </li>
* <li> When workers dequeue a chunk, they trigger a QUEUED->IN_PROGRESS transition to report status.
* {@link ca.uhn.fhir.batch2.coordinator.WorkChannelMessageHandler.MessageProcess#updateAndValidateJobStatus}
* </li>
* <li> As a special case, if the first chunk produces no children, the job advances IN_PROGRESS->COMPLETE
* {@link ca.uhn.fhir.batch2.coordinator.JobStepExecutor#executeStep()}
* </li>
* <li> Other transitions happen during maintenance runs. If a job is running, and the user has requested cancellation,
* the job transitions (IN_PROGRESS or ERRORED) -> CANCELLED.
* </li>
* <li> Then the processor looks at chunk statuses. If any chunks are FAILED, then the job moves
* (IN_PROGRESS or ERRORED) -> FAILED. {@link ca.uhn.fhir.batch2.progress.InstanceProgress#calculateNewStatus}
* </li>
* <li> If any chunk is currently in {@link ca.uhn.fhir.batch2.model.WorkChunkStatusEnum#ERRORED ERRORED} state,
* the job progresses IN_PROGRESS->ERRORED, and the error message is copied over.
* </li>
* <li> If all chunks are COMPLETED, then the job moves (IN_PROGRESS or ERRORED) -> COMPLETED.
* </li>
* <li> Gated jobs that have a reducer step will transtion (IN_PROGRESS or ERRORED) -> FINALIZE when
* starting the reduction step
* {@link ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor#triggerGatedExecutions}
* </li>
* </ul>
*
* Design gaps:
* <ul>
* <li> If the maintenance job is killed while sending notifications about
* a gated step advance, remaining chunks will never be notified. A CREATED state before QUEUED would catch this.
* a gated step advance, remaining chunks will never be notified. A READY state before QUEUED would catch this.
* A WAITING state for gated chunks will simplify that handling.
* </li>
* <li> A running reduction step will not restart if the server is killed. </li>
* </ul>
*/
package ca.uhn.fhir.batch2;

View File

@ -165,10 +165,6 @@ public class InstanceProgress {
}
}
public boolean changed() {
return (myIncompleteChunkCount + myCompleteChunkCount + myErroredChunkCount + myErrorCountForAllStatuses) > 0;
}
@Override
public String toString() {
ToStringBuilder builder = new ToStringBuilder(this)

View File

@ -52,15 +52,12 @@ public class JobInstanceProgressCalculator {
myJobPersistence.updateInstance(theInstanceId, currentInstance->{
instanceProgress.updateInstance(currentInstance);
if (instanceProgress.changed() || currentInstance.getStatus() == StatusEnum.IN_PROGRESS) {
if (currentInstance.getCombinedRecordsProcessed() > 0) {
ourLog.info("Job {} of type {} has status {} - {} records processed ({}/sec) - ETA: {}", currentInstance.getInstanceId(), currentInstance.getJobDefinitionId(), currentInstance.getStatus(), currentInstance.getCombinedRecordsProcessed(), currentInstance.getCombinedRecordsProcessedPerSecond(), currentInstance.getEstimatedTimeRemaining());
} else {
ourLog.info("Job {} of type {} has status {} - {} records processed", currentInstance.getInstanceId(), currentInstance.getJobDefinitionId(), currentInstance.getStatus(), currentInstance.getCombinedRecordsProcessed());
}
ourLog.debug(instanceProgress.toString());
}
if (instanceProgress.hasNewStatus()) {
myJobInstanceStatusUpdater.updateInstanceStatus(currentInstance, instanceProgress.getNewStatus());
@ -84,6 +81,7 @@ public class JobInstanceProgressCalculator {
instanceProgress.addChunk(next);
}
// wipmb separate status update from stats collection in 6.8
instanceProgress.calculateNewStatus();
return instanceProgress;

View File

@ -71,8 +71,8 @@ public class JobInstanceStatusUpdater {
case CANCELLED:
invokeCompletionHandler(theJobInstance, definition, definition.getErrorHandler());
break;
case ERRORED:
case QUEUED:
case ERRORED:
case IN_PROGRESS:
case FINALIZE:
default:

View File

@ -144,7 +144,6 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
assertEquals(PASSWORD_VALUE, params.getPassword());
verify(myJobInstancePersister, times(1)).onWorkChunkCompletion(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0));
verify(myJobInstancePersister, times(0)).fetchWorkChunksWithoutData(any(), anyInt(), anyInt());
verify(myBatchJobSender, times(2)).sendWorkChannelMessage(any());
}

View File

@ -26,6 +26,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
@ -37,6 +38,8 @@ import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessorTest.StepOutputDa
import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessorTest.TestJobParameters;
import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessorTest.createWorkChunk;
import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessorTest.getTestJobInstance;
import static ca.uhn.fhir.batch2.model.StatusEnum.ERRORED;
import static ca.uhn.fhir.batch2.model.StatusEnum.IN_PROGRESS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -64,7 +67,7 @@ public class ReductionStepExecutorServiceImplTest {
// @Mock
// private JobDefinitionStep<TestJobParameters, StepInputData, StepOutputData> myCurrentStep;
private ReductionStepExecutorServiceImpl mySvc;
private JobDefinitionRegistry myJobDefinitionRegistry = new JobDefinitionRegistry();
private final JobDefinitionRegistry myJobDefinitionRegistry = new JobDefinitionRegistry();
@BeforeEach
public void before() {
@ -89,7 +92,7 @@ public class ReductionStepExecutorServiceImplTest {
when(workCursor.getCurrentStep()).thenReturn((JobDefinitionStep<TestJobParameters, StepInputData, StepOutputData>) createJobDefinition().getSteps().get(1));
when(workCursor.getJobDefinition()).thenReturn(createJobDefinition());
when(myJobPersistence.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(jobInstance));
when(myJobPersistence.markInstanceAsStatus(eq(INSTANCE_ID), eq(StatusEnum.FINALIZE))).thenReturn(true);
when(myJobPersistence.markInstanceAsStatusWhenStatusIn(INSTANCE_ID, StatusEnum.FINALIZE, EnumSet.of(StatusEnum.IN_PROGRESS, StatusEnum.ERRORED))).thenReturn(true);
when(myJobPersistence.fetchAllWorkChunksForStepStream(eq(INSTANCE_ID), eq(REDUCTION_STEP_ID)))
.thenReturn(chunks.stream());
when(myReductionStepWorker.consume(any(ChunkExecutionDetails.class)))
@ -142,7 +145,7 @@ public class ReductionStepExecutorServiceImplTest {
when(workCursor.getCurrentStep()).thenReturn((JobDefinitionStep<TestJobParameters, StepInputData, StepOutputData>) createJobDefinition().getSteps().get(1));
when(workCursor.getJobDefinition()).thenReturn(createJobDefinition());
when(myJobPersistence.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(jobInstance));
when(myJobPersistence.markInstanceAsStatus(eq(INSTANCE_ID), eq(StatusEnum.FINALIZE))).thenReturn(true);
when(myJobPersistence.markInstanceAsStatusWhenStatusIn(INSTANCE_ID, StatusEnum.FINALIZE, EnumSet.of(IN_PROGRESS, ERRORED))).thenReturn(true);
when(myJobPersistence.fetchAllWorkChunksForStepStream(eq(INSTANCE_ID), eq(REDUCTION_STEP_ID)))
.thenReturn(chunks.stream());
when(myReductionStepWorker.consume(any(ChunkExecutionDetails.class)))
@ -193,7 +196,7 @@ public class ReductionStepExecutorServiceImplTest {
when(workCursor.getJobDefinition()).thenReturn(createJobDefinition());
when(myJobPersistence.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(jobInstance));
when(myJobPersistence.fetchAllWorkChunksForStepStream(eq(INSTANCE_ID), eq(REDUCTION_STEP_ID))).thenReturn(chunks.stream());
when(myJobPersistence.markInstanceAsStatus(eq(INSTANCE_ID), eq(StatusEnum.FINALIZE))).thenReturn(true);
when(myJobPersistence.markInstanceAsStatusWhenStatusIn(INSTANCE_ID, StatusEnum.FINALIZE, EnumSet.of(StatusEnum.IN_PROGRESS, StatusEnum.ERRORED))).thenReturn(true);
doThrow(new RuntimeException("This is an error")).when(myReductionStepWorker).consume(any(ChunkExecutionDetails.class));
// test

View File

@ -52,7 +52,7 @@ class StatusEnumTest {
"CANCELLED, FAILED, false",
"ERRORED, QUEUED, false",
"ERRORED, IN_PROGRESS, false",
"ERRORED, IN_PROGRESS, true",
"ERRORED, COMPLETED, true",
"ERRORED, CANCELLED, true",
"ERRORED, ERRORED, true",