diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java index 34dee1392cd..39a777180d4 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java @@ -506,10 +506,9 @@ public class JpaJobPersistenceImpl implements IJobPersistence { @Override public Page fetchAllWorkChunkMetadataForJobInStates( - int thePageIndex, int thePageSize, String theInstanceId, Set theStates) { - Pageable request = PageRequest.of(thePageIndex, thePageSize); + Pageable thePageable, String theInstanceId, Set theStates) { Page page = - myWorkChunkMetadataViewRepo.fetchWorkChunkMetadataForJobInStates(request, theInstanceId, theStates); + myWorkChunkMetadataViewRepo.fetchWorkChunkMetadataForJobInStates(thePageable, theInstanceId, theStates); return page.map(Batch2WorkChunkMetadataView::toChunkMetadata); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/Batch2WorkChunkMetadataView.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/Batch2WorkChunkMetadataView.java index d9f2b638830..07f87340a39 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/Batch2WorkChunkMetadataView.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/Batch2WorkChunkMetadataView.java @@ -14,6 +14,10 @@ import java.io.Serializable; import static ca.uhn.fhir.batch2.model.JobDefinition.ID_MAX_LENGTH; +/** + * A view for a Work Chunk that contains only the most necessary information + * to satisfy the no-data path. + */ @Entity @Immutable @Subselect("SELECT e.id as id, " diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IJobMaintenanceActions.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IJobMaintenanceActions.java index a1f0ce2de1c..6392c24428f 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IJobMaintenanceActions.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IJobMaintenanceActions.java @@ -3,6 +3,7 @@ package ca.uhn.hapi.fhir.batch2.test; import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation; import ca.uhn.test.concurrency.PointcutLatch; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -14,6 +15,11 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC Logger ourLog = LoggerFactory.getLogger(IJobMaintenanceActions.class); + @BeforeEach + default void before() { + enableMaintenanceRunner(false); + } + @Test default void test_gatedJob_stepReady_advances() throws InterruptedException { // setup @@ -24,7 +30,6 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC 2|READY,2|QUEUED """; int numToTransition = 2; - enableMaintenanceRunner(false); PointcutLatch sendLatch = disableWorkChunkMessageHandler(); sendLatch.setExpectedCount(numToTransition); JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(initialState, true); @@ -101,7 +106,6 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC }) default void testGatedStep2NotReady_notAdvance(String theChunkState) throws InterruptedException { // setup - enableMaintenanceRunner(false); PointcutLatch sendingLatch = disableWorkChunkMessageHandler(); sendingLatch.setExpectedCount(0); JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true); @@ -145,7 +149,6 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC }) default void testGatedStep2ReadyToAdvance_advanceToStep3(String theChunkState) throws InterruptedException { // setup - enableMaintenanceRunner(false); PointcutLatch sendingLatch = disableWorkChunkMessageHandler(); JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true); createChunksInStates(result); @@ -160,7 +163,7 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC } @Test - default void test_ungatedJob_advancesSteps() throws InterruptedException { + default void test_ungatedJob_queuesReadyChunks() throws InterruptedException { // setup String state = """ # READY chunks should transition; others should stay @@ -174,7 +177,6 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC int expectedTransitions = 2; JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(state, false); - enableMaintenanceRunner(false); PointcutLatch sendLatch = disableWorkChunkMessageHandler(); sendLatch.setExpectedCount(expectedTransitions); createChunksInStates(result); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java index 3b29172a2b7..1a3562152d2 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java @@ -154,7 +154,7 @@ public interface IJobPersistence extends IWorkChunkPersistence { */ @Transactional(propagation = Propagation.SUPPORTS) Page fetchAllWorkChunkMetadataForJobInStates( - int thePageIndex, int thePageSize, String theInstanceId, Set theStates); + Pageable thePageable, String theInstanceId, Set theStates); /** * Callback to update a JobInstance within a locked transaction. diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index 63a2d6605d2..246c27c7f97 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -34,11 +34,13 @@ import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator; import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater; import ca.uhn.fhir.model.api.IModelJson; +import ca.uhn.fhir.model.api.PagingIterator; import ca.uhn.fhir.util.Logs; import ca.uhn.fhir.util.StopWatch; import org.apache.commons.lang3.time.DateUtils; import org.slf4j.Logger; import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; import java.util.Iterator; import java.util.List; @@ -50,57 +52,8 @@ public class JobInstanceProcessor { private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); public static final long PURGE_THRESHOLD = 7L * DateUtils.MILLIS_PER_DAY; - class ReadyChunkIterator implements Iterator { - - // 10,000 - we want big batches - private static final int PAGE_SIZE = 10000; - - private Page currentPage; - - private int myPageIndex = 0; - - private int myItemIndex = 0; - - private final String myInstanceId; - - public ReadyChunkIterator(String theInstanceId) { - myInstanceId = theInstanceId; - } - - private void getNextPage() { - if (currentPage == null || currentPage.hasNext()) { - currentPage = myJobPersistence.fetchAllWorkChunkMetadataForJobInStates( - myPageIndex++, getPageSize(), myInstanceId, Set.of(WorkChunkStatusEnum.READY)); - myItemIndex = 0; - } else { - currentPage = Page.empty(); - } - } - - int getPageSize() { - return PAGE_SIZE; - } - - @Override - public boolean hasNext() { - if (currentPage == null) { - getNextPage(); - } - return currentPage.getContent().size() > myItemIndex || currentPage.hasNext(); - } - - @Override - public WorkChunkMetadata next() { - if (myItemIndex >= currentPage.getSize()) { - getNextPage(); - } - if (myItemIndex < currentPage.getSize()) { - return currentPage.getContent().get(myItemIndex++); - } - return null; - } - } - + // 10k; we want to get as many as we can + private static final int WORK_CHUNK_METADATA_BATCH_SIZE = 10000; private final IJobPersistence myJobPersistence; private final BatchJobSender myBatchJobSender; private final JobChunkProgressAccumulator myProgressAccumulator; @@ -329,8 +282,15 @@ public class JobInstanceProcessor { return false; } - protected ReadyChunkIterator getReadyChunks(String theInstanceId) { - return new ReadyChunkIterator(theInstanceId); + protected PagingIterator getReadyChunks() { + return new PagingIterator<>(WORK_CHUNK_METADATA_BATCH_SIZE, (index, batchsize, consumer) -> { + Pageable pageable = Pageable.ofSize(batchsize).withPage(index); + Page results = myJobPersistence.fetchAllWorkChunkMetadataForJobInStates( + pageable, myInstanceId, Set.of(WorkChunkStatusEnum.READY)); + for (WorkChunkMetadata metadata : results) { + consumer.accept(metadata); + } + }); } /** @@ -345,7 +305,7 @@ public class JobInstanceProcessor { */ private void enqueueReadyChunks( JobInstance theJobInstance, JobDefinition theJobDefinition, boolean theIsGatedExecutionAdvancementBool) { - Iterator iter = getReadyChunks(theJobInstance.getInstanceId()); + Iterator iter = getReadyChunks(); AtomicInteger counter = new AtomicInteger(); ConcurrentHashMap> stepToWorkCursor = new ConcurrentHashMap<>(); diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessorTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessorTest.java deleted file mode 100644 index a9b663f2ea3..00000000000 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessorTest.java +++ /dev/null @@ -1,193 +0,0 @@ -package ca.uhn.fhir.batch2.maintenance; - -import ca.uhn.fhir.batch2.api.IJobPersistence; -import ca.uhn.fhir.batch2.api.IReductionStepExecutorService; -import ca.uhn.fhir.batch2.api.RunOutcome; -import ca.uhn.fhir.batch2.api.VoidModel; -import ca.uhn.fhir.batch2.channel.BatchJobSender; -import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry; -import ca.uhn.fhir.batch2.model.JobDefinition; -import ca.uhn.fhir.batch2.model.JobInstance; -import ca.uhn.fhir.batch2.model.StatusEnum; -import ca.uhn.fhir.batch2.model.WorkChunkMetadata; -import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; -import ca.uhn.fhir.model.api.IModelJson; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.springframework.data.domain.Page; -import org.springframework.data.domain.PageImpl; -import org.springframework.data.domain.PageRequest; -import org.springframework.data.domain.Pageable; -import org.springframework.transaction.PlatformTransactionManager; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -public class JobInstanceProcessorTest { - - private static final String JOB_ID = "job_id"; - private static final String SECOND_STEP_ID = "second"; - - public static class TestJobInstanceProcessor extends JobInstanceProcessor { - - private ReadyChunkIterator mySpy; - - public TestJobInstanceProcessor(IJobPersistence theJobPersistence, BatchJobSender theBatchJobSender, String theInstanceId, JobChunkProgressAccumulator theProgressAccumulator, IReductionStepExecutorService theReductionStepExecutorService, JobDefinitionRegistry theJobDefinitionRegistry, PlatformTransactionManager theTransactionManager) { - super(theJobPersistence, theBatchJobSender, theInstanceId, theProgressAccumulator, theReductionStepExecutorService, theJobDefinitionRegistry, theTransactionManager); - } - - public void setSpy(ReadyChunkIterator theSpy) { - mySpy = theSpy; - } - - @Override - protected ReadyChunkIterator getReadyChunks(String theInstanceId) { - if (mySpy == null) { - return super.getReadyChunks(theInstanceId); - } - return mySpy; - } - } - - @Mock - private IJobPersistence myJobPersistence; - - @Mock - private JobDefinitionRegistry myJobDefinitionRegistry; - - private TestJobInstanceProcessor myJobInstanceProcessor; - - @Test - public void testLargeBatchesForEnqueue_pagesCorrectly() { - // setup - int batchSize = 2; - int total = 20; - String instanceId = "instanceId"; - JobInstance instance = new JobInstance(); - instance.setInstanceId(instanceId); - instance.setStatus(StatusEnum.QUEUED); - List workchunks = new ArrayList<>(); - Set expectedIds = new HashSet<>(); - for (int i = 0; i < total; i++) { - String id = "id" + i; - workchunks.add(createWorkChunkMetadata(instanceId).setId(id)); - expectedIds.add(id); - } - - // create out object to test (it's not a bean) - myJobInstanceProcessor = new TestJobInstanceProcessor( - myJobPersistence, - mock(BatchJobSender.class), - instanceId, - mock(JobChunkProgressAccumulator.class), - mock(IReductionStepExecutorService.class), - myJobDefinitionRegistry, - null - ); - - // we need to spy our iterator so we don't use 1000s in this test - JobInstanceProcessor.ReadyChunkIterator chunkIterator = spy(myJobInstanceProcessor.getReadyChunks(instanceId)); - - // mock - when(myJobPersistence.fetchInstance(eq(instanceId))) - .thenReturn(Optional.of(instance)); - when(myJobDefinitionRegistry.getJobDefinitionOrThrowException(any())) - .thenReturn((JobDefinition) createDefinition()); - - when(chunkIterator.getPageSize()) - .thenReturn(batchSize); - when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), eq(batchSize), eq(instanceId), eq(Set.of(WorkChunkStatusEnum.READY)))) - .thenAnswer((args) -> { - int pageNum = args.getArgument(0); - int pageSize = args.getArgument(1); - assertEquals(batchSize, pageSize); - int indexLow = pageNum * pageSize; - int indexHigh = Math.min(indexLow + pageNum + pageSize, workchunks.size()); - List subList = workchunks.subList(indexLow, indexHigh); - - Page page = new PageImpl<>(subList, - PageRequest.of(pageNum, pageSize), - total); - return page; - }); - myJobInstanceProcessor.setSpy(chunkIterator); - - // test - myJobInstanceProcessor.process(); - - // verify - ArgumentCaptor idCaptor = ArgumentCaptor.forClass(String.class); - verify(myJobPersistence, times(total)) - .enqueueWorkChunkForProcessing(idCaptor.capture(), any()); - assertEquals(total, idCaptor.getAllValues().size()); - Set actualIds = new HashSet<>(idCaptor.getAllValues()); - assertEquals(expectedIds.size(), actualIds.size()); - } - - private WorkChunkMetadata createWorkChunkMetadata(String theInstanceId) { - WorkChunkMetadata metadata = new WorkChunkMetadata(); - metadata.setInstanceId(theInstanceId); - metadata.setJobDefinitionId(JOB_ID); - metadata.setJobDefinitionVersion(1); - metadata.setId("id"); - metadata.setTargetStepId(SECOND_STEP_ID); - metadata.setStatus(WorkChunkStatusEnum.READY); - metadata.setSequence(0); - return metadata; - } - - private JobDefinition createDefinition() { - return JobDefinition.newBuilder() - .setParametersType(IModelJson.class) - .setJobDefinitionId(JOB_ID) - .setJobDefinitionVersion(1) - .setJobDescription("a descriptive description") - .addFirstStep( - "first", - "first description", - IModelJson.class, - (details, sink) -> { - sink.accept(new IModelJson() { - - }); - return RunOutcome.SUCCESS; - }) - .addIntermediateStep( - SECOND_STEP_ID, - "second description", - IModelJson.class, - (details, sink) -> { - sink.accept(new IModelJson() { - }); - return RunOutcome.SUCCESS; - } - ) - .addLastStep( - "last", - "last description", - (details, sink) -> { - sink.accept(new VoidModel()); - return RunOutcome.SUCCESS; - } - ) - .build(); - } -} diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java index 76645ce431d..a02ebfcbb80 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java @@ -38,6 +38,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.slf4j.LoggerFactory; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageImpl; +import org.springframework.data.domain.Pageable; import org.springframework.messaging.Message; import org.springframework.transaction.PlatformTransactionManager; @@ -133,7 +134,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { JobInstance instance = createInstance(); when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(List.of(instance)); when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance)); - when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(INSTANCE_ID), any())) + when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(any(Pageable.class), eq(INSTANCE_ID), any())) .thenReturn(page); mySvc.runMaintenancePass(); @@ -177,7 +178,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance)); when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false))) .thenReturn(chunks.iterator()); - when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY)))) + when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(any(Pageable.class), eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY)))) .thenReturn(Page.empty()); stubUpdateInstanceCallback(instance); @@ -221,7 +222,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance)); when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false))) .thenReturn(chunks.iterator()); - when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY)))) + when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(any(Pageable.class), eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY)))) .thenReturn(Page.empty()); stubUpdateInstanceCallback(instance); @@ -258,7 +259,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { instance1.setCurrentGatedStepId(STEP_1); when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance1)); when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance1)); - when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), anyString(), eq(Set.of(WorkChunkStatusEnum.READY)))) + when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(any(Pageable.class), anyString(), eq(Set.of(WorkChunkStatusEnum.READY)))) .thenAnswer((args) -> { // new page every time (called more than once) return getPageOfData(new ArrayList<>(chunks)); @@ -293,7 +294,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { instance.setEndTime(parseTime("2001-01-01T12:12:12Z")); when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance)); when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance)); - when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY)))) + when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(any(Pageable.class), eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY)))) .thenReturn(Page.empty()); mySvc.runMaintenancePass(); @@ -318,7 +319,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance)); when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean())).thenAnswer(t->chunks.iterator()); when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance)); - when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY)))) + when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(any(Pageable.class), eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY)))) .thenReturn(Page.empty()); stubUpdateInstanceCallback(instance); @@ -362,7 +363,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance)); when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean())) .thenAnswer(t->chunks.iterator()); - when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY)))) + when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(any(Pageable.class), eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY)))) .thenReturn(Page.empty()); stubUpdateInstanceCallback(instance); @@ -394,7 +395,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { // mocks when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance)); when(myJobPersistence.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(instance)); - when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY)))) + when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(any(Pageable.class), eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY)))) .thenReturn(getPageOfData(theChunks)); when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean())) .thenAnswer(t -> theChunks.stream().map(c -> c.setStatus(WorkChunkStatusEnum.READY)).toList().iterator());