fixing up more tests for batch2

This commit is contained in:
leif stawnyczy 2024-03-07 16:42:09 -05:00
parent 7d5794e1fb
commit 4b22697af9
9 changed files with 634 additions and 570 deletions

View File

@ -18,6 +18,7 @@ import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
import ca.uhn.fhir.util.JsonUtil;
import ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest;
import com.google.common.collect.ImmutableList;
@ -77,6 +78,9 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Autowired
private IBatch2JobInstanceRepository myJobInstanceRepository;
@Autowired
public Batch2JobHelper myBatch2JobHelper;
@Test
public void testDeleteInstance() {
// Setup
@ -323,14 +327,19 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
class Batch2SpecTest extends AbstractIJobPersistenceSpecificationTest {
@Override
protected PlatformTransactionManager getTxManager() {
public PlatformTransactionManager getTxManager() {
return JpaJobPersistenceImplTest.this.getTxManager();
}
@Override
protected WorkChunk freshFetchWorkChunk(String chunkId) {
public WorkChunk freshFetchWorkChunk(String chunkId) {
return JpaJobPersistenceImplTest.this.freshFetchWorkChunk(chunkId);
}
@Override
public void runMaintenancePass() {
myBatch2JobHelper.runMaintenancePass();
}
}
@Test

View File

@ -23,29 +23,18 @@ package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.maintenance.JobChunkProgressAccumulator;
import ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobStep2InputType;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobStep3InputType;
import com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import jakarta.annotation.Nonnull;
import org.junit.jupiter.api.AfterEach;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
@ -53,572 +42,35 @@ import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import jakarta.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.theInstance;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
* Specification tests for batch2 storage and event system.
* These tests are abstract, and do not depend on JPA.
* Test setups should use the public batch2 api to create scenarios.
*/
public abstract class AbstractIJobPersistenceSpecificationTest {
private static final Logger ourLog = LoggerFactory.getLogger(AbstractIJobPersistenceSpecificationTest.class);
public static final String JOB_DEFINITION_ID = "definition-id";
public static final String TARGET_STEP_ID = "step-id";
public static final String DEF_CHUNK_ID = "definition-chunkId";
public static final String STEP_CHUNK_ID = "step-chunkId";
public static final int JOB_DEF_VER = 1;
public static final int SEQUENCE_NUMBER = 1;
public static final String CHUNK_DATA = "{\"key\":\"value\"}";
public static final String ERROR_MESSAGE_A = "This is an error message: A";
public static final String ERROR_MESSAGE_B = "This is a different error message: B";
public static final String ERROR_MESSAGE_C = "This is a different error message: C";
public abstract class AbstractIJobPersistenceSpecificationTest implements IInProgressActionsTests, IInstanceStateTransitions, IWorkChunkStateTransitions, IWorkChunkStorageTests, IWorkChunkErrorActionsTests, WorkChunkTestConstants {
@Autowired
private IJobPersistence mySvc;
@Nested
class WorkChunkStorage {
@Autowired
private JobDefinitionRegistry myJobDefinitionRegistry;
@Test
public void testStoreAndFetchWorkChunk_NoData() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
@Autowired
private IHapiTransactionService myTransactionService;
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null);
WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
assertNull(chunk.getData());
public IHapiTransactionService getTransactionManager() {
return myTransactionService;
}
@Test
public void testStoreAndFetchWorkChunk_WithData() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
assertNotNull(id);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(id).getStatus()));
WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
assertEquals(36, chunk.getInstanceId().length());
assertEquals(JOB_DEFINITION_ID, chunk.getJobDefinitionId());
assertEquals(JOB_DEF_VER, chunk.getJobDefinitionVersion());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
assertEquals(CHUNK_DATA, chunk.getData());
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, freshFetchWorkChunk(id).getStatus()));
public IJobPersistence getSvc() {
return mySvc;
}
/**
* Should match the diagram in batch2_states.md
* @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md
*/
@Nested
class StateTransitions {
private String myInstanceId;
private String myChunkId;
@BeforeEach
void setUp() {
JobInstance jobInstance = createInstance();
myInstanceId = mySvc.storeNewInstance(jobInstance);
}
private String createChunk() {
return storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, myInstanceId, 0, CHUNK_DATA);
}
@Test
public void chunkCreation_isQueued() {
myChunkId = createChunk();
WorkChunk fetchedWorkChunk = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.QUEUED, fetchedWorkChunk.getStatus(), "New chunks are QUEUED");
}
@Test
public void chunkReceived_queuedToInProgress() {
myChunkId = createChunk();
// the worker has received the chunk, and marks it started.
WorkChunk chunk = mySvc.onWorkChunkDequeue(myChunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
assertEquals(CHUNK_DATA, chunk.getData());
// verify the db was updated too
WorkChunk fetchedWorkChunk = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, fetchedWorkChunk.getStatus());
}
@Nested
class InProgressActions {
@BeforeEach
void setUp() {
// setup - the worker has received the chunk, and has marked it IN_PROGRESS.
myChunkId = createChunk();
mySvc.onWorkChunkDequeue(myChunkId);
}
@Test
public void processingOk_inProgressToSuccess_clearsDataSavesRecordCount() {
// execution ok
mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(myChunkId, 3, 0));
// verify the db was updated
var workChunkEntity = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.COMPLETED, workChunkEntity.getStatus());
assertNull(workChunkEntity.getData());
assertEquals(3, workChunkEntity.getRecordsProcessed());
assertNull(workChunkEntity.getErrorMessage());
assertEquals(0, workChunkEntity.getErrorCount());
}
@Test
public void processingRetryableError_inProgressToError_bumpsCountRecordsMessage() {
// execution had a retryable error
mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_A));
// verify the db was updated
var workChunkEntity = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, workChunkEntity.getStatus());
assertEquals(ERROR_MESSAGE_A, workChunkEntity.getErrorMessage());
assertEquals(1, workChunkEntity.getErrorCount());
}
@Test
public void processingFailure_inProgressToFailed() {
// execution had a failure
mySvc.onWorkChunkFailed(myChunkId, "some error");
// verify the db was updated
var workChunkEntity = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.FAILED, workChunkEntity.getStatus());
assertEquals("some error", workChunkEntity.getErrorMessage());
}
}
@Nested
class ErrorActions {
public static final String FIRST_ERROR_MESSAGE = ERROR_MESSAGE_A;
@BeforeEach
void setUp() {
// setup - the worker has received the chunk, and has marked it IN_PROGRESS.
myChunkId = createChunk();
mySvc.onWorkChunkDequeue(myChunkId);
// execution had a retryable error
mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, FIRST_ERROR_MESSAGE));
}
/**
* The consumer will retry after a retryable error is thrown
*/
@Test
void errorRetry_errorToInProgress() {
// when consumer restarts chunk
WorkChunk chunk = mySvc.onWorkChunkDequeue(myChunkId).orElseThrow(IllegalArgumentException::new);
// then
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
// verify the db state, error message, and error count
var workChunkEntity = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, workChunkEntity.getStatus());
assertEquals(FIRST_ERROR_MESSAGE, workChunkEntity.getErrorMessage(), "Original error message kept");
assertEquals(1, workChunkEntity.getErrorCount(), "error count kept");
}
@Test
void errorRetry_repeatError_increasesErrorCount() {
// setup - the consumer is re-trying, and marks it IN_PROGRESS
mySvc.onWorkChunkDequeue(myChunkId);
// when another error happens
mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B));
// verify the state, new message, and error count
var workChunkEntity = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, workChunkEntity.getStatus());
assertEquals(ERROR_MESSAGE_B, workChunkEntity.getErrorMessage(), "new error message");
assertEquals(2, workChunkEntity.getErrorCount(), "error count inc");
}
@Test
void errorThenRetryAndComplete_addsErrorCounts() {
// setup - the consumer is re-trying, and marks it IN_PROGRESS
mySvc.onWorkChunkDequeue(myChunkId);
// then it completes ok.
mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(myChunkId, 3, 1));
// verify the state, new message, and error count
var workChunkEntity = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.COMPLETED, workChunkEntity.getStatus());
assertEquals(FIRST_ERROR_MESSAGE, workChunkEntity.getErrorMessage(), "Error message kept.");
assertEquals(2, workChunkEntity.getErrorCount(), "error combined with earlier error");
}
@Test
void errorRetry_maxErrors_movesToFailed() {
// we start with 1 error already
// 2nd try
mySvc.onWorkChunkDequeue(myChunkId);
mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B));
var chunk = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, chunk.getStatus());
assertEquals(2, chunk.getErrorCount());
// 3rd try
mySvc.onWorkChunkDequeue(myChunkId);
mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B));
chunk = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, chunk.getStatus());
assertEquals(3, chunk.getErrorCount());
// 4th try
mySvc.onWorkChunkDequeue(myChunkId);
mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_C));
chunk = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.FAILED, chunk.getStatus());
assertEquals(4, chunk.getErrorCount());
assertThat("Error message contains last error", chunk.getErrorMessage(), containsString(ERROR_MESSAGE_C));
assertThat("Error message contains error count and complaint", chunk.getErrorMessage(), containsString("many errors: 4"));
}
}
}
@Test
public void testMarkChunkAsCompleted_Success() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(chunkId).getStatus()));
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
assertNotNull(chunk.getCreateTime());
assertNotNull(chunk.getStartTime());
assertNull(chunk.getEndTime());
assertNull(chunk.getRecordsProcessed());
assertNotNull(chunk.getData());
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, freshFetchWorkChunk(chunkId).getStatus()));
sleepUntilTimeChanges();
runInTransaction(() -> mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkId, 50, 0)));
WorkChunk entity = freshFetchWorkChunk(chunkId);
assertEquals(WorkChunkStatusEnum.COMPLETED, entity.getStatus());
assertEquals(50, entity.getRecordsProcessed());
assertNotNull(entity.getCreateTime());
assertNotNull(entity.getStartTime());
assertNotNull(entity.getEndTime());
assertNull(entity.getData());
assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime());
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
}
@Test
public void testMarkChunkAsCompleted_Error() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(chunkId).getStatus()));
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
WorkChunkErrorEvent request = new WorkChunkErrorEvent(chunkId, ERROR_MESSAGE_A);
mySvc.onWorkChunkError(request);
runInTransaction(() -> {
WorkChunk entity = freshFetchWorkChunk(chunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus());
assertEquals(ERROR_MESSAGE_A, entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
assertNotNull(entity.getStartTime());
assertNotNull(entity.getEndTime());
assertEquals(1, entity.getErrorCount());
assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime());
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
});
// Mark errored again
WorkChunkErrorEvent request2 = new WorkChunkErrorEvent(chunkId, "This is an error message 2");
mySvc.onWorkChunkError(request2);
runInTransaction(() -> {
WorkChunk entity = freshFetchWorkChunk(chunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus());
assertEquals("This is an error message 2", entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
assertNotNull(entity.getStartTime());
assertNotNull(entity.getEndTime());
assertEquals(2, entity.getErrorCount());
assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime());
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
});
List<WorkChunk> chunks = ImmutableList.copyOf(mySvc.fetchAllWorkChunksIterator(instanceId, true));
assertEquals(1, chunks.size());
assertEquals(2, chunks.get(0).getErrorCount());
}
@Test
public void testMarkChunkAsCompleted_Fail() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(chunkId).getStatus()));
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
mySvc.onWorkChunkFailed(chunkId, "This is an error message");
runInTransaction(() -> {
WorkChunk entity = freshFetchWorkChunk(chunkId);
assertEquals(WorkChunkStatusEnum.FAILED, entity.getStatus());
assertEquals("This is an error message", entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
assertNotNull(entity.getStartTime());
assertNotNull(entity.getEndTime());
assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime());
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
});
}
@Test
public void markWorkChunksWithStatusAndWipeData_marksMultipleChunksWithStatus_asExpected() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
ArrayList<String> chunkIds = new ArrayList<>();
for (int i = 0; i < 10; i++) {
WorkChunkCreateEvent chunk = new WorkChunkCreateEvent(
"defId",
1,
"stepId",
instanceId,
0,
"{}"
);
String id = mySvc.onWorkChunkCreate(chunk);
chunkIds.add(id);
}
runInTransaction(() -> mySvc.markWorkChunksWithStatusAndWipeData(instance.getInstanceId(), chunkIds, WorkChunkStatusEnum.COMPLETED, null));
Iterator<WorkChunk> reducedChunks = mySvc.fetchAllWorkChunksIterator(instanceId, true);
while (reducedChunks.hasNext()) {
WorkChunk reducedChunk = reducedChunks.next();
assertTrue(chunkIds.contains(reducedChunk.getId()));
assertEquals(WorkChunkStatusEnum.COMPLETED, reducedChunk.getStatus());
}
}
}
/**
* Test
* * @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md
*/
@Nested
class InstanceStateTransitions {
@Test
void createInstance_createsInQueuedWithChunk() {
// given
JobDefinition<?> jd = withJobDefinition();
// when
IJobPersistence.CreateResult createResult =
newTxTemplate().execute(status->
mySvc.onCreateWithFirstChunk(jd, "{}"));
// then
ourLog.info("job and chunk created {}", createResult);
assertNotNull(createResult);
assertThat(createResult.jobInstanceId, not(emptyString()));
assertThat(createResult.workChunkId, not(emptyString()));
JobInstance jobInstance = freshFetchJobInstance(createResult.jobInstanceId);
assertThat(jobInstance.getStatus(), equalTo(StatusEnum.QUEUED));
assertThat(jobInstance.getParameters(), equalTo("{}"));
WorkChunk firstChunk = freshFetchWorkChunk(createResult.workChunkId);
assertThat(firstChunk.getStatus(), equalTo(WorkChunkStatusEnum.QUEUED));
assertNull(firstChunk.getData(), "First chunk data is null - only uses parameters");
}
@Test
void testCreateInstance_firstChunkDequeued_movesToInProgress() {
// given
JobDefinition<?> jd = withJobDefinition();
IJobPersistence.CreateResult createResult = newTxTemplate().execute(status->
mySvc.onCreateWithFirstChunk(jd, "{}"));
assertNotNull(createResult);
// when
newTxTemplate().execute(status -> mySvc.onChunkDequeued(createResult.jobInstanceId));
// then
JobInstance jobInstance = freshFetchJobInstance(createResult.jobInstanceId);
assertThat(jobInstance.getStatus(), equalTo(StatusEnum.IN_PROGRESS));
}
@ParameterizedTest
@EnumSource(StatusEnum.class)
void cancelRequest_cancelsJob_whenNotFinalState(StatusEnum theState) {
// given
JobInstance cancelledInstance = createInstance();
cancelledInstance.setStatus(theState);
String instanceId1 = mySvc.storeNewInstance(cancelledInstance);
mySvc.cancelInstance(instanceId1);
JobInstance normalInstance = createInstance();
normalInstance.setStatus(theState);
String instanceId2 = mySvc.storeNewInstance(normalInstance);
JobDefinitionRegistry jobDefinitionRegistry = new JobDefinitionRegistry();
jobDefinitionRegistry.addJobDefinitionIfNotRegistered(withJobDefinition());
// when
runInTransaction(()-> {
new JobInstanceProcessor(
mySvc,
null,
instanceId1,
new JobChunkProgressAccumulator(),
null,
jobDefinitionRegistry,
null // transaction manager
).process();
});
// then
JobInstance freshInstance1 = mySvc.fetchInstance(instanceId1).orElseThrow();
if (theState.isCancellable()) {
assertEquals(StatusEnum.CANCELLED, freshInstance1.getStatus(), "cancel request processed");
assertThat(freshInstance1.getErrorMessage(), containsString("Job instance cancelled"));
} else {
assertEquals(theState, freshInstance1.getStatus(), "cancel request ignored - state unchanged");
assertNull(freshInstance1.getErrorMessage(), "no error message");
}
JobInstance freshInstance2 = mySvc.fetchInstance(instanceId2).orElseThrow();
assertEquals(theState, freshInstance2.getStatus(), "cancel request ignored - cancelled not set");
}
}
@Test
void testDeleteChunksAndMarkInstanceAsChunksPurged_doesWhatItSays() {
// given
JobDefinition<?> jd = withJobDefinition();
IJobPersistence.CreateResult createResult = newTxTemplate().execute(status->
mySvc.onCreateWithFirstChunk(jd, "{}"));
String instanceId = createResult.jobInstanceId;
for (int i = 0; i < 10; i++) {
storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, i, CHUNK_DATA);
}
JobInstance readback = freshFetchJobInstance(instanceId);
assertFalse(readback.isWorkChunksPurged());
assertTrue(mySvc.fetchAllWorkChunksIterator(instanceId, true).hasNext(), "has chunk");
// when
mySvc.deleteChunksAndMarkInstanceAsChunksPurged(instanceId);
// then
readback = freshFetchJobInstance(instanceId);
assertTrue(readback.isWorkChunksPurged(), "purged set");
assertFalse(mySvc.fetchAllWorkChunksIterator(instanceId, true).hasNext(), "chunks gone");
}
@Test
void testInstanceUpdate_modifierApplied() {
// given
String instanceId = mySvc.storeNewInstance(createInstance());
// when
mySvc.updateInstance(instanceId, instance ->{
instance.setErrorCount(42);
return true;
});
// then
JobInstance jobInstance = freshFetchJobInstance(instanceId);
assertEquals(42, jobInstance.getErrorCount());
}
@Test
void testInstanceUpdate_modifierNotAppliedWhenPredicateReturnsFalse() {
// given
JobInstance instance1 = createInstance();
boolean initialValue = true;
instance1.setFastTracking(initialValue);
String instanceId = mySvc.storeNewInstance(instance1);
// when
mySvc.updateInstance(instanceId, instance ->{
instance.setFastTracking(false);
return false;
});
// then
JobInstance jobInstance = freshFetchJobInstance(instanceId);
assertEquals(initialValue, jobInstance.isFastTracking());
}
private JobDefinition<TestJobParameters> withJobDefinition() {
public JobDefinition<TestJobParameters> withJobDefinition() {
return JobDefinition.newBuilder()
.setJobDefinitionId(JOB_DEFINITION_ID)
.setJobDefinitionVersion(JOB_DEF_VER)
@ -630,11 +82,20 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
.build();
}
@AfterEach
public void after() {
myJobDefinitionRegistry.removeJobDefinition(JOB_DEFINITION_ID, JOB_DEF_VER);
}
@Nonnull
private JobInstance createInstance() {
public JobInstance createInstance() {
JobDefinition<TestJobParameters> jobDefinition = withJobDefinition();
if (myJobDefinitionRegistry.getJobDefinition(JOB_DEFINITION_ID, JOB_DEF_VER).isEmpty()) {
myJobDefinitionRegistry.addJobDefinition(jobDefinition);
}
JobInstance instance = new JobInstance();
instance.setJobDefinitionId(JOB_DEFINITION_ID);
instance.setJobDefinitionId(jobDefinition.getJobDefinitionId());
instance.setStatus(StatusEnum.QUEUED);
instance.setJobDefinitionVersion(JOB_DEF_VER);
instance.setParameters(CHUNK_DATA);
@ -642,15 +103,14 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
return instance;
}
private String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) {
public String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) {
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData);
return mySvc.onWorkChunkCreate(batchWorkChunk);
}
public abstract PlatformTransactionManager getTxManager();
protected abstract PlatformTransactionManager getTxManager();
protected abstract WorkChunk freshFetchWorkChunk(String theChunkId);
protected JobInstance freshFetchJobInstance(String theInstanceId) {
public JobInstance freshFetchJobInstance(String theInstanceId) {
return runInTransaction(() -> mySvc.fetchInstance(theInstanceId).orElseThrow());
}
@ -689,6 +149,14 @@ public abstract class AbstractIJobPersistenceSpecificationTest {
await().until(() -> sw.getMillis() > 0);
}
public String createAndStoreJobInstance() {
JobInstance jobInstance = createInstance();
return mySvc.storeNewInstance(jobInstance);
}
public String createAndDequeueWorkChunk(String theJobInstanceId) {
String chunkId = createChunk(theJobInstanceId);
mySvc.onWorkChunkDequeue(chunkId);
return chunkId;
}
}

