fixing tests

This commit is contained in:
leif stawnyczy 2024-03-20 16:23:56 -04:00
parent ad470cff72
commit 2037c01454
6 changed files with 381 additions and 230 deletions

View File

@ -164,7 +164,7 @@ public class Batch2WorkChunkEntity implements Serializable {
}
public static Batch2WorkChunkEntity fromWorkChunk(WorkChunk theWorkChunk) {
return new Batch2WorkChunkEntity(
Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity(
theWorkChunk.getId(),
theWorkChunk.getSequence(),
theWorkChunk.getJobDefinitionId(),
@ -181,6 +181,9 @@ public class Batch2WorkChunkEntity implements Serializable {
theWorkChunk.getRecordsProcessed(),
theWorkChunk.getWarningMessage()
);
entity.setSerializedData(theWorkChunk.getData());
return entity;
}
public int getErrorCount() {

View File

@ -26,19 +26,19 @@ import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.hapi.fhir.batch2.test.models.JobMaintenanceStateInformation;
import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobStep2InputType;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobStep3InputType;
import jakarta.annotation.Nonnull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Nested;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -48,21 +48,20 @@ import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* Specification tests for batch2 storage and event system.
* These tests are abstract, and do not depend on JPA.
* Test setups should use the public batch2 api to create scenarios.
*/
public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMaintenanceActions, IInProgressActionsTests, IInstanceStateTransitions, IWorkChunkStateTransitions, IWorkChunkStorageTests, IWorkChunkErrorActionsTests, WorkChunkTestConstants {
public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMaintenanceActions, IInProgressActionsTests, IInstanceStateTransitions, IWorkChunkCommon, WorkChunkTestConstants {
private static final Logger ourLog = LoggerFactory.getLogger(AbstractIJobPersistenceSpecificationTest.class);
@ -107,9 +106,43 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa
@AfterEach
public void after() {
myJobDefinitionRegistry.removeJobDefinition(JOB_DEFINITION_ID, JOB_DEF_VER);
// re-enable our runner after every test (just in case)
myMaintenanceService.enableMaintenancePass(true);
}
@Nested
class WorkChunkStorage implements IWorkChunkStorageTests {
@Override
public IWorkChunkCommon getTestManager() {
return AbstractIJobPersistenceSpecificationTest.this;
}
@Nested
class StateTransitions implements IWorkChunkStateTransitions {
@Override
public IWorkChunkCommon getTestManager() {
return AbstractIJobPersistenceSpecificationTest.this;
}
@Nested
class ErrorActions implements IWorkChunkErrorActionsTests {
@Override
public IWorkChunkCommon getTestManager() {
return AbstractIJobPersistenceSpecificationTest.this;
}
}
}
}
@Override
public IWorkChunkCommon getTestManager() {
return this;
}
@Nonnull
public JobInstance createInstance(JobDefinition<?> theJobDefinition) {
JobDefinition<?> jobDefinition = theJobDefinition == null ? withJobDefinition(false)
@ -185,50 +218,24 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa
return chunkId;
}
public String createChunk(String theInstanceId) {
return storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, theInstanceId, 0, CHUNK_DATA);
}
public void enableMaintenanceRunner(boolean theToEnable) {
myMaintenanceService.enableMaintenancePass(theToEnable);
}
public BatchJobSender getBatchJobSender() {
return myBatchJobSender;
public void disableWorkChunkMessageHandler() {
doNothing().when(myBatchJobSender).sendWorkChannelMessage(any(JobWorkNotification.class));
}
public void verifyWorkChunkMessageHandlerCalled(int theNumberOfTimes) {
verify(myBatchJobSender, times(theNumberOfTimes))
.sendWorkChannelMessage(any(JobWorkNotification.class));
}
public void createChunksInStates(JobMaintenanceStateInformation theJobMaintenanceStateInformation) {
// should have as many input workchunks as output workchunks
// unless we have newly created ones somewhere
assertEquals(theJobMaintenanceStateInformation.getInitialWorkChunks().size(), theJobMaintenanceStateInformation.getFinalWorkChunk().size());
Set<String> stepIds = new HashSet<>();
for (int i = 0; i < theJobMaintenanceStateInformation.getInitialWorkChunks().size(); i++) {
WorkChunk workChunk = theJobMaintenanceStateInformation.getInitialWorkChunks().get(i);
WorkChunk saved = mySvc.createWorkChunk(workChunk);
ourLog.info("Created WorkChunk: " + saved.toString());
workChunk.setId(saved.getId());
theJobMaintenanceStateInformation.getFinalWorkChunk().get(i)
.setId(saved.getId());
stepIds.add(workChunk.getTargetStepId());
}
// if it's a gated job, we'll manually set the step id for the instance
JobDefinition<?> jobDef = theJobMaintenanceStateInformation.getJobDefinition();
if (jobDef.isGatedExecution()) {
AtomicReference<String> latestStepId = new AtomicReference<>();
int totalSteps = jobDef.getSteps().size();
for (int i = totalSteps - 1; i >= 0; i--) {
JobDefinitionStep<?, ?, ?> step = jobDef.getSteps().get(i);
if (stepIds.contains(step.getStepId())) {
latestStepId.set(step.getStepId());
break;
}
}
// should def have a value
assertNotNull(latestStepId.get());
String instanceId = theJobMaintenanceStateInformation.getInstanceId();
mySvc.updateInstance(instanceId, instance -> {
instance.setCurrentGatedStepId(latestStepId.get());
return true;
});
}
theJobMaintenanceStateInformation.initialize(mySvc);
}
}

View File

@ -1,42 +1,27 @@
package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.hapi.fhir.batch2.test.models.JobMaintenanceStateInformation;
import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestConstants {
Logger ourLog = LoggerFactory.getLogger(IJobMaintenanceActions.class);
void enableMaintenanceRunner(boolean theToEnable);
void createChunksInStates(JobMaintenanceStateInformation theInitialState);
BatchJobSender getBatchJobSender();
@Test
default void test_gatedJob_stepReady_advances() {
// given
// setup
String initialState = """
# chunks ready - move to queued
1|COMPLETED
@ -44,18 +29,16 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
2|READY,2|QUEUED
""";
enableMaintenanceRunner(false);
disableWorkChunkMessageHandler();
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(initialState, true);
// setup
createChunksInStates(result);
// TEST run job maintenance - force transition
doNothing().when(getBatchJobSender()).sendWorkChannelMessage(any(JobWorkNotification.class));
enableMaintenanceRunner(true);
// test
runMaintenancePass();
// verify
verifyWorkChunkFinalStates(result);
verifyWorkChunkMessageHandlerCalled(2);
}
@ParameterizedTest
@ -120,21 +103,20 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
"""
})
default void testGatedStep2NotReady_notAdvance(String theChunkState) {
// given
// setup
enableMaintenanceRunner(false);
disableWorkChunkMessageHandler();
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true);
// setup
createChunksInStates(result);
// TEST run job maintenance - force transition
enableMaintenanceRunner(true);
lenient().doNothing().when(getBatchJobSender()).sendWorkChannelMessage(any(JobWorkNotification.class));
// test
runMaintenancePass();
// verify
verifyWorkChunkFinalStates(result);
// nothing ever queued -> nothing ever sent to queue
verifyWorkChunkMessageHandlerCalled(0);
}
@ParameterizedTest
@ -163,25 +145,25 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
"""
})
default void testGatedStep2ReadyToAdvance_advanceToStep3(String theChunkState) {
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true);
// setup
enableMaintenanceRunner(false);
disableWorkChunkMessageHandler();
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true);
createChunksInStates(result);
// TEST run job maintenance - force transition
enableMaintenanceRunner(true);
lenient().doNothing().when(getBatchJobSender()).sendWorkChannelMessage(any(JobWorkNotification.class));
// test
runMaintenancePass();
// verify
verifyWorkChunkFinalStates(result);
// things are being set to READY; is anything being queued?
verifyWorkChunkMessageHandlerCalled(0);
}
@ParameterizedTest
@ValueSource(strings = {
"""
@Test
default void test_ungatedJob_advancesSteps() {
// setup
String state = """
# READY chunks should transition; others should stay
1|COMPLETED
2|READY,2|QUEUED
@ -189,31 +171,28 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
2|COMPLETED
2|IN_PROGRESS
3|IN_PROGRESS
"""
})
default void test_ungatedJob_advancesSteps(String theChunkState) {
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, false);
""";
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(state, false);
// setup
enableMaintenanceRunner(false);
disableWorkChunkMessageHandler();
createChunksInStates(result);
// TEST run job maintenance - force transition
enableMaintenanceRunner(true);
lenient().doNothing().when(getBatchJobSender()).sendWorkChannelMessage(any(JobWorkNotification.class));
runMaintenancePass();
// verify
verifyWorkChunkFinalStates(result);
verifyWorkChunkMessageHandlerCalled(2);
}
private JobMaintenanceStateInformation setupGatedWorkChunkTransitionTest(String theChunkState, boolean theIsGated) {
// get the job def and store the instance
JobDefinition<?> definition = withJobDefinition(theIsGated);
String instanceId = createAndStoreJobInstance(definition);
JobMaintenanceStateInformation stateInformation = new JobMaintenanceStateInformation(instanceId, definition);
stateInformation.initialize(theChunkState);
JobMaintenanceStateInformation stateInformation = new JobMaintenanceStateInformation(instanceId, definition, theChunkState);
ourLog.info("Starting test case \n {}", theChunkState);
// display comments if there are any
@ -222,28 +201,6 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
}
private void verifyWorkChunkFinalStates(JobMaintenanceStateInformation theStateInformation) {
assertEquals(theStateInformation.getInitialWorkChunks().size(), theStateInformation.getFinalWorkChunk().size());
HashMap<String, WorkChunk> workchunkMap = new HashMap<>();
for (WorkChunk fs : theStateInformation.getFinalWorkChunk()) {
workchunkMap.put(fs.getId(), fs);
}
// fetch all workchunks
Iterator<WorkChunk> workChunkIterator = getSvc().fetchAllWorkChunksIterator(theStateInformation.getInstanceId(), true);
List<WorkChunk> workchunks = new ArrayList<>();
workChunkIterator.forEachRemaining(workchunks::add);
assertEquals(workchunks.size(), workchunkMap.size());
workchunks.forEach(c -> ourLog.info("Returned " + c.toString()));
for (WorkChunk wc : workchunks) {
WorkChunk expected = workchunkMap.get(wc.getId());
assertNotNull(expected);
// verify status and step id
assertEquals(expected.getTargetStepId(), wc.getTargetStepId());
assertEquals(expected.getStatus(), wc.getStatus());
}
theStateInformation.verifyFinalStates(getSvc());
}
}

View File

@ -1,40 +1,74 @@
package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
public interface IWorkChunkCommon extends WorkChunkTestConstants {
String createAndStoreJobInstance(JobDefinition<?> theJobDefinition);
String createAndDequeueWorkChunk(String theJobInstanceId);
/**
* Returns the concrete class that is implementing this stuff.
* Used primarily for structure
*/
IWorkChunkCommon getTestManager();
WorkChunk freshFetchWorkChunk(String theChunkId);
default String createAndStoreJobInstance(JobDefinition<?> theJobDefinition) {
return getTestManager().createAndStoreJobInstance(theJobDefinition);
}
JobInstance createInstance();
default String createAndDequeueWorkChunk(String theJobInstanceId) {
return getTestManager().createAndDequeueWorkChunk(theJobInstanceId);
}
String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData);
default WorkChunk freshFetchWorkChunk(String theChunkId) {
return getTestManager().freshFetchWorkChunk(theChunkId);
}
void runInTransaction(Runnable theRunnable);
default JobInstance createInstance() {
return getTestManager().createInstance();
}
public void sleepUntilTimeChanges();
default String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) {
return getTestManager().storeWorkChunk(theJobDefinitionId, theTargetStepId, theInstanceId, theSequence, theSerializedData);
}
JobDefinition<TestJobParameters> withJobDefinition(boolean theIsGatedJob);
default void runInTransaction(Runnable theRunnable) {
getTestManager().runInTransaction(theRunnable);
}
TransactionTemplate newTxTemplate();
default void sleepUntilTimeChanges() {
getTestManager().sleepUntilTimeChanges();
}
JobInstance freshFetchJobInstance(String theInstanceId);
default JobDefinition<TestJobParameters> withJobDefinition(boolean theIsGatedJob) {
return getTestManager().withJobDefinition(theIsGatedJob);
}
void runMaintenancePass();
default TransactionTemplate newTxTemplate() {
return getTestManager().newTxTemplate();
}
PlatformTransactionManager getTransactionManager();
default JobInstance freshFetchJobInstance(String theInstanceId) {
return getTestManager().freshFetchJobInstance(theInstanceId);
}
IJobPersistence getSvc();
default void runMaintenancePass() {
getTestManager().runMaintenancePass();
}
default PlatformTransactionManager getTransactionManager() {
return getTestManager().getTransactionManager();
}
default IJobPersistence getSvc() {
return getTestManager().getSvc();
}
/**
* This assumes a creation of JOB_DEFINITION already
@ -42,6 +76,36 @@ public interface IWorkChunkCommon extends WorkChunkTestConstants {
* @return
*/
default String createChunk(String theJobInstanceId) {
return storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, theJobInstanceId, 0, CHUNK_DATA);
return getTestManager().createChunk(theJobInstanceId);
}
/**
* Enable/disable the maintenance runner (So it doesn't run on a scheduler)
*/
default void enableMaintenanceRunner(boolean theToEnable) {
getTestManager().enableMaintenanceRunner(theToEnable);
}
/**
* Uses the JobMaintenanceState information and the format:
* "step_number|initialstate,step_number|finalstate" to construct
* an initial state for a test scenario
*/
default void createChunksInStates(JobMaintenanceStateInformation theInitialState) {
getTestManager().createChunksInStates(theInitialState);
}
/**
* Disables the workchunk message handler
* so that we do not actually send messages to the queue;
* useful if mocking state transitions and we don't want to test
* dequeuing.
*/
default void disableWorkChunkMessageHandler() {
getTestManager().disableWorkChunkMessageHandler();
}
default void verifyWorkChunkMessageHandlerCalled(int theNumberOfTimes) {
getTestManager().verifyWorkChunkMessageHandlerCalled(theNumberOfTimes);
}
}

