Use java event names for work chunk transitions. (#4658)

Change work-chunk creation to be first event in state machine.
This commit is contained in:
michaelabuckley 2023-03-21 13:53:16 -04:00 committed by GitHub
parent ece6661120
commit 4b3405a456
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 207 additions and 128 deletions

View File

@ -48,10 +48,10 @@ stateDiagram-v2
state FAILED
state COMPLETED
direction LR
[*] --> QUEUED : on store
[*] --> QUEUED : on create
%% worker processing states
QUEUED --> on_receive : on receive by worker
QUEUED --> on_receive : on deque by worker
on_receive --> IN_PROGRESS : start execution
IN_PROGRESS --> execute: execute

View File

@ -21,13 +21,13 @@ package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.JobOperationResultJson;
import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk;
import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
@ -90,7 +90,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
@Transactional(propagation = Propagation.REQUIRED)
public String storeWorkChunk(BatchWorkChunk theBatchWorkChunk) {
public String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) {
Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity();
entity.setId(UUID.randomUUID().toString());
entity.setSequence(theBatchWorkChunk.sequence);
@ -108,7 +108,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
@Transactional(propagation = Propagation.REQUIRED)
public Optional<WorkChunk> fetchWorkChunkSetStartTimeAndMarkInProgress(String theChunkId) {
public Optional<WorkChunk> onWorkChunkDequeue(String theChunkId) {
int rowsModified = myWorkChunkRepository.updateChunkStatusForStart(theChunkId, new Date(), WorkChunkStatusEnum.IN_PROGRESS, List.of(WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.ERRORED, WorkChunkStatusEnum.IN_PROGRESS));
if (rowsModified == 0) {
ourLog.info("Attempting to start chunk {} but it was already started.", theChunkId);
@ -239,7 +239,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public WorkChunkStatusEnum workChunkErrorEvent(WorkChunkErrorEvent theParameters) {
public WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) {
String chunkId = theParameters.getChunkId();
String errorMessage = truncateErrorMessage(theParameters.getErrorMsg());
int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED);
@ -264,7 +264,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
@Transactional
public void markWorkChunkAsFailed(String theChunkId, String theErrorMessage) {
public void onWorkChunkFailed(String theChunkId, String theErrorMessage) {
ourLog.info("Marking chunk {} as failed with message: {}", theChunkId, theErrorMessage);
String errorMessage = truncateErrorMessage(theErrorMessage);
myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(theChunkId, new Date(), errorMessage, WorkChunkStatusEnum.FAILED);
@ -272,7 +272,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
@Transactional
public void workChunkCompletionEvent(WorkChunkCompletionEvent theEvent) {
public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) {
myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(theEvent.getChunkId(), new Date(), theEvent.getRecordsProcessed(), theEvent.getRecoveredErrorCount(), WorkChunkStatusEnum.COMPLETED);
}

View File

@ -16,8 +16,6 @@ import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Meta;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;

View File

@ -46,8 +46,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
* {@link JobInstanceProcessor#cleanupInstance()}
* For chunks:
* {@link ca.uhn.fhir.jpa.batch2.JpaJobPersistenceImpl#storeWorkChunk}
* {@link JpaJobPersistenceImpl#fetchWorkChunkSetStartTimeAndMarkInProgress(String)}
* {@link ca.uhn.fhir.jpa.batch2.JpaJobPersistenceImpl#onWorkChunkCreate}
* {@link JpaJobPersistenceImpl#onWorkChunkDequeue(String)}
* Chunk execution {@link ca.uhn.fhir.batch2.coordinator.StepExecutor#executeStep}
* wipmb figure this out
state transition triggers.

View File

@ -2,12 +2,12 @@ package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.JobOperationResultJson;
import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk;
import ca.uhn.fhir.batch2.jobs.imprt.NdJsonFileJson;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
@ -94,8 +94,8 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
}
private String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) {
BatchWorkChunk batchWorkChunk = new BatchWorkChunk(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData);
return mySvc.storeWorkChunk(batchWorkChunk);
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData);
return mySvc.onWorkChunkCreate(batchWorkChunk);
}
@Test
@ -231,15 +231,15 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
BatchWorkChunk batchWorkChunk = new BatchWorkChunk(JOB_DEFINITION_ID, JOB_DEF_VER, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
String chunkId = mySvc.storeWorkChunk(batchWorkChunk);
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(JOB_DEFINITION_ID, JOB_DEF_VER, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
String chunkId = mySvc.onWorkChunkCreate(batchWorkChunk);
Optional<Batch2WorkChunkEntity> byId = myWorkChunkRepository.findById(chunkId);
Batch2WorkChunkEntity entity = byId.get();
entity.setStatus(theStatus);
myWorkChunkRepository.save(entity);
// Execute
Optional<WorkChunk> workChunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId);
Optional<WorkChunk> workChunk = mySvc.onWorkChunkDequeue(chunkId);
// Verify
boolean chunkStarted = workChunk.isPresent();
@ -378,7 +378,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null);
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(IllegalArgumentException::new);
WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
assertNull(chunk.getData());
}
@ -391,7 +391,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertNotNull(id);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(IllegalArgumentException::new);
WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
assertEquals(36, chunk.getInstanceId().length());
assertEquals(JOB_DEFINITION_ID, chunk.getJobDefinitionId());
assertEquals(JOB_DEF_VER, chunk.getJobDefinitionVersion());
@ -412,7 +412,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new);
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
assertNotNull(chunk.getCreateTime());
@ -424,7 +424,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
sleepUntilTimeChanges();
mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(chunkId, 50, 0));
mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkId, 50, 0));
runInTransaction(() -> {
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(WorkChunkStatusEnum.COMPLETED, entity.getStatus());
@ -444,7 +444,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(chunkId, 0, 0));
mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkId, 0, 0));
boolean canAdvance = mySvc.canAdvanceInstanceToNextStep(instanceId, STEP_CHUNK_ID);
assertTrue(canAdvance);
@ -456,18 +456,18 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertFalse(canAdvance);
//Toggle it to complete
mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(newChunkId, 50, 0));
mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(newChunkId, 50, 0));
canAdvance = mySvc.canAdvanceInstanceToNextStep(instanceId, STEP_CHUNK_ID);
assertTrue(canAdvance);
//Create a new chunk and set it in progress.
String newerChunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(newerChunkId);
mySvc.onWorkChunkDequeue(newerChunkId);
canAdvance = mySvc.canAdvanceInstanceToNextStep(instanceId, STEP_CHUNK_ID);
assertFalse(canAdvance);
//Toggle IN_PROGRESS to complete
mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(newerChunkId, 50, 0));
mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(newerChunkId, 50, 0));
canAdvance = mySvc.canAdvanceInstanceToNextStep(instanceId, STEP_CHUNK_ID);
assertTrue(canAdvance);
}
@ -483,14 +483,14 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new);
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
WorkChunkErrorEvent request = new WorkChunkErrorEvent(chunkId).setErrorMsg("This is an error message");
mySvc.workChunkErrorEvent(request);
mySvc.onWorkChunkError(request);
runInTransaction(() -> {
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus());
@ -506,7 +506,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
// Mark errored again
WorkChunkErrorEvent request2 = new WorkChunkErrorEvent(chunkId).setErrorMsg("This is an error message 2");
mySvc.workChunkErrorEvent(request2);
mySvc.onWorkChunkError(request2);
runInTransaction(() -> {
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus());
@ -535,13 +535,13 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new);
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
mySvc.markWorkChunkAsFailed(chunkId, "This is an error message");
mySvc.onWorkChunkFailed(chunkId, "This is an error message");
runInTransaction(() -> {
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(WorkChunkStatusEnum.FAILED, entity.getStatus());
@ -608,7 +608,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String instanceId = mySvc.storeNewInstance(instance);
ArrayList<String> chunkIds = new ArrayList<>();
for (int i = 0; i < 10; i++) {
BatchWorkChunk chunk = new BatchWorkChunk(
WorkChunkCreateEvent chunk = new WorkChunkCreateEvent(
"defId",
1,
"stepId",
@ -616,7 +616,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
0,
"{}"
);
String id = mySvc.storeWorkChunk(chunk);
String id = mySvc.onWorkChunkCreate(chunk);
chunkIds.add(id);
}

View File

@ -127,7 +127,6 @@ public class Batch2JobHelper {
return true;
}
myJobMaintenanceService.runMaintenancePass();
Thread.sleep(1000);
return hasStatus(theBatchJobId, theExpectedStatuses);
}

View File

@ -21,11 +21,11 @@ package ca.uhn.hapi.fhir.batch2.test;
*/
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
@ -92,7 +92,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null);
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(IllegalArgumentException::new);
WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
assertNull(chunk.getData());
}
@ -105,7 +105,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
assertNotNull(id);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(id).getStatus()));
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(IllegalArgumentException::new);
WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
assertEquals(36, chunk.getInstanceId().length());
assertEquals(JOB_DEFINITION_ID, chunk.getJobDefinitionId());
assertEquals(JOB_DEF_VER, chunk.getJobDefinitionVersion());
@ -151,7 +151,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
myChunkId = createChunk();
// the worker has received the chunk, and marks it started.
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId).orElseThrow(IllegalArgumentException::new);
WorkChunk chunk = mySvc.onWorkChunkDequeue(myChunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
assertEquals(CHUNK_DATA, chunk.getData());
@ -167,14 +167,14 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
void setUp() {
// setup - the worker has received the chunk, and has marked it IN_PROGRESS.
myChunkId = createChunk();
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId);
mySvc.onWorkChunkDequeue(myChunkId);
}
@Test
public void processingOk_inProgressToSuccess_clearsDataSavesRecordCount() {
// execution ok
mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(myChunkId, 3, 0));
mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(myChunkId, 3, 0));
// verify the db was updated
var workChunkEntity = freshFetchWorkChunk(myChunkId);
@ -189,7 +189,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
public void processingRetryableError_inProgressToError_bumpsCountRecordsMessage() {
// execution had a retryable error
mySvc.workChunkErrorEvent(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_A));
mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_A));
// verify the db was updated
var workChunkEntity = freshFetchWorkChunk(myChunkId);
@ -202,7 +202,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
public void processingFailure_inProgressToFailed() {
// execution had a failure
mySvc.markWorkChunkAsFailed(myChunkId, "some error");
mySvc.onWorkChunkFailed(myChunkId, "some error");
// verify the db was updated
var workChunkEntity = freshFetchWorkChunk(myChunkId);
@ -218,9 +218,9 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
void setUp() {
// setup - the worker has received the chunk, and has marked it IN_PROGRESS.
myChunkId = createChunk();
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId);
mySvc.onWorkChunkDequeue(myChunkId);
// execution had a retryable error
mySvc.workChunkErrorEvent(new WorkChunkErrorEvent(myChunkId, FIRST_ERROR_MESSAGE));
mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, FIRST_ERROR_MESSAGE));
}
/**
@ -230,7 +230,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
void errorRetry_errorToInProgress() {
// when consumer restarts chunk
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId).orElseThrow(IllegalArgumentException::new);
WorkChunk chunk = mySvc.onWorkChunkDequeue(myChunkId).orElseThrow(IllegalArgumentException::new);
// then
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
@ -245,11 +245,11 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
@Test
void errorRetry_repeatError_increasesErrorCount() {
// setup - the consumer is re-trying, and marks it IN_PROGRESS
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId);
mySvc.onWorkChunkDequeue(myChunkId);
// when another error happens
mySvc.workChunkErrorEvent(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B));
mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B));
// verify the state, new message, and error count
@ -262,10 +262,10 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
@Test
void errorThenRetryAndComplete_addsErrorCounts() {
// setup - the consumer is re-trying, and marks it IN_PROGRESS
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId);
mySvc.onWorkChunkDequeue(myChunkId);
// then it completes ok.
mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(myChunkId, 3, 1));
mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(myChunkId, 3, 1));
// verify the state, new message, and error count
var workChunkEntity = freshFetchWorkChunk(myChunkId);
@ -279,22 +279,22 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
// we start with 1 error already
// 2nd try
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId);
mySvc.workChunkErrorEvent(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B));
mySvc.onWorkChunkDequeue(myChunkId);
mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B));
var chunk = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, chunk.getStatus());
assertEquals(2, chunk.getErrorCount());
// 3rd try
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId);
mySvc.workChunkErrorEvent(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B));
mySvc.onWorkChunkDequeue(myChunkId);
mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B));
chunk = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, chunk.getStatus());
assertEquals(3, chunk.getErrorCount());
// 4th try
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId);
mySvc.workChunkErrorEvent(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_C));
mySvc.onWorkChunkDequeue(myChunkId);
mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_C));
chunk = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.FAILED, chunk.getStatus());
assertEquals(4, chunk.getErrorCount());
@ -351,7 +351,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new);
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
assertNotNull(chunk.getCreateTime());
@ -363,7 +363,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
sleepUntilTimeChanges();
runInTransaction(() -> mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(chunkId, 50, 0)));
runInTransaction(() -> mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkId, 50, 0)));
WorkChunk entity = freshFetchWorkChunk(chunkId);
assertEquals(WorkChunkStatusEnum.COMPLETED, entity.getStatus());
@ -388,14 +388,14 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new);
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
WorkChunkErrorEvent request = new WorkChunkErrorEvent(chunkId, ERROR_MESSAGE_A);
mySvc.workChunkErrorEvent(request);
mySvc.onWorkChunkError(request);
runInTransaction(() -> {
WorkChunk entity = freshFetchWorkChunk(chunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus());
@ -411,7 +411,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
// Mark errored again
WorkChunkErrorEvent request2 = new WorkChunkErrorEvent(chunkId, "This is an error message 2");
mySvc.workChunkErrorEvent(request2);
mySvc.onWorkChunkError(request2);
runInTransaction(() -> {
WorkChunk entity = freshFetchWorkChunk(chunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus());
@ -440,13 +440,13 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new);
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
mySvc.markWorkChunkAsFailed(chunkId, "This is an error message");
mySvc.onWorkChunkFailed(chunkId, "This is an error message");
runInTransaction(() -> {
WorkChunk entity = freshFetchWorkChunk(chunkId);
assertEquals(WorkChunkStatusEnum.FAILED, entity.getStatus());
@ -465,7 +465,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
String instanceId = mySvc.storeNewInstance(instance);
ArrayList<String> chunkIds = new ArrayList<>();
for (int i = 0; i < 10; i++) {
BatchWorkChunk chunk = new BatchWorkChunk(
WorkChunkCreateEvent chunk = new WorkChunkCreateEvent(
"defId",
1,
"stepId",
@ -473,7 +473,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
0,
"{}"
);
String id = mySvc.storeWorkChunk(chunk);
String id = mySvc.onWorkChunkCreate(chunk);
chunkIds.add(id);
}
@ -534,8 +534,8 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
}
private String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) {
BatchWorkChunk batchWorkChunk = new BatchWorkChunk(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData);
return mySvc.storeWorkChunk(batchWorkChunk);
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData);
return mySvc.onWorkChunkCreate(batchWorkChunk);
}

View File

@ -27,8 +27,6 @@ import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.i18n.Msg;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.Iterator;

View File

@ -22,6 +22,7 @@ package ca.uhn.fhir.batch2.api;
import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
@ -42,7 +43,6 @@ public interface IWorkChunkPersistence {
// WorkChunk calls
//////////////////////////////////
/**
* Stores a chunk of work for later retrieval.
* The first state event, as the chunk is created.
@ -53,42 +53,90 @@ public interface IWorkChunkPersistence {
* @param theBatchWorkChunk the batch work chunk to be stored
* @return a globally unique identifier for this chunk.
*/
String storeWorkChunk(BatchWorkChunk theBatchWorkChunk);
default String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) {
// back-compat for one minor version
return storeWorkChunk(theBatchWorkChunk);
}
// wipmb for deletion
@Deprecated(since="6.5.6")
default String storeWorkChunk(BatchWorkChunk theBatchWorkChunk) {
// dead code in 6.5.7
return null;
}
/**
* Fetches a chunk of work from storage, and update the stored status to {@link WorkChunkStatusEnum#IN_PROGRESS}.
* On arrival at a worker.
* The second state event, as the worker starts processing.
* This will only fetch chunks which are currently QUEUED or ERRORRED.
* Transition to {@link WorkChunkStatusEnum#IN_PROGRESS} if unless not in QUEUED or ERRORRED state.
*
* @param theChunkId The ID from {@link #storeWorkChunk(BatchWorkChunk theBatchWorkChunk)}
* @return The WorkChunk or empty if no chunk with that id exists in the QUEUED or ERRORRED states
* @param theChunkId The ID from {@link #onWorkChunkCreate(BatchWorkChunk theBatchWorkChunk)}
* @return The WorkChunk or empty if no chunk exists, or not in a runnable state (QUEUED or ERRORRED)
*/
Optional<WorkChunk> fetchWorkChunkSetStartTimeAndMarkInProgress(String theChunkId);
default Optional<WorkChunk> onWorkChunkDequeue(String theChunkId) {
// back-compat for one minor version
return fetchWorkChunkSetStartTimeAndMarkInProgress(theChunkId);
}
// wipmb for deletion
@Deprecated(since="6.5.6")
default Optional<WorkChunk> fetchWorkChunkSetStartTimeAndMarkInProgress(String theChunkId) {
// dead code
return null;
}
/**
* Marks a given chunk as having errored (ie, may be recoverable)
* <p>
* Returns the work chunk.
* A retryable error.
* Transition to {@link WorkChunkStatusEnum#ERRORED} unless max-retries passed, then
* transition to {@link WorkChunkStatusEnum#FAILED}.
*
* @param theParameters - the parameters for marking the workchunk with error
* @return - workchunk optional, if available.
* @param theParameters - the error message and max retry count.
* @return - the new status - ERRORED or ERRORED, depending on retry count
*/
WorkChunkStatusEnum workChunkErrorEvent(WorkChunkErrorEvent theParameters);
default WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) {
// back-compat for one minor version
return workChunkErrorEvent(theParameters);
}
// wipmb for deletion
@Deprecated(since="6.5.6")
default WorkChunkStatusEnum workChunkErrorEvent(WorkChunkErrorEvent theParameters) {
// dead code in 6.5.7
return null;
}
/**
* Marks a given chunk as having failed (i.e. probably not recoverable)
* An unrecoverable error.
* Transition to {@link WorkChunkStatusEnum#FAILED}
*
* @param theChunkId The chunk ID
*/
void markWorkChunkAsFailed(String theChunkId, String theErrorMessage);
default void onWorkChunkFailed(String theChunkId, String theErrorMessage) {
// back-compat for one minor version
markWorkChunkAsFailed(theChunkId, theErrorMessage);
}
// wipmb for deletion
@Deprecated(since="6.5.6")
default void markWorkChunkAsFailed(String theChunkId, String theErrorMessage) {
// dead code in 6.5.7
}
/**
* Report success and complete the chunk.
* Transition to {@link WorkChunkStatusEnum#COMPLETED}
*
* @param theEvent with record and error count
*/
void workChunkCompletionEvent(WorkChunkCompletionEvent theEvent);
default void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) {
// back-compat for one minor version
workChunkCompletionEvent(theEvent);
}
// wipmb for deletion
@Deprecated(since="6.5.6")
default void workChunkCompletionEvent(WorkChunkCompletionEvent theEvent) {
// dead code in 6.5.7
}
/**
* Marks all work chunks with the provided status and erases the data

View File

@ -20,12 +20,16 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
/**
* wipmb delete, and push down to WorkChunkCreateEvent
*/
public class BatchWorkChunk {
public final String jobDefinitionId;
@ -54,11 +58,11 @@ public class BatchWorkChunk {
serializedData = theSerializedData;
}
public static BatchWorkChunk firstChunk(JobDefinition<?> theJobDefinition, String theInstanceId) {
public static WorkChunkCreateEvent firstChunk(JobDefinition<?> theJobDefinition, String theInstanceId) {
String firstStepId = theJobDefinition.getFirstStepId();
String jobDefinitionId = theJobDefinition.getJobDefinitionId();
int jobDefinitionVersion = theJobDefinition.getJobDefinitionVersion();
return new BatchWorkChunk(jobDefinitionId, jobDefinitionVersion, firstStepId, theInstanceId, 0, null);
return new WorkChunkCreateEvent(jobDefinitionId, jobDefinitionVersion, firstStepId, theInstanceId, 0, null);
}
@Override

View File

@ -30,6 +30,7 @@ import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
@ -126,8 +127,8 @@ public class JobCoordinatorImpl implements IJobCoordinator {
ourLog.info("Stored new {} job {} with status {}", jobDefinition.getJobDefinitionId(), instanceId, instance.getStatus());
ourLog.debug("Job parameters: {}", instance.getParameters());
BatchWorkChunk batchWorkChunk = BatchWorkChunk.firstChunk(jobDefinition, instanceId);
String chunkId = myJobPersistence.storeWorkChunk(batchWorkChunk);
WorkChunkCreateEvent batchWorkChunk = WorkChunkCreateEvent.firstChunk(jobDefinition, instanceId);
String chunkId = myJobPersistence.onWorkChunkCreate(batchWorkChunk);
JobWorkNotification workNotification = JobWorkNotification.firstStepNotification(jobDefinition, instanceId, chunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);

View File

@ -25,6 +25,7 @@ import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkData;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.util.Logs;
@ -67,12 +68,13 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
String instanceId = getInstanceId();
String targetStepId = myTargetStep.getStepId();
// wipmb what is sequence for? It isn't global, so what?
int sequence = myChunkCounter.getAndIncrement();
OT dataValue = theData.getData();
String dataValueString = JsonUtil.serialize(dataValue, false);
BatchWorkChunk batchWorkChunk = new BatchWorkChunk(myJobDefinitionId, myJobDefinitionVersion, targetStepId, instanceId, sequence, dataValueString);
String chunkId = myJobPersistence.storeWorkChunk(batchWorkChunk);
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(myJobDefinitionId, myJobDefinitionVersion, targetStepId, instanceId, sequence, dataValueString);
String chunkId = myJobPersistence.onWorkChunkCreate(batchWorkChunk);
myLastChunkId.set(chunkId);
if (!myGatedExecution) {

View File

@ -314,7 +314,7 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
ourLog.error(msg, e);
theResponseObject.setSuccessful(false);
myJobPersistence.markWorkChunkAsFailed(theChunk.getId(), msg);
myJobPersistence.onWorkChunkFailed(theChunk.getId(), msg);
}
}
}

View File

@ -65,14 +65,14 @@ public class StepExecutor {
chunkId,
e);
if (theStepExecutionDetails.hasAssociatedWorkChunk()) {
myJobPersistence.markWorkChunkAsFailed(chunkId, e.toString());
myJobPersistence.onWorkChunkFailed(chunkId, e.toString());
}
return false;
} catch (Exception e) {
if (theStepExecutionDetails.hasAssociatedWorkChunk()) {
ourLog.error("Failure executing job {} step {}, marking chunk {} as ERRORED", jobDefinitionId, targetStepId, chunkId, e);
WorkChunkErrorEvent parameters = new WorkChunkErrorEvent(chunkId, e.getMessage());
WorkChunkStatusEnum newStatus = myJobPersistence.workChunkErrorEvent(parameters);
WorkChunkStatusEnum newStatus = myJobPersistence.onWorkChunkError(parameters);
if (newStatus == WorkChunkStatusEnum.FAILED) {
return false;
}
@ -83,7 +83,7 @@ public class StepExecutor {
} catch (Throwable t) {
ourLog.error("Unexpected failure executing job {} step {}", jobDefinitionId, targetStepId, t);
if (theStepExecutionDetails.hasAssociatedWorkChunk()) {
myJobPersistence.markWorkChunkAsFailed(chunkId, t.toString());
myJobPersistence.onWorkChunkFailed(chunkId, t.toString());
}
return false;
}
@ -93,7 +93,7 @@ public class StepExecutor {
int recoveredErrorCount = theDataSink.getRecoveredErrorCount();
WorkChunkCompletionEvent event = new WorkChunkCompletionEvent(chunkId, recordsProcessed, recoveredErrorCount);
myJobPersistence.workChunkCompletionEvent(event);
myJobPersistence.onWorkChunkCompletion(event);
}
return true;

View File

@ -77,7 +77,7 @@ class WorkChannelMessageHandler implements MessageHandler {
JobWorkCursor<?, ?, ?> cursor = null;
WorkChunk workChunk = null;
Optional<WorkChunk> chunkOpt = myJobPersistence.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId);
Optional<WorkChunk> chunkOpt = myJobPersistence.onWorkChunkDequeue(chunkId);
if (chunkOpt.isEmpty()) {
ourLog.error("Unable to find chunk with ID {} - Aborting", chunkId);
return;

View File

@ -45,6 +45,7 @@ public class WorkChunk implements IModelJson {
private String myId;
@JsonProperty("sequence")
// wipmb this seems unused.
private int mySequence;
@JsonProperty("status")

View File

@ -0,0 +1,27 @@
package ca.uhn.fhir.batch2.model;
import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
/**
* The data required for the create transition.
* Payload for the work-chunk creation event including all the job coordinates, the chunk data, and a sequence within the step.
* @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md
*/
public class WorkChunkCreateEvent extends BatchWorkChunk {
/**
* Constructor
*
* @param theJobDefinitionId The job definition ID
* @param theJobDefinitionVersion The job definition version
* @param theTargetStepId The step ID that will be responsible for consuming this chunk
* @param theInstanceId The instance ID associated with this chunk
* @param theSequence
* @param theSerializedData The data. This will be in the form of a map where the values may be strings, lists, and other maps (i.e. JSON)
*/
public WorkChunkCreateEvent(@Nonnull String theJobDefinitionId, int theJobDefinitionVersion, @Nonnull String theTargetStepId, @Nonnull String theInstanceId, int theSequence, @Nullable String theSerializedData) {
super(theJobDefinitionId, theJobDefinitionVersion, theTargetStepId, theInstanceId, theSequence, theSerializedData);
}
}

View File

@ -16,6 +16,7 @@ import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
@ -35,7 +36,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ -139,7 +139,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
assertEquals(PARAM_2_VALUE, params.getParam2());
assertEquals(PASSWORD_VALUE, params.getPassword());
verify(myJobInstancePersister, times(1)).workChunkCompletionEvent(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0));
verify(myJobInstancePersister, times(1)).onWorkChunkCompletion(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0));
verify(myJobInstancePersister, times(0)).fetchWorkChunksWithoutData(any(), anyInt(), anyInt());
verify(myBatchJobSender, times(2)).sendWorkChannelMessage(any());
}
@ -147,7 +147,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
private void setupMocks(JobDefinition<TestJobParameters> theJobDefinition, WorkChunk theWorkChunk) {
mockJobRegistry(theJobDefinition);
when(myJobInstancePersister.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance()));
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(theWorkChunk));
when(myJobInstancePersister.onWorkChunkDequeue(eq(CHUNK_ID))).thenReturn(Optional.of(theWorkChunk));
}
private void mockJobRegistry(JobDefinition<TestJobParameters> theJobDefinition) {
@ -257,7 +257,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
assertEquals(PARAM_2_VALUE, params.getParam2());
assertEquals(PASSWORD_VALUE, params.getPassword());
verify(myJobInstancePersister, times(1)).workChunkCompletionEvent(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0));
verify(myJobInstancePersister, times(1)).onWorkChunkCompletion(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0));
verify(myBatchJobSender, times(0)).sendWorkChannelMessage(any());
}
@ -266,7 +266,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
// Setup
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_2, new TestJobStep2InputType(DATA_1_VALUE, DATA_2_VALUE))));
when(myJobInstancePersister.onWorkChunkDequeue(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_2, new TestJobStep2InputType(DATA_1_VALUE, DATA_2_VALUE))));
doReturn(createJobDefinition()).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1));
when(myJobInstancePersister.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance()));
when(myStep2Worker.run(any(), any())).thenReturn(new RunOutcome(50));
@ -284,7 +284,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
assertEquals(PARAM_2_VALUE, params.getParam2());
assertEquals(PASSWORD_VALUE, params.getPassword());
verify(myJobInstancePersister, times(1)).workChunkCompletionEvent(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0));
verify(myJobInstancePersister, times(1)).onWorkChunkCompletion(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0));
}
@Test
@ -293,7 +293,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
// Setup
AtomicInteger counter = new AtomicInteger();
doReturn(createJobDefinition()).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1));
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_2, new TestJobStep2InputType(DATA_1_VALUE, DATA_2_VALUE))));
when(myJobInstancePersister.onWorkChunkDequeue(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_2, new TestJobStep2InputType(DATA_1_VALUE, DATA_2_VALUE))));
when(myJobInstancePersister.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance()));
when(myStep2Worker.run(any(), any())).thenAnswer(t -> {
if (counter.getAndIncrement() == 0) {
@ -317,12 +317,12 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
assertEquals(PASSWORD_VALUE, params.getPassword());
ArgumentCaptor<WorkChunkErrorEvent> parametersArgumentCaptor = ArgumentCaptor.forClass(WorkChunkErrorEvent.class);
verify(myJobInstancePersister, times(1)).workChunkErrorEvent(parametersArgumentCaptor.capture());
verify(myJobInstancePersister, times(1)).onWorkChunkError(parametersArgumentCaptor.capture());
WorkChunkErrorEvent capturedParams = parametersArgumentCaptor.getValue();
assertEquals(CHUNK_ID, capturedParams.getChunkId());
assertEquals("This is an error message", capturedParams.getErrorMsg());
verify(myJobInstancePersister, times(1)).workChunkCompletionEvent(new WorkChunkCompletionEvent(CHUNK_ID, 0, 0));
verify(myJobInstancePersister, times(1)).onWorkChunkCompletion(new WorkChunkCompletionEvent(CHUNK_ID, 0, 0));
}
@ -331,7 +331,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
// Setup
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_2, new TestJobStep2InputType(DATA_1_VALUE, DATA_2_VALUE))));
when(myJobInstancePersister.onWorkChunkDequeue(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_2, new TestJobStep2InputType(DATA_1_VALUE, DATA_2_VALUE))));
doReturn(createJobDefinition()).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1));
when(myJobInstancePersister.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance()));
when(myStep2Worker.run(any(), any())).thenAnswer(t -> {
@ -354,7 +354,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
assertEquals(PARAM_2_VALUE, params.getParam2());
assertEquals(PASSWORD_VALUE, params.getPassword());
verify(myJobInstancePersister, times(1)).workChunkCompletionEvent(eq(new WorkChunkCompletionEvent(CHUNK_ID, 50, 2)));
verify(myJobInstancePersister, times(1)).onWorkChunkCompletion(eq(new WorkChunkCompletionEvent(CHUNK_ID, 50, 2)));
}
@Test
@ -362,7 +362,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
// Setup
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunkStep3()));
when(myJobInstancePersister.onWorkChunkDequeue(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunkStep3()));
doReturn(createJobDefinition()).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1));
when(myJobInstancePersister.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance()));
when(myStep3Worker.run(any(), any())).thenReturn(new RunOutcome(50));
@ -380,7 +380,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
assertEquals(PARAM_2_VALUE, params.getParam2());
assertEquals(PASSWORD_VALUE, params.getPassword());
verify(myJobInstancePersister, times(1)).workChunkCompletionEvent(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0));
verify(myJobInstancePersister, times(1)).onWorkChunkCompletion(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0));
}
@SuppressWarnings("unchecked")
@ -389,7 +389,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
// Setup
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_3, new TestJobStep3InputType().setData3(DATA_3_VALUE).setData4(DATA_4_VALUE))));
when(myJobInstancePersister.onWorkChunkDequeue(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_3, new TestJobStep3InputType().setData3(DATA_3_VALUE).setData4(DATA_4_VALUE))));
doReturn(createJobDefinition()).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1));
when(myJobInstancePersister.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance()));
when(myStep3Worker.run(any(), any())).thenAnswer(t -> {
@ -406,7 +406,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
// Verify
verify(myStep3Worker, times(1)).run(myStep3ExecutionDetailsCaptor.capture(), any());
verify(myJobInstancePersister, times(1)).markWorkChunkAsFailed(eq(CHUNK_ID), any());
verify(myJobInstancePersister, times(1)).onWorkChunkFailed(eq(CHUNK_ID), any());
}
@Test
@ -416,7 +416,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
String exceptionMessage = "badbadnotgood";
when(myJobDefinitionRegistry.getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1))).thenThrow(new JobExecutionFailedException(exceptionMessage));
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunkStep2()));
when(myJobInstancePersister.onWorkChunkDequeue(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunkStep2()));
mySvc.start();
// Execute
@ -442,7 +442,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
// Setup
when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.empty());
when(myJobInstancePersister.onWorkChunkDequeue(eq(CHUNK_ID))).thenReturn(Optional.empty());
mySvc.start();
// Execute
@ -516,8 +516,8 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
assertEquals(1, myJobWorkNotificationCaptor.getAllValues().get(0).getJobDefinitionVersion());
assertEquals(STEP_1, myJobWorkNotificationCaptor.getAllValues().get(0).getTargetStepId());
BatchWorkChunk expectedWorkChunk = new BatchWorkChunk(JOB_DEFINITION_ID, 1, STEP_1, INSTANCE_ID, 0, null);
verify(myJobInstancePersister, times(1)).storeWorkChunk(eq(expectedWorkChunk));
WorkChunkCreateEvent expectedWorkChunk = new WorkChunkCreateEvent(JOB_DEFINITION_ID, 1, STEP_1, INSTANCE_ID, 0, null);
verify(myJobInstancePersister, times(1)).onWorkChunkCreate(eq(expectedWorkChunk));
verifyNoMoreInteractions(myJobInstancePersister);
verifyNoMoreInteractions(myStep1Worker);

