fixing tests

This commit is contained in:
leif stawnyczy 2024-04-15 13:26:21 -04:00
parent 3889a7f25f
commit c1e9c526e3
2 changed files with 106 additions and 12 deletions

View File

@ -1,12 +1,15 @@
package ca.uhn.fhir.jpa.batch2; package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.JobOperationResultJson; import ca.uhn.fhir.batch2.api.JobOperationResultJson;
import ca.uhn.fhir.batch2.api.RunOutcome; import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry; import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.jobs.imprt.NdJsonFileJson; import ca.uhn.fhir.batch2.jobs.imprt.NdJsonFileJson;
import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
@ -28,6 +31,7 @@ import ca.uhn.fhir.testjob.models.FirstStepOutput;
import ca.uhn.fhir.util.JsonUtil; import ca.uhn.fhir.util.JsonUtil;
import ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest; import ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest;
import ca.uhn.hapi.fhir.batch2.test.configs.SpyOverrideConfig; import ca.uhn.hapi.fhir.batch2.test.configs.SpyOverrideConfig;
import ca.uhn.test.concurrency.PointcutLatch;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import jakarta.annotation.Nonnull; import jakarta.annotation.Nonnull;
@ -70,6 +74,12 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@TestMethodOrder(MethodOrderer.MethodName.class) @TestMethodOrder(MethodOrderer.MethodName.class)
@ContextConfiguration(classes = { @ContextConfiguration(classes = {
@ -97,12 +107,20 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Autowired @Autowired
public Batch2JobHelper myBatch2JobHelper; public Batch2JobHelper myBatch2JobHelper;
// this is our spy
@Autowired
private BatchJobSender myBatchSender;
@Autowired
private IJobMaintenanceService myMaintenanceService;
@Autowired @Autowired
public JobDefinitionRegistry myJobDefinitionRegistry; public JobDefinitionRegistry myJobDefinitionRegistry;
@AfterEach @AfterEach
public void after() { public void after() {
myJobDefinitionRegistry.removeJobDefinition(JOB_DEFINITION_ID, JOB_DEF_VER); myJobDefinitionRegistry.removeJobDefinition(JOB_DEFINITION_ID, JOB_DEF_VER);
myMaintenanceService.enableMaintenancePass(true);
} }
@Test @Test
@ -243,10 +261,18 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@ParameterizedTest @ParameterizedTest
@MethodSource("provideStatuses") @MethodSource("provideStatuses")
public void testStartChunkOnlyWorksOnValidChunks(WorkChunkStatusEnum theStatus, boolean theShouldBeStartedByConsumer) { public void testStartChunkOnlyWorksOnValidChunks(WorkChunkStatusEnum theStatus, boolean theShouldBeStartedByConsumer) throws InterruptedException {
// Setup // Setup
JobInstance instance = createInstance(); JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance); String instanceId = mySvc.storeNewInstance(instance);
// PointcutLatch latch = new PointcutLatch("senderlatch");
// doAnswer(a -> {
// latch.call(1);
// return Void.class;
// }).when(myBatchSender).sendWorkChannelMessage(any(JobWorkNotification.class));
// latch.setExpectedCount(1);
myMaintenanceService.enableMaintenancePass(false);
storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, CHUNK_DATA, false); storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, CHUNK_DATA, false);
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(JOB_DEFINITION_ID, JOB_DEF_VER, FIRST_STEP_ID, instanceId, 0, CHUNK_DATA, false); WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(JOB_DEFINITION_ID, JOB_DEF_VER, FIRST_STEP_ID, instanceId, 0, CHUNK_DATA, false);
String chunkId = mySvc.onWorkChunkCreate(batchWorkChunk); String chunkId = mySvc.onWorkChunkCreate(batchWorkChunk);
@ -260,7 +286,9 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
// Verify // Verify
boolean chunkStarted = workChunk.isPresent(); boolean chunkStarted = workChunk.isPresent();
assertEquals(chunkStarted, theShouldBeStartedByConsumer); assertEquals(theShouldBeStartedByConsumer, chunkStarted);
verify(myBatchSender, never())
.sendWorkChannelMessage(any());
} }
@Test @Test
@ -450,11 +478,20 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
"false, READY, QUEUED", "false, READY, QUEUED",
"true, GATE_WAITING, QUEUED" "true, GATE_WAITING, QUEUED"
}) })
public void testStoreAndFetchWorkChunk_withOrWithoutGatedExecutionNoData_createdAndTransitionToExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum theExpectedStatusOnCreate, WorkChunkStatusEnum theExpectedStatusAfterTransition) { public void testStoreAndFetchWorkChunk_withOrWithoutGatedExecutionNoData_createdAndTransitionToExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum theExpectedStatusOnCreate, WorkChunkStatusEnum theExpectedStatusAfterTransition) throws InterruptedException {
// setup // setup
JobInstance instance = createInstance(true, theGatedExecution); JobInstance instance = createInstance(true, theGatedExecution);
String instanceId = mySvc.storeNewInstance(instance); String instanceId = mySvc.storeNewInstance(instance);
// when
PointcutLatch latch = new PointcutLatch("senderlatch");
doAnswer(a -> {
latch.call(1);
return Void.class;
}).when(myBatchSender).sendWorkChannelMessage(any(JobWorkNotification.class));
latch.setExpectedCount(1);
myMaintenanceService.enableMaintenancePass(false);
// execute & verify // execute & verify
String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, null); String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, null);
// mark the first chunk as COMPLETED to allow step advance // mark the first chunk as COMPLETED to allow step advance
@ -468,16 +505,27 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new); WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
// assert null since we did not input any data when creating the chunks // assert null since we did not input any data when creating the chunks
assertNull(chunk.getData()); assertNull(chunk.getData());
latch.awaitExpected();
verify(myBatchSender).sendWorkChannelMessage(any());
clearInvocations(myBatchSender);
} }
@Test @Test
public void testStoreAndFetchWorkChunk_withGatedJobMultipleChunk_correctTransitions() { public void testStoreAndFetchWorkChunk_withGatedJobMultipleChunk_correctTransitions() throws InterruptedException {
// setup // setup
boolean isGatedExecution = true; boolean isGatedExecution = true;
String expectedFirstChunkData = "IAmChunk1"; String expectedFirstChunkData = "IAmChunk1";
String expectedSecondChunkData = "IAmChunk2"; String expectedSecondChunkData = "IAmChunk2";
JobInstance instance = createInstance(true, isGatedExecution); JobInstance instance = createInstance(true, isGatedExecution);
String instanceId = mySvc.storeNewInstance(instance); String instanceId = mySvc.storeNewInstance(instance);
PointcutLatch latch = new PointcutLatch("senderlatch");
doAnswer(a -> {
latch.call(1);
return Void.class;
}).when(myBatchSender).sendWorkChannelMessage(any(JobWorkNotification.class));
latch.setExpectedCount(2);
myMaintenanceService.enableMaintenancePass(false);
// execute & verify // execute & verify
String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, expectedFirstChunkData); String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, expectedFirstChunkData);
@ -516,6 +564,11 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
WorkChunk actualSecondChunkData = mySvc.onWorkChunkDequeue(secondChunkId).orElseThrow(IllegalArgumentException::new); WorkChunk actualSecondChunkData = mySvc.onWorkChunkDequeue(secondChunkId).orElseThrow(IllegalArgumentException::new);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(secondChunkId).getStatus())); runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(secondChunkId).getStatus()));
assertEquals(expectedSecondChunkData, actualSecondChunkData.getData()); assertEquals(expectedSecondChunkData, actualSecondChunkData.getData());
latch.awaitExpected();
verify(myBatchSender, times(2))
.sendWorkChannelMessage(any());
clearInvocations(myBatchSender);
} }
@Test @Test
@ -562,7 +615,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertNull(workChunk.getEndTime()); assertNull(workChunk.getEndTime());
assertNull(workChunk.getErrorMessage()); assertNull(workChunk.getErrorMessage());
assertEquals(0, workChunk.getErrorCount()); assertEquals(0, workChunk.getErrorCount());
assertEquals(null, workChunk.getRecordsProcessed()); assertNull(workChunk.getRecordsProcessed());
} }
{ {
@ -570,7 +623,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertEquals(WorkChunkStatusEnum.ERRORED, workChunk1.getStatus()); assertEquals(WorkChunkStatusEnum.ERRORED, workChunk1.getStatus());
assertEquals("Our error message", workChunk1.getErrorMessage()); assertEquals("Our error message", workChunk1.getErrorMessage());
assertEquals(1, workChunk1.getErrorCount()); assertEquals(1, workChunk1.getErrorCount());
assertEquals(null, workChunk1.getRecordsProcessed()); assertNull(workChunk1.getRecordsProcessed());
assertNotNull(workChunk1.getEndTime()); assertNotNull(workChunk1.getEndTime());
} }
@ -582,7 +635,6 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertNull(workChunk2.getErrorMessage()); assertNull(workChunk2.getErrorMessage());
assertEquals(0, workChunk2.getErrorCount()); assertEquals(0, workChunk2.getErrorCount());
} }
} }
@ParameterizedTest @ParameterizedTest
@ -590,10 +642,17 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
"false, READY, QUEUED", "false, READY, QUEUED",
"true, GATE_WAITING, QUEUED" "true, GATE_WAITING, QUEUED"
}) })
public void testStoreAndFetchWorkChunk_withOrWithoutGatedExecutionwithData_createdAndTransitionToExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum theExpectedCreatedStatus, WorkChunkStatusEnum theExpectedTransitionStatus) { public void testStoreAndFetchWorkChunk_withOrWithoutGatedExecutionwithData_createdAndTransitionToExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum theExpectedCreatedStatus, WorkChunkStatusEnum theExpectedTransitionStatus) throws InterruptedException {
// setup // setup
JobInstance instance = createInstance(true, theGatedExecution); JobInstance instance = createInstance(true, theGatedExecution);
String instanceId = mySvc.storeNewInstance(instance); String instanceId = mySvc.storeNewInstance(instance);
PointcutLatch latch = new PointcutLatch("senderlatch");
doAnswer(a -> {
latch.call(1);
return Void.class;
}).when(myBatchSender).sendWorkChannelMessage(any(JobWorkNotification.class));
latch.setExpectedCount(1);
myMaintenanceService.enableMaintenancePass(false);
// execute & verify // execute & verify
String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, null); String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, null);
@ -614,15 +673,25 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertEquals(CHUNK_DATA, chunk.getData()); assertEquals(CHUNK_DATA, chunk.getData());
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(id).getStatus())); runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(id).getStatus()));
latch.awaitExpected();
verify(myBatchSender).sendWorkChannelMessage(any());
clearInvocations(myBatchSender);
} }
@Test @Test
public void testMarkChunkAsCompleted_Success() { public void testMarkChunkAsCompleted_Success() throws InterruptedException {
boolean isGatedExecution = false; boolean isGatedExecution = false;
JobInstance instance = createInstance(true, isGatedExecution); JobInstance instance = createInstance(true, isGatedExecution);
String instanceId = mySvc.storeNewInstance(instance); String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA, isGatedExecution); String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA, isGatedExecution);
assertNotNull(chunkId); assertNotNull(chunkId);
PointcutLatch latch = new PointcutLatch("senderlatch");
doAnswer(a -> {
latch.call(1);
return Void.class;
}).when(myBatchSender).sendWorkChannelMessage(any(JobWorkNotification.class));
latch.setExpectedCount(1);
myMaintenanceService.enableMaintenancePass(false);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkId).getStatus())); runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkId).getStatus()));
myBatch2JobHelper.runMaintenancePass(); myBatch2JobHelper.runMaintenancePass();
@ -652,11 +721,22 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime()); assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime());
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime()); assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
}); });
latch.awaitExpected();
verify(myBatchSender).sendWorkChannelMessage(any());
clearInvocations(myBatchSender);
} }
@Test @Test
public void testMarkChunkAsCompleted_Error() { public void testMarkChunkAsCompleted_Error() {
boolean isGatedExecution = false; boolean isGatedExecution = false;
PointcutLatch latch = new PointcutLatch("senderlatch");
doAnswer(a -> {
latch.call(1);
return Void.class;
}).when(myBatchSender).sendWorkChannelMessage(any(JobWorkNotification.class));
latch.setExpectedCount(1);
myMaintenanceService.enableMaintenancePass(false);
JobInstance instance = createInstance(true, isGatedExecution); JobInstance instance = createInstance(true, isGatedExecution);
String instanceId = mySvc.storeNewInstance(instance); String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(JOB_DEFINITION_ID, TestJobDefinitionUtils.FIRST_STEP_ID, instanceId, SEQUENCE_NUMBER, null, isGatedExecution); String chunkId = storeWorkChunk(JOB_DEFINITION_ID, TestJobDefinitionUtils.FIRST_STEP_ID, instanceId, SEQUENCE_NUMBER, null, isGatedExecution);
@ -705,15 +785,25 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
List<WorkChunk> chunks = ImmutableList.copyOf(mySvc.fetchAllWorkChunksIterator(instanceId, true)); List<WorkChunk> chunks = ImmutableList.copyOf(mySvc.fetchAllWorkChunksIterator(instanceId, true));
assertEquals(1, chunks.size()); assertEquals(1, chunks.size());
assertEquals(2, chunks.get(0).getErrorCount()); assertEquals(2, chunks.get(0).getErrorCount());
verify(myBatchSender).sendWorkChannelMessage(any());
clearInvocations(myBatchSender);
} }
@Test @Test
public void testMarkChunkAsCompleted_Fail() { public void testMarkChunkAsCompleted_Fail() throws InterruptedException {
boolean isGatedExecution = false; boolean isGatedExecution = false;
JobInstance instance = createInstance(true, isGatedExecution); JobInstance instance = createInstance(true, isGatedExecution);
String instanceId = mySvc.storeNewInstance(instance); String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null, isGatedExecution); String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null, isGatedExecution);
assertNotNull(chunkId); assertNotNull(chunkId);
PointcutLatch latch = new PointcutLatch("senderlatch");
doAnswer(a -> {
latch.call(1);
return Void.class;
}).when(myBatchSender).sendWorkChannelMessage(any(JobWorkNotification.class));
latch.setExpectedCount(1);
myMaintenanceService.enableMaintenancePass(false);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkId).getStatus())); runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkId).getStatus()));
myBatch2JobHelper.runMaintenancePass(); myBatch2JobHelper.runMaintenancePass();
@ -736,6 +826,10 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime()); assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime());
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime()); assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
}); });
latch.awaitExpected();
verify(myBatchSender)
.sendWorkChannelMessage(any());
clearInvocations(myBatchSender);
} }
@Test @Test

View File

@ -39,7 +39,6 @@ import ca.uhn.hapi.fhir.batch2.test.support.TestJobStep3InputType;
import ca.uhn.test.concurrency.PointcutLatch; import ca.uhn.test.concurrency.PointcutLatch;
import jakarta.annotation.Nonnull; import jakarta.annotation.Nonnull;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Nested;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -65,7 +64,8 @@ import static org.mockito.Mockito.verify;
* These tests are abstract, and do not depend on JPA. * These tests are abstract, and do not depend on JPA.
* Test setups should use the public batch2 api to create scenarios. * Test setups should use the public batch2 api to create scenarios.
*/ */
public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMaintenanceActions, IInProgressActionsTests, IInstanceStateTransitions, ITestFixture, IWorkChunkCommon, WorkChunkTestConstants { public abstract class AbstractIJobPersistenceSpecificationTest
implements ITestFixture, IWorkChunkCommon, WorkChunkTestConstants, IJobMaintenanceActions, IInProgressActionsTests, IInstanceStateTransitions {
private static final Logger ourLog = LoggerFactory.getLogger(AbstractIJobPersistenceSpecificationTest.class); private static final Logger ourLog = LoggerFactory.getLogger(AbstractIJobPersistenceSpecificationTest.class);