View File

@ -0,0 +1,55 @@
package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
public interface IInProgressActionsTests extends IWorkChunkCommon, WorkChunkTestConstants {
@Test
default void processingOk_inProgressToSuccess_clearsDataSavesRecordCount() {
String jobId = createAndStoreJobInstance();
String myChunkId = createAndDequeueWorkChunk(jobId);
// execution ok
getSvc().onWorkChunkCompletion(new WorkChunkCompletionEvent(myChunkId, 3, 0));
// verify the db was updated
var workChunkEntity = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.COMPLETED, workChunkEntity.getStatus());
assertNull(workChunkEntity.getData());
assertEquals(3, workChunkEntity.getRecordsProcessed());
assertNull(workChunkEntity.getErrorMessage());
assertEquals(0, workChunkEntity.getErrorCount());
}
@Test
default void processingRetryableError_inProgressToError_bumpsCountRecordsMessage() {
String jobId = createAndStoreJobInstance();
String myChunkId = createAndDequeueWorkChunk(jobId);
// execution had a retryable error
getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_A));
// verify the db was updated
var workChunkEntity = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, workChunkEntity.getStatus());
assertEquals(ERROR_MESSAGE_A, workChunkEntity.getErrorMessage());
assertEquals(1, workChunkEntity.getErrorCount());
}
@Test
default void processingFailure_inProgressToFailed() {
String jobId = createAndStoreJobInstance();
String myChunkId = createAndDequeueWorkChunk(jobId);
// execution had a failure
getSvc().onWorkChunkFailed(myChunkId, "some error");
// verify the db was updated
var workChunkEntity = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.FAILED, workChunkEntity.getStatus());
assertEquals("some error", workChunkEntity.getErrorMessage());
}
}

