This commit is contained in:
leif stawnyczy 2024-03-19 16:13:33 -04:00
parent 3f6c2f984b
commit 32994ee69f
3 changed files with 67 additions and 7 deletions

View File

@ -38,6 +38,8 @@ import ca.uhn.hapi.fhir.batch2.test.support.TestJobStep2InputType;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobStep3InputType;
import jakarta.annotation.Nonnull;
import org.junit.jupiter.api.AfterEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
@ -48,10 +50,10 @@ import org.springframework.transaction.support.TransactionTemplate;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
/**
@ -61,6 +63,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
*/
public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMaintenanceActions, IInProgressActionsTests, IInstanceStateTransitions, IWorkChunkStateTransitions, IWorkChunkStorageTests, IWorkChunkErrorActionsTests, WorkChunkTestConstants {
private static final Logger ourLog = LoggerFactory.getLogger(AbstractIJobPersistenceSpecificationTest.class);
@Autowired
private IJobPersistence mySvc;
@ -182,9 +186,20 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa
}
public void createChunksInStates(JobMaintenanceStateInformation theJobMaintenanceStateInformation) {
// should have as many input workchunks as output workchunks
// unless we have newly created ones somewhere
assertEquals(theJobMaintenanceStateInformation.getInitialWorkChunks().size(), theJobMaintenanceStateInformation.getFinalWorkChunk().size());
Set<String> stepIds = new HashSet<>();
for (WorkChunk workChunk : theJobMaintenanceStateInformation.getInitialWorkChunks()) {
mySvc.createWorkChunk(workChunk);
for (int i = 0; i < theJobMaintenanceStateInformation.getInitialWorkChunks().size(); i++) {
WorkChunk workChunk = theJobMaintenanceStateInformation.getInitialWorkChunks().get(i);
WorkChunk saved = mySvc.createWorkChunk(workChunk);
ourLog.info("Created WorkChunk: " + saved.toString());
workChunk.setId(saved.getId());
theJobMaintenanceStateInformation.getFinalWorkChunk().get(i)
.setId(saved.getId());
stepIds.add(workChunk.getTargetStepId());
}
// if it's a gated job, we'll manually set the step id for the instance

View File

@ -14,6 +14,7 @@ import java.util.Iterator;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestConstants {
@ -23,8 +24,31 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
void createChunksInStates(JobMaintenanceStateInformation theInitialState);
// fixme step 1 is special - only 1 chunk.
// fixme cover step 1 to step 2 and step 2->3
@ParameterizedTest
@ValueSource(strings = {
"""
# chunks ready - move to queued
1|COMPLETED
2|READY,2|QUEUED
2|READY,2|QUEUED
"""
})
default void test_gatedJob_stepReady_advances(String theChunkState) {
// given
enableMaintenanceRunner(false);
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState);
// setup
createChunksInStates(result);
// TEST run job maintenance - force transition
enableMaintenanceRunner(true);
runMaintenancePass();
// verify
verifyWorkChunkFinalStates(result);
}
@ParameterizedTest
@ValueSource(strings = {
// """
@ -32,20 +56,37 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
// 2|GATED
// """,
"""
# Chunk already queued -> waiting for complete
1|COMPLETED
2|QUEUED
""",
"""
# Chunks in progress, complete, errored -> cannot advance
1|COMPLETED
2|COMPLETED
2|ERRORED
2|IN_PROGRESS
""",
"""
# Chunk in errored/already queued -> cannot advance
1|COMPLETED
2|ERRORED # equivalent of QUEUED
2|COMPLETED
""",
"""
# Not all steps ready to advance
1|COMPLETED
2|READY # a single ready chunk
2|IN_PROGRESS
""",
"""
# Previous step not ready -> do not advance
1|COMPLETED
2|COMPLETED
2|IN_PROGRESS
3|READY
3|READY
"""
// """
// 1|COMPLETED
// 2|READY
@ -153,9 +194,12 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
workChunkIterator.forEachRemaining(workchunks::add);
assertEquals(workchunks.size(), workchunkMap.size());
workchunks.forEach(c -> ourLog.info("Returned " + c.toString()));
for (WorkChunk wc : workchunks) {
WorkChunk expected = workchunkMap.get(wc.getId());
assertNotNull(expected);
// verify status and step id
assertEquals(expected.getTargetStepId(), wc.getTargetStepId());
assertEquals(expected.getStatus(), wc.getStatus());

View File

@ -125,7 +125,8 @@ public class JobMaintenanceStateInformation {
private String getJobStepId(String theIndexId) {
try {
int index = Integer.parseInt(theIndexId.trim());
// -1 because code is 0 indexed, but people think in 1 indexed
int index = Integer.parseInt(theIndexId.trim()) - 1;
if (index >= myJobDefinition.getSteps().size()) {
throw new RuntimeException("Unable to find step with index " + index);
@ -149,7 +150,7 @@ public class JobMaintenanceStateInformation {
private WorkChunk createBaseWorkChunk() {
WorkChunk chunk = new WorkChunk();
chunk.setId(UUID.randomUUID().toString());
// chunk.setId(UUID.randomUUID().toString());
chunk.setJobDefinitionId(myJobDefinition.getJobDefinitionId());
chunk.setInstanceId(myInstanceId);
chunk.setJobDefinitionVersion(myJobDefinition.getJobDefinitionVersion());