adding different paging mechanism

This commit is contained in:
leif stawnyczy 2024-03-22 13:57:52 -04:00
parent a00296f9fd
commit 7fb98340a8
7 changed files with 284 additions and 72 deletions

View File

@ -505,15 +505,11 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
}
@Override
public Iterator<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates(String theInstanceId, Set<WorkChunkStatusEnum> theStates) {
// fetch 10,000 at a time
return new PagingIterator<>(10000, (thePageIndex, theBatchSize, theConsumer) -> {
List<Batch2WorkChunkMetadataView> results = myWorkChunkMetadataViewRepo.fetchWorkChunkMetadataForJobInStates(PageRequest.of(thePageIndex, theBatchSize), theInstanceId, theStates);
public Page<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates(int thePageIndex, int thePageSize, String theInstanceId, Set<WorkChunkStatusEnum> theStates) {
Pageable request = PageRequest.of(thePageIndex, thePageSize);
Page<Batch2WorkChunkMetadataView> page = myWorkChunkMetadataViewRepo.fetchWorkChunkMetadataForJobInStates(request, theInstanceId, theStates);
for (Batch2WorkChunkMetadataView metaView : results) {
theConsumer.accept(metaView.toChunkMetadata());
}
});
return page.map(Batch2WorkChunkMetadataView::toChunkMetadata);
}
@Override

View File

@ -2,6 +2,7 @@ package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkMetadataView;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
@ -16,7 +17,7 @@ public interface IBatch2WorkChunkMetadataViewRepository extends JpaRepository<Ba
"SELECT v FROM Batch2WorkChunkMetadataView v WHERE v.myInstanceId = :instanceId AND v.myStatus IN :states "
+ " ORDER BY v.myInstanceId, v.mySequence, v.myTargetStepId, v.myStatus, v.myId ASC"
)
List<Batch2WorkChunkMetadataView> fetchWorkChunkMetadataForJobInStates(
Page<Batch2WorkChunkMetadataView> fetchWorkChunkMetadataForJobInStates(
Pageable thePageRequest,
@Param("instanceId") String theInstanceId,
@Param("states") Collection<WorkChunkStatusEnum> theStates

View File

@ -25,7 +25,7 @@ import static ca.uhn.fhir.batch2.model.JobDefinition.ID_MAX_LENGTH;
+ " e.definition_id as job_definition_id, "
+ " e.definition_ver as job_definition_version, "
+ " e.tgt_step_id as target_step_id "
+ "FROM BT2_WORK_CHUNK e WHERE (1=0) = false"
+ "FROM BT2_WORK_CHUNK e"
)
public class Batch2WorkChunkMetadataView implements Serializable {

View File

@ -153,7 +153,7 @@ public interface IJobPersistence extends IWorkChunkPersistence {
* @return an iterator for the workchunks
*/
@Transactional(propagation = Propagation.SUPPORTS)
Iterator<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates(String theInstanceId, Set<WorkChunkStatusEnum> theStates);
Page<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates(int thePageIndex, int thePageSize, String theInstanceId, Set<WorkChunkStatusEnum> theStates);
/**
* Callback to update a JobInstance within a locked transaction.

View File

@ -38,6 +38,7 @@ import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch;
import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;
import org.springframework.data.domain.Page;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.Iterator;
@ -50,6 +51,57 @@ public class JobInstanceProcessor {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
public static final long PURGE_THRESHOLD = 7L * DateUtils.MILLIS_PER_DAY;
class ReadyChunkIterator implements Iterator<WorkChunkMetadata> {
// 10,000 - we want big batches
private static final int PAGE_SIZE = 10000;
private Page<WorkChunkMetadata> currentPage;
private int myPageIndex = 0;
private int myItemIndex = 0;
private final String myInstanceId;
public ReadyChunkIterator(String theInstanceId) {
myInstanceId = theInstanceId;
}
private void getNextPage() {
if (currentPage == null || currentPage.hasNext()) {
currentPage = myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(
myPageIndex++, getPageSize(), myInstanceId, Set.of(WorkChunkStatusEnum.READY));
myItemIndex = 0;
} else {
currentPage = Page.empty();
}
}
int getPageSize() {
return PAGE_SIZE;
}
@Override
public boolean hasNext() {
if (currentPage == null) {
getNextPage();
}
return currentPage.getContent().size() > myItemIndex || currentPage.hasNext();
}
@Override
public WorkChunkMetadata next() {
if (myItemIndex >= currentPage.getSize()) {
getNextPage();
}
if (myItemIndex < currentPage.getSize()) {
return currentPage.getContent().get(myItemIndex++);
}
return null;
}
}
private final IJobPersistence myJobPersistence;
private final BatchJobSender myBatchJobSender;
private final JobChunkProgressAccumulator myProgressAccumulator;
@ -283,6 +335,10 @@ public class JobInstanceProcessor {
return false;
}
protected ReadyChunkIterator getReadyChunks(String theInstanceId) {
return new ReadyChunkIterator(theInstanceId);
}
/**
* Chunks are initially created in READY state.
* We will move READY chunks to QUEUE'd and send them to the queue/topic (kafka)
@ -298,17 +354,12 @@ public class JobInstanceProcessor {
// we need a transaction to access the stream of workchunks
// because workchunks are created in READY state, there's an unknown
// number of them (and so we could be reading many from the db)
// TransactionStatus status = myTransactionManager.getTransaction(new DefaultTransactionDefinition());
Iterator<WorkChunkMetadata> readyChunkMetadata = myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(
theJobInstance.getInstanceId(), Set.of(WorkChunkStatusEnum.READY)
);
Iterator<WorkChunkMetadata> iter = getReadyChunks(theJobInstance.getInstanceId());
AtomicInteger counter = new AtomicInteger();
ConcurrentHashMap<String, JobWorkCursor<?, ?, ?>> stepToWorkCursor = new ConcurrentHashMap<>();
while (readyChunkMetadata.hasNext()) {
WorkChunkMetadata metadata = readyChunkMetadata.next();
while (iter.hasNext()) {
WorkChunkMetadata metadata = iter.next();
JobWorkCursor<?, ?, ?> jobWorkCursor =
stepToWorkCursor.computeIfAbsent(metadata.getTargetStepId(), (e) -> {
return JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, metadata.getTargetStepId());
@ -335,41 +386,8 @@ public class JobInstanceProcessor {
*/
updateChunkAndSendToQueue(metadata);
}
// Stream<WorkChunk> readyChunks = myJobPersistence.fetchAllWorkChunksForJobInStates(
// theJobInstance.getInstanceId(), Set.of(WorkChunkStatusEnum.READY));
//
// AtomicInteger counter = new AtomicInteger();
// readyChunks.forEach(chunk -> {
// JobWorkCursor<?, ?, ?> jobWorkCursor =
// JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, chunk.getTargetStepId());
// counter.getAndIncrement();
// if (!theIsGatedExecutionAdvancementBool
// && (theJobDefinition.isGatedExecution() || jobWorkCursor.isReductionStep())) {
// /*
// * Gated executions are queued later when all work chunks are ready.
// *
// * Reduction steps are not submitted to the queue at all, but processed inline.
// * Currently all reduction steps are also gated, but this might not always
// * be true.
// */
// return;
// }
//
// /*
// * For each chunk id
// * * Move to QUEUE'd
// * * Send to topic
// * * flush changes
// * * commit
// */
// updateChunkAndSendToQueue(chunk, theJobInstance, theJobDefinition);
// });
// myTransactionManager.commit(status);
ourLog.debug(
"Encountered {} READY work chunks for job {}", counter.get(), theJobDefinition.getJobDefinitionId());
"Encountered {} READY work chunks for job {}", counter.get(), theJobDefinition.getJobDefinitionId());
}
/**

View File

@ -0,0 +1,193 @@
package ca.uhn.fhir.batch2.maintenance;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunkMetadata;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.model.api.IModelJson;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class JobInstanceProcessorTest {
private static final String JOB_ID = "job_id";
private static final String SECOND_STEP_ID = "second";
public static class TestJobInstanceProcessor extends JobInstanceProcessor {
private ReadyChunkIterator mySpy;
public TestJobInstanceProcessor(IJobPersistence theJobPersistence, BatchJobSender theBatchJobSender, String theInstanceId, JobChunkProgressAccumulator theProgressAccumulator, IReductionStepExecutorService theReductionStepExecutorService, JobDefinitionRegistry theJobDefinitionRegistry, PlatformTransactionManager theTransactionManager) {
super(theJobPersistence, theBatchJobSender, theInstanceId, theProgressAccumulator, theReductionStepExecutorService, theJobDefinitionRegistry, theTransactionManager);
}
public void setSpy(ReadyChunkIterator theSpy) {
mySpy = theSpy;
}
@Override
protected ReadyChunkIterator getReadyChunks(String theInstanceId) {
if (mySpy == null) {
return super.getReadyChunks(theInstanceId);
}
return mySpy;
}
}
@Mock
private IJobPersistence myJobPersistence;
@Mock
private JobDefinitionRegistry myJobDefinitionRegistry;
private TestJobInstanceProcessor myJobInstanceProcessor;
@Test
public void testLargeBatchesForEnqueue_pagesCorrectly() {
// setup
int batchSize = 2;
int total = 20;
String instanceId = "instanceId";
JobInstance instance = new JobInstance();
instance.setInstanceId(instanceId);
instance.setStatus(StatusEnum.QUEUED);
List<WorkChunkMetadata> workchunks = new ArrayList<>();
Set<String> expectedIds = new HashSet<>();
for (int i = 0; i < total; i++) {
String id = "id" + i;
workchunks.add(createWorkChunkMetadata(instanceId).setId(id));
expectedIds.add(id);
}
// create out object to test (it's not a bean)
myJobInstanceProcessor = new TestJobInstanceProcessor(
myJobPersistence,
mock(BatchJobSender.class),
instanceId,
mock(JobChunkProgressAccumulator.class),
mock(IReductionStepExecutorService.class),
myJobDefinitionRegistry,
null
);
// we need to spy our iterator so we don't use 1000s in this test
JobInstanceProcessor.ReadyChunkIterator chunkIterator = spy(myJobInstanceProcessor.getReadyChunks(instanceId));
// mock
when(myJobPersistence.fetchInstance(eq(instanceId)))
.thenReturn(Optional.of(instance));
when(myJobDefinitionRegistry.getJobDefinitionOrThrowException(any()))
.thenReturn((JobDefinition<IModelJson>) createDefinition());
when(chunkIterator.getPageSize())
.thenReturn(batchSize);
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), eq(batchSize), eq(instanceId), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenAnswer((args) -> {
int pageNum = args.getArgument(0);
int pageSize = args.getArgument(1);
assertEquals(batchSize, pageSize);
int indexLow = pageNum * pageSize;
int indexHigh = Math.min(indexLow + pageNum + pageSize, workchunks.size());
List<WorkChunkMetadata> subList = workchunks.subList(indexLow, indexHigh);
Page<WorkChunkMetadata> page = new PageImpl<>(subList,
PageRequest.of(pageNum, pageSize),
total);
return page;
});
myJobInstanceProcessor.setSpy(chunkIterator);
// test
myJobInstanceProcessor.process();
// verify
ArgumentCaptor<String> idCaptor = ArgumentCaptor.forClass(String.class);
verify(myJobPersistence, times(total))
.enqueueWorkChunkForProcessing(idCaptor.capture(), any());
assertEquals(total, idCaptor.getAllValues().size());
Set<String> actualIds = new HashSet<>(idCaptor.getAllValues());
assertEquals(expectedIds.size(), actualIds.size());
}
private WorkChunkMetadata createWorkChunkMetadata(String theInstanceId) {
WorkChunkMetadata metadata = new WorkChunkMetadata();
metadata.setInstanceId(theInstanceId);
metadata.setJobDefinitionId(JOB_ID);
metadata.setJobDefinitionVersion(1);
metadata.setId("id");
metadata.setTargetStepId(SECOND_STEP_ID);
metadata.setStatus(WorkChunkStatusEnum.READY);
metadata.setSequence(0);
return metadata;
}
private JobDefinition<?> createDefinition() {
return JobDefinition.newBuilder()
.setParametersType(IModelJson.class)
.setJobDefinitionId(JOB_ID)
.setJobDefinitionVersion(1)
.setJobDescription("a descriptive description")
.addFirstStep(
"first",
"first description",
IModelJson.class,
(details, sink) -> {
sink.accept(new IModelJson() {
});
return RunOutcome.SUCCESS;
})
.addIntermediateStep(
SECOND_STEP_ID,
"second description",
IModelJson.class,
(details, sink) -> {
sink.accept(new IModelJson() {
});
return RunOutcome.SUCCESS;
}
)
.addLastStep(
"last",
"last description",
(details, sink) -> {
sink.accept(new VoidModel());
return RunOutcome.SUCCESS;
}
)
.build();
}
}

View File

@ -36,6 +36,8 @@ import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.messaging.Message;
import org.springframework.transaction.PlatformTransactionManager;
@ -126,15 +128,14 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
);
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false)))
.thenReturn(chunks.iterator());
Page<WorkChunkMetadata> page = getPageOfData(chunks);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
JobInstance instance = createInstance();
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(List.of(instance));
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance));
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(eq(INSTANCE_ID), any()))
.thenReturn(chunks.stream()
.map(c -> (WorkChunkMetadata) c)
.iterator());
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(INSTANCE_ID), any()))
.thenReturn(page);
mySvc.runMaintenancePass();
@ -177,8 +178,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false)))
.thenReturn(chunks.iterator());
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Collections.emptyIterator());
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Page.empty());
stubUpdateInstanceCallback(instance);
mySvc.runMaintenancePass();
@ -221,8 +222,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false)))
.thenReturn(chunks.iterator());
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Collections.emptyIterator());
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Page.empty());
stubUpdateInstanceCallback(instance);
// Execute
@ -258,10 +259,10 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
instance1.setCurrentGatedStepId(STEP_1);
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance1));
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance1));
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyString(), eq(Set.of(WorkChunkStatusEnum.READY))))
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), anyString(), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenAnswer((args) -> {
// new iterator every time (called more than once)
return new ArrayList<>(chunks).iterator();
// new page every time (called more than once)
return getPageOfData(new ArrayList<>(chunks));
});
doAnswer(a -> {
Consumer<Integer> callback = a.getArgument(1);
@ -293,8 +294,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
instance.setEndTime(parseTime("2001-01-01T12:12:12Z"));
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance));
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Collections.emptyIterator());
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Page.empty());
mySvc.runMaintenancePass();
@ -318,8 +319,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean())).thenAnswer(t->chunks.iterator());
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance));
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Collections.emptyIterator());
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Page.empty());
stubUpdateInstanceCallback(instance);
// Execute
@ -362,8 +363,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean()))
.thenAnswer(t->chunks.iterator());
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Collections.emptyIterator());
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Page.empty());
stubUpdateInstanceCallback(instance);
mySvc.runMaintenancePass();
@ -394,8 +395,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
// mocks
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(instance));
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenAnswer(t -> theChunks.iterator());
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(getPageOfData(theChunks));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean()))
.thenAnswer(t -> theChunks.stream().map(c -> c.setStatus(WorkChunkStatusEnum.READY)).toList().iterator());
@ -564,4 +565,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
return new DateTimeType(theDate).getValue();
}
private Page<WorkChunkMetadata> getPageOfData(List<WorkChunk> theChunks) {
return new PageImpl<>(theChunks.stream().map(c -> (WorkChunkMetadata)c).collect(Collectors.toList()));
}
}