review fixes

This commit is contained in:
leif stawnyczy 2024-03-25 10:36:55 -04:00
parent 708a737e5f
commit db4ca99a3d
7 changed files with 37 additions and 264 deletions

View File

@ -506,10 +506,9 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
public Page<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates(
int thePageIndex, int thePageSize, String theInstanceId, Set<WorkChunkStatusEnum> theStates) {
Pageable request = PageRequest.of(thePageIndex, thePageSize);
Pageable thePageable, String theInstanceId, Set<WorkChunkStatusEnum> theStates) {
Page<Batch2WorkChunkMetadataView> page =
myWorkChunkMetadataViewRepo.fetchWorkChunkMetadataForJobInStates(request, theInstanceId, theStates);
myWorkChunkMetadataViewRepo.fetchWorkChunkMetadataForJobInStates(thePageable, theInstanceId, theStates);
return page.map(Batch2WorkChunkMetadataView::toChunkMetadata);
}

View File

@ -14,6 +14,10 @@ import java.io.Serializable;
import static ca.uhn.fhir.batch2.model.JobDefinition.ID_MAX_LENGTH;
/**
* A view for a Work Chunk that contains only the most necessary information
* to satisfy the no-data path.
*/
@Entity
@Immutable
@Subselect("SELECT e.id as id, "

View File

@ -3,6 +3,7 @@ package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation;
import ca.uhn.test.concurrency.PointcutLatch;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@ -14,6 +15,11 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
Logger ourLog = LoggerFactory.getLogger(IJobMaintenanceActions.class);
@BeforeEach
default void before() {
enableMaintenanceRunner(false);
}
@Test
default void test_gatedJob_stepReady_advances() throws InterruptedException {
// setup
@ -24,7 +30,6 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
2|READY,2|QUEUED
""";
int numToTransition = 2;
enableMaintenanceRunner(false);
PointcutLatch sendLatch = disableWorkChunkMessageHandler();
sendLatch.setExpectedCount(numToTransition);
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(initialState, true);
@ -101,7 +106,6 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
})
default void testGatedStep2NotReady_notAdvance(String theChunkState) throws InterruptedException {
// setup
enableMaintenanceRunner(false);
PointcutLatch sendingLatch = disableWorkChunkMessageHandler();
sendingLatch.setExpectedCount(0);
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true);
@ -145,7 +149,6 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
})
default void testGatedStep2ReadyToAdvance_advanceToStep3(String theChunkState) throws InterruptedException {
// setup
enableMaintenanceRunner(false);
PointcutLatch sendingLatch = disableWorkChunkMessageHandler();
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true);
createChunksInStates(result);
@ -160,7 +163,7 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
}
@Test
default void test_ungatedJob_advancesSteps() throws InterruptedException {
default void test_ungatedJob_queuesReadyChunks() throws InterruptedException {
// setup
String state = """
# READY chunks should transition; others should stay
@ -174,7 +177,6 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
int expectedTransitions = 2;
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(state, false);
enableMaintenanceRunner(false);
PointcutLatch sendLatch = disableWorkChunkMessageHandler();
sendLatch.setExpectedCount(expectedTransitions);
createChunksInStates(result);

View File

@ -154,7 +154,7 @@ public interface IJobPersistence extends IWorkChunkPersistence {
*/
@Transactional(propagation = Propagation.SUPPORTS)
Page<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates(
int thePageIndex, int thePageSize, String theInstanceId, Set<WorkChunkStatusEnum> theStates);
Pageable thePageable, String theInstanceId, Set<WorkChunkStatusEnum> theStates);
/**
* Callback to update a JobInstance within a locked transaction.

View File

@ -34,11 +34,13 @@ import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.model.api.PagingIterator;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch;
import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import java.util.Iterator;
import java.util.List;
@ -50,57 +52,8 @@ public class JobInstanceProcessor {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
public static final long PURGE_THRESHOLD = 7L * DateUtils.MILLIS_PER_DAY;
class ReadyChunkIterator implements Iterator<WorkChunkMetadata> {
// 10,000 - we want big batches
private static final int PAGE_SIZE = 10000;
private Page<WorkChunkMetadata> currentPage;
private int myPageIndex = 0;
private int myItemIndex = 0;
private final String myInstanceId;
public ReadyChunkIterator(String theInstanceId) {
myInstanceId = theInstanceId;
}
private void getNextPage() {
if (currentPage == null || currentPage.hasNext()) {
currentPage = myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(
myPageIndex++, getPageSize(), myInstanceId, Set.of(WorkChunkStatusEnum.READY));
myItemIndex = 0;
} else {
currentPage = Page.empty();
}
}
int getPageSize() {
return PAGE_SIZE;
}
@Override
public boolean hasNext() {
if (currentPage == null) {
getNextPage();
}
return currentPage.getContent().size() > myItemIndex || currentPage.hasNext();
}
@Override
public WorkChunkMetadata next() {
if (myItemIndex >= currentPage.getSize()) {
getNextPage();
}
if (myItemIndex < currentPage.getSize()) {
return currentPage.getContent().get(myItemIndex++);
}
return null;
}
}
// 10k; we want to get as many as we can
private static final int WORK_CHUNK_METADATA_BATCH_SIZE = 10000;
private final IJobPersistence myJobPersistence;
private final BatchJobSender myBatchJobSender;
private final JobChunkProgressAccumulator myProgressAccumulator;
@ -329,8 +282,15 @@ public class JobInstanceProcessor {
return false;
}
protected ReadyChunkIterator getReadyChunks(String theInstanceId) {
return new ReadyChunkIterator(theInstanceId);
protected PagingIterator<WorkChunkMetadata> getReadyChunks() {
return new PagingIterator<>(WORK_CHUNK_METADATA_BATCH_SIZE, (index, batchsize, consumer) -> {
Pageable pageable = Pageable.ofSize(batchsize).withPage(index);
Page<WorkChunkMetadata> results = myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(
pageable, myInstanceId, Set.of(WorkChunkStatusEnum.READY));
for (WorkChunkMetadata metadata : results) {
consumer.accept(metadata);
}
});
}
/**
@ -345,7 +305,7 @@ public class JobInstanceProcessor {
*/
private void enqueueReadyChunks(
JobInstance theJobInstance, JobDefinition<?> theJobDefinition, boolean theIsGatedExecutionAdvancementBool) {
Iterator<WorkChunkMetadata> iter = getReadyChunks(theJobInstance.getInstanceId());
Iterator<WorkChunkMetadata> iter = getReadyChunks();
AtomicInteger counter = new AtomicInteger();
ConcurrentHashMap<String, JobWorkCursor<?, ?, ?>> stepToWorkCursor = new ConcurrentHashMap<>();

View File

@ -1,193 +0,0 @@
package ca.uhn.fhir.batch2.maintenance;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunkMetadata;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.model.api.IModelJson;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class JobInstanceProcessorTest {
private static final String JOB_ID = "job_id";
private static final String SECOND_STEP_ID = "second";
public static class TestJobInstanceProcessor extends JobInstanceProcessor {
private ReadyChunkIterator mySpy;
public TestJobInstanceProcessor(IJobPersistence theJobPersistence, BatchJobSender theBatchJobSender, String theInstanceId, JobChunkProgressAccumulator theProgressAccumulator, IReductionStepExecutorService theReductionStepExecutorService, JobDefinitionRegistry theJobDefinitionRegistry, PlatformTransactionManager theTransactionManager) {
super(theJobPersistence, theBatchJobSender, theInstanceId, theProgressAccumulator, theReductionStepExecutorService, theJobDefinitionRegistry, theTransactionManager);
}
public void setSpy(ReadyChunkIterator theSpy) {
mySpy = theSpy;
}
@Override
protected ReadyChunkIterator getReadyChunks(String theInstanceId) {
if (mySpy == null) {
return super.getReadyChunks(theInstanceId);
}
return mySpy;
}
}
@Mock
private IJobPersistence myJobPersistence;
@Mock
private JobDefinitionRegistry myJobDefinitionRegistry;
private TestJobInstanceProcessor myJobInstanceProcessor;
@Test
public void testLargeBatchesForEnqueue_pagesCorrectly() {
// setup
int batchSize = 2;
int total = 20;
String instanceId = "instanceId";
JobInstance instance = new JobInstance();
instance.setInstanceId(instanceId);
instance.setStatus(StatusEnum.QUEUED);
List<WorkChunkMetadata> workchunks = new ArrayList<>();
Set<String> expectedIds = new HashSet<>();
for (int i = 0; i < total; i++) {
String id = "id" + i;
workchunks.add(createWorkChunkMetadata(instanceId).setId(id));
expectedIds.add(id);
}
// create out object to test (it's not a bean)
myJobInstanceProcessor = new TestJobInstanceProcessor(
myJobPersistence,
mock(BatchJobSender.class),
instanceId,
mock(JobChunkProgressAccumulator.class),
mock(IReductionStepExecutorService.class),
myJobDefinitionRegistry,
null
);
// we need to spy our iterator so we don't use 1000s in this test
JobInstanceProcessor.ReadyChunkIterator chunkIterator = spy(myJobInstanceProcessor.getReadyChunks(instanceId));
// mock
when(myJobPersistence.fetchInstance(eq(instanceId)))
.thenReturn(Optional.of(instance));
when(myJobDefinitionRegistry.getJobDefinitionOrThrowException(any()))
.thenReturn((JobDefinition<IModelJson>) createDefinition());
when(chunkIterator.getPageSize())
.thenReturn(batchSize);
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), eq(batchSize), eq(instanceId), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenAnswer((args) -> {
int pageNum = args.getArgument(0);
int pageSize = args.getArgument(1);
assertEquals(batchSize, pageSize);
int indexLow = pageNum * pageSize;
int indexHigh = Math.min(indexLow + pageNum + pageSize, workchunks.size());
List<WorkChunkMetadata> subList = workchunks.subList(indexLow, indexHigh);
Page<WorkChunkMetadata> page = new PageImpl<>(subList,
PageRequest.of(pageNum, pageSize),
total);
return page;
});
myJobInstanceProcessor.setSpy(chunkIterator);
// test
myJobInstanceProcessor.process();
// verify
ArgumentCaptor<String> idCaptor = ArgumentCaptor.forClass(String.class);
verify(myJobPersistence, times(total))
.enqueueWorkChunkForProcessing(idCaptor.capture(), any());
assertEquals(total, idCaptor.getAllValues().size());
Set<String> actualIds = new HashSet<>(idCaptor.getAllValues());
assertEquals(expectedIds.size(), actualIds.size());
}
private WorkChunkMetadata createWorkChunkMetadata(String theInstanceId) {
WorkChunkMetadata metadata = new WorkChunkMetadata();
metadata.setInstanceId(theInstanceId);
metadata.setJobDefinitionId(JOB_ID);
metadata.setJobDefinitionVersion(1);
metadata.setId("id");
metadata.setTargetStepId(SECOND_STEP_ID);
metadata.setStatus(WorkChunkStatusEnum.READY);
metadata.setSequence(0);
return metadata;
}
private JobDefinition<?> createDefinition() {
return JobDefinition.newBuilder()
.setParametersType(IModelJson.class)
.setJobDefinitionId(JOB_ID)
.setJobDefinitionVersion(1)
.setJobDescription("a descriptive description")
.addFirstStep(
"first",
"first description",
IModelJson.class,
(details, sink) -> {
sink.accept(new IModelJson() {
});
return RunOutcome.SUCCESS;
})
.addIntermediateStep(
SECOND_STEP_ID,
"second description",
IModelJson.class,
(details, sink) -> {
sink.accept(new IModelJson() {
});
return RunOutcome.SUCCESS;
}
)
.addLastStep(
"last",
"last description",
(details, sink) -> {
sink.accept(new VoidModel());
return RunOutcome.SUCCESS;
}
)
.build();
}
}

View File

@ -38,6 +38,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.messaging.Message;
import org.springframework.transaction.PlatformTransactionManager;
@ -133,7 +134,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
JobInstance instance = createInstance();
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(List.of(instance));
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance));
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(INSTANCE_ID), any()))
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(any(Pageable.class), eq(INSTANCE_ID), any()))
.thenReturn(page);
mySvc.runMaintenancePass();
@ -177,7 +178,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false)))
.thenReturn(chunks.iterator());
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(any(Pageable.class), eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Page.empty());
stubUpdateInstanceCallback(instance);
@ -221,7 +222,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false)))
.thenReturn(chunks.iterator());
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(any(Pageable.class), eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Page.empty());
stubUpdateInstanceCallback(instance);
@ -258,7 +259,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
instance1.setCurrentGatedStepId(STEP_1);
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance1));
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance1));
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), anyString(), eq(Set.of(WorkChunkStatusEnum.READY))))
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(any(Pageable.class), anyString(), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenAnswer((args) -> {
// new page every time (called more than once)
return getPageOfData(new ArrayList<>(chunks));
@ -293,7 +294,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
instance.setEndTime(parseTime("2001-01-01T12:12:12Z"));
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance));
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(any(Pageable.class), eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Page.empty());
mySvc.runMaintenancePass();
@ -318,7 +319,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean())).thenAnswer(t->chunks.iterator());
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance));
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY))))
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(any(Pageable.class), eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Page.empty());
stubUpdateInstanceCallback(instance);
@ -362,7 +363,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean()))
.thenAnswer(t->chunks.iterator());
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(any(Pageable.class), eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Page.empty());
stubUpdateInstanceCallback(instance);
@ -394,7 +395,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
// mocks
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(instance));
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyInt(), anyInt(), eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY))))
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(any(Pageable.class), eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(getPageOfData(theChunks));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean()))
.thenAnswer(t -> theChunks.stream().map(c -> c.setStatus(WorkChunkStatusEnum.READY)).toList().iterator());