Merge branch '5745-ready-state-batch2' into 5767-add-poll-waiting-step

This commit is contained in:
leif stawnyczy 2024-03-15 09:45:40 -04:00
commit bb2a215d9f
33 changed files with 408 additions and 306 deletions

View File

@ -1,11 +1,9 @@
---
type: add
issue: 5745
title: "Added another work chunk state to Batch2 jobs: READY.
This work chunk state will be the initial state work chunks
are created. After which, they will be picked up by a maintenance
pass and have their states updated to QUEUED before sending them to
the queue.
title: "Added another state to the Batch2 work chunk state machine: READY.
This work chunk state will be the initial state on creation.
Once queued for deliver, they will transition to QUEUED.
The exception is for ReductionStep chunks (because reduction steps
are not read off of the queue, but executed by the maintenance job
inline.

View File

@ -47,7 +47,6 @@ stateDiagram-v2
title: Batch2 Job Work Chunk state transitions
---
stateDiagram-v2
[*]:
state READY
state QUEUED
state on_receive <<choice>>

View File

@ -24,12 +24,12 @@ The Batch Job Coordinator will then store two records in the database:
### The Maintenance Job
A Scheduled Job runs every so often (default once a minute), and does the following for each Job Instance in the database:
A Scheduled Job runs periodically (once a minute). For each Job Instance in the database, it:
1. Moves all `READY` work chunks into the `QUEUED` state and publishes a message to the Batch Notification Message Channel to inform worker threads that a work chunk is now ready for processing. \*
1. Calculates job progress (% of work chunks in `COMPLETE` status). If the job is finished, purges any left over work chunks still in the database.
1. Cleans up any complete, failed, or cancelled jobs that need to be removed.
1. Moves any gated jobs onto their next step.
1. Moves any gated jobs onto their next step when complete.
1. If the final step of a gated job is a reduction step, a reduction step execution will be triggered.
\* An exception is for the final reduction step, where work chunks are not published to the Batch Notification Message Channel,

View File

@ -54,7 +54,6 @@ import jakarta.persistence.LockModeType;
import jakarta.persistence.Query;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.Validate;
import org.hibernate.Session;
import org.slf4j.Logger;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
@ -65,7 +64,6 @@ import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.sql.Connection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
@ -129,6 +127,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
ourLog.trace(
"Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData());
myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> myWorkChunkRepository.save(entity));
return entity.getId();
}
@ -298,10 +297,6 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
int updated = myWorkChunkRepository.updateChunkStatus(
theChunkId, WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.READY);
theCallback.accept(updated);
if (updated == 1) {
myEntityManager.flush();
myEntityManager.unwrap(Session.class).doWork(Connection::commit);
}
}
@Override
@ -361,15 +356,15 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) {
myTransactionService
.withSystemRequestOnDefaultPartition()
.execute(() -> myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(
theEvent.getChunkId(),
new Date(),
theEvent.getRecordsProcessed(),
theEvent.getRecoveredErrorCount(),
WorkChunkStatusEnum.COMPLETED,
theEvent.getRecoveredWarningMessage()));
myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> {
myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(
theEvent.getChunkId(),
new Date(),
theEvent.getRecordsProcessed(),
theEvent.getRecoveredErrorCount(),
WorkChunkStatusEnum.COMPLETED,
theEvent.getRecoveredWarningMessage());
});
}
@Nullable
@ -401,7 +396,8 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId) {
if (getRunningJob(theInstanceId) == null) {
Batch2JobInstanceEntity jobInstanceEntity = getRunningJob(theInstanceId);
if (jobInstanceEntity == null) {
return false;
}
@ -413,7 +409,10 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
theInstanceId,
theCurrentStepId,
statusesForStep);
return statusesForStep.isEmpty() || statusesForStep.equals(Set.of(WorkChunkStatusEnum.COMPLETED));
return statusesForStep.isEmpty()
|| statusesForStep.equals(Set.of(WorkChunkStatusEnum.COMPLETED))
|| statusesForStep.equals(Set.of(WorkChunkStatusEnum.READY));
}
@Override

View File