View File

@ -13,6 +13,7 @@ import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.JsonUtil;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -52,7 +53,7 @@ class JobDataSinkTest {
@Captor
private ArgumentCaptor<JobWorkNotification> myJobWorkNotificationCaptor;
@Captor
private ArgumentCaptor<BatchWorkChunk> myBatchWorkChunkCaptor;
private ArgumentCaptor<WorkChunkCreateEvent> myBatchWorkChunkCaptor;
@Test
public void test_sink_accept() {
@ -93,7 +94,7 @@ class JobDataSinkTest {
// execute
// Let's test our first step worker by calling run on it:
when(myJobPersistence.storeWorkChunk(myBatchWorkChunkCaptor.capture())).thenReturn(CHUNK_ID);
when(myJobPersistence.onWorkChunkCreate(myBatchWorkChunkCaptor.capture())).thenReturn(CHUNK_ID);
JobInstance instance = JobInstance.fromInstanceId(JOB_INSTANCE_ID);
StepExecutionDetails<TestJobParameters, VoidModel> details = new StepExecutionDetails<>(new TestJobParameters().setParam1("" + PID_COUNT), null, instance, CHUNK_ID);
JobWorkCursor<TestJobParameters, VoidModel, Step1Output> cursor = new JobWorkCursor<>(job, true, firstStep, lastStep);

View File

@ -204,7 +204,7 @@ public class ReductionStepExecutorServiceImplTest {
ArgumentCaptor<String> chunkIdCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> errorCaptor = ArgumentCaptor.forClass(String.class);
verify(myJobPersistence, times(chunkIds.size()))
.markWorkChunkAsFailed(chunkIdCaptor.capture(), errorCaptor.capture());
.onWorkChunkFailed(chunkIdCaptor.capture(), errorCaptor.capture());
List<String> chunkIdsCaptured = chunkIdCaptor.getAllValues();
List<String> errorsCaptured = errorCaptor.getAllValues();
for (int i = 0; i < chunkIds.size(); i++) {

View File

@ -150,12 +150,12 @@ public class WorkChunkProcessorTest {
// verify
assertTrue(result.isSuccessful());
verify(myJobPersistence)
.workChunkCompletionEvent(any(WorkChunkCompletionEvent.class));
.onWorkChunkCompletion(any(WorkChunkCompletionEvent.class));
assertTrue(myDataSink.myActualDataSink instanceof JobDataSink);
if (theRecoveredErrorsForDataSink > 0) {
verify(myJobPersistence)
.workChunkCompletionEvent(any(WorkChunkCompletionEvent.class));
.onWorkChunkCompletion(any(WorkChunkCompletionEvent.class));
//.workChunkErrorEvent(anyString(new WorkChunkErrorEvent(chunk.getId(), theRecoveredErrorsForDataSink)));
}
@ -207,7 +207,7 @@ public class WorkChunkProcessorTest {
runExceptionThrowingTest(new JobExecutionFailedException("Failure"));
verify(myJobPersistence)
.markWorkChunkAsFailed(anyString(), anyString());
.onWorkChunkFailed(anyString(), anyString());
}
@Test
@ -239,7 +239,7 @@ public class WorkChunkProcessorTest {
// when
when(myNonReductionStep.run(any(), any()))
.thenThrow(new RuntimeException(errorMsg));
when(myJobPersistence.workChunkErrorEvent(any(WorkChunkErrorEvent.class)))
when(myJobPersistence.onWorkChunkError(any(WorkChunkErrorEvent.class)))
.thenAnswer((p) -> {
WorkChunk ec = new WorkChunk();
ec.setId(chunk.getId());
@ -314,17 +314,17 @@ public class WorkChunkProcessorTest {
private void verifyNoErrors(int theRecoveredErrorCount) {
if (theRecoveredErrorCount == 0) {
verify(myJobPersistence, never())
.workChunkErrorEvent(any());
.onWorkChunkError(any());
}
verify(myJobPersistence, never())
.markWorkChunkAsFailed(anyString(), anyString());
.onWorkChunkFailed(anyString(), anyString());
verify(myJobPersistence, never())
.workChunkErrorEvent(any(WorkChunkErrorEvent.class));
.onWorkChunkError(any(WorkChunkErrorEvent.class));
}
private void verifyNonReductionStep() {
verify(myJobPersistence, never())
.fetchWorkChunkSetStartTimeAndMarkInProgress(anyString());
.onWorkChunkDequeue(anyString());
verify(myJobPersistence, never())
.markWorkChunksWithStatusAndWipeData(anyString(), anyList(), any(), any());
verify(myJobPersistence, never())