fixing some tests and adding a view

This commit is contained in:
leif stawnyczy 2024-03-21 16:40:25 -04:00
parent 8aed8bf6ac
commit a00296f9fd
24 changed files with 566 additions and 218 deletions

View File

@ -26,6 +26,9 @@ import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.function.Consumer;
/**
* This paging iterator only works with already ordered queries
*/
public class PagingIterator<T> implements Iterator<T> {
public interface PageFetcher<T> {
@ -42,8 +45,16 @@ public class PagingIterator<T> implements Iterator<T> {
private final PageFetcher<T> myFetcher;
private final int myPageSize;
public PagingIterator(PageFetcher<T> theFetcher) {
this(PAGE_SIZE, theFetcher);
}
public PagingIterator(int thePageSize, PageFetcher<T> theFetcher) {
assert thePageSize > 0 : "Page size must be a positive value";
myFetcher = theFetcher;
myPageSize = thePageSize;
}
@Override
@ -66,9 +77,9 @@ public class PagingIterator<T> implements Iterator<T> {
private void fetchNextBatch() {
if (!myIsFinished && myCurrentBatch.isEmpty()) {
myFetcher.fetchNextPage(myPage, PAGE_SIZE, myCurrentBatch::add);
myFetcher.fetchNextPage(myPage, myPageSize, myCurrentBatch::add);
myPage++;
myIsFinished = myCurrentBatch.size() < PAGE_SIZE;
myIsFinished = myCurrentBatch.size() < myPageSize;
}
}
}

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.batch2.config.BaseBatch2Config;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.jpa.bulk.export.job.BulkExportJobConfig;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkMetadataViewRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import jakarta.persistence.EntityManager;
@ -39,12 +40,14 @@ public class JpaBatch2Config extends BaseBatch2Config {
public IJobPersistence batch2JobInstancePersister(
IBatch2JobInstanceRepository theJobInstanceRepository,
IBatch2WorkChunkRepository theWorkChunkRepository,
IBatch2WorkChunkMetadataViewRepository theWorkChunkMetadataViewRepo,
IHapiTransactionService theTransactionService,
EntityManager theEntityManager,
IInterceptorBroadcaster theInterceptorBroadcaster) {
return new JpaJobPersistenceImpl(
theJobInstanceRepository,
theWorkChunkRepository,
theWorkChunkMetadataViewRepo,
theTransactionService,
theEntityManager,
theInterceptorBroadcaster);

View File

@ -28,16 +28,19 @@ import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkMetadata;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkMetadataViewRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkMetadataView;
import ca.uhn.fhir.model.api.PagingIterator;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
@ -87,6 +90,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
private final IBatch2JobInstanceRepository myJobInstanceRepository;
private final IBatch2WorkChunkRepository myWorkChunkRepository;
private final IBatch2WorkChunkMetadataViewRepository myWorkChunkMetadataViewRepo;
private final EntityManager myEntityManager;
private final IHapiTransactionService myTransactionService;
private final IInterceptorBroadcaster myInterceptorBroadcaster;
@ -97,6 +101,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
public JpaJobPersistenceImpl(
IBatch2JobInstanceRepository theJobInstanceRepository,
IBatch2WorkChunkRepository theWorkChunkRepository,
IBatch2WorkChunkMetadataViewRepository theWorkChunkMetadataViewRepo,
IHapiTransactionService theTransactionService,
EntityManager theEntityManager,
IInterceptorBroadcaster theInterceptorBroadcaster) {
@ -104,6 +109,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
Validate.notNull(theWorkChunkRepository);
myJobInstanceRepository = theJobInstanceRepository;
myWorkChunkRepository = theWorkChunkRepository;
myWorkChunkMetadataViewRepo = theWorkChunkMetadataViewRepo;
myTransactionService = theTransactionService;
myEntityManager = theEntityManager;
myInterceptorBroadcaster = theInterceptorBroadcaster;
@ -499,11 +505,15 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
}
@Override
public Stream<WorkChunk> fetchAllWorkChunksForJobInStates(
String theInstanceId, Set<WorkChunkStatusEnum> theWorkChunkStatuses) {
return myWorkChunkRepository
.fetchChunksForJobInStates(theInstanceId, theWorkChunkStatuses)
.map(this::toChunk);
public Iterator<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates(String theInstanceId, Set<WorkChunkStatusEnum> theStates) {
// fetch 10,000 at a time
return new PagingIterator<>(10000, (thePageIndex, theBatchSize, theConsumer) -> {
List<Batch2WorkChunkMetadataView> results = myWorkChunkMetadataViewRepo.fetchWorkChunkMetadataForJobInStates(PageRequest.of(thePageIndex, theBatchSize), theInstanceId, theStates);
for (Batch2WorkChunkMetadataView metaView : results) {
theConsumer.accept(metaView.toChunkMetadata());
}
});
}
@Override

View File

@ -0,0 +1,24 @@
package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkMetadataView;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.util.Collection;
import java.util.List;
public interface IBatch2WorkChunkMetadataViewRepository extends JpaRepository<Batch2WorkChunkMetadataView, String> {
@Query(
"SELECT v FROM Batch2WorkChunkMetadataView v WHERE v.myInstanceId = :instanceId AND v.myStatus IN :states "
+ " ORDER BY v.myInstanceId, v.mySequence, v.myTargetStepId, v.myStatus, v.myId ASC"
)
List<Batch2WorkChunkMetadataView> fetchWorkChunkMetadataForJobInStates(
Pageable thePageRequest,
@Param("instanceId") String theInstanceId,
@Param("states") Collection<WorkChunkStatusEnum> theStates
);
}

View File

@ -63,10 +63,6 @@ public interface IBatch2WorkChunkRepository
Stream<Batch2WorkChunkEntity> fetchChunksForStep(
@Param("instanceId") String theInstanceId, @Param("targetStepId") String theTargetStepId);
@Query("SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId AND e.myStatus IN :states")
Stream<Batch2WorkChunkEntity> fetchChunksForJobInStates(
@Param("instanceId") String theInstanceId, @Param("states") Collection<WorkChunkStatusEnum> theStates);
@Modifying
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, "
+ "e.myRecordsProcessed = :rp, e.myErrorCount = e.myErrorCount + :errorRetries, e.mySerializedData = null, "

View File

@ -47,10 +47,15 @@ import static ca.uhn.fhir.batch2.model.JobDefinition.ID_MAX_LENGTH;
import static ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity.STATUS_MAX_LENGTH;
import static org.apache.commons.lang3.StringUtils.left;
// add index for instance,stepId,status,seq,id
@Entity
@Table(
name = "BT2_WORK_CHUNK",
indexes = {@Index(name = "IDX_BT2WC_II_SEQ", columnList = "INSTANCE_ID,SEQ")})
indexes = {
@Index(name = "IDX_BT2WC_II_SEQ", columnList = "INSTANCE_ID,SEQ"),
@Index(name = "IDX_BT2WC_II_SI_S_SEQ_ID", columnList = "INSTANCE_ID,TGT_STEP_ID,STAT,SEQ,ID")
}
)
public class Batch2WorkChunkEntity implements Serializable {
public static final int ERROR_MSG_MAX_LENGTH = 500;

View File

@ -0,0 +1,122 @@
package ca.uhn.fhir.jpa.entity;
import ca.uhn.fhir.batch2.model.WorkChunkMetadata;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.EnumType;
import jakarta.persistence.Enumerated;
import jakarta.persistence.Id;
import org.hibernate.annotations.Immutable;
import org.hibernate.annotations.Subselect;
import java.io.Serializable;
import static ca.uhn.fhir.batch2.model.JobDefinition.ID_MAX_LENGTH;
@Entity
@Immutable
@Subselect(
"SELECT e.id as id, "
+ " e.seq as seq,"
+ " e.stat as status, "
+ " e.instance_id as instance_id, "
+ " e.definition_id as job_definition_id, "
+ " e.definition_ver as job_definition_version, "
+ " e.tgt_step_id as target_step_id "
+ "FROM BT2_WORK_CHUNK e WHERE (1=0) = false"
)
public class Batch2WorkChunkMetadataView implements Serializable {
@Id
@Column(name = "ID", length = ID_MAX_LENGTH)
private String myId;
@Column(name = "SEQ", nullable = false)
private int mySequence;
@Column(name = "STATUS", length = ID_MAX_LENGTH, nullable = false)
@Enumerated(EnumType.STRING)
private WorkChunkStatusEnum myStatus;
@Column(name = "INSTANCE_ID", length = ID_MAX_LENGTH, nullable = false)
private String myInstanceId;
@Column(name = "JOB_DEFINITION_ID", length = ID_MAX_LENGTH, nullable = false)
private String myJobDefinitionId;
@Column(name = "JOB_DEFINITION_VERSION", nullable = false)
private int myJobDefinitionVersion;
@Column(name = "TARGET_STEP_ID", length = ID_MAX_LENGTH, nullable = false)
private String myTargetStepId;
public String getId() {
return myId;
}
public void setId(String theId) {
myId = theId;
}
public int getSequence() {
return mySequence;
}
public void setSequence(int theSequence) {
mySequence = theSequence;
}
public WorkChunkStatusEnum getStatus() {
return myStatus;
}
public void setStatus(WorkChunkStatusEnum theStatus) {
myStatus = theStatus;
}
public String getInstanceId() {
return myInstanceId;
}
public void setInstanceId(String theInstanceId) {
myInstanceId = theInstanceId;
}
public String getJobDefinitionId() {
return myJobDefinitionId;
}
public void setJobDefinitionId(String theJobDefinitionId) {
myJobDefinitionId = theJobDefinitionId;
}
public int getJobDefinitionVersion() {
return myJobDefinitionVersion;
}
public void setJobDefinitionVersion(int theJobDefinitionVersion) {
myJobDefinitionVersion = theJobDefinitionVersion;
}
public String getTargetStepId() {
return myTargetStepId;
}
public void setTargetStepId(String theTargetStepId) {
myTargetStepId = theTargetStepId;
}
public WorkChunkMetadata toChunkMetadata() {
WorkChunkMetadata metadata = new WorkChunkMetadata();
metadata.setId(getId());
metadata.setInstanceId(getInstanceId());
metadata.setSequence(getSequence());
metadata.setStatus(getStatus());
metadata.setJobDefinitionId(getJobDefinitionId());
metadata.setJobDefinitionVersion(getJobDefinitionVersion());
metadata.setTargetStepId(getTargetStepId());
return metadata;
}
}

View File

@ -216,6 +216,14 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
// This fix will work for MSSQL or Oracle.
version.addTask(new ForceIdMigrationFixTask(version.getRelease(), "20231222.1"));
// add index to Batch2WorkChunkEntity
Builder.BuilderWithTableName workChunkTable = version.onTable("BT2_WORK_CHUNK");
workChunkTable
.addIndex("20240321.1", "IDX_BT2WC_II_SI_S_SEQ_ID")
.unique(true)
.withColumns("INSTANCE_ID", "TGT_STEP_ID", "STAT", "SEQ", "ID");
}
protected void init680() {

View File

@ -2215,28 +2215,28 @@ public class FhirResourceDaoDstu2Test extends BaseJpaDstu2Test {
p.addName().addFamily(methodName);
IIdType id1 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless();
sleep1MS();
sleepUntilTimeChange();
p = new Patient();
p.addIdentifier().setSystem("urn:system2").setValue(methodName);
p.addName().addFamily(methodName);
IIdType id2 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless();
sleep1MS();
sleepUntilTimeChange();
p = new Patient();
p.addIdentifier().setSystem("urn:system3").setValue(methodName);
p.addName().addFamily(methodName);
IIdType id3 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless();
sleep1MS();
sleepUntilTimeChange();
p = new Patient();
p.addIdentifier().setSystem("urn:system4").setValue(methodName);
p.addName().addFamily(methodName);
IIdType id4 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless();
sleep1MS();
sleepUntilTimeChange();
SearchParameterMap pm;
List<IIdType> actual;

View File

@ -366,7 +366,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
Date updateTime = runInTransaction(() -> new Date(myJobInstanceRepository.findById(instanceId).orElseThrow().getUpdateTime().getTime()));
sleep1MS();
sleepUntilTimeChange();
// Test
runInTransaction(() -> mySvc.updateInstanceUpdateTime(instanceId));
@ -504,7 +504,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertNotNull(chunk.getData());
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
sleep1MS();
sleepUntilTimeChange();
mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkId, 50, 0));
runInTransaction(() -> {
@ -569,7 +569,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
sleep1MS();
sleepUntilTimeChange();
WorkChunkErrorEvent request = new WorkChunkErrorEvent(chunkId).setErrorMsg("This is an error message");
mySvc.onWorkChunkError(request);
@ -621,7 +621,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
sleep1MS();
sleepUntilTimeChange();
mySvc.onWorkChunkFailed(chunkId, "This is an error message");
runInTransaction(() -> {

View File

@ -581,13 +581,13 @@ public class FhirSystemDaoR4Test extends BaseJpaR4SystemTest {
p.addName().setFamily("family");
final IIdType id = myPatientDao.create(p, mySrd).getId().toUnqualified();
sleep1MS();
sleepUntilTimeChange();
ValueSet vs = new ValueSet();
vs.setUrl("http://foo");
myValueSetDao.create(vs, mySrd);
sleep1MS();
sleepUntilTimeChange();
ResourceTable entity = new TransactionTemplate(myTxManager).execute(t -> myEntityManager.find(ResourceTable.class, id.getIdPartAsLong()));
assertEquals(Long.valueOf(1), entity.getIndexStatus());

View File

@ -358,7 +358,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
myReindexTestHelper.createObservationWithAlleleExtension(Observation.ObservationStatus.FINAL);
}
sleep1MS();
sleepUntilTimeChange();
myReindexTestHelper.createAlleleSearchParameter();
mySearchParamRegistry.forceRefresh();

View File

@ -30,22 +30,22 @@ public class ResourceReindexSvcImplTest extends BaseJpaR4Test {
// Setup
createPatient(withActiveFalse());
sleep1MS();
sleepUntilTimeChange();
Date start = new Date();
Long id0 = createPatient(withActiveFalse()).getIdPartAsLong();
sleep1MS();
sleepUntilTimeChange();
Long id1 = createPatient(withActiveFalse()).getIdPartAsLong();
sleep1MS();
sleepUntilTimeChange();
Date beforeLastInRange = new Date();
sleep1MS();
sleepUntilTimeChange();
Long id2 = createObservation(withObservationCode("http://foo", "bar")).getIdPartAsLong();
sleep1MS();
sleepUntilTimeChange();
Date end = new Date();
sleep1MS();
sleepUntilTimeChange();
createPatient(withActiveFalse());
@ -103,26 +103,26 @@ public class ResourceReindexSvcImplTest extends BaseJpaR4Test {
// Setup
final Long patientId0 = createPatient(withActiveFalse()).getIdPartAsLong();
sleep1MS();
sleepUntilTimeChange();
// Start of resources within range
Date start = new Date();
sleep1MS();
sleepUntilTimeChange();
Long patientId1 = createPatient(withActiveFalse()).getIdPartAsLong();
createObservation(withObservationCode("http://foo", "bar"));
createObservation(withObservationCode("http://foo", "bar"));
sleep1MS();
sleepUntilTimeChange();
Date beforeLastInRange = new Date();
sleep1MS();
sleepUntilTimeChange();
Long patientId2 = createPatient(withActiveFalse()).getIdPartAsLong();
sleep1MS();
sleepUntilTimeChange();
Date end = new Date();
sleep1MS();
sleepUntilTimeChange();
// End of resources within range
createObservation(withObservationCode("http://foo", "bar"));
final Long patientId3 = createPatient(withActiveFalse()).getIdPartAsLong();
sleep1MS();
sleepUntilTimeChange();
// Execute

View File

@ -607,9 +607,9 @@ public abstract class BaseJpaTest extends BaseTest {
}
/**
* Sleep until at least 1 ms has elapsed
* Sleep until time change on the clocks
*/
public void sleep1MS() {
public void sleepUntilTimeChange() {
StopWatch sw = new StopWatch();
await().until(() -> sw.getMillis() > 0);
}

View File

@ -36,9 +36,12 @@ import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobStep2InputType;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobStep3InputType;
import ca.uhn.test.concurrency.PointcutLatch;
import jakarta.annotation.Nonnull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Nested;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -52,7 +55,7 @@ import java.util.concurrent.Callable;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -109,6 +112,10 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa
// re-enable our runner after every test (just in case)
myMaintenanceService.enableMaintenancePass(true);
// clear invocations on the batch sender from previous jobs that might be
// kicking around
Mockito.clearInvocations(myBatchJobSender);
}
@Nested
@ -226,13 +233,22 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa
myMaintenanceService.enableMaintenancePass(theToEnable);
}
public void disableWorkChunkMessageHandler() {
doNothing().when(myBatchJobSender).sendWorkChannelMessage(any(JobWorkNotification.class));
public PointcutLatch disableWorkChunkMessageHandler() {
PointcutLatch latch = new PointcutLatch(new Exception().getStackTrace()[0].getMethodName());
doAnswer(a -> {
latch.call(1);
return Void.class;
}).when(myBatchJobSender).sendWorkChannelMessage(any(JobWorkNotification.class));
return latch;
}
public void verifyWorkChunkMessageHandlerCalled(int theNumberOfTimes) {
public void verifyWorkChunkMessageHandlerCalled(PointcutLatch theSendingLatch, int theNumberOfTimes) throws InterruptedException {
theSendingLatch.awaitExpected();
ArgumentCaptor<JobWorkNotification> notificationCaptor = ArgumentCaptor.forClass(JobWorkNotification.class);
verify(myBatchJobSender, times(theNumberOfTimes))
.sendWorkChannelMessage(any(JobWorkNotification.class));
.sendWorkChannelMessage(notificationCaptor.capture());
}
public void createChunksInStates(JobMaintenanceStateInformation theJobMaintenanceStateInformation) {

View File

@ -1,26 +1,21 @@
package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation;
import ca.uhn.test.concurrency.PointcutLatch;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.lenient;
public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestConstants {
Logger ourLog = LoggerFactory.getLogger(IJobMaintenanceActions.class);
@Test
default void test_gatedJob_stepReady_advances() {
default void test_gatedJob_stepReady_advances() throws InterruptedException {
// setup
String initialState = """
# chunks ready - move to queued
@ -28,8 +23,10 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
2|READY,2|QUEUED
2|READY,2|QUEUED
""";
int numToTransition = 2;
enableMaintenanceRunner(false);
disableWorkChunkMessageHandler();
PointcutLatch sendLatch = disableWorkChunkMessageHandler();
sendLatch.setExpectedCount(numToTransition);
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(initialState, true);
createChunksInStates(result);
@ -37,8 +34,8 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
runMaintenancePass();
// verify
verifyWorkChunkMessageHandlerCalled(sendLatch, numToTransition);
verifyWorkChunkFinalStates(result);
verifyWorkChunkMessageHandlerCalled(2);
}
@ParameterizedTest
@ -102,10 +99,11 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
3|GATED
"""
})
default void testGatedStep2NotReady_notAdvance(String theChunkState) {
default void testGatedStep2NotReady_notAdvance(String theChunkState) throws InterruptedException {
// setup
enableMaintenanceRunner(false);
disableWorkChunkMessageHandler();
PointcutLatch sendingLatch = disableWorkChunkMessageHandler();
sendingLatch.setExpectedCount(0);
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true);
createChunksInStates(result);
@ -114,11 +112,12 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
runMaintenancePass();
// verify
verifyWorkChunkFinalStates(result);
// nothing ever queued -> nothing ever sent to queue
verifyWorkChunkMessageHandlerCalled(0);
verifyWorkChunkMessageHandlerCalled(sendingLatch, 0);
verifyWorkChunkFinalStates(result);
}
@Disabled
@ParameterizedTest
@ValueSource(strings = {
"""
@ -144,10 +143,10 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
3|QUEUED|READY
"""
})
default void testGatedStep2ReadyToAdvance_advanceToStep3(String theChunkState) {
default void testGatedStep2ReadyToAdvance_advanceToStep3(String theChunkState) throws InterruptedException {
// setup
enableMaintenanceRunner(false);
disableWorkChunkMessageHandler();
PointcutLatch sendingLatch = disableWorkChunkMessageHandler();
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true);
createChunksInStates(result);
@ -155,13 +154,13 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
runMaintenancePass();
// verify
verifyWorkChunkFinalStates(result);
// things are being set to READY; is anything being queued?
verifyWorkChunkMessageHandlerCalled(0);
verifyWorkChunkMessageHandlerCalled(sendingLatch, 0);
verifyWorkChunkFinalStates(result);
}
@Test
default void test_ungatedJob_advancesSteps() {
default void test_ungatedJob_advancesSteps() throws InterruptedException {
// setup
String state = """
# READY chunks should transition; others should stay
@ -172,10 +171,12 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
2|IN_PROGRESS
3|IN_PROGRESS
""";
int expectedTransitions = 2;
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(state, false);
enableMaintenanceRunner(false);
disableWorkChunkMessageHandler();
PointcutLatch sendLatch = disableWorkChunkMessageHandler();
sendLatch.setExpectedCount(expectedTransitions);
createChunksInStates(result);
// TEST run job maintenance - force transition
@ -184,8 +185,8 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
runMaintenancePass();
// verify
verifyWorkChunkMessageHandlerCalled(sendLatch, expectedTransitions);
verifyWorkChunkFinalStates(result);
verifyWorkChunkMessageHandlerCalled(2);
}
private JobMaintenanceStateInformation setupGatedWorkChunkTransitionTest(String theChunkState, boolean theIsGated) {

View File

@ -7,6 +7,7 @@ import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters;
import ca.uhn.test.concurrency.PointcutLatch;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
@ -100,12 +101,18 @@ public interface IWorkChunkCommon extends WorkChunkTestConstants {
* so that we do not actually send messages to the queue;
* useful if mocking state transitions and we don't want to test
* dequeuing.
* Returns a latch that will fire each time a message is sent.
*/
default void disableWorkChunkMessageHandler() {
getTestManager().disableWorkChunkMessageHandler();
default PointcutLatch disableWorkChunkMessageHandler() {
return getTestManager().disableWorkChunkMessageHandler();
}
default void verifyWorkChunkMessageHandlerCalled(int theNumberOfTimes) {
getTestManager().verifyWorkChunkMessageHandlerCalled(theNumberOfTimes);
/**
*
* @param theSendingLatch the latch sent back from the disableWorkChunkMessageHandler method above
* @param theNumberOfTimes the number of invocations to expect
*/
default void verifyWorkChunkMessageHandlerCalled(PointcutLatch theSendingLatch, int theNumberOfTimes) throws InterruptedException {
getTestManager().verifyWorkChunkMessageHandlerCalled(theSendingLatch, theNumberOfTimes);
}
}

View File

@ -1,13 +1,20 @@
package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation;
import ca.uhn.test.concurrency.PointcutLatch;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkTestConstants {
Logger ourLog = LoggerFactory.getLogger(IWorkChunkStateTransitions.class);
@Test
default void chunkCreation_isQueued() {
String jobInstanceId = createAndStoreJobInstance(null);
@ -18,7 +25,9 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT
}
@Test
default void chunkReceived_queuedToInProgress() {
default void chunkReceived_queuedToInProgress() throws InterruptedException {
PointcutLatch sendLatch = disableWorkChunkMessageHandler();
sendLatch.setExpectedCount(1);
String jobInstanceId = createAndStoreJobInstance(null);
String myChunkId = createChunk(jobInstanceId);
@ -32,5 +41,48 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT
// verify the db was updated too
WorkChunk fetchedWorkChunk = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, fetchedWorkChunk.getStatus());
verifyWorkChunkMessageHandlerCalled(sendLatch, 1);
}
@Test
default void enqueueWorkChunkForProcessing_enqueuesOnlyREADYChunks() throws InterruptedException {
// setup
disableWorkChunkMessageHandler();
enableMaintenanceRunner(false);
StringBuilder sb = new StringBuilder();
// first step is always complete
sb.append("1|COMPLETED");
for (WorkChunkStatusEnum status : WorkChunkStatusEnum.values()) {
if (!sb.isEmpty()) {
sb.append("\n");
}
// second step for all other workchunks
sb.append("2|")
.append(status.name());
}
String state = sb.toString();
JobDefinition<?> jobDef = withJobDefinition(false);
String instanceId = createAndStoreJobInstance(jobDef);
JobMaintenanceStateInformation stateInformation = new JobMaintenanceStateInformation(
instanceId,
jobDef,
state
);
createChunksInStates(stateInformation);
// test
PointcutLatch latch = new PointcutLatch(new Exception().getStackTrace()[0].getMethodName());
latch.setExpectedCount(stateInformation.getInitialWorkChunks().size());
for (WorkChunk chunk : stateInformation.getInitialWorkChunks()) {
getSvc().enqueueWorkChunkForProcessing(chunk.getId(), updated -> {
// should not update non-ready chunks
ourLog.info("Enqueuing chunk with state {}; updated {}", chunk.getStatus().name(), updated);
int expected = chunk.getStatus() == WorkChunkStatusEnum.READY ? 1 : 0;
assertEquals(expected, updated);
latch.call(1);
});
}
latch.awaitExpected();
}
}

View File

@ -9,6 +9,7 @@ import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation;
import ca.uhn.test.concurrency.PointcutLatch;
import com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.Test;
@ -54,10 +55,12 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
}
@Test
default void testNonGatedWorkChunkInReady_IsQueuedDuringMaintenance() {
default void testNonGatedWorkChunkInReady_IsQueuedDuringMaintenance() throws InterruptedException {
// setup
int expectedCalls = 1;
enableMaintenanceRunner(false);
disableWorkChunkMessageHandler();
PointcutLatch sendingLatch = disableWorkChunkMessageHandler();
sendingLatch.setExpectedCount(expectedCalls);
String state = "1|READY,1|QUEUED";
JobDefinition<?> jobDefinition = withJobDefinition(false);
String instanceId = createAndStoreJobInstance(jobDefinition);
@ -74,7 +77,7 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
// verify it's in QUEUED now
stateInformation.verifyFinalStates(getSvc());
verifyWorkChunkMessageHandlerCalled(1);
verifyWorkChunkMessageHandlerCalled(sendingLatch, expectedCalls);
}
@Test

View File

@ -25,6 +25,7 @@ import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkMetadata;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import com.google.common.annotations.VisibleForTesting;
@ -146,15 +147,13 @@ public interface IJobPersistence extends IWorkChunkPersistence {
Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId);
/**
* Fetch all WorkChunks for job with instance id theInstanceId that are in
* theWorkChunkStatuses.
* @param theInstanceId the instance id of the job
* @param theWorkChunkStatuses the statuses of interest
* @return a stream of work chunks
* Fetches an iterator that retrieves WorkChunkMetadata from the db.
* @param theInstanceId instance id of job of interest
* @param theStates states of interset
* @return an iterator for the workchunks
*/
@Transactional
Stream<WorkChunk> fetchAllWorkChunksForJobInStates(
String theInstanceId, Set<WorkChunkStatusEnum> theWorkChunkStatuses);
@Transactional(propagation = Propagation.SUPPORTS)
Iterator<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates(String theInstanceId, Set<WorkChunkStatusEnum> theStates);
/**
* Callback to update a JobInstance within a locked transaction.

View File

@ -29,7 +29,7 @@ import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkMetadata;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
@ -39,13 +39,12 @@ import ca.uhn.fhir.util.StopWatch;
import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
public class JobInstanceProcessor {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@ -299,17 +298,24 @@ public class JobInstanceProcessor {
// we need a transaction to access the stream of workchunks
// because workchunks are created in READY state, there's an unknown
// number of them (and so we could be reading many from the db)
TransactionStatus status = myTransactionManager.getTransaction(new DefaultTransactionDefinition());
Stream<WorkChunk> readyChunks = myJobPersistence.fetchAllWorkChunksForJobInStates(
theJobInstance.getInstanceId(), Set.of(WorkChunkStatusEnum.READY));
// TransactionStatus status = myTransactionManager.getTransaction(new DefaultTransactionDefinition());
Iterator<WorkChunkMetadata> readyChunkMetadata = myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(
theJobInstance.getInstanceId(), Set.of(WorkChunkStatusEnum.READY)
);
AtomicInteger counter = new AtomicInteger();
readyChunks.forEach(chunk -> {
ConcurrentHashMap<String, JobWorkCursor<?, ?, ?>> stepToWorkCursor = new ConcurrentHashMap<>();
while (readyChunkMetadata.hasNext()) {
WorkChunkMetadata metadata = readyChunkMetadata.next();
JobWorkCursor<?, ?, ?> jobWorkCursor =
JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, chunk.getTargetStepId());
stepToWorkCursor.computeIfAbsent(metadata.getTargetStepId(), (e) -> {
return JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, metadata.getTargetStepId());
});
counter.getAndIncrement();
if (!theIsGatedExecutionAdvancementBool
&& (theJobDefinition.isGatedExecution() || jobWorkCursor.isReductionStep())) {
&& (theJobDefinition.isGatedExecution() || jobWorkCursor.isReductionStep())) {
/*
* Gated executions are queued later when all work chunks are ready.
*
@ -327,10 +333,40 @@ public class JobInstanceProcessor {
* * flush changes
* * commit
*/
updateChunkAndSendToQueue(chunk, theJobInstance, theJobDefinition);
});
updateChunkAndSendToQueue(metadata);
}
myTransactionManager.commit(status);
// Stream<WorkChunk> readyChunks = myJobPersistence.fetchAllWorkChunksForJobInStates(
// theJobInstance.getInstanceId(), Set.of(WorkChunkStatusEnum.READY));
//
// AtomicInteger counter = new AtomicInteger();
// readyChunks.forEach(chunk -> {
// JobWorkCursor<?, ?, ?> jobWorkCursor =
// JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, chunk.getTargetStepId());
// counter.getAndIncrement();
// if (!theIsGatedExecutionAdvancementBool
// && (theJobDefinition.isGatedExecution() || jobWorkCursor.isReductionStep())) {
// /*
// * Gated executions are queued later when all work chunks are ready.
// *
// * Reduction steps are not submitted to the queue at all, but processed inline.
// * Currently all reduction steps are also gated, but this might not always
// * be true.
// */
// return;
// }
//
// /*
// * For each chunk id
// * * Move to QUEUE'd
// * * Send to topic
// * * flush changes
// * * commit
// */
// updateChunkAndSendToQueue(chunk, theJobInstance, theJobDefinition);
// });
// myTransactionManager.commit(status);
ourLog.debug(
"Encountered {} READY work chunks for job {}", counter.get(), theJobDefinition.getJobDefinitionId());
@ -345,34 +381,37 @@ public class JobInstanceProcessor {
*
* Returns true after processing.
*/
private void updateChunkAndSendToQueue(
WorkChunk theChunk, JobInstance theInstance, JobDefinition<?> theJobDefinition) {
private void updateChunkAndSendToQueue(WorkChunkMetadata theChunk) {
String chunkId = theChunk.getId();
myJobPersistence.enqueueWorkChunkForProcessing(chunkId, updated -> {
ourLog.info("Updated {} workchunk with id {}", updated, chunkId);
if (updated == 1) {
// send to the queue
// we use current step id because it has not been moved to the next step (yet)
JobWorkNotification workNotification = new JobWorkNotification(
theJobDefinition.getJobDefinitionId(),
theJobDefinition.getJobDefinitionVersion(),
theInstance.getInstanceId(),
theChunk.getTargetStepId(),
chunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
sendNotification(theChunk);
} else {
// means the work chunk is likely already gone...
// we'll log and skip it. If it's still in the DB, the next pass
// will pick it up. Otherwise, it's no longer important
ourLog.error(
"Job Instance {} failed to transition work chunk with id {} from READY to QUEUED; found {}, expected 1; skipping work chunk.",
theInstance.getInstanceId(),
theChunk.getInstanceId(),
theChunk.getId(),
updated);
}
});
}
private void sendNotification(WorkChunkMetadata theChunk) {
// send to the queue
// we use current step id because it has not been moved to the next step (yet)
JobWorkNotification workNotification = new JobWorkNotification(
theChunk.getJobDefinitionId(),
theChunk.getJobDefinitionVersion(),
theChunk.getInstanceId(),
theChunk.getTargetStepId(),
theChunk.getId());
myBatchJobSender.sendWorkChannelMessage(workNotification);
}
private void processChunksForNextGatedSteps(
JobInstance theInstance,
JobDefinition<?> theJobDefinition,

View File

@ -39,30 +39,8 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
*
* @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md
*/
public class WorkChunk implements IModelJson {
public class WorkChunk extends WorkChunkMetadata {
@JsonProperty("id")
private String myId;
@JsonProperty("sequence")
// TODO MB danger - these repeat with a job or even a single step. They start at 0 for every parent chunk. Review
// after merge.
private int mySequence;
@JsonProperty("status")
private WorkChunkStatusEnum myStatus;
@JsonProperty("jobDefinitionId")
private String myJobDefinitionId;
@JsonProperty("jobDefinitionVersion")
private int myJobDefinitionVersion;
@JsonProperty("targetStepId")
private String myTargetStepId;
@JsonProperty("instanceId")
private String myInstanceId;
@JsonProperty("data")
private String myData;
@ -106,6 +84,36 @@ public class WorkChunk implements IModelJson {
super();
}
public WorkChunk setId(String theId) {
super.setId(theId);
return this;
}
public WorkChunk setStatus(WorkChunkStatusEnum theStatus) {
super.setStatus(theStatus);
return this;
}
public WorkChunk setInstanceId(String theInstanceId) {
super.setInstanceId(theInstanceId);
return this;
}
public WorkChunk setTargetStepId(String theTargetStepId) {
super.setTargetStepId(theTargetStepId);
return this;
}
public WorkChunk setJobDefinitionVersion(int theJobDefinitionVersion) {
super.setJobDefinitionVersion(theJobDefinitionVersion);
return this;
}
public WorkChunk setJobDefinitionId(String theJobDefinitionId) {
super.setJobDefinitionId(theJobDefinitionId);
return this;
}
public int getErrorCount() {
return myErrorCount;
}
@ -142,45 +150,6 @@ public class WorkChunk implements IModelJson {
return this;
}
public WorkChunkStatusEnum getStatus() {
return myStatus;
}
public WorkChunk setStatus(WorkChunkStatusEnum theStatus) {
myStatus = theStatus;
return this;
}
public String getJobDefinitionId() {
return myJobDefinitionId;
}
public WorkChunk setJobDefinitionId(String theJobDefinitionId) {
Validate.notBlank(theJobDefinitionId);
myJobDefinitionId = theJobDefinitionId;
return this;
}
public int getJobDefinitionVersion() {
return myJobDefinitionVersion;
}
public WorkChunk setJobDefinitionVersion(int theJobDefinitionVersion) {
Validate.isTrue(theJobDefinitionVersion >= 1);
myJobDefinitionVersion = theJobDefinitionVersion;
return this;
}
public String getTargetStepId() {
return myTargetStepId;
}
public WorkChunk setTargetStepId(String theTargetStepId) {
Validate.notBlank(theTargetStepId);
myTargetStepId = theTargetStepId;
return this;
}
public String getData() {
return myData;
}
@ -199,33 +168,6 @@ public class WorkChunk implements IModelJson {
return JsonUtil.deserialize(getData(), theType);
}
public String getInstanceId() {
return myInstanceId;
}
public WorkChunk setInstanceId(String theInstanceId) {
myInstanceId = theInstanceId;
return this;
}
public String getId() {
return myId;
}
public WorkChunk setId(String theId) {
Validate.notBlank(theId);
myId = theId;
return this;
}
public int getSequence() {
return mySequence;
}
public void setSequence(int theSequence) {
mySequence = theSequence;
}
public Date getCreateTime() {
return myCreateTime;
}
@ -263,13 +205,13 @@ public class WorkChunk implements IModelJson {
@Override
public String toString() {
ToStringBuilder b = new ToStringBuilder(this);
b.append("Id", myId);
b.append("Sequence", mySequence);
b.append("Status", myStatus);
b.append("JobDefinitionId", myJobDefinitionId);
b.append("JobDefinitionVersion", myJobDefinitionVersion);
b.append("TargetStepId", myTargetStepId);
b.append("InstanceId", myInstanceId);
b.append("Id", getId());
b.append("Sequence", getSequence());
b.append("Status", getStatus());
b.append("JobDefinitionId", getJobDefinitionId());
b.append("JobDefinitionVersion", getJobDefinitionVersion());
b.append("TargetStepId", getTargetStepId());
b.append("InstanceId", getInstanceId());
b.append("Data", isNotBlank(myData) ? "(present)" : "(absent)");
b.append("CreateTime", myCreateTime);
b.append("StartTime", myStartTime);

View File

@ -0,0 +1,97 @@
package ca.uhn.fhir.batch2.model;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.Validate;
public class WorkChunkMetadata implements IModelJson {
@JsonProperty("id")
private String myId;
@JsonProperty("sequence")
// TODO MB danger - these repeat with a job or even a single step. They start at 0 for every parent chunk. Review
// after merge.
private int mySequence;
@JsonProperty("status")
private WorkChunkStatusEnum myStatus;
@JsonProperty("jobDefinitionId")
private String myJobDefinitionId;
@JsonProperty("jobDefinitionVersion")
private int myJobDefinitionVersion;
@JsonProperty("targetStepId")
private String myTargetStepId;
@JsonProperty("instanceId")
private String myInstanceId;
public WorkChunkStatusEnum getStatus() {
return myStatus;
}
public WorkChunkMetadata setStatus(WorkChunkStatusEnum theStatus) {
myStatus = theStatus;
return this;
}
public String getJobDefinitionId() {
return myJobDefinitionId;
}
public WorkChunkMetadata setJobDefinitionId(String theJobDefinitionId) {
Validate.notBlank(theJobDefinitionId);
myJobDefinitionId = theJobDefinitionId;
return this;
}
public int getJobDefinitionVersion() {
return myJobDefinitionVersion;
}
public WorkChunkMetadata setJobDefinitionVersion(int theJobDefinitionVersion) {
Validate.isTrue(theJobDefinitionVersion >= 1);
myJobDefinitionVersion = theJobDefinitionVersion;
return this;
}
public String getTargetStepId() {
return myTargetStepId;
}
public WorkChunkMetadata setTargetStepId(String theTargetStepId) {
Validate.notBlank(theTargetStepId);
myTargetStepId = theTargetStepId;
return this;
}
public String getInstanceId() {
return myInstanceId;
}
public WorkChunkMetadata setInstanceId(String theInstanceId) {
myInstanceId = theInstanceId;
return this;
}
public String getId() {
return myId;
}
public WorkChunkMetadata setId(String theId) {
Validate.notBlank(theId);
myId = theId;
return this;
}
public int getSequence() {
return mySequence;
}
public void setSequence(int theSequence) {
mySequence = theSequence;
}
}

View File

@ -15,6 +15,7 @@ import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkMetadata;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
@ -51,7 +52,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunkStep1;
import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunkStep2;
@ -131,6 +131,10 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
JobInstance instance = createInstance();
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(List.of(instance));
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance));
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(eq(INSTANCE_ID), any()))
.thenReturn(chunks.stream()
.map(c -> (WorkChunkMetadata) c)
.iterator());
mySvc.runMaintenancePass();
@ -173,8 +177,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false)))
.thenReturn(chunks.iterator());
when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Stream.empty());
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Collections.emptyIterator());
stubUpdateInstanceCallback(instance);
mySvc.runMaintenancePass();
@ -217,8 +221,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false)))
.thenReturn(chunks.iterator());
when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Stream.empty());
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Collections.emptyIterator());
stubUpdateInstanceCallback(instance);
// Execute
@ -254,10 +258,10 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
instance1.setCurrentGatedStepId(STEP_1);
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance1));
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance1));
when(myJobPersistence.fetchAllWorkChunksForJobInStates(anyString(), eq(Set.of(WorkChunkStatusEnum.READY))))
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(anyString(), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenAnswer((args) -> {
// new stream every time
return new ArrayList<>(chunks).stream();
// new iterator every time (called more than once)
return new ArrayList<>(chunks).iterator();
});
doAnswer(a -> {
Consumer<Integer> callback = a.getArgument(1);
@ -289,8 +293,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
instance.setEndTime(parseTime("2001-01-01T12:12:12Z"));
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance));
when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Stream.empty());
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Collections.emptyIterator());
mySvc.runMaintenancePass();
@ -314,8 +318,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean())).thenAnswer(t->chunks.iterator());
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance));
when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Stream.empty());
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Collections.emptyIterator());
stubUpdateInstanceCallback(instance);
// Execute
@ -358,8 +362,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean()))
.thenAnswer(t->chunks.iterator());
when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Stream.empty());
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Collections.emptyIterator());
stubUpdateInstanceCallback(instance);
mySvc.runMaintenancePass();
@ -390,8 +394,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
// mocks
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(instance));
when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenAnswer(t -> theChunks.stream());
when(myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenAnswer(t -> theChunks.iterator());
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean()))
.thenAnswer(t -> theChunks.stream().map(c -> c.setStatus(WorkChunkStatusEnum.READY)).toList().iterator());
@ -406,10 +410,19 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
createWorkChunkStep3().setStatus(WorkChunkStatusEnum.READY),
createWorkChunkStep3().setStatus(WorkChunkStatusEnum.READY)
);
List<WorkChunk> previousChunks = List.of(
createWorkChunkStep2().setStatus(WorkChunkStatusEnum.COMPLETED),
createWorkChunkStep2().setStatus(WorkChunkStatusEnum.COMPLETED)
);
String lastStepId = chunks.get(0).getTargetStepId();
String previousStepId = previousChunks.get(0).getTargetStepId();
// when
when(myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(anyString(), anyString()))
when(myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(eq(INSTANCE_ID), eq(lastStepId)))
.thenReturn(chunks.stream().map(WorkChunk::getStatus).collect(Collectors.toSet()));
when(myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(eq(INSTANCE_ID), eq(previousStepId)))
.thenReturn(previousChunks.stream().map(WorkChunk::getStatus).collect(Collectors.toSet()));
// test
runEnqueueReadyChunksTest(chunks, createJobDefinitionWithReduction());