fix batch2 reduction step (#4499)

* fix bug where FINALIZE jobs are not cancellable

* moved reduction step to message hander

* moving reduction step to queue

* addingchangelog

* cleaning up

* review fixes

* review fix'

---------

Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-mbp.home>
This commit is contained in:
TipzCM 2023-02-03 14:00:29 -05:00 committed by GitHub
parent 7d554d56fd
commit 0996124778
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 253 additions and 106 deletions

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 4491
title: "Batch2 Jobs in the FINALIZE state can now be
cancelled."

View File

@ -0,0 +1,8 @@
---
type: fix
issue: 4491
title: "Moved batch2 reduction step logic to the messaging queue.
Before it was executed during the maintenance run directly.
This resulted in bugs with multiple reduction steps kicking
off for long running jobs.
"

View File

@ -34,6 +34,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -45,11 +47,13 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static ca.uhn.fhir.batch2.config.BaseBatch2Config.CHANNEL_NAME;
import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@ -216,9 +220,44 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myLastStepLatch.awaitExpected();
}
@Test
public void testJobDefinitionWithReductionStepIT() throws InterruptedException {
private void createThreeStepReductionJob(
String theJobId,
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> theFirstStep,
IJobStepWorker<TestJobParameters, FirstStepOutput, SecondStepOutput> theSecondStep,
IReductionStepWorker<TestJobParameters, SecondStepOutput, ReductionStepOutput> theReductionsStep
) {
// create job definition (it's the test method's name)
JobDefinition<? extends IModelJson> jd = JobDefinition.newBuilder()
.setJobDefinitionId(theJobId)
.setJobDescription("test job")
.setJobDefinitionVersion(TEST_JOB_VERSION)
.setParametersType(TestJobParameters.class)
.gatedExecution()
.addFirstStep(
FIRST_STEP_ID,
"Test first step",
FirstStepOutput.class,
theFirstStep
)
.addIntermediateStep("SECOND",
"Second step",
SecondStepOutput.class,
theSecondStep)
.addFinalReducerStep(
LAST_STEP_ID,
"Test last step",
ReductionStepOutput.class,
theReductionsStep
)
.build();
myJobDefinitionRegistry.addJobDefinition(jd);
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testJobDefinitionWithReductionStepIT(boolean theDelayReductionStepBool) throws InterruptedException {
// setup
String jobId = new Exception().getStackTrace()[0].getMethodName() + "_" + theDelayReductionStepBool;
String testInfo = "test";
AtomicInteger secondStepInt = new AtomicInteger();
@ -235,6 +274,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
SecondStepOutput output = new SecondStepOutput();
output.setValue(testInfo + secondStepInt.getAndIncrement());
sink.accept(output);
return RunOutcome.SUCCESS;
};
@ -243,63 +283,66 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
private final ArrayList<SecondStepOutput> myOutput = new ArrayList<>();
private final AtomicBoolean myBoolean = new AtomicBoolean();
private final AtomicInteger mySecondGate = new AtomicInteger();
@Override
public ChunkOutcome consume(ChunkExecutionDetails<TestJobParameters, SecondStepOutput> theChunkDetails) {
myOutput.add(theChunkDetails.getData());
// 1 because we know 2 packets are coming.
// we'll fire the second maintenance run on the second packet
// which should cause multiple maintenance runs to run simultaneously
if (theDelayReductionStepBool && mySecondGate.getAndIncrement() == 1) {
ourLog.info("SECOND FORCED MAINTENANCE PASS FORCED");
myBatch2JobHelper.forceRunMaintenancePass();
}
return ChunkOutcome.SUCCESS();
}
@Nonnull
@Override
public RunOutcome run(@Nonnull StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails,
@Nonnull IJobDataSink<ReductionStepOutput> theDataSink) throws JobExecutionFailedException {
theDataSink.accept(new ReductionStepOutput(myOutput));
callLatch(myLastStepLatch, theStepExecutionDetails);
public RunOutcome run(
@Nonnull StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails,
@Nonnull IJobDataSink<ReductionStepOutput> theDataSink
) throws JobExecutionFailedException {
boolean isRunAlready = myBoolean.getAndSet(true);
assertFalse(isRunAlready, "Reduction step should only be called once!");
complete(theStepExecutionDetails, theDataSink);
return RunOutcome.SUCCESS;
}
};
// create job definition
String jobId = new Exception().getStackTrace()[0].getMethodName();
JobDefinition<? extends IModelJson> jd = JobDefinition.newBuilder()
.setJobDefinitionId(jobId)
.setJobDescription("test job")
.setJobDefinitionVersion(TEST_JOB_VERSION)
.setParametersType(TestJobParameters.class)
.gatedExecution()
.addFirstStep(
FIRST_STEP_ID,
"Test first step",
FirstStepOutput.class,
first
)
.addIntermediateStep("SECOND",
"Second step",
SecondStepOutput.class,
second)
.addFinalReducerStep(
LAST_STEP_ID,
"Test last step",
ReductionStepOutput.class,
last
)
.build();
myJobDefinitionRegistry.addJobDefinition(jd);
private void complete(
@Nonnull StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails,
@Nonnull IJobDataSink<ReductionStepOutput> theDataSink
) {
assertTrue(myBoolean.get());
theDataSink.accept(new ReductionStepOutput(myOutput));
callLatch(myLastStepLatch, theStepExecutionDetails);
}
};
createThreeStepReductionJob(jobId, first, second, last);
// run test
JobInstanceStartRequest request = buildRequest(jobId);
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request);
String instanceId = startResponse.getJobId();
myFirstStepLatch.awaitExpected();
assertNotNull(instanceId);
myBatch2JobHelper.awaitGatedStepId(FIRST_STEP_ID, instanceId);
// wait for last step to finish
ourLog.info("Setting last step latch");
myLastStepLatch.setExpectedCount(1);
// waiting
myBatch2JobHelper.awaitJobCompletion(instanceId);
myLastStepLatch.awaitExpected();
ourLog.info("awaited the last step");
// verify
Optional<JobInstance> instanceOp = myJobPersistence.fetchInstance(instanceId);