View File

@ -0,0 +1,122 @@
package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.maintenance.JobChunkProgressAccumulator;
import ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
public interface IInstanceStateTransitions extends IWorkChunkCommon, WorkChunkTestConstants {
Logger ourLog = LoggerFactory.getLogger(IInstanceStateTransitions.class);
@Test
default void createInstance_createsInQueuedWithChunk() {
// given
JobDefinition<?> jd = withJobDefinition();
// when
IJobPersistence.CreateResult createResult =
newTxTemplate().execute(status->
getSvc().onCreateWithFirstChunk(jd, "{}"));
// then
ourLog.info("job and chunk created {}", createResult);
assertNotNull(createResult);
assertThat(createResult.jobInstanceId, not(emptyString()));
assertThat(createResult.workChunkId, not(emptyString()));
JobInstance jobInstance = freshFetchJobInstance(createResult.jobInstanceId);
assertThat(jobInstance.getStatus(), equalTo(StatusEnum.QUEUED));
assertThat(jobInstance.getParameters(), equalTo("{}"));
WorkChunk firstChunk = freshFetchWorkChunk(createResult.workChunkId);
assertThat(firstChunk.getStatus(), equalTo(WorkChunkStatusEnum.READY));
assertNull(firstChunk.getData(), "First chunk data is null - only uses parameters");
}
@Test
default void testCreateInstance_firstChunkDequeued_movesToInProgress() {
// given
JobDefinition<?> jd = withJobDefinition();
IJobPersistence.CreateResult createResult = newTxTemplate().execute(status->
getSvc().onCreateWithFirstChunk(jd, "{}"));
assertNotNull(createResult);
// when
newTxTemplate().execute(status -> getSvc().onChunkDequeued(createResult.jobInstanceId));
// then
JobInstance jobInstance = freshFetchJobInstance(createResult.jobInstanceId);
assertThat(jobInstance.getStatus(), equalTo(StatusEnum.IN_PROGRESS));
}
@ParameterizedTest
@EnumSource(StatusEnum.class)
default void cancelRequest_cancelsJob_whenNotFinalState(StatusEnum theState) {
// given
JobInstance cancelledInstance = createInstance();
cancelledInstance.setStatus(theState);
String instanceId1 = getSvc().storeNewInstance(cancelledInstance);
getSvc().cancelInstance(instanceId1);
JobInstance normalInstance = createInstance();
normalInstance.setStatus(theState);
String instanceId2 = getSvc().storeNewInstance(normalInstance);
JobDefinitionRegistry jobDefinitionRegistry = new JobDefinitionRegistry();
jobDefinitionRegistry.addJobDefinitionIfNotRegistered(withJobDefinition());
// when
runInTransaction(()-> {
new JobInstanceProcessor(
getSvc(),
null,
instanceId1,
new JobChunkProgressAccumulator(),
null,
jobDefinitionRegistry,
getTransactionManager()
).process();
});
// then
JobInstance freshInstance1 = getSvc().fetchInstance(instanceId1).orElseThrow();
if (theState.isCancellable()) {
assertEquals(StatusEnum.CANCELLED, freshInstance1.getStatus(), "cancel request processed");
assertThat(freshInstance1.getErrorMessage(), containsString("Job instance cancelled"));
} else {
assertEquals(theState, freshInstance1.getStatus(), "cancel request ignored - state unchanged");
assertNull(freshInstance1.getErrorMessage(), "no error message");
}
JobInstance freshInstance2 = getSvc().fetchInstance(instanceId2).orElseThrow();
assertEquals(theState, freshInstance2.getStatus(), "cancel request ignored - cancelled not set");
}
default JobInstance createInstance() {
JobInstance instance = new JobInstance();
instance.setJobDefinitionId(JOB_DEFINITION_ID);
instance.setStatus(StatusEnum.QUEUED);
instance.setJobDefinitionVersion(JOB_DEF_VER);
instance.setParameters(CHUNK_DATA);
instance.setReport("TEST");
return instance;
}
}