@ -2215,28 +2215,28 @@ public class FhirResourceDaoDstu2Test extends BaseJpaDstu2Test {
p.addName().addFamily(methodName);
IIdType id1 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless();
sleepUntilTimeChanges();
sleep1MS();
p = new Patient();
p.addIdentifier().setSystem("urn:system2").setValue(methodName);
p.addName().addFamily(methodName);
IIdType id2 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless();
sleepUntilTimeChanges();
sleep1MS();
p = new Patient();
p.addIdentifier().setSystem("urn:system3").setValue(methodName);
p.addName().addFamily(methodName);
IIdType id3 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless();
sleepUntilTimeChanges();
sleep1MS();
p = new Patient();
p.addIdentifier().setSystem("urn:system4").setValue(methodName);
p.addName().addFamily(methodName);
IIdType id4 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless();
sleepUntilTimeChanges();
sleep1MS();
SearchParameterMap pm;
List<IIdType> actual;

View File

@ -249,15 +249,10 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
AtomicBoolean completionBool = new AtomicBoolean();
AtomicBoolean jobStatusBool = new AtomicBoolean();
myCompletionHandler = (params) -> {
// ensure our completion handler fires
// ensure our completion handler gets the right status
assertEquals(StatusEnum.COMPLETED, params.getInstance().getStatus());
completionBool.getAndSet(true);
if (StatusEnum.COMPLETED.equals(params.getInstance().getStatus())){
jobStatusBool.getAndSet(true);
}
};
buildAndDefine3StepReductionJob(jobId, new IReductionStepHandler() {
@ -305,7 +300,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myFirstStepLatch.awaitExpected();
assertNotNull(instanceId);
myBatch2JobHelper.awaitGatedStepId(FIRST_STEP_ID, instanceId);
myBatch2JobHelper.awaitGatedStepId(SECOND_STEP_ID, instanceId);
// wait for last step to finish
ourLog.info("Setting last step latch");
@ -313,17 +308,16 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
// waiting
myBatch2JobHelper.awaitJobCompletion(instanceId);
myLastStepLatch.awaitExpected();
ourLog.info("awaited the last step");
myLastStepLatch.awaitExpected();
// verify
Optional<JobInstance> instanceOp = myJobPersistence.fetchInstance(instanceId);
assertTrue(instanceOp.isPresent());
JobInstance jobInstance = instanceOp.get();
// ensure our completion handler fires with the up-to-date job instance
// ensure our completion handler fired
assertTrue(completionBool.get());
assertTrue(jobStatusBool.get());
assertEquals(StatusEnum.COMPLETED, jobInstance.getStatus());
assertEquals(1.0, jobInstance.getProgress());
@ -397,7 +391,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myFirstStepLatch.awaitExpected();
assertNotNull(instanceId);
myBatch2JobHelper.awaitGatedStepId(FIRST_STEP_ID, instanceId);
myBatch2JobHelper.awaitGatedStepId(SECOND_STEP_ID, instanceId);
// wait for last step to finish
ourLog.info("Setting last step latch");

View File

@ -11,6 +11,7 @@ import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.batch2.model.StatusEnum;
@ -23,6 +24,10 @@ import ca.uhn.fhir.jpa.test.config.Batch2FastSchedulerConfig;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.test.utilities.UnregisterScheduledProcessor;
import ca.uhn.fhir.testjob.TestJobDefinitionUtils;
import ca.uhn.fhir.testjob.models.FirstStepOutput;
import ca.uhn.fhir.testjob.models.ReductionStepOutput;
import ca.uhn.fhir.testjob.models.TestJobParameters;
import ca.uhn.test.concurrency.PointcutLatch;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nonnull;
@ -43,9 +48,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* The on-enter actions are defined in
* {@link ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater#handleStatusChange}
* {@link ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater#handleStatusChange(JobInstance)}}
* {@link ca.uhn.fhir.batch2.progress.InstanceProgress#updateStatus(JobInstance)}
* {@link JobInstanceProcessor#cleanupInstance()}
* {@link ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor#cleanupInstance()}
* For chunks:
* {@link ca.uhn.fhir.jpa.batch2.JpaJobPersistenceImpl#onWorkChunkCreate}
@ -59,9 +64,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class Batch2JobMaintenanceIT extends BaseJpaR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(Batch2JobMaintenanceIT.class);
public static final int TEST_JOB_VERSION = 1;
public static final String FIRST_STEP_ID = "first-step";
public static final String LAST_STEP_ID = "last-step";
@Autowired
JobDefinitionRegistry myJobDefinitionRegistry;
@Autowired
@ -89,6 +91,7 @@ public class Batch2JobMaintenanceIT extends BaseJpaR4Test {
@BeforeEach
public void before() {
myStorageSettings.setJobFastTrackingEnabled(true);
myCompletionHandler = details -> {};
myWorkChannel = (LinkedBlockingChannel) myChannelFactory.getOrCreateReceiver(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, new ChannelConsumerSettings());
JobMaintenanceServiceImpl jobMaintenanceService = (JobMaintenanceServiceImpl) myJobMaintenanceService;
@ -101,7 +104,6 @@ public class Batch2JobMaintenanceIT extends BaseJpaR4Test {
@AfterEach
public void after() {
myWorkChannel.clearInterceptorsForUnitTest();
myStorageSettings.setJobFastTrackingEnabled(true);
JobMaintenanceServiceImpl jobMaintenanceService = (JobMaintenanceServiceImpl) myJobMaintenanceService;
jobMaintenanceService.setMaintenanceJobStartedCallback(() -> {});
}
@ -125,6 +127,7 @@ public class Batch2JobMaintenanceIT extends BaseJpaR4Test {
myFirstStepLatch.setExpectedCount(1);
myLastStepLatch.setExpectedCount(1);
String batchJobId = myJobCoordinator.startInstance(new SystemRequestDetails(), request).getInstanceId();
myBatch2JobHelper.awaitJobHasStatus(batchJobId, StatusEnum.QUEUED);
myFirstStepLatch.awaitExpected();
@ -159,12 +162,12 @@ public class Batch2JobMaintenanceIT extends BaseJpaR4Test {
public void testFirstStepToSecondStepFasttrackingDisabled_singleChunkDoesNotFasttrack() throws InterruptedException {
myStorageSettings.setJobFastTrackingEnabled(false);
IJobStepWorker<Batch2JobMaintenanceIT.TestJobParameters, VoidModel, Batch2JobMaintenanceIT.FirstStepOutput> firstStep = (step, sink) -> {
sink.accept(new Batch2JobMaintenanceIT.FirstStepOutput());
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> firstStep = (step, sink) -> {
sink.accept(new FirstStepOutput());
callLatch(myFirstStepLatch, step);
return RunOutcome.SUCCESS;
};
IJobStepWorker<Batch2JobMaintenanceIT.TestJobParameters, Batch2JobMaintenanceIT.FirstStepOutput, VoidModel> lastStep = (step, sink) -> callLatch(myLastStepLatch, step);
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> callLatch(myLastStepLatch, step);
String jobDefId = new Exception().getStackTrace()[0].getMethodName();
@ -176,7 +179,7 @@ public class Batch2JobMaintenanceIT extends BaseJpaR4Test {
myFirstStepLatch.setExpectedCount(1);
myLastStepLatch.setExpectedCount(1);
String batchJobId = myJobCoordinator.startInstance(request).getInstanceId();
String batchJobId = myJobCoordinator.startInstance(new SystemRequestDetails(), request).getInstanceId();
myFirstStepLatch.awaitExpected();
myBatch2JobHelper.assertFastTracking(batchJobId);
@ -203,54 +206,19 @@ public class Batch2JobMaintenanceIT extends BaseJpaR4Test {
@Nonnull
private JobDefinition<? extends IModelJson> buildGatedJobDefinition(String theJobId, IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> theFirstStep, IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> theLastStep) {
return JobDefinition.newBuilder()
.setJobDefinitionId(theJobId)
.setJobDescription("test job")
.setJobDefinitionVersion(TEST_JOB_VERSION)
.setParametersType(TestJobParameters.class)
.gatedExecution()
.addFirstStep(
FIRST_STEP_ID,
"Test first step",
FirstStepOutput.class,
theFirstStep
)
.addLastStep(
LAST_STEP_ID,
"Test last step",
theLastStep
)
.completionHandler(myCompletionHandler)
.build();
return TestJobDefinitionUtils.buildGatedJobDefinition(
theJobId,
theFirstStep,
theLastStep,
myCompletionHandler
);
}
static class TestJobParameters implements IModelJson {
TestJobParameters() {
}
}
static class FirstStepOutput implements IModelJson {
FirstStepOutput() {
}
}
static class SecondStepOutput implements IModelJson {
@JsonProperty("test")
private String myTestValue;
SecondStepOutput() {
}
public void setValue(String theV) {
myTestValue = theV;
}
}
static class ReductionStepOutput implements IModelJson {
static class OurReductionStepOutput extends ReductionStepOutput {
@JsonProperty("result")
private List<?> myResult;
ReductionStepOutput(List<?> theResult) {
OurReductionStepOutput(List<?> theResult) {
myResult = theResult;
}
}

View File

@ -2,7 +2,10 @@ package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.JobOperationResultJson;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.jobs.imprt.NdJsonFileJson;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
@ -19,10 +22,13 @@ import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
import ca.uhn.fhir.testjob.TestJobDefinitionUtils;
import ca.uhn.fhir.testjob.models.FirstStepOutput;
import ca.uhn.fhir.util.JsonUtil;
import ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import jakarta.annotation.Nonnull;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
@ -36,7 +42,6 @@ import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.transaction.PlatformTransactionManager;
import jakarta.annotation.Nonnull;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
@ -64,9 +69,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
public static final String JOB_DEFINITION_ID = "definition-id";
public static final String TARGET_STEP_ID = "step-id";
public static final String TARGET_STEP_ID = TestJobDefinitionUtils.FIRST_STEP_ID;
public static final String DEF_CHUNK_ID = "definition-chunkId";
public static final String STEP_CHUNK_ID = "step-chunkId";
public static final String STEP_CHUNK_ID = TestJobDefinitionUtils.FIRST_STEP_ID;
public static final int JOB_DEF_VER = 1;
public static final int SEQUENCE_NUMBER = 1;
public static final String CHUNK_DATA = "{\"key\":\"value\"}";
@ -81,6 +86,9 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Autowired
public Batch2JobHelper myBatch2JobHelper;
@Autowired
public JobDefinitionRegistry myJobDefinitionRegistry;
@Test
public void testDeleteInstance() {
// Setup
@ -104,7 +112,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
}
private String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) {
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData);
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, TestJobDefinitionUtils.TEST_JOB_VERSION, theTargetStepId, theInstanceId, theSequence, theSerializedData);
return mySvc.onWorkChunkCreate(batchWorkChunk);
}
@ -350,7 +358,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
Date updateTime = runInTransaction(() -> new Date(myJobInstanceRepository.findById(instanceId).orElseThrow().getUpdateTime().getTime()));
sleepUntilTimeChanges();
sleep1MS();
// Test
runInTransaction(() -> mySvc.updateInstanceUpdateTime(instanceId));
@ -367,11 +375,15 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testStoreAndFetchWorkChunk_NoData() {
JobInstance instance = createInstance();
JobInstance instance = createInstance(true);
String instanceId = mySvc.storeNewInstance(instance);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
assertNull(chunk.getData());
}
@ -410,7 +422,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertEquals(instanceId, workChunk.getInstanceId());
assertEquals(TARGET_STEP_ID, workChunk.getTargetStepId());
assertEquals(0, workChunk.getSequence());
assertEquals(WorkChunkStatusEnum.QUEUED, workChunk.getStatus());
assertEquals(WorkChunkStatusEnum.READY, workChunk.getStatus());
assertNotNull(workChunk.getCreateTime());
@ -445,13 +457,14 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testStoreAndFetchWorkChunk_WithData() {
JobInstance instance = createInstance();
JobInstance instance = createInstance(true);
String instanceId = mySvc.storeNewInstance(instance);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
assertNotNull(id);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
assertEquals(36, chunk.getInstanceId().length());
assertEquals(JOB_DEFINITION_ID, chunk.getJobDefinitionId());
@ -464,14 +477,14 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testMarkChunkAsCompleted_Success() {
JobInstance instance = createInstance();
JobInstance instance = createInstance(true);
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
sleepUntilTimeChanges();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
@ -483,7 +496,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertNotNull(chunk.getData());
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
sleepUntilTimeChanges();
sleep1MS();
mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkId, 50, 0));
runInTransaction(() -> {
@ -535,20 +548,20 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testMarkChunkAsCompleted_Error() {
JobInstance instance = createInstance();
JobInstance instance = createInstance(true);
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
String chunkId = storeWorkChunk(JOB_DEFINITION_ID, TestJobDefinitionUtils.FIRST_STEP_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
sleep1MS();
WorkChunkErrorEvent request = new WorkChunkErrorEvent(chunkId).setErrorMsg("This is an error message");
mySvc.onWorkChunkError(request);
@ -587,20 +600,20 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testMarkChunkAsCompleted_Fail() {
JobInstance instance = createInstance();
JobInstance instance = createInstance(true);
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
sleep1MS();
mySvc.onWorkChunkFailed(chunkId, "This is an error message");
runInTransaction(() -> {
@ -675,15 +688,38 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
.orElseThrow(IllegalArgumentException::new));
}
private JobInstance createInstance() {
return createInstance(false);
}
@Nonnull
private JobInstance createInstance() {
private JobInstance createInstance(boolean theCreateJobDefBool) {
JobInstance instance = new JobInstance();
instance.setJobDefinitionId(JOB_DEFINITION_ID);
instance.setStatus(StatusEnum.QUEUED);
instance.setJobDefinitionVersion(JOB_DEF_VER);
instance.setParameters(CHUNK_DATA);
instance.setReport("TEST");
if (theCreateJobDefBool) {
JobDefinition<?> jobDef = TestJobDefinitionUtils.buildGatedJobDefinition(
JOB_DEFINITION_ID,
(step, sink) -> {
sink.accept(new FirstStepOutput());
return RunOutcome.SUCCESS;
},
(step, sink) -> {
return RunOutcome.SUCCESS;
},
theDetails -> {
}
);
if (myJobDefinitionRegistry.getJobDefinition(jobDef.getJobDefinitionId(), jobDef.getJobDefinitionVersion()).isEmpty()) {
myJobDefinitionRegistry.addJobDefinition(jobDef);
}
}
return instance;
}

View File

@ -881,6 +881,7 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
options.setResourceTypes(Sets.newHashSet("Patient", "Observation", "CarePlan", "MedicationAdministration", "ServiceRequest"));
options.setExportStyle(BulkExportJobParameters.ExportStyle.PATIENT);
options.setOutputFormat(Constants.CT_FHIR_NDJSON);
verifyBulkExportResults(options, List.of("Patient/P1", carePlanId, medAdminId, sevReqId, obsSubId, obsPerId), Collections.emptyList());
}
@ -1079,7 +1080,6 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
String resourceType = file.getKey();
List<String> binaryIds = file.getValue();
for (var nextBinaryId : binaryIds) {
String nextBinaryIdPart = new IdType(nextBinaryId).getIdPart();
assertThat(nextBinaryIdPart, matchesPattern("[a-zA-Z0-9]{32}"));

View File

@ -93,7 +93,6 @@ import org.springframework.transaction.support.TransactionTemplate;
import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@ -582,13 +581,13 @@ public class FhirSystemDaoR4Test extends BaseJpaR4SystemTest {
p.addName().setFamily("family");
final IIdType id = myPatientDao.create(p, mySrd).getId().toUnqualified();
sleepUntilTimeChanges();
sleep1MS();
ValueSet vs = new ValueSet();
vs.setUrl("http://foo");
myValueSetDao.create(vs, mySrd);
sleepUntilTimeChanges();
sleep1MS();
ResourceTable entity = new TransactionTemplate(myTxManager).execute(t -> myEntityManager.find(ResourceTable.class, id.getIdPartAsLong()));
assertEquals(Long.valueOf(1), entity.getIndexStatus());

View File

@ -358,7 +358,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
myReindexTestHelper.createObservationWithAlleleExtension(Observation.ObservationStatus.FINAL);
}
sleepUntilTimeChanges();
sleep1MS();
myReindexTestHelper.createAlleleSearchParameter();
mySearchParamRegistry.forceRefresh();

View File

@ -30,22 +30,22 @@ public class ResourceReindexSvcImplTest extends BaseJpaR4Test {
// Setup
createPatient(withActiveFalse());
sleepUntilTimeChanges();
sleep1MS();
Date start = new Date();
Long id0 = createPatient(withActiveFalse()).getIdPartAsLong();
sleepUntilTimeChanges();
sleep1MS();
Long id1 = createPatient(withActiveFalse()).getIdPartAsLong();
sleepUntilTimeChanges();
sleep1MS();
Date beforeLastInRange = new Date();
sleepUntilTimeChanges();
sleep1MS();
Long id2 = createObservation(withObservationCode("http://foo", "bar")).getIdPartAsLong();
sleepUntilTimeChanges();
sleep1MS();
Date end = new Date();
sleepUntilTimeChanges();
sleep1MS();
createPatient(withActiveFalse());
@ -103,26 +103,26 @@ public class ResourceReindexSvcImplTest extends BaseJpaR4Test {
// Setup
final Long patientId0 = createPatient(withActiveFalse()).getIdPartAsLong();
sleepUntilTimeChanges();
sleep1MS();
// Start of resources within range
Date start = new Date();
sleepUntilTimeChanges();
sleep1MS();
Long patientId1 = createPatient(withActiveFalse()).getIdPartAsLong();
createObservation(withObservationCode("http://foo", "bar"));
createObservation(withObservationCode("http://foo", "bar"));
sleepUntilTimeChanges();
sleep1MS();
Date beforeLastInRange = new Date();
sleepUntilTimeChanges();
sleep1MS();
Long patientId2 = createPatient(withActiveFalse()).getIdPartAsLong();
sleepUntilTimeChanges();
sleep1MS();
Date end = new Date();
sleepUntilTimeChanges();
sleep1MS();
// End of resources within range
createObservation(withObservationCode("http://foo", "bar"));
final Long patientId3 = createPatient(withActiveFalse()).getIdPartAsLong();
sleepUntilTimeChanges();
sleep1MS();
// Execute

View File

@ -0,0 +1,50 @@
package ca.uhn.fhir.testjob;
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.testjob.models.FirstStepOutput;
import ca.uhn.fhir.testjob.models.TestJobParameters;
public class TestJobDefinitionUtils {
public static final int TEST_JOB_VERSION = 1;
public static final String FIRST_STEP_ID = "first-step";
public static final String LAST_STEP_ID = "last-step";
/**
* Creates a test job definition
* @param theJobId
* @param theFirstStep
* @param theLastStep
* @param theCompletionHandler
* @return
*/
public static JobDefinition<? extends IModelJson> buildGatedJobDefinition(
String theJobId,
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> theFirstStep,
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> theLastStep,
IJobCompletionHandler<TestJobParameters> theCompletionHandler) {
return JobDefinition.newBuilder()
.setJobDefinitionId(theJobId)
.setJobDescription("test job")
.setJobDefinitionVersion(TEST_JOB_VERSION)
.setParametersType(TestJobParameters.class)
.gatedExecution()
.addFirstStep(
FIRST_STEP_ID,
"Test first step",
FirstStepOutput.class,
theFirstStep
)
.addLastStep(
LAST_STEP_ID,
"Test last step",
theLastStep
)
.completionHandler(theCompletionHandler)
.build();
}
}

View File

@ -0,0 +1,6 @@
package ca.uhn.fhir.testjob.models;
import ca.uhn.fhir.model.api.IModelJson;
public class FirstStepOutput implements IModelJson {
}

View File

@ -0,0 +1,6 @@
package ca.uhn.fhir.testjob.models;
import ca.uhn.fhir.model.api.IModelJson;
public class ReductionStepOutput implements IModelJson {
}

View File

@ -0,0 +1,6 @@
package ca.uhn.fhir.testjob.models;
import ca.uhn.fhir.model.api.IModelJson;
public class TestJobParameters implements IModelJson {
}

View File

@ -609,7 +609,7 @@ public abstract class BaseJpaTest extends BaseTest {
/**
* Sleep until at least 1 ms has elapsed
*/
public void sleepUntilTimeChanges() {
public void sleep1MS() {
StopWatch sw = new StopWatch();
await().until(() -> sw.getMillis() > 0);
}

View File

@ -27,7 +27,6 @@ import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters;
@ -60,10 +59,10 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IInPro
private JobDefinitionRegistry myJobDefinitionRegistry;
@Autowired
private IHapiTransactionService myTransactionService;
private PlatformTransactionManager myTransactionManager;
public IHapiTransactionService getTransactionManager() {
return myTransactionService;
public PlatformTransactionManager getTransactionManager() {
return myTransactionManager;
}
public IJobPersistence getSvc() {

View File

@ -6,6 +6,7 @@ import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
public interface IWorkChunkCommon extends WorkChunkTestConstants {
@ -31,7 +32,7 @@ public interface IWorkChunkCommon extends WorkChunkTestConstants {
void runMaintenancePass();
IHapiTransactionService getTransactionManager();
PlatformTransactionManager getTransactionManager();
IJobPersistence getSvc();

View File

@ -63,7 +63,6 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(chunkId).getStatus()));
sleepUntilTimeChanges();
WorkChunk chunk = getSvc().onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());

View File

@ -118,6 +118,7 @@ public interface IJobPersistence extends IWorkChunkPersistence {
Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest);
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
@Deprecated
boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId);
/**

View File

@ -41,6 +41,7 @@ import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
public abstract class BaseBatch2Config {
@ -104,7 +105,7 @@ public abstract class BaseBatch2Config {
BatchJobSender theBatchJobSender,
WorkChunkProcessor theExecutor,
IReductionStepExecutorService theReductionStepExecutorService,
IHapiTransactionService theTransactionService) {
PlatformTransactionManager theTransactionManager) {
return new JobMaintenanceServiceImpl(
theSchedulerService,
myPersistence,
@ -113,7 +114,7 @@ public abstract class BaseBatch2Config {
theBatchJobSender,
theExecutor,
theReductionStepExecutorService,
theTransactionService);
theTransactionManager);
}
@Bean

View File

@ -77,8 +77,6 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
String dataValueString = JsonUtil.serialize(dataValue, false);
// once finished, create workchunks in READY state
// the JobMaintenanceServiceImpl will transition these to
// QUEUED when necessary
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(
myJobDefinitionId, myJobDefinitionVersion, targetStepId, instanceId, sequence, dataValueString);
String chunkId = myHapiTransactionService

View File

@ -314,10 +314,10 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
ReductionStepChunkProcessingResponse theResponseObject,
JobWorkCursor<PT, IT, OT> theJobWorkCursor) {
if (!theChunk.getStatus().isIncomplete()) {
if (theChunk.getStatus() != WorkChunkStatusEnum.READY) {
// This should never happen since jobs with reduction are required to be gated
ourLog.error(
"Unexpected chunk {} with status {} found while reducing {}. No chunks feeding into a reduction step should be complete.",
"Unexpected chunk {} with status {} found while reducing {}. No chunks feeding into a reduction step should be in a state other than READY.",
theChunk.getId(),
theChunk.getStatus(),
theInstance);

View File

@ -32,17 +32,18 @@ import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch;
import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
public class JobInstanceProcessor {
@ -58,7 +59,7 @@ public class JobInstanceProcessor {
private final String myInstanceId;
private final JobDefinitionRegistry myJobDefinitionegistry;
private final IHapiTransactionService myTransactionService;
private final PlatformTransactionManager myTransactionManager;
public JobInstanceProcessor(
IJobPersistence theJobPersistence,
@ -67,7 +68,7 @@ public class JobInstanceProcessor {
JobChunkProgressAccumulator theProgressAccumulator,
IReductionStepExecutorService theReductionStepExecutorService,
JobDefinitionRegistry theJobDefinitionRegistry,
IHapiTransactionService theTransactionService) {
PlatformTransactionManager theTransactionManager) {
myJobPersistence = theJobPersistence;
myBatchJobSender = theBatchJobSender;
myInstanceId = theInstanceId;
@ -78,7 +79,7 @@ public class JobInstanceProcessor {
new JobInstanceProgressCalculator(theJobPersistence, theProgressAccumulator, theJobDefinitionRegistry);
myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry);
myTransactionService = theTransactionService;
myTransactionManager = theTransactionManager;
}
public void process() {
@ -99,7 +100,7 @@ public class JobInstanceProcessor {
JobDefinition<? extends IModelJson> jobDefinition =
myJobDefinitionegistry.getJobDefinitionOrThrowException(theInstance);
enqueueReadyChunks(theInstance, jobDefinition);
enqueueReadyChunks(theInstance, jobDefinition, false);
cleanupInstance(theInstance);
triggerGatedExecutions(theInstance, jobDefinition);
@ -173,7 +174,9 @@ public class JobInstanceProcessor {
}
private void triggerGatedExecutions(JobInstance theInstance, JobDefinition<?> theJobDefinition) {
if (!theInstance.isRunning()) {
// QUEUE'd jobs that are gated need to start; this step will do that
if (!theInstance.isRunning()
&& (theInstance.getStatus() != StatusEnum.QUEUED && theJobDefinition.isGatedExecution())) {
ourLog.debug(
"JobInstance {} is not in a \"running\" state. Status {}",
theInstance.getInstanceId(),
@ -188,40 +191,74 @@ public class JobInstanceProcessor {
JobWorkCursor<?, ?, ?> jobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(
theJobDefinition, theInstance.getCurrentGatedStepId());
// final step
if (jobWorkCursor.isFinalStep() && !jobWorkCursor.isReductionStep()) {
ourLog.debug("Job instance {} is in final step and it's not a reducer step", theInstance.getInstanceId());
return;
}
String instanceId = theInstance.getInstanceId();
String currentStepId = jobWorkCursor.getCurrentStepId();
boolean canAdvance = myJobPersistence.canAdvanceInstanceToNextStep(instanceId, currentStepId);
boolean canAdvance = canAdvanceGatedJob(theJobDefinition, theInstance, currentStepId);
if (canAdvance) {
String nextStepId = jobWorkCursor.nextStep.getStepId();
ourLog.info(
"All processing is complete for gated execution of instance {} step {}. Proceeding to step {}",
instanceId,
currentStepId,
nextStepId);
if (jobWorkCursor.nextStep.isReductionStep()) {
if (jobWorkCursor.isReductionStep()) {
// current step is the reduction step (all reduction steps are final)
JobWorkCursor<?, ?, ?> nextJobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(
jobWorkCursor.getJobDefinition(), jobWorkCursor.nextStep.getStepId());
jobWorkCursor.getJobDefinition(), jobWorkCursor.getCurrentStepId());
myReductionStepExecutorService.triggerReductionStep(instanceId, nextJobWorkCursor);
} else if (jobWorkCursor.isFinalStep()) {
// current step is the final step in a non-reduction gated job
processChunksForNextGatedSteps(
theInstance, theJobDefinition, jobWorkCursor, jobWorkCursor.getCurrentStepId());
} else {
// all other gated job steps
String nextStepId = jobWorkCursor.nextStep.getStepId();
ourLog.info(
"All processing is complete for gated execution of instance {} step {}. Proceeding to step {}",
instanceId,
currentStepId,
nextStepId);
// otherwise, continue processing as expected
processChunksForNextSteps(theInstance, nextStepId);
processChunksForNextGatedSteps(theInstance, theJobDefinition, jobWorkCursor, nextStepId);
}
} else {
String stepId = jobWorkCursor.nextStep != null
? jobWorkCursor.nextStep.getStepId()
: jobWorkCursor.getCurrentStepId();
ourLog.debug(
"Not ready to advance gated execution of instance {} from step {} to {}.",
instanceId,
currentStepId,
jobWorkCursor.nextStep.getStepId());
stepId);
}
}
private boolean canAdvanceGatedJob(JobDefinition<?> theJobDefinition, JobInstance theInstance, String theStepId) {
// make sure our instance still exists
if (myJobPersistence.fetchInstance(theInstance.getInstanceId()).isEmpty()) {
// no more job
return false;
}
Set<WorkChunkStatusEnum> workChunkStatuses =
myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(theInstance.getInstanceId(), theStepId);
if (workChunkStatuses.isEmpty()) {
// no work chunks = no output
// trivial to advance to next step
return true;
}
if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) {
// all previous workchunks are complete;
// none in READY though -> still proceed
return true;
}
if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.READY))) {
// all workchunks ready -> proceed
return true;
}
// anything else
return false;
}
/**
* Chunks are initially created in READY state.
* We will move READY chunks to QUEUE'd and send them to the queue/topic (kafka)
@ -232,25 +269,46 @@ public class JobInstanceProcessor {
* we'd need a new GATE_WAITING state to move chunks to to prevent jobs from
* completing prematurely.
*/
private void enqueueReadyChunks(JobInstance theJobInstance, JobDefinition<?> theJobDefinition) {
private void enqueueReadyChunks(
JobInstance theJobInstance, JobDefinition<?> theJobDefinition, boolean theIsGatedExecutionAdvancementBool) {
// we need a transaction to access the stream of workchunks
// because workchunks are created in READY state, there's an unknown
// number of them (and so we could be reading many from the db)
getTxBuilder().withPropagation(Propagation.REQUIRES_NEW).execute(() -> {
Stream<WorkChunk> readyChunks = myJobPersistence.fetchAllWorkChunksForJobInStates(
theJobInstance.getInstanceId(), Set.of(WorkChunkStatusEnum.READY));
TransactionStatus status = myTransactionManager.getTransaction(new DefaultTransactionDefinition());
Stream<WorkChunk> readyChunks = myJobPersistence.fetchAllWorkChunksForJobInStates(
theJobInstance.getInstanceId(), Set.of(WorkChunkStatusEnum.READY));
readyChunks.forEach(chunk -> {
AtomicInteger counter = new AtomicInteger();
readyChunks.forEach(chunk -> {
JobWorkCursor<?, ?, ?> jobWorkCursor =
JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, chunk.getTargetStepId());
counter.getAndIncrement();
if (!theIsGatedExecutionAdvancementBool
&& (theJobDefinition.isGatedExecution() || jobWorkCursor.isReductionStep())) {
/*
* For each chunk id
* * Move to QUEUE'd
* * Send to topic
* * flush changes
* * commit
* Gated executions are queued later when all work chunks are ready.
*
* Reduction steps are not submitted to the queue at all, but processed inline.
* Currently all reduction steps are also gated, but this might not always
* be true.
*/
updateChunkAndSendToQueue(chunk, theJobInstance, theJobDefinition);
});
return;
}
/*
* For each chunk id
* * Move to QUEUE'd
* * Send to topic
* * flush changes
* * commit
*/
updateChunkAndSendToQueue(chunk, theJobInstance, theJobDefinition);
});
myTransactionManager.commit(status);
ourLog.debug(
"Encountered {} READY work chunks for job {}", counter.get(), theJobDefinition.getJobDefinitionId());
}
/**
@ -266,20 +324,8 @@ public class JobInstanceProcessor {
WorkChunk theChunk, JobInstance theInstance, JobDefinition<?> theJobDefinition) {
String chunkId = theChunk.getId();
myJobPersistence.enqueueWorkChunkForProcessing(chunkId, updated -> {
ourLog.info("Updated {} workchunk with id {}", updated, chunkId);
if (updated == 1) {
JobWorkCursor<?, ?, ?> jobWorkCursor =
JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, theChunk.getTargetStepId());
if (theJobDefinition.isGatedExecution()
&& jobWorkCursor.isFinalStep()
&& jobWorkCursor.isReductionStep()) {
// reduction steps are processed by
// ReductionStepExecutorServiceImpl
// which does not wait for steps off the queue but reads all the
// "QUEUE'd" chunks and processes them inline
return;
}
// send to the queue
// we use current step id because it has not been moved to the next step (yet)
JobWorkNotification workNotification = new JobWorkNotification(
@ -294,61 +340,53 @@ public class JobInstanceProcessor {
// we'll log and skip it. If it's still in the DB, the next pass
// will pick it up. Otherwise, it's no longer important
ourLog.error(
"Job Instance {} failed to transition work chunk with id {} from READY to QUEUED; skipping work chunk.",
"Job Instance {} failed to transition work chunk with id {} from READY to QUEUED; found {}, expected 1; skipping work chunk.",
theInstance.getInstanceId(),
theChunk.getId());
theChunk.getId(),
updated);
}
});
}
private IHapiTransactionService.IExecutionBuilder getTxBuilder() {
return myTransactionService.withSystemRequest().withRequestPartitionId(RequestPartitionId.allPartitions());
}
private void processChunksForNextSteps(JobInstance theInstance, String nextStepId) {
private void processChunksForNextGatedSteps(
JobInstance theInstance,
JobDefinition<?> theJobDefinition,
JobWorkCursor<?, ?, ?> theWorkCursor,
String nextStepId) {
String instanceId = theInstance.getInstanceId();
List<String> queuedChunksForNextStep =
myProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, WorkChunkStatusEnum.QUEUED);
List<String> readyChunksForNextStep =
myProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, WorkChunkStatusEnum.READY);
int totalChunksForNextStep = myProgressAccumulator.getTotalChunkCountForInstanceAndStep(instanceId, nextStepId);
if (totalChunksForNextStep != queuedChunksForNextStep.size()) {
if (totalChunksForNextStep != readyChunksForNextStep.size()) {
ourLog.debug(
"Total ProgressAccumulator QUEUED chunk count does not match QUEUED chunk size! [instanceId={}, stepId={}, totalChunks={}, queuedChunks={}]",
"Total ProgressAccumulator READY chunk count does not match READY chunk size! [instanceId={}, stepId={}, totalChunks={}, queuedChunks={}]",
instanceId,
nextStepId,
totalChunksForNextStep,
queuedChunksForNextStep.size());
readyChunksForNextStep.size());
}
// Note on sequence: we don't have XA transactions, and are talking to two stores (JPA + Queue)
// Sequence: 1 - So we run the query to minimize the work overlapping.
List<String> chunksToSubmit =
myJobPersistence.fetchAllChunkIdsForStepWithStatus(instanceId, nextStepId, WorkChunkStatusEnum.QUEUED);
// Sequence: 2 - update the job step so the workers will process them.
boolean changed = myJobPersistence.updateInstance(instanceId, instance -> {
if (instance.getCurrentGatedStepId().equals(nextStepId)) {
// someone else beat us here. No changes
return false;
}
instance.setCurrentGatedStepId(nextStepId);
return true;
});
// update the job step so the workers will process them.
// if it's the last (gated) step, there will be no change - but we should
// queue up the chunks anyways
boolean changed = theWorkCursor.isFinalStep()
|| myJobPersistence.updateInstance(instanceId, instance -> {
if (instance.getCurrentGatedStepId().equals(nextStepId)) {
// someone else beat us here. No changes
return false;
}
instance.setCurrentGatedStepId(nextStepId);
return true;
});
if (!changed) {
// we collided with another maintenance job.
ourLog.warn("Skipping gate advance to {} for instance {} - already advanced.", nextStepId, instanceId);
return;
}
// DESIGN GAP: if we die here, these chunks will never be queued.
// Need a WAITING stage before QUEUED for chunks, so we can catch them.
// Sequence: 3 - send the notifications
for (String nextChunkId : chunksToSubmit) {
JobWorkNotification workNotification = new JobWorkNotification(theInstance, nextStepId, nextChunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
}
ourLog.debug(
"Submitted a batch of chunks for processing. [chunkCount={}, instanceId={}, stepId={}]",
chunksToSubmit.size(),
instanceId,
nextStepId);
// because we now have all gated job chunks in READY state,
// we can enqueue them
enqueueReadyChunks(theInstance, theJobDefinition, true);
}
}

View File

@ -28,7 +28,6 @@ import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
@ -41,6 +40,7 @@ import org.apache.commons.lang3.time.DateUtils;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.HashSet;
import java.util.List;
@ -90,7 +90,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
private final JobDefinitionRegistry myJobDefinitionRegistry;
private final BatchJobSender myBatchJobSender;
private final WorkChunkProcessor myJobExecutorSvc;
private final IHapiTransactionService myTransactionService;
private final PlatformTransactionManager myTransactionManager;
private final Semaphore myRunMaintenanceSemaphore = new Semaphore(1);
@ -110,7 +110,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
@Nonnull BatchJobSender theBatchJobSender,
@Nonnull WorkChunkProcessor theExecutor,
@Nonnull IReductionStepExecutorService theReductionStepExecutorService,
IHapiTransactionService theTransactionService) {
PlatformTransactionManager theTransactionService) {
myStorageSettings = theStorageSettings;
myReductionStepExecutorService = theReductionStepExecutorService;
Validate.notNull(theSchedulerService);
@ -123,7 +123,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
myJobDefinitionRegistry = theJobDefinitionRegistry;
myBatchJobSender = theBatchJobSender;
myJobExecutorSvc = theExecutor;
myTransactionService = theTransactionService;
myTransactionManager = theTransactionService;
}
@Override
@ -237,7 +237,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
progressAccumulator,
myReductionStepExecutorService,
myJobDefinitionRegistry,
myTransactionService);
myTransactionManager);
ourLog.debug(
"Triggering maintenance process for instance {} in status {}",
instanceId,

View File

@ -145,6 +145,13 @@ public class JobDefinition<PT extends IModelJson> {
return myGatedExecution;
}
public JobDefinitionStep<?, ?, ?> getStepById(String theId) {
return getSteps().stream()
.filter(s -> s.getStepId().equals(theId))
.findFirst()
.orElse(null);
}
public boolean isLastStepReduction() {
int stepCount = getSteps().size();
return stepCount >= 1 && getSteps().get(stepCount - 1).isReductionStep();

View File

@ -73,6 +73,7 @@ public class InstanceProgress {
statusToCountMap.put(theChunk.getStatus(), statusToCountMap.getOrDefault(theChunk.getStatus(), 0) + 1);
switch (theChunk.getStatus()) {
case READY:
case QUEUED:
case IN_PROGRESS:
myIncompleteChunkCount++;

View File

@ -98,6 +98,7 @@ public class JobInstanceProgressCalculator {
while (workChunkIterator.hasNext()) {
WorkChunk next = workChunkIterator.next();
// global stats
myProgressAccumulator.addChunk(next);
// instance stats

View File

@ -55,6 +55,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@ -183,7 +184,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
.thenReturn(Arrays.asList(existingInProgInstance));
// test
Batch2JobStartResponse startResponse = mySvc.startInstance(startRequest);
Batch2JobStartResponse startResponse = mySvc.startInstance(new SystemRequestDetails(), startRequest);
// verify
assertEquals(inProgressInstanceId, startResponse.getInstanceId()); // make sure it's the completed one
@ -476,6 +477,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
assertSame(jobDefinition, myJobDefinitionCaptor.getValue());
assertEquals(startRequest.getParameters(), myParametersJsonCaptor.getValue());
verify(myBatchJobSender, never()).sendWorkChannelMessage(any());
verifyNoMoreInteractions(myJobInstancePersister);
verifyNoMoreInteractions(myStep1Worker);
verifyNoMoreInteractions(myStep2Worker);

View File

@ -35,6 +35,8 @@ import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -111,14 +113,7 @@ class JobDataSinkTest {
// theDataSink.accept(output) called by firstStepWorker above calls two services. Let's validate them both.
verify(myBatchJobSender).sendWorkChannelMessage(myJobWorkNotificationCaptor.capture());
JobWorkNotification notification = myJobWorkNotificationCaptor.getValue();
assertEquals(JOB_DEF_ID, notification.getJobDefinitionId());
assertEquals(JOB_INSTANCE_ID, notification.getInstanceId());
assertEquals(CHUNK_ID, notification.getChunkId());
assertEquals(JOB_DEF_VERSION, notification.getJobDefinitionVersion());
assertEquals(LAST_STEP_ID, notification.getTargetStepId());
verify(myBatchJobSender, never()).sendWorkChannelMessage(any());
WorkChunkCreateEvent batchWorkChunk = myBatchWorkChunkCaptor.getValue();
assertEquals(JOB_DEF_VERSION, batchWorkChunk.jobDefinitionVersion);

View File

@ -488,7 +488,7 @@ public class WorkChunkProcessorTest {
WorkChunk chunk = new WorkChunk();
chunk.setInstanceId(INSTANCE_ID);
chunk.setId(theId);
chunk.setStatus(WorkChunkStatusEnum.QUEUED);
chunk.setStatus(WorkChunkStatusEnum.READY);
chunk.setData(JsonUtil.serialize(
new StepInputData()
));

View File

@ -17,8 +17,6 @@ import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.test.util.LogbackCaptureTestExtension;
@ -26,8 +24,6 @@ import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import com.google.common.collect.Lists;
import jakarta.persistence.EntityManager;
import org.hibernate.Session;
import org.hl7.fhir.r4.model.DateTimeType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -40,8 +36,9 @@ import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
@ -54,6 +51,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunkStep1;
import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunkStep2;
@ -70,8 +68,6 @@ import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -81,21 +77,6 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
private static class TestHapiTransactionservice extends HapiTransactionService {
@Override
protected <T> T doExecute(ExecutionBuilder theExecutionBuilder, TransactionCallback<T> theCallback) {
return overrideExecute(theCallback);
}
/**
* Override method for testing purposes (if needed)
*/
public <T> T overrideExecute(TransactionCallback<T> theCallback) {
return null;
}
}
@RegisterExtension
LogbackCaptureTestExtension myLogCapture = new LogbackCaptureTestExtension((Logger) LoggerFactory.getLogger("ca.uhn.fhir.log.batch_troubleshooting"), Level.WARN);
@Mock
@ -112,8 +93,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
private JobDefinitionRegistry myJobDefinitionRegistry;
@Mock
private IChannelProducer myWorkChannelProducer;
@Spy
private IHapiTransactionService myTransactionService = new TestHapiTransactionservice();
@Mock
private PlatformTransactionManager myTransactionService;
@Captor
private ArgumentCaptor<Message<JobWorkNotification>> myMessageCaptor;
@Captor
@ -192,6 +173,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false)))
.thenReturn(chunks.iterator());
when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Stream.empty());
stubUpdateInstanceCallback(instance);
mySvc.runMaintenancePass();
@ -234,6 +217,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false)))
.thenReturn(chunks.iterator());
when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Stream.empty());
stubUpdateInstanceCallback(instance);
// Execute
@ -255,23 +240,30 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
public void testInProgress_GatedExecution_FirstStepComplete() {
// Setup
List<WorkChunk> chunks = Arrays.asList(
JobCoordinatorImplTest.createWorkChunkStep1().setStatus(WorkChunkStatusEnum.COMPLETED).setId(CHUNK_ID + "abc"),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.QUEUED).setId(CHUNK_ID),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.QUEUED).setId(CHUNK_ID_2)
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.READY).setId(CHUNK_ID),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.READY).setId(CHUNK_ID_2)
);
when (myJobPersistence.canAdvanceInstanceToNextStep(any(), any())).thenReturn(true);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition(JobDefinition.Builder::gatedExecution));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false)))
.thenReturn(chunks.iterator());
when(myJobPersistence.fetchAllChunkIdsForStepWithStatus(eq(INSTANCE_ID), eq(STEP_2), eq(WorkChunkStatusEnum.QUEUED)))
.thenReturn(chunks.stream().filter(c->c.getTargetStepId().equals(STEP_2)).map(WorkChunk::getId).collect(Collectors.toList()));
when(myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(anyString(), anyString()))
.thenReturn(chunks.stream().map(WorkChunk::getStatus).collect(Collectors.toSet()));
JobInstance instance1 = createInstance();
instance1.setCurrentGatedStepId(STEP_1);
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance1));
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance1));
when(myJobPersistence.fetchAllWorkChunksForJobInStates(anyString(), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenAnswer((args) -> {
// new stream every time
return new ArrayList<>(chunks).stream();
});
doAnswer(a -> {
Consumer<Integer> callback = a.getArgument(1);
callback.accept(1);
return null;
}).when(myJobPersistence).enqueueWorkChunkForProcessing(anyString(), any());
stubUpdateInstanceCallback(instance1);
// Execute
@ -297,6 +289,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
instance.setEndTime(parseTime("2001-01-01T12:12:12Z"));
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance));
when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Stream.empty());
mySvc.runMaintenancePass();
@ -320,6 +314,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean())).thenAnswer(t->chunks.iterator());
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance));
when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Stream.empty());
stubUpdateInstanceCallback(instance);
// Execute
@ -362,6 +358,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean()))
.thenAnswer(t->chunks.iterator());
when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Stream.empty());
stubUpdateInstanceCallback(instance);
mySvc.runMaintenancePass();
@ -383,31 +381,26 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
private void runEnqueueReadyChunksTest(List<WorkChunk> theChunks, JobDefinition<TestJobParameters> theJobDefinition) {
myJobDefinitionRegistry.addJobDefinition(theJobDefinition);
JobInstance instance = createInstance();
// we'll set the instance to the first step id
theChunks.stream().findFirst().ifPresent(c -> {
instance.setCurrentGatedStepId(c.getTargetStepId());
});
instance.setJobDefinitionId(theJobDefinition.getJobDefinitionId());
Session sessionContract = mock(Session.class);
// mocks
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(instance));
when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenAnswer(t -> theChunks.stream());
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean()))
.thenAnswer(t -> theChunks.stream().map(c -> c.setStatus(WorkChunkStatusEnum.QUEUED)).toList().iterator());
// we just need it to fire, so we'll fire it manually
when(((TestHapiTransactionservice)myTransactionService).overrideExecute(any()))
.thenAnswer(args -> {
TransactionCallback<?> callback = args.getArgument(0);
callback.doInTransaction(null);
return null;
});
.thenAnswer(t -> theChunks.stream().map(c -> c.setStatus(WorkChunkStatusEnum.READY)).toList().iterator());
// test
mySvc.runMaintenancePass();
}
@Test
public void testMaintenancePass_withREADYworkChunksForReductionSteps_movedToQueueButNotPublished() {
public void testMaintenancePass_withREADYworkChunksForReductionSteps_notQueuedButProcessed() {
// setup
List<WorkChunk> chunks = List.of(
createWorkChunkStep3().setStatus(WorkChunkStatusEnum.READY),
@ -415,19 +408,17 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
);
// when
doAnswer(args -> {
Consumer<Integer> consumer = args.getArgument(1);
consumer.accept(1);
return 1;
}).when(myJobPersistence).enqueueWorkChunkForProcessing(anyString(), any());
when(myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(anyString(), anyString()))
.thenReturn(chunks.stream().map(WorkChunk::getStatus).collect(Collectors.toSet()));
// test
runEnqueueReadyChunksTest(chunks, createJobDefinitionWithReduction());
// verify
// saved, but not sent to the queue
verify(myJobPersistence, times(2)).enqueueWorkChunkForProcessing(anyString(), any());
// verify never updated (should remain in ready state)
verify(myJobPersistence, never()).enqueueWorkChunkForProcessing(anyString(), any());
verify(myWorkChannelProducer, never()).send(any());
verify(myReductionStepExecutorService)
.triggerReductionStep(anyString(), any());
}
@Test
@ -445,7 +436,6 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
return 1;
}).when(myJobPersistence).enqueueWorkChunkForProcessing(anyString(), any());
// test
runEnqueueReadyChunksTest(chunks, createJobDefinition());
@ -464,9 +454,11 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
public void testMaintenancePass_whenUpdateFails_skipsWorkChunkAndLogs() {
// setup
List<WorkChunk> chunks = List.of(
createWorkChunkStep3().setStatus(WorkChunkStatusEnum.READY),
createWorkChunkStep3().setStatus(WorkChunkStatusEnum.READY)
createWorkChunkStep2().setStatus(WorkChunkStatusEnum.READY),
createWorkChunkStep2().setStatus(WorkChunkStatusEnum.READY)
);
JobInstance instance = createInstance();
instance.setCurrentGatedStepId(STEP_2);
myLogCapture.setUp(Level.ERROR);
@ -476,6 +468,12 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
consumer.accept(0); // nothing processed
return 1;
}).when(myJobPersistence).enqueueWorkChunkForProcessing(anyString(), any());
doAnswer(args -> {
IJobPersistence.JobInstanceUpdateCallback callback = args.getArgument(1);
callback.doUpdate(instance);
return true;
}).when(myJobPersistence).updateInstance(any(), any());
// test
runEnqueueReadyChunksTest(chunks, createJobDefinitionWithReduction());