View File

@ -1,22 +1,29 @@
package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation;
import com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestConstants {
@ -26,28 +33,75 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
String instanceId = getSvc().storeNewInstance(instance);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null);
runMaintenancePass();
WorkChunk chunk = getSvc().onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
assertNull(chunk.getData());
runInTransaction(() -> {
WorkChunk chunk = freshFetchWorkChunk(id);
assertNull(chunk.getData());
});
}
// fixme add test - testWorkChunkCreate_inReady
// fixme add test - testNonGatedWorkChunkInReady_IsQueuedDuringMaintentaince
@Test
default void testStoreAndFetchWorkChunk_WithData() {
default void testWorkChunkCreate_inReadyState() {
JobInstance instance = createInstance();
String instanceId = getSvc().storeNewInstance(instance);
enableMaintenanceRunner(false);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
assertNotNull(id);
// fixme drop this verify against READY state.
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, freshFetchWorkChunk(id).getStatus()));
}
@Test
default void testNonGatedWorkChunkInReady_IsQueuedDuringMaintenance() {
// setup
enableMaintenanceRunner(false);
disableWorkChunkMessageHandler();
String state = "1|READY,1|QUEUED";
JobDefinition<?> jobDefinition = withJobDefinition(false);
String instanceId = createAndStoreJobInstance(jobDefinition);
JobMaintenanceStateInformation stateInformation = new JobMaintenanceStateInformation(instanceId, jobDefinition, state);
createChunksInStates(stateInformation);
String id = stateInformation.getInitialWorkChunks().stream().findFirst().orElseThrow().getId();
// verify created in ready
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, freshFetchWorkChunk(id).getStatus()));
// test
runMaintenancePass();
// verify it's in QUEUED now
stateInformation.verifyFinalStates(getSvc());
verifyWorkChunkMessageHandlerCalled(1);
}
@Test
default void testStoreAndFetchWorkChunk_WithData() {
// setup
disableWorkChunkMessageHandler();
enableMaintenanceRunner(false);
JobDefinition<?> jobDefinition = withJobDefinition(false);
JobInstance instance = createInstance();
String instanceId = getSvc().storeNewInstance(instance);
// we're not transitioning this state; we're just checking storage of data
JobMaintenanceStateInformation info = new JobMaintenanceStateInformation(instanceId, jobDefinition, "1|QUEUED");
info.addWorkChunkModifier((chunk) -> {
chunk.setData(CHUNK_DATA);
});
createChunksInStates(info);
String id = info.getInitialWorkChunks().stream().findFirst().orElseThrow().getId();
// verify created in QUEUED
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(id).getStatus()));
// test; manually dequeue chunk
WorkChunk chunk = getSvc().onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
// verify
assertEquals(36, chunk.getInstanceId().length());
assertEquals(JOB_DEFINITION_ID, chunk.getJobDefinitionId());
assertEquals(JOB_DEF_VER, chunk.getJobDefinitionVersion());
@ -59,69 +113,56 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
@Test
default void testMarkChunkAsCompleted_Success() {
JobInstance instance = createInstance();
String instanceId = getSvc().storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, TARGET_STEP_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA);
assertNotNull(chunkId);
// setup
String state = "2|IN_PROGRESS,2|COMPLETED";
disableWorkChunkMessageHandler();
enableMaintenanceRunner(false);
runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(chunkId).getStatus()));
sleepUntilTimeChanges();
JobDefinition<?> jobDefinition = withJobDefinition(false);
String instanceId = createAndStoreJobInstance(jobDefinition);
JobMaintenanceStateInformation info = new JobMaintenanceStateInformation(instanceId, jobDefinition, state);
info.addWorkChunkModifier(chunk -> {
chunk.setCreateTime(new Date());
chunk.setData(CHUNK_DATA);
});
createChunksInStates(info);
WorkChunk chunk = getSvc().onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
assertNotNull(chunk.getCreateTime());
assertNotNull(chunk.getStartTime());
assertNull(chunk.getEndTime());
assertNull(chunk.getRecordsProcessed());
assertNotNull(chunk.getData());
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, freshFetchWorkChunk(chunkId).getStatus()));
sleepUntilTimeChanges();
String chunkId = info.getInitialWorkChunks().stream().findFirst().orElseThrow().getId();
// run test
runInTransaction(() -> getSvc().onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkId, 50, 0)));
// verify
info.verifyFinalStates(getSvc());
WorkChunk entity = freshFetchWorkChunk(chunkId);
assertEquals(WorkChunkStatusEnum.COMPLETED, entity.getStatus());
assertEquals(50, entity.getRecordsProcessed());
assertNotNull(entity.getCreateTime());
assertNotNull(entity.getStartTime());
assertNotNull(entity.getEndTime());
assertNull(entity.getData());
assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime());
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
}
@Test
default void testMarkChunkAsCompleted_Error() {
JobInstance instance = createInstance();
String instanceId = getSvc().storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, TARGET_STEP_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, freshFetchWorkChunk(chunkId).getStatus()));
runMaintenancePass();
sleepUntilTimeChanges();
WorkChunk chunk = getSvc().onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
// setup
String state = "1|IN_PROGRESS,1|ERRORED";
disableWorkChunkMessageHandler();
enableMaintenanceRunner(false);
JobDefinition<?> jobDef = withJobDefinition(false);
String instanceId = createAndStoreJobInstance(jobDef);
JobMaintenanceStateInformation info = new JobMaintenanceStateInformation(
instanceId, jobDef, state
);
createChunksInStates(info);
String chunkId = info.getInitialWorkChunks().stream().findFirst().orElseThrow().getId();
// test
WorkChunkErrorEvent request = new WorkChunkErrorEvent(chunkId, ERROR_MESSAGE_A);
getSvc().onWorkChunkError(request);
runInTransaction(() -> {
WorkChunk entity = freshFetchWorkChunk(chunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus());
assertEquals(ERROR_MESSAGE_A, entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
assertNotNull(entity.getStartTime());
assertNotNull(entity.getEndTime());
assertEquals(1, entity.getErrorCount());
assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime());
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
});
// Mark errored again
@ -132,68 +173,64 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
WorkChunk entity = freshFetchWorkChunk(chunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus());
assertEquals("This is an error message 2", entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
assertNotNull(entity.getStartTime());
assertNotNull(entity.getEndTime());
assertEquals(2, entity.getErrorCount());
assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime());
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
});
List<WorkChunk> chunks = ImmutableList.copyOf(getSvc().fetchAllWorkChunksIterator(instanceId, true));
assertEquals(1, chunks.size());
assertEquals(2, chunks.get(0).getErrorCount());
info.verifyFinalStates(getSvc());
}
@Test
default void testMarkChunkAsCompleted_Fail() {
JobInstance instance = createInstance();
String instanceId = getSvc().storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, TARGET_STEP_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, freshFetchWorkChunk(chunkId).getStatus()));
runMaintenancePass();
sleepUntilTimeChanges();
WorkChunk chunk = getSvc().onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
// setup
String state = "1|IN_PROGRESS,1|FAILED";
disableWorkChunkMessageHandler();
enableMaintenanceRunner(false);
JobDefinition<?> jobDef = withJobDefinition(false);
String instanceId = createAndStoreJobInstance(jobDef);
JobMaintenanceStateInformation info = new JobMaintenanceStateInformation(
instanceId, jobDef, state
);
createChunksInStates(info);
String chunkId = info.getInitialWorkChunks().stream().findFirst().orElseThrow().getId();
// test
getSvc().onWorkChunkFailed(chunkId, "This is an error message");
// verify
runInTransaction(() -> {
WorkChunk entity = freshFetchWorkChunk(chunkId);
assertEquals(WorkChunkStatusEnum.FAILED, entity.getStatus());
assertEquals("This is an error message", entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
assertNotNull(entity.getStartTime());
assertNotNull(entity.getEndTime());
assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime());
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
});
info.verifyFinalStates(getSvc());
}
@Test
default void markWorkChunksWithStatusAndWipeData_marksMultipleChunksWithStatus_asExpected() {
JobInstance instance = createInstance();
String instanceId = getSvc().storeNewInstance(instance);
ArrayList<String> chunkIds = new ArrayList<>();
for (int i = 0; i < 10; i++) {
WorkChunkCreateEvent chunk = new WorkChunkCreateEvent(
"defId",
1,
"stepId",
instanceId,
0,
"{}"
);
String id = getSvc().onWorkChunkCreate(chunk);
chunkIds.add(id);
}
// setup
String state = """
1|IN_PROGRESS,1|COMPLETED
1|ERRORED,1|COMPLETED
1|QUEUED,1|COMPLETED
1|IN_PROGRESS,1|COMPLETED
""";
disableWorkChunkMessageHandler();
enableMaintenanceRunner(false);
JobDefinition<?> jobDef = withJobDefinition(false);
String instanceId = createAndStoreJobInstance(jobDef);
JobMaintenanceStateInformation info = new JobMaintenanceStateInformation(
instanceId, jobDef, state
);
createChunksInStates(info);
List<String> chunkIds = info.getInitialWorkChunks().stream().map(WorkChunk::getId)
.collect(Collectors.toList());
runInTransaction(() -> getSvc().markWorkChunksWithStatusAndWipeData(instance.getInstanceId(), chunkIds, WorkChunkStatusEnum.COMPLETED, null));
runInTransaction(() -> getSvc().markWorkChunksWithStatusAndWipeData(instanceId, chunkIds, WorkChunkStatusEnum.COMPLETED, null));
Iterator<WorkChunk> reducedChunks = getSvc().fetchAllWorkChunksIterator(instanceId, true);

View File

@ -1,5 +1,6 @@
package ca.uhn.hapi.fhir.batch2.test.models;
package ca.uhn.hapi.fhir.batch2.test.support;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.WorkChunk;
@ -9,13 +10,19 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class JobMaintenanceStateInformation {
@ -33,9 +40,20 @@ public class JobMaintenanceStateInformation {
private final String myInstanceId;
public JobMaintenanceStateInformation(String theInstanceId, JobDefinition<?> theJobDefinition) {
private Consumer<WorkChunk> myWorkChunkModifier = (chunk) -> {};
public JobMaintenanceStateInformation(
String theInstanceId,
JobDefinition<?> theJobDefinition,
String theStateUnderTest) {
myInstanceId = theInstanceId;
myJobDefinition = theJobDefinition;
setState(theStateUnderTest);
}
public void addWorkChunkModifier(Consumer<WorkChunk> theModifier) {
myWorkChunkModifier = theModifier;
}
public List<String> getLineComments() {
@ -58,7 +76,73 @@ public class JobMaintenanceStateInformation {
return myInstanceId;
}
public void initialize(String theState) {
public void verifyFinalStates(IJobPersistence theJobPersistence) {
assertEquals(getInitialWorkChunks().size(), getFinalWorkChunk().size());
HashMap<String, WorkChunk> workchunkMap = new HashMap<>();
for (WorkChunk fs : getFinalWorkChunk()) {
workchunkMap.put(fs.getId(), fs);
}
// fetch all workchunks
Iterator<WorkChunk> workChunkIterator = theJobPersistence.fetchAllWorkChunksIterator(getInstanceId(), true);
List<WorkChunk> workchunks = new ArrayList<>();
workChunkIterator.forEachRemaining(workchunks::add);
assertEquals(workchunks.size(), workchunkMap.size());
workchunks.forEach(c -> ourLog.info("Returned " + c.toString()));
for (WorkChunk wc : workchunks) {
WorkChunk expected = workchunkMap.get(wc.getId());
assertNotNull(expected);
// verify status and step id
assertEquals(expected.getTargetStepId(), wc.getTargetStepId());
assertEquals(expected.getStatus(), wc.getStatus());
}
}
public void initialize(IJobPersistence theJobPersistence) {
// should have as many input workchunks as output workchunks
// unless we have newly created ones somewhere
assertEquals(getInitialWorkChunks().size(), getFinalWorkChunk().size());
Set<String> stepIds = new HashSet<>();
for (int i = 0; i < getInitialWorkChunks().size(); i++) {
WorkChunk workChunk = getInitialWorkChunks().get(i);
myWorkChunkModifier.accept(workChunk);
WorkChunk saved = theJobPersistence.createWorkChunk(workChunk);
ourLog.info("Created WorkChunk: " + saved.toString());
workChunk.setId(saved.getId());
getFinalWorkChunk().get(i)
.setId(saved.getId());
stepIds.add(workChunk.getTargetStepId());
}
// if it's a gated job, we'll manually set the step id for the instance
JobDefinition<?> jobDef = getJobDefinition();
if (jobDef.isGatedExecution()) {
AtomicReference<String> latestStepId = new AtomicReference<>();
int totalSteps = jobDef.getSteps().size();
for (int i = totalSteps - 1; i >= 0; i--) {
JobDefinitionStep<?, ?, ?> step = jobDef.getSteps().get(i);
if (stepIds.contains(step.getStepId())) {
latestStepId.set(step.getStepId());
break;
}
}
// should def have a value
assertNotNull(latestStepId.get());
String instanceId = getInstanceId();
theJobPersistence.updateInstance(instanceId, instance -> {
instance.setCurrentGatedStepId(latestStepId.get());
return true;
});
}
}
private void setState(String theState) {
String[] chunkLines = theState.split("\n");
Pattern pattern = Pattern.compile(COMMENT_PATTERN);
for (String chunkLine : chunkLines) {
@ -117,10 +201,10 @@ public class JobMaintenanceStateInformation {
String stepId = getJobStepId(parts[0]);
WorkChunkStatusEnum initialStatus = WorkChunkStatusEnum.valueOf(parts[1].trim());
WorkChunk initial = createBaseWorkChunk();
initial.setStatus(initialStatus);
initial.setTargetStepId(stepId);
theAdder.accept(initial);
WorkChunk chunk = createBaseWorkChunk();
chunk.setStatus(initialStatus);
chunk.setTargetStepId(stepId);
theAdder.accept(chunk);
}
private String getJobStepId(String theIndexId) {
@ -150,7 +234,6 @@ public class JobMaintenanceStateInformation {
private WorkChunk createBaseWorkChunk() {
WorkChunk chunk = new WorkChunk();
// chunk.setId(UUID.randomUUID().toString());
chunk.setJobDefinitionId(myJobDefinition.getJobDefinitionId());
chunk.setInstanceId(myInstanceId);
chunk.setJobDefinitionVersion(myJobDefinition.getJobDefinitionVersion());