View File

@ -0,0 +1,46 @@
package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters;
import org.springframework.transaction.support.TransactionTemplate;
public interface IWorkChunkCommon extends WorkChunkTestConstants {
String createAndStoreJobInstance();
String createAndDequeueWorkChunk(String theJobInstanceId);
WorkChunk freshFetchWorkChunk(String theChunkId);
JobInstance createInstance();
String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData);
void runInTransaction(Runnable theRunnable);
public void sleepUntilTimeChanges();
JobDefinition<TestJobParameters> withJobDefinition();
TransactionTemplate newTxTemplate();
JobInstance freshFetchJobInstance(String theInstanceId);
void runMaintenancePass();
IHapiTransactionService getTransactionManager();
IJobPersistence getSvc();
/**
* This assumes a creation of JOB_DEFINITION already
* @param theJobInstanceId
* @return
*/
default String createChunk(String theJobInstanceId) {
return storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, theJobInstanceId, 0, CHUNK_DATA);
}
}

View File

@ -0,0 +1,110 @@
package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
public interface IWorkChunkErrorActionsTests extends IWorkChunkCommon, WorkChunkTestConstants {
/**
* The consumer will retry after a retryable error is thrown
*/
@Test
default void errorRetry_errorToInProgress() {
String jobId = createAndStoreJobInstance();
String myChunkId = createAndDequeueWorkChunk(jobId);
getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, FIRST_ERROR_MESSAGE));
// when consumer restarts chunk
WorkChunk chunk = getSvc().onWorkChunkDequeue(myChunkId).orElseThrow(IllegalArgumentException::new);
// then
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
// verify the db state, error message, and error count
var workChunkEntity = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, workChunkEntity.getStatus());
assertEquals(FIRST_ERROR_MESSAGE, workChunkEntity.getErrorMessage(), "Original error message kept");
assertEquals(1, workChunkEntity.getErrorCount(), "error count kept");
}
@Test
default void errorRetry_repeatError_increasesErrorCount() {
String jobId = createAndStoreJobInstance();
String myChunkId = createAndDequeueWorkChunk(jobId);
getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, FIRST_ERROR_MESSAGE));
// setup - the consumer is re-trying, and marks it IN_PROGRESS
getSvc().onWorkChunkDequeue(myChunkId);
// when another error happens
getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B));
// verify the state, new message, and error count
var workChunkEntity = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, workChunkEntity.getStatus());
assertEquals(ERROR_MESSAGE_B, workChunkEntity.getErrorMessage(), "new error message");
assertEquals(2, workChunkEntity.getErrorCount(), "error count inc");
}
@Test
default void errorThenRetryAndComplete_addsErrorCounts() {
String jobId = createAndStoreJobInstance();
String myChunkId = createAndDequeueWorkChunk(jobId);
getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, FIRST_ERROR_MESSAGE));
// setup - the consumer is re-trying, and marks it IN_PROGRESS
getSvc().onWorkChunkDequeue(myChunkId);
// then it completes ok.
getSvc().onWorkChunkCompletion(new WorkChunkCompletionEvent(myChunkId, 3, 1));
// verify the state, new message, and error count
var workChunkEntity = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.COMPLETED, workChunkEntity.getStatus());
assertEquals(FIRST_ERROR_MESSAGE, workChunkEntity.getErrorMessage(), "Error message kept.");
assertEquals(2, workChunkEntity.getErrorCount(), "error combined with earlier error");
}
@Test
default void errorRetry_maxErrors_movesToFailed() {
// we start with 1 error already
String jobId = createAndStoreJobInstance();
String myChunkId = createAndDequeueWorkChunk(jobId);
getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, FIRST_ERROR_MESSAGE));
// 2nd try
getSvc().onWorkChunkDequeue(myChunkId);
getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B));
var chunk = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, chunk.getStatus());
assertEquals(2, chunk.getErrorCount());
// 3rd try
getSvc().onWorkChunkDequeue(myChunkId);
getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B));
chunk = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, chunk.getStatus());
assertEquals(3, chunk.getErrorCount());
// 4th try
getSvc().onWorkChunkDequeue(myChunkId);
getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_C));
chunk = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.FAILED, chunk.getStatus());
assertEquals(4, chunk.getErrorCount());
assertThat("Error message contains last error", chunk.getErrorMessage(), containsString(ERROR_MESSAGE_C));
assertThat("Error message contains error count and complaint", chunk.getErrorMessage(), containsString("many errors: 4"));
}
}

