This commit is contained in:
leif stawnyczy 2024-04-08 15:56:13 -04:00
parent 8d84ed0b94
commit 999d6efc25
6 changed files with 42 additions and 47 deletions

View File

@ -29,7 +29,6 @@ import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.StopWatch;
@ -128,7 +127,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa
class WorkChunkStorage implements IWorkChunkStorageTests {
@Override
public IWorkChunkCommon getTestManager() {
public ITestFixture getTestManager() {
return AbstractIJobPersistenceSpecificationTest.this;
}
@ -136,7 +135,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa
class StateTransitions implements IWorkChunkStateTransitions {
@Override
public IWorkChunkCommon getTestManager() {
public ITestFixture getTestManager() {
return AbstractIJobPersistenceSpecificationTest.this;
}
@ -144,18 +143,13 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa
class ErrorActions implements IWorkChunkErrorActionsTests {
@Override
public IWorkChunkCommon getTestManager() {
public ITestFixture getTestManager() {
return AbstractIJobPersistenceSpecificationTest.this;
}
}
}
}
@Override
public IWorkChunkCommon getTestManager() {
return this;
}
@Nonnull
public JobInstance createInstance(JobDefinition<?> theJobDefinition) {
JobDefinition<?> jobDefinition = theJobDefinition == null ? withJobDefinition(false)
@ -260,6 +254,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa
.sendWorkChannelMessage(notificationCaptor.capture());
}
@Override
public void createChunksInStates(JobMaintenanceStateInformation theJobMaintenanceStateInformation) {
theJobMaintenanceStateInformation.initialize(mySvc);
}

View File

@ -47,13 +47,6 @@ public interface IInProgressActionsTests extends IWorkChunkCommon, WorkChunkTest
@Test
default void processingRetryableError_inProgressToError_bumpsCountRecordsMessage() {
String jobId = createAndStoreJobInstance(null);
String myChunkId = createAndDequeueWorkChunk(jobId);
// execution had a retryable error
getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_A));
// verify the db was updated
var workChunkEntity = freshFetchWorkChunk(myChunkId);
String jobId = getTestManager().createAndStoreJobInstance(null);
String myChunkId = getTestManager().createAndDequeueWorkChunk(jobId);
// execution had a retryable error

View File

@ -50,12 +50,12 @@ public interface IInstanceStateTransitions extends IWorkChunkCommon, WorkChunkTe
@Test
default void createInstance_createsInQueuedWithChunkInReady() {
// given
JobDefinition<?> jd = withJobDefinition(false);
JobDefinition<?> jd = getTestManager().withJobDefinition(false);
// when
IJobPersistence.CreateResult createResult =
newTxTemplate().execute(status->
getSvc().onCreateWithFirstChunk(jd, "{}"));
getTestManager().newTxTemplate().execute(status->
getTestManager().getSvc().onCreateWithFirstChunk(jd, "{}"));
// then
ourLog.info("job and chunk created {}", createResult);
@ -63,11 +63,11 @@ public interface IInstanceStateTransitions extends IWorkChunkCommon, WorkChunkTe
assertThat(createResult.jobInstanceId, not(emptyString()));
assertThat(createResult.workChunkId, not(emptyString()));
JobInstance jobInstance = freshFetchJobInstance(createResult.jobInstanceId);
JobInstance jobInstance = getTestManager().freshFetchJobInstance(createResult.jobInstanceId);
assertThat(jobInstance.getStatus(), equalTo(StatusEnum.QUEUED));
assertThat(jobInstance.getParameters(), equalTo("{}"));
WorkChunk firstChunk = freshFetchWorkChunk(createResult.workChunkId);
WorkChunk firstChunk = getTestManager().freshFetchWorkChunk(createResult.workChunkId);
assertThat(firstChunk.getStatus(), equalTo(WorkChunkStatusEnum.READY));
assertNull(firstChunk.getData(), "First chunk data is null - only uses parameters");
}

View File

@ -17,7 +17,7 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
@BeforeEach
default void before() {
enableMaintenanceRunner(false);
getTestManager().enableMaintenanceRunner(false);
}
@Test
@ -30,16 +30,16 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
2|READY,2|QUEUED
""";
int numToTransition = 2;
PointcutLatch sendLatch = disableWorkChunkMessageHandler();
PointcutLatch sendLatch = getTestManager().disableWorkChunkMessageHandler();
sendLatch.setExpectedCount(numToTransition);
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(initialState, true);
createChunksInStates(result);
getTestManager().createChunksInStates(result);
// test
runMaintenancePass();
getTestManager().runMaintenancePass();
// verify
verifyWorkChunkMessageHandlerCalled(sendLatch, numToTransition);
getTestManager().verifyWorkChunkMessageHandlerCalled(sendLatch, numToTransition);
verifyWorkChunkFinalStates(result);
}
@ -106,18 +106,18 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
})
default void testGatedStep2NotReady_notAdvance(String theChunkState) throws InterruptedException {
// setup
PointcutLatch sendingLatch = disableWorkChunkMessageHandler();
PointcutLatch sendingLatch = getTestManager().disableWorkChunkMessageHandler();
sendingLatch.setExpectedCount(0);
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true);
createChunksInStates(result);
getTestManager().createChunksInStates(result);
// test
runMaintenancePass();
getTestManager().runMaintenancePass();
// verify
// nothing ever queued -> nothing ever sent to queue
verifyWorkChunkMessageHandlerCalled(sendingLatch, 0);
getTestManager().verifyWorkChunkMessageHandlerCalled(sendingLatch, 0);
verifyWorkChunkFinalStates(result);
}
@ -149,16 +149,16 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
})
default void testGatedStep2ReadyToAdvance_advanceToStep3(String theChunkState) throws InterruptedException {
// setup
PointcutLatch sendingLatch = disableWorkChunkMessageHandler();
PointcutLatch sendingLatch = getTestManager().disableWorkChunkMessageHandler();
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true);
createChunksInStates(result);
getTestManager().createChunksInStates(result);
// test
runMaintenancePass();
getTestManager().runMaintenancePass();
// verify
// things are being set to READY; is anything being queued?
verifyWorkChunkMessageHandlerCalled(sendingLatch, 0);
getTestManager().verifyWorkChunkMessageHandlerCalled(sendingLatch, 0);
verifyWorkChunkFinalStates(result);
}
@ -177,24 +177,24 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
int expectedTransitions = 2;
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(state, false);
PointcutLatch sendLatch = disableWorkChunkMessageHandler();
PointcutLatch sendLatch = getTestManager().disableWorkChunkMessageHandler();
sendLatch.setExpectedCount(expectedTransitions);
createChunksInStates(result);
getTestManager().createChunksInStates(result);
// TEST run job maintenance - force transition
enableMaintenanceRunner(true);
getTestManager().enableMaintenanceRunner(true);
runMaintenancePass();
getTestManager().runMaintenancePass();
// verify
verifyWorkChunkMessageHandlerCalled(sendLatch, expectedTransitions);
getTestManager().verifyWorkChunkMessageHandlerCalled(sendLatch, expectedTransitions);
verifyWorkChunkFinalStates(result);
}
private JobMaintenanceStateInformation setupGatedWorkChunkTransitionTest(String theChunkState, boolean theIsGated) {
// get the job def and store the instance
JobDefinition<?> definition = withJobDefinition(theIsGated);
String instanceId = createAndStoreJobInstance(definition);
JobDefinition<?> definition = getTestManager().withJobDefinition(theIsGated);
String instanceId = getTestManager().createAndStoreJobInstance(definition);
JobMaintenanceStateInformation stateInformation = new JobMaintenanceStateInformation(instanceId, definition, theChunkState);
ourLog.info("Starting test case \n {}", theChunkState);
@ -204,6 +204,6 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
}
private void verifyWorkChunkFinalStates(JobMaintenanceStateInformation theStateInformation) {
theStateInformation.verifyFinalStates(getSvc());
theStateInformation.verifyFinalStates(getTestManager().getSvc());
}
}

View File

@ -23,6 +23,7 @@ import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters;
import ca.uhn.test.concurrency.PointcutLatch;
import org.springframework.transaction.PlatformTransactionManager;
@ -81,4 +82,10 @@ public interface ITestFixture {
* @param theNumberOfTimes the number of invocations to expect
*/
void verifyWorkChunkMessageHandlerCalled(PointcutLatch theSendingLatch, int theNumberOfTimes) throws InterruptedException;
/**
* Uses the JobMaintenanceStateInformation to setup a test.
* @param theJobMaintenanceStateInformation
*/
void createChunksInStates(JobMaintenanceStateInformation theJobMaintenanceStateInformation);
}

View File

@ -76,7 +76,7 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
String id = getTestManager().storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
assertNotNull(id);
getTestManager().runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, freshFetchWorkChunk(id).getStatus()));
getTestManager().runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, getTestManager().freshFetchWorkChunk(id).getStatus()));
}
@Test
@ -95,7 +95,7 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
String id = stateInformation.getInitialWorkChunks().stream().findFirst().orElseThrow().getId();
// verify created in ready
getTestManager().runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, freshFetchWorkChunk(id).getStatus()));
getTestManager().runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, getTestManager().freshFetchWorkChunk(id).getStatus()));
// test
getTestManager().runMaintenancePass();
@ -124,10 +124,10 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
String id = info.getInitialWorkChunks().stream().findFirst().orElseThrow().getId();
// verify created in QUEUED
getTestManager().runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(id).getStatus()));
getTestManager().runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, getTestManager().freshFetchWorkChunk(id).getStatus()));
// test; manually dequeue chunk
WorkChunk chunk = getSvc().onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
WorkChunk chunk = getTestManager().getSvc().onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
// verify
assertEquals(36, chunk.getInstanceId().length());
@ -136,7 +136,7 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
assertEquals(CHUNK_DATA, chunk.getData());
getTestManager().runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, freshFetchWorkChunk(id).getStatus()));
getTestManager().runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, getTestManager().freshFetchWorkChunk(id).getStatus()));
}
@Test