fix fast-tracked job record count reporting (#3645)

This commit is contained in:
Ken Stevens 2022-05-25 18:53:45 -04:00 committed by GitHub
parent 4eb099b900
commit 66b3fa680b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 15 additions and 9 deletions

View File

@ -28,6 +28,8 @@ import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.maintenance.JobChunkProgressAccumulator;
import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
@ -101,9 +103,10 @@ public class JobStepExecutor<PT extends IModelJson, IT extends IModelJson, OT ex
// This job is defined to be gated, but so far every step has produced at most 1 work chunk, so it is
// eligible for fast tracking.
if (myCursor.isFinalStep()) {
if (updateInstanceStatus(jobInstance, StatusEnum.COMPLETED)) {
myJobPersistence.updateInstance(jobInstance);
}
// TODO KHS instance factory should set definition instead of setting it explicitly here and there
jobInstance.setJobDefinition(myDefinition);
JobInstanceProgressCalculator calculator = new JobInstanceProgressCalculator(myJobPersistence, jobInstance, new JobChunkProgressAccumulator());
calculator.calculateAndStoreInstanceProgress();
} else {
JobWorkNotification workNotification = new JobWorkNotification(jobInstance, myCursor.nextStep.getStepId(), ((JobDataSink<PT,IT,OT>) theDataSink).getOnlyChunkId());
myBatchJobSender.sendWorkChannelMessage(workNotification);

View File

@ -22,7 +22,7 @@ import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
* are found, so that they can be reused for maintenance jobs without
* needing to hit the database a second time.
*/
class JobChunkProgressAccumulator {
public class JobChunkProgressAccumulator {
private final Set<String> myConsumedInstanceAndChunkIds = new HashSet<>();
private final Multimap<String, ChunkStatusCountKey> myInstanceIdToChunkStatuses = ArrayListMultimap.create();

View File

@ -1,5 +1,6 @@
package ca.uhn.fhir.batch2.maintenance;
import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobInstance;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.batch2.maintenance;
package ca.uhn.fhir.batch2.progress;
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
import ca.uhn.fhir.batch2.api.JobCompletionDetails;

View File

@ -1,6 +1,8 @@
package ca.uhn.fhir.batch2.maintenance;
package ca.uhn.fhir.batch2.progress;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.maintenance.JobChunkProgressAccumulator;
import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
@ -9,18 +11,18 @@ import java.util.List;
import static ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor.updateInstanceStatus;
class JobInstanceProgressCalculator {
public class JobInstanceProgressCalculator {
private final IJobPersistence myJobPersistence;
private final JobInstance myInstance;
private final JobChunkProgressAccumulator myProgressAccumulator;
JobInstanceProgressCalculator(IJobPersistence theJobPersistence, JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) {
public JobInstanceProgressCalculator(IJobPersistence theJobPersistence, JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) {
myJobPersistence = theJobPersistence;
myInstance = theInstance;
myProgressAccumulator = theProgressAccumulator;
}
void calculateAndStoreInstanceProgress() {
public void calculateAndStoreInstanceProgress() {
InstanceProgress instanceProgress = new InstanceProgress();
for (int page = 0; ; page++) {