View File

@ -0,0 +1,36 @@
package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkTestConstants {
@Test
default void chunkCreation_isQueued() {
String jobInstanceId = createAndStoreJobInstance();
String myChunkId = createChunk(jobInstanceId);
WorkChunk fetchedWorkChunk = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.READY, fetchedWorkChunk.getStatus(), "New chunks are READY");
}
@Test
default void chunkReceived_queuedToInProgress() {
String jobInstanceId = createAndStoreJobInstance();
String myChunkId = createChunk(jobInstanceId);
runMaintenancePass();
// the worker has received the chunk, and marks it started.
WorkChunk chunk = getSvc().onWorkChunkDequeue(myChunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
assertEquals(CHUNK_DATA, chunk.getData());
// verify the db was updated too
WorkChunk fetchedWorkChunk = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, fetchedWorkChunk.getStatus());
}
}

View File

@ -0,0 +1,202 @@
package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestConstants {
@Test
default void testStoreAndFetchWorkChunk_NoData() {
JobInstance instance = createInstance();
String instanceId = getSvc().storeNewInstance(instance);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null);
runMaintenancePass();
WorkChunk chunk = getSvc().onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
assertNull(chunk.getData());
}
@Test
default void testStoreAndFetchWorkChunk_WithData() {
JobInstance instance = createInstance();
String instanceId = getSvc().storeNewInstance(instance);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
assertNotNull(id);
runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(id).getStatus()));
WorkChunk chunk = getSvc().onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
assertEquals(36, chunk.getInstanceId().length());
assertEquals(JOB_DEFINITION_ID, chunk.getJobDefinitionId());
assertEquals(JOB_DEF_VER, chunk.getJobDefinitionVersion());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
assertEquals(CHUNK_DATA, chunk.getData());
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, freshFetchWorkChunk(id).getStatus()));
}
@Test
default void testMarkChunkAsCompleted_Success() {
JobInstance instance = createInstance();
String instanceId = getSvc().storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, TARGET_STEP_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA);
assertNotNull(chunkId);
runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(chunkId).getStatus()));
sleepUntilTimeChanges();
WorkChunk chunk = getSvc().onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
assertNotNull(chunk.getCreateTime());
assertNotNull(chunk.getStartTime());
assertNull(chunk.getEndTime());
assertNull(chunk.getRecordsProcessed());
assertNotNull(chunk.getData());
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, freshFetchWorkChunk(chunkId).getStatus()));
sleepUntilTimeChanges();
runInTransaction(() -> getSvc().onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkId, 50, 0)));
WorkChunk entity = freshFetchWorkChunk(chunkId);
assertEquals(WorkChunkStatusEnum.COMPLETED, entity.getStatus());
assertEquals(50, entity.getRecordsProcessed());
assertNotNull(entity.getCreateTime());
assertNotNull(entity.getStartTime());
assertNotNull(entity.getEndTime());
assertNull(entity.getData());
assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime());
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
}
@Test
default void testMarkChunkAsCompleted_Error() {
JobInstance instance = createInstance();
String instanceId = getSvc().storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, TARGET_STEP_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, freshFetchWorkChunk(chunkId).getStatus()));
runMaintenancePass();
sleepUntilTimeChanges();
WorkChunk chunk = getSvc().onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
WorkChunkErrorEvent request = new WorkChunkErrorEvent(chunkId, ERROR_MESSAGE_A);
getSvc().onWorkChunkError(request);
runInTransaction(() -> {
WorkChunk entity = freshFetchWorkChunk(chunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus());
assertEquals(ERROR_MESSAGE_A, entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
assertNotNull(entity.getStartTime());
assertNotNull(entity.getEndTime());
assertEquals(1, entity.getErrorCount());
assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime());
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
});
// Mark errored again
WorkChunkErrorEvent request2 = new WorkChunkErrorEvent(chunkId, "This is an error message 2");
getSvc().onWorkChunkError(request2);
runInTransaction(() -> {
WorkChunk entity = freshFetchWorkChunk(chunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus());
assertEquals("This is an error message 2", entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
assertNotNull(entity.getStartTime());
assertNotNull(entity.getEndTime());
assertEquals(2, entity.getErrorCount());
assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime());
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
});
List<WorkChunk> chunks = ImmutableList.copyOf(getSvc().fetchAllWorkChunksIterator(instanceId, true));
assertEquals(1, chunks.size());
assertEquals(2, chunks.get(0).getErrorCount());
}
@Test
default void testMarkChunkAsCompleted_Fail() {
JobInstance instance = createInstance();
String instanceId = getSvc().storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, TARGET_STEP_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, freshFetchWorkChunk(chunkId).getStatus()));
runMaintenancePass();
sleepUntilTimeChanges();
WorkChunk chunk = getSvc().onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
getSvc().onWorkChunkFailed(chunkId, "This is an error message");
runInTransaction(() -> {
WorkChunk entity = freshFetchWorkChunk(chunkId);
assertEquals(WorkChunkStatusEnum.FAILED, entity.getStatus());
assertEquals("This is an error message", entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
assertNotNull(entity.getStartTime());
assertNotNull(entity.getEndTime());
assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime());
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
});
}
@Test
default void markWorkChunksWithStatusAndWipeData_marksMultipleChunksWithStatus_asExpected() {
JobInstance instance = createInstance();
String instanceId = getSvc().storeNewInstance(instance);
ArrayList<String> chunkIds = new ArrayList<>();
for (int i = 0; i < 10; i++) {
WorkChunkCreateEvent chunk = new WorkChunkCreateEvent(
"defId",
1,
"stepId",
instanceId,
0,
"{}"
);
String id = getSvc().onWorkChunkCreate(chunk);
chunkIds.add(id);
}
runInTransaction(() -> getSvc().markWorkChunksWithStatusAndWipeData(instance.getInstanceId(), chunkIds, WorkChunkStatusEnum.COMPLETED, null));
Iterator<WorkChunk> reducedChunks = getSvc().fetchAllWorkChunksIterator(instanceId, true);
while (reducedChunks.hasNext()) {
WorkChunk reducedChunk = reducedChunks.next();
assertTrue(chunkIds.contains(reducedChunk.getId()));
assertEquals(WorkChunkStatusEnum.COMPLETED, reducedChunk.getStatus());
}
}
}

View File

@ -0,0 +1,16 @@
package ca.uhn.hapi.fhir.batch2.test;
public interface WorkChunkTestConstants {
public static final String JOB_DEFINITION_ID = "definition-id";
public static final String TARGET_STEP_ID = "step-id";
public static final String DEF_CHUNK_ID = "definition-chunkId";
// public static final String STEP_CHUNK_ID = "step-chunkId";
public static final int JOB_DEF_VER = 1;
public static final int SEQUENCE_NUMBER = 1;
public static final String CHUNK_DATA = "{\"key\":\"value\"}";
public static final String ERROR_MESSAGE_A = "This is an error message: A";
public static final String ERROR_MESSAGE_B = "This is a different error message: B";
public static final String ERROR_MESSAGE_C = "This is a different error message: C";
final String FIRST_ERROR_MESSAGE = ERROR_MESSAGE_A;
}