Merge branch '5745-ready-state-batch2' into 5767-add-poll-waiting-step

This commit is contained in:
leif stawnyczy 2024-04-05 14:27:33 -04:00
commit a791510111
3 changed files with 85 additions and 12 deletions

View File

@ -37,6 +37,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nonnull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@ -45,6 +46,9 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Sort;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.test.context.ContextConfiguration;
import org.testcontainers.shaded.org.awaitility.Awaitility;
@ -247,6 +251,84 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
assertEquals(1.0, jobInstance.getProgress());
}
/**
* This test verifies that if we have a workchunks being processed by the queue,
* and the maintenance job kicks in, it won't necessarily advance the steps.
*/
@Test
public void gatedJob_whenMaintenanceRunHappensDuringMsgProcessing_doesNotAdvance() throws InterruptedException {
// setup
String jobId = new Exception().getStackTrace()[0].getMethodName();
int chunksToMake = 5;
AtomicInteger secondGateCounter = new AtomicInteger();
AtomicBoolean reductionCheck = new AtomicBoolean(false);
// we will listen into the message queue so we can force actions on it
MessageHandler handler = message -> {
/*
* We will force a run of the maintenance job
* to simulate the situation in which a chunk is
* still being processed by the WorkChunkMessageHandler
* (and thus, not available yet).
*/
myBatch2JobHelper.forceRunMaintenancePass();
};
buildAndDefine3StepReductionJob(jobId, new IReductionStepHandler() {
@Override
public void firstStep(StepExecutionDetails<TestJobParameters, VoidModel> theStep, IJobDataSink<FirstStepOutput> theDataSink) {
for (int i = 0; i < chunksToMake; i++) {
theDataSink.accept(new FirstStepOutput());
}
}
@Override
public void secondStep(StepExecutionDetails<TestJobParameters, FirstStepOutput> theStep, IJobDataSink<SecondStepOutput> theDataSink) {
// no new chunks
SecondStepOutput output = new SecondStepOutput();
theDataSink.accept(output);
}
@Override
public void reductionStepConsume(ChunkExecutionDetails<TestJobParameters, SecondStepOutput> theChunkDetails, IJobDataSink<ReductionStepOutput> theDataSink) {
// we expect to get one here
int val = secondGateCounter.getAndIncrement();
}
@Override
public void reductionStepRun(StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails, IJobDataSink<ReductionStepOutput> theDataSink) {
reductionCheck.set(true);
theDataSink.accept(new ReductionStepOutput(new ArrayList<>()));
}
});
try {
myWorkChannel.subscribe(handler);
// test
JobInstanceStartRequest request = buildRequest(jobId);
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
String instanceId = startResponse.getInstanceId();
// wait
myBatch2JobHelper.awaitJobCompletion(instanceId);
// verify
Optional<JobInstance> instanceOp = myJobPersistence.fetchInstance(instanceId);
assertTrue(instanceOp.isPresent());
JobInstance jobInstance = instanceOp.get();
assertTrue(reductionCheck.get());
assertEquals(chunksToMake, secondGateCounter.get());
assertEquals(StatusEnum.COMPLETED, jobInstance.getStatus());
assertEquals(1.0, jobInstance.getProgress());
} finally {
myWorkChannel.unsubscribe(handler);
}
}
@Test
public void testJobWithReductionStepFiresCompletionHandler() throws InterruptedException {
// setup

View File

@ -240,18 +240,7 @@ public class JobInstanceProcessor {
Set<WorkChunkStatusEnum> workChunkStatuses = myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(
theInstance.getInstanceId(), currentGatedStepId);
if (workChunkStatuses.isEmpty()) {
// no work chunks = no output
// trivial to advance to next step
return true;
}
if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) {
// all previous workchunks are complete;
// none in READY though -> still proceed
return true;
}
// we only advance if all of the current steps workchunks are READY
if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.READY))) {
if (theWorkCursor.isFirstStep()) {
// first step - all ready means we're ready to proceed to the next step

View File

@ -492,6 +492,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
callback.doUpdate(instance);
return true;
}).when(myJobPersistence).updateInstance(any(), any());
when(myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(eq(instance.getInstanceId()), eq(STEP_2)))
.thenReturn(chunks.stream().map(WorkChunkMetadata::getStatus).collect(Collectors.toSet()));
// test
runEnqueueReadyChunksTest(chunks, createJobDefinitionWithReduction());