View File

@ -71,7 +71,6 @@ public class Batch2JobHelper {
return awaitJobHasStatusWithoutMaintenancePass(theBatchJobId, StatusEnum.COMPLETED);
}
public JobInstance awaitJobCancelled(String theBatchJobId) {
return awaitJobHasStatus(theBatchJobId, StatusEnum.CANCELLED);
}
@ -106,7 +105,6 @@ public class Batch2JobHelper {
return myJobCoordinator.getInstance(theBatchJobId);
}
public JobInstance awaitJobawaitJobHasStatusWithoutMaintenancePass(String theBatchJobId, int theSecondsToWait, StatusEnum... theExpectedStatus) {
assert !TransactionSynchronizationManager.isActualTransactionActive();
@ -168,7 +166,6 @@ public class Batch2JobHelper {
public long getCombinedRecordsProcessed(String theJobId) {
JobInstance job = myJobCoordinator.getInstance(theJobId);
return job.getCombinedRecordsProcessed();
}
public void awaitAllJobsOfJobDefinitionIdToComplete(String theJobDefinitionId) {
@ -243,6 +240,14 @@ public class Batch2JobHelper {
myJobMaintenanceService.runMaintenancePass();
}
/**
* Forces a run of the maintenance pass without waiting for
* the semaphore to release
*/
public void forceRunMaintenancePass() {
myJobMaintenanceService.forceMaintenancePass();
}
public void cancelAllJobsAndAwaitCancellation() {
List<JobInstance> instances = myJobPersistence.fetchInstances(1000, 0);
for (JobInstance next : instances) {

View File

@ -0,0 +1,37 @@
package ca.uhn.fhir.batch2.jobs.export;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.jpa.api.svc.IBatch2JobRunner;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum;
import org.slf4j.Logger;
import static org.slf4j.LoggerFactory.getLogger;
public class BulkExportUtil {
private static final Logger ourLog = getLogger(BulkExportUtil.class);
private BulkExportUtil() {
}
/**
* Converts Batch2 StatusEnum -> BulkExportJobStatusEnum
*/
public static BulkExportJobStatusEnum fromBatchStatus(StatusEnum status) {
switch (status) {
case QUEUED:
case FINALIZE:
return BulkExportJobStatusEnum.SUBMITTED;
case COMPLETED :
return BulkExportJobStatusEnum.COMPLETE;
case IN_PROGRESS:
return BulkExportJobStatusEnum.BUILDING;
default:
ourLog.warn("Unrecognized status {}; treating as FAILED/CANCELLED/ERRORED", status.name());
case FAILED:
case CANCELLED:
case ERRORED:
return BulkExportJobStatusEnum.ERROR;
}
}
}

View File

@ -22,10 +22,10 @@ package ca.uhn.fhir.batch2.jobs.services;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.JobOperationResultJson;
import ca.uhn.fhir.batch2.jobs.export.BulkExportUtil;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportJobParameters;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.api.model.Batch2JobInfo;
import ca.uhn.fhir.jpa.api.model.Batch2JobOperationResult;
@ -33,7 +33,6 @@ import ca.uhn.fhir.jpa.api.model.BulkExportParameters;
import ca.uhn.fhir.jpa.api.svc.IBatch2JobRunner;
import ca.uhn.fhir.jpa.batch.models.Batch2BaseJobParameters;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.util.Batch2JobDefinitionConstants;
import org.slf4j.Logger;
@ -97,7 +96,10 @@ public class Batch2JobRunnerImpl implements IBatch2JobRunner {
private Batch2JobInfo fromJobInstanceToBatch2JobInfo(@Nonnull JobInstance theInstance) {
Batch2JobInfo info = new Batch2JobInfo();
info.setJobId(theInstance.getInstanceId());
info.setStatus(fromBatchStatus(theInstance.getStatus()));
// should convert this to a more generic enum for all batch2 (which is what it seems like)
// or use the status enum only (combine with bulk export enum)
// on the Batch2JobInfo
info.setStatus(BulkExportUtil.fromBatchStatus(theInstance.getStatus()));
info.setCancelled(theInstance.isCancelled());
info.setStartTime(theInstance.getStartTime());
info.setEndTime(theInstance.getEndTime());
@ -106,22 +108,6 @@ public class Batch2JobRunnerImpl implements IBatch2JobRunner {
return info;
}
public static BulkExportJobStatusEnum fromBatchStatus(StatusEnum status) {
switch (status) {
case QUEUED:
return BulkExportJobStatusEnum.SUBMITTED;
case COMPLETED :
return BulkExportJobStatusEnum.COMPLETE;
case IN_PROGRESS:
return BulkExportJobStatusEnum.BUILDING;
case FAILED:
case CANCELLED:
case ERRORED:
default:
return BulkExportJobStatusEnum.ERROR;
}
}
private Batch2JobStartResponse startBatch2BulkExportJob(BulkExportParameters theParameters) {
JobInstanceStartRequest request = createStartRequest(theParameters);
request.setParameters(BulkExportJobParameters.createFromExportJobParameters(theParameters));

View File

@ -20,6 +20,8 @@ package ca.uhn.fhir.batch2.api;
* #L%
*/
import com.google.common.annotations.VisibleForTesting;
public interface IJobMaintenanceService {
/**
* Do not wait for the next scheduled time for maintenance. Trigger it immediately.
@ -29,4 +31,10 @@ public interface IJobMaintenanceService {
void runMaintenancePass();
/**
* Forces a second maintenance run.
* Only to be used in tests to simulate a long running maintenance step
*/
@VisibleForTesting
void forceMaintenancePass();
}

View File

@ -28,8 +28,8 @@ import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import org.slf4j.Logger;
import javax.annotation.Nonnull;
@ -53,7 +53,7 @@ public class JobStepExecutor<PT extends IModelJson, IT extends IModelJson, OT ex
JobStepExecutor(@Nonnull IJobPersistence theJobPersistence,
@Nonnull BatchJobSender theBatchJobSender,
@Nonnull JobInstance theInstance,
@Nonnull WorkChunk theWorkChunk,
WorkChunk theWorkChunk,
@Nonnull JobWorkCursor<PT, IT, OT> theCursor,
@Nonnull WorkChunkProcessor theExecutor, IJobMaintenanceService theJobMaintenanceService) {
myJobPersistence = theJobPersistence;

View File

@ -46,7 +46,7 @@ public class JobStepExecutorFactory {
myJobMaintenanceService = theJobMaintenanceService;
}
public <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> JobStepExecutor<PT,IT,OT> newJobStepExecutor(@Nonnull JobInstance theInstance, @Nonnull WorkChunk theWorkChunk, @Nonnull JobWorkCursor<PT, IT, OT> theCursor) {
public <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> JobStepExecutor<PT,IT,OT> newJobStepExecutor(@Nonnull JobInstance theInstance, WorkChunk theWorkChunk, @Nonnull JobWorkCursor<PT, IT, OT> theCursor) {
return new JobStepExecutor<>(myJobPersistence, myBatchJobSender, theInstance, theWorkChunk, theCursor, myJobStepExecutorSvc, myJobMaintenanceService);
}
}

View File

@ -28,8 +28,8 @@ import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import org.slf4j.Logger;
import java.util.ArrayList;
@ -55,10 +55,14 @@ public class ReductionStepExecutor {
) {
IReductionStepWorker<PT, IT, OT> reductionStepWorker = (IReductionStepWorker<PT, IT, OT>) theStep.getJobStepWorker();
// we mark it first so that no other maintenance passes will pick this job up!
// if we shut down mid process, though, it will be stuck in FINALIZE forever :(
if (!myJobPersistence.markInstanceAsStatus(theInstance.getInstanceId(), StatusEnum.FINALIZE)) {
ourLog.warn("JobInstance[{}] is already in FINALIZE state, no reducer action performed.", theInstance.getInstanceId());
ourLog.warn(
"JobInstance[{}] is already in FINALIZE state. In memory status is {}. Reduction step will not rerun!"
+ " This could be a long running reduction job resulting in the processed msg not being acknowledge,"
+ " or the result of a failed process or server restarting.",
theInstance.getInstanceId(),
theInstance.getStatus().name()
);
return false;
}
theInstance.setStatus(StatusEnum.FINALIZE);
@ -106,6 +110,8 @@ public class ReductionStepExecutor {
break;
case FAIL:
// non-idempotent; but failed chunks will be
// ignored on a second runthrough of reduction step
myJobPersistence.markWorkChunkAsFailed(chunk.getId(),
"Step worker failed to process work chunk " + chunk.getId());
retval = false;

View File

@ -32,8 +32,9 @@ import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.batch2.util.Batch2Constants;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.Logs;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.springframework.messaging.Message;
@ -75,17 +76,28 @@ class WorkChannelMessageHandler implements MessageHandler {
String chunkId = workNotification.getChunkId();
Validate.notNull(chunkId);
Optional<WorkChunk> chunkOpt = myJobPersistence.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId);
if (chunkOpt.isEmpty()) {
ourLog.error("Unable to find chunk with ID {} - Aborting", chunkId);
return;
boolean isReductionWorkNotification = Batch2Constants.REDUCTION_STEP_CHUNK_ID_PLACEHOLDER.equals(chunkId);
JobWorkCursor<?, ?, ?> cursor = null;
WorkChunk workChunk = null;
if (!isReductionWorkNotification) {
Optional<WorkChunk> chunkOpt = myJobPersistence.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId);
if (chunkOpt.isEmpty()) {
ourLog.error("Unable to find chunk with ID {} - Aborting", chunkId);
return;
}
workChunk = chunkOpt.get();
ourLog.debug("Worker picked up chunk. [chunkId={}, stepId={}, startTime={}]", chunkId, workChunk.getTargetStepId(), workChunk.getStartTime());
cursor = buildCursorFromNotification(workNotification);
Validate.isTrue(workChunk.getTargetStepId().equals(cursor.getCurrentStepId()), "Chunk %s has target step %s but expected %s", chunkId, workChunk.getTargetStepId(), cursor.getCurrentStepId());
} else {
ourLog.debug("Processing reduction step work notification. No associated workchunks.");
cursor = buildCursorFromNotification(workNotification);
}
WorkChunk workChunk = chunkOpt.get();
ourLog.debug("Worker picked up chunk. [chunkId={}, stepId={}, startTime={}]", chunkId, workChunk.getTargetStepId(), workChunk.getStartTime());
JobWorkCursor<?, ?, ?> cursor = buildCursorFromNotification(workNotification);
Validate.isTrue(workChunk.getTargetStepId().equals(cursor.getCurrentStepId()), "Chunk %s has target step %s but expected %s", chunkId, workChunk.getTargetStepId(), cursor.getCurrentStepId());
Optional<JobInstance> instanceOpt = myJobPersistence.fetchInstance(workNotification.getInstanceId());
JobInstance instance = instanceOpt.orElseThrow(() -> new InternalErrorException("Unknown instance: " + workNotification.getInstanceId()));

View File

@ -32,13 +32,12 @@ import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import javax.annotation.Nullable;
import java.util.Optional;
import static org.apache.commons.lang3.StringUtils.isBlank;
@ -102,7 +101,7 @@ public class WorkChunkProcessor {
boolean success = myReductionStepExecutor.executeReductionStep(theInstance, step, inputType, parameters);
if (success) {
// Now call call the normal step executor
// Now call the normal step executor
// the data sink stores the report on the instance (i.e. not chunks).
// Assume the OT (report) data is smaller than the list of all IT data
@ -113,7 +112,6 @@ public class WorkChunkProcessor {
}
return new JobStepExecutorOutput<>(success, dataSink);
} else {
// all other kinds of steps
Validate.notNull(theWorkChunk);

View File

@ -35,7 +35,7 @@ import org.slf4j.Logger;
import javax.annotation.Nonnull;
import java.util.Date;
import static ca.uhn.fhir.batch2.config.Batch2Constants.BATCH_START_DATE;
import static ca.uhn.fhir.batch2.util.Batch2Constants.BATCH_START_DATE;
public class GenerateRangeChunksStep<PT extends PartitionedUrlListJobParameters> implements IFirstJobStepWorker<PT, PartitionedUrlChunkRangeJson> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();

View File

@ -22,7 +22,6 @@ package ca.uhn.fhir.batch2.maintenance;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobStepExecutorOutput;
import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
@ -30,11 +29,11 @@ import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.batch2.util.Batch2Constants;
import ca.uhn.fhir.util.Logs;
import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;
import java.util.Date;
import java.util.List;
public class JobInstanceProcessor {
@ -125,6 +124,8 @@ public class JobInstanceProcessor {
private void triggerGatedExecutions() {
if (!myInstance.isRunning()) {
ourLog.debug("JobInstance {} is not in a \"running\" state. Status {}",
myInstance.getInstanceId(), myInstance.getStatus().name());
return;
}
@ -136,9 +137,12 @@ public class JobInstanceProcessor {
// final step
if (jobWorkCursor.isFinalStep() && !jobWorkCursor.isReductionStep()) {
ourLog.debug("Job instance {} is in final step and it's not a reducer step", myInstance.getInstanceId());
return;
}
// we should not be sending a second reduction step
// to the queue if it's in finalize status
if (jobWorkCursor.isReductionStep() && myInstance.getStatus() == StatusEnum.FINALIZE) {
ourLog.warn("Job instance {} is still finalizing - a second reduction job will not be started.", myInstance.getInstanceId());
return;
@ -179,16 +183,13 @@ public class JobInstanceProcessor {
myJobPersistence.updateInstance(myInstance);
}
private void processReductionStep(JobWorkCursor<?, ?, ?> jobWorkCursor) {
// do execution of the final step now
// (ie, we won't send to job workers)
JobStepExecutorOutput<?, ?, ?> result = myJobExecutorSvc.doExecution(
JobWorkCursor.fromJobDefinitionAndRequestedStepId(myInstance.getJobDefinition(), jobWorkCursor.nextStep.getStepId()),
private void processReductionStep(JobWorkCursor<?, ?, ?> theWorkCursor) {
JobWorkNotification workNotification = new JobWorkNotification(
myInstance,
null);
if (!result.isSuccessful()) {
myInstance.setEndTime(new Date());
myJobInstanceStatusUpdater.setFailed(myInstance);
}
theWorkCursor.nextStep.getStepId(),
Batch2Constants.REDUCTION_STEP_CHUNK_ID_PLACEHOLDER // chunk id; we don't need it
);
ourLog.debug("Submitting a Work Notification for a job reduction step. No associated work chunk ids are available.");
myBatchJobSender.sendWorkChannelMessage(workNotification);
}
}

View File

@ -179,6 +179,16 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
return myRunMaintenanceSemaphore.getQueueLength();
}
@VisibleForTesting
public void forceMaintenancePass() {
// to simulate a long running job!
ourLog.info(
"Forcing a maintenance pass run; semaphore at {}",
getQueueLength()
);
doMaintenancePass();
}
@Override
public void runMaintenancePass() {
if (!myRunMaintenanceSemaphore.tryAcquire()) {
@ -204,6 +214,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
myJobDefinitionRegistry.setJobDefinition(instance);
JobInstanceProcessor jobInstanceProcessor = new JobInstanceProcessor(myJobPersistence,
myBatchJobSender, instance, progressAccumulator, myJobExecutorSvc);
ourLog.debug("Triggering maintenance process for instance {} in status {}", instance.getInstanceId(), instance.getStatus().name());
jobInstanceProcessor.process();
}
}

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.batch2.api.IJobInstance;
import ca.uhn.fhir.jpa.util.JsonDateDeserializer;
import ca.uhn.fhir.jpa.util.JsonDateSerializer;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
@ -32,7 +33,6 @@ import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import java.util.Date;
import java.util.Objects;
import static org.apache.commons.lang3.StringUtils.isBlank;
@ -359,10 +359,24 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson,
}
/**
* Returns true if the job instance is in {@link StatusEnum#IN_PROGRESS} and is not cancelled
* Returns true if the job instance is in:
* {@link StatusEnum#IN_PROGRESS}
* {@link StatusEnum#FINALIZE}
* and is not cancelled
*/
public boolean isRunning() {
return getStatus() == StatusEnum.IN_PROGRESS && !isCancelled();
if (isCancelled()) {
return false;
}
switch (getStatus()) {
case IN_PROGRESS:
case FINALIZE:
return true;
default:
Logs.getBatchTroubleshootingLog().debug("Status {} is considered \"not running\"", getStatus().name());
}
return false;
}
public boolean isFinished() {
@ -376,7 +390,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson,
}
public boolean isPendingCancellationRequest() {
return myCancelled && (myStatus == StatusEnum.QUEUED || myStatus == StatusEnum.IN_PROGRESS);
return myCancelled && myStatus.isCancellable();
}
/**

View File

@ -34,51 +34,53 @@ public enum StatusEnum {
/**
* Task is waiting to execute and should begin with no intervention required.
*/
QUEUED(true, false),
QUEUED(true, false, true),
/**
* Task is current executing
*/
IN_PROGRESS(true, false),
IN_PROGRESS(true, false, true),
/**
* For reduction steps
*/
FINALIZE(true, false),
FINALIZE(true, false, true),
/**
* Task completed successfully
*/
COMPLETED(false, true),
COMPLETED(false, true, false),
/**
* Task execution resulted in an error but the error may be transient (or transient status is unknown).
* Retrying may result in success.
*/
ERRORED(true, true),
ERRORED(true, true, false),
/**
* Task has failed and is known to be unrecoverable. There is no reason to believe that retrying will
* result in a different outcome.
*/
FAILED(true, true),
FAILED(true, true, false),
/**
* Task has been cancelled.
*/
CANCELLED(true, true);
CANCELLED(true, true, false);
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
private final boolean myIncomplete;
private final boolean myEnded;
private final boolean myIsCancellable;
private static StatusEnum[] ourIncompleteStatuses;
private static Set<StatusEnum> ourEndedStatuses;
private static Set<StatusEnum> ourNotEndedStatuses;
StatusEnum(boolean theIncomplete, boolean theEnded) {
StatusEnum(boolean theIncomplete, boolean theEnded, boolean theIsCancellable) {
myIncomplete = theIncomplete;
myEnded = theEnded;
myIsCancellable = theIsCancellable;
}
/**
@ -186,4 +188,8 @@ public enum StatusEnum {
public boolean isIncomplete() {
return myIncomplete;
}
public boolean isCancellable() {
return myIsCancellable;
}
}

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.batch2.config;
package ca.uhn.fhir.batch2.util;
/*-
* #%L
@ -30,4 +30,10 @@ public class Batch2Constants {
* date when performing operations that pull resources by time windows.
*/
public static final Date BATCH_START_DATE = new InstantType("2000-01-01T00:00:00Z").getValue();
/**
* This is a placeholder chunkid for the reduction step to allow it to be
* used in the message handling
*/
public static final String REDUCTION_STEP_CHUNK_ID_PLACEHOLDER = "REDUCTION";
}

View File

@ -26,7 +26,7 @@ import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.config.Batch2Constants;
import ca.uhn.fhir.batch2.util.Batch2Constants;
import ca.uhn.fhir.mdm.batch2.clear.MdmClearJobParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -369,9 +369,10 @@ public class BulkDataExportProvider {
myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToWriter(oo, response.getWriter());
response.getWriter().close();
break;
default:
ourLog.warn("Unrecognized status encountered: {}. Treating as BUILDING/SUBMITTED", info.getStatus().name());
case BUILDING:
case SUBMITTED:
default:
if (theRequestDetails.getRequestType() == RequestTypeEnum.DELETE) {
handleDeleteRequest(theJobId, response, info.getStatus());
} else {