Batch 2 Maintenance refactor (#3631)

* SRP refactor maintenance

* SRP refactor maintenance

* cache jobDefinition in jobInstance when processing (rather than looking it up in a bunch of different places)

* begin with failing test

* test passes

* tests pass.  still need to add a few more tests

* add comment to trigger new ci

* added positive and negative tests for new behaviour

* fix IJ warnings

* change log

* change log

* fix test race condition

* resolved feedback

* review feedback
This commit is contained in:
Ken Stevens 2022-05-25 00:16:58 -04:00 committed by GitHub
parent a19c1ec996
commit fb6e5cf6ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
48 changed files with 797 additions and 547 deletions

View File

@ -25,7 +25,7 @@ public final class Msg {
/**
* IMPORTANT: Please update the following comment after you add a new code
* Last code value: 2081
* Last code value: 2082
*/
private Msg() {}

View File

@ -0,0 +1,5 @@
---
type: add
issue: 3631
title: "If a gated Batch job step and all prior steps produced only a single chunk of work, notify the next step immediately
instead of waiting for the scheduled job maintenance to advance it."

View File

@ -22,7 +22,7 @@ package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.config.BaseBatch2Config;
import ca.uhn.fhir.batch2.impl.SynchronizedJobPersistenceWrapper;
import ca.uhn.fhir.batch2.coordinator.SynchronizedJobPersistenceWrapper;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import org.springframework.context.annotation.Bean;

View File

@ -21,7 +21,7 @@ package ca.uhn.fhir.jpa.batch2;
*/
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.impl.BatchWorkChunk;
import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;

View File

@ -29,6 +29,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertNull;
public class Batch2JobHelper {
@ -38,13 +39,17 @@ public class Batch2JobHelper {
@Autowired
private IJobCoordinator myJobCoordinator;
public void awaitJobCompletion(String theId) {
public void awaitMultipleChunkJobCompletion(String theId) {
await().until(() -> {
myJobMaintenanceService.runMaintenancePass();
return myJobCoordinator.getInstance(theId).getStatus();
}, equalTo(StatusEnum.COMPLETED));
}
public void awaitSingleChunkJobCompletion(String theId) {
await().until(() -> myJobCoordinator.getInstance(theId).getStatus() == StatusEnum.COMPLETED);
}
public JobInstance awaitJobFailure(String theId) {
await().until(() -> {
myJobMaintenanceService.runMaintenancePass();
@ -66,4 +71,12 @@ public class Batch2JobHelper {
return myJobCoordinator.getInstance(theId).getStatus();
}, equalTo(StatusEnum.IN_PROGRESS));
}
public void assertNoGatedStep(String theInstanceId) {
assertNull(myJobCoordinator.getInstance(theInstanceId).getCurrentGatedStepId());
}
public void awaitGatedStepId(String theExpectedGatedStepId, String theInstanceId) {
await().until(() -> theExpectedGatedStepId.equals(myJobCoordinator.getInstance(theInstanceId).getCurrentGatedStepId()));
}
}

View File

@ -6,7 +6,7 @@ import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.impl.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
@ -22,6 +22,8 @@ import static org.junit.jupiter.api.Assertions.fail;
public class Batch2CoordinatorIT extends BaseJpaR4Test {
public static final int TEST_JOB_VERSION = 1;
public static final String FIRST_STEP_ID = "first-step";
public static final String LAST_STEP_ID = "last-step";
@Autowired
JobDefinitionRegistry myJobDefinitionRegistry;
@Autowired
@ -43,7 +45,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> fail();
String jobId = "test-job-1";
JobDefinition<? extends IModelJson> definition = buildJobDefinition(jobId, firstStep, lastStep);
JobDefinition<? extends IModelJson> definition = buildGatedJobDefinition(jobId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
@ -53,12 +55,12 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
String instanceId = myJobCoordinator.startInstance(request);
myFirstStepLatch.awaitExpected();
myBatch2JobHelper.awaitJobCompletion(instanceId);
myBatch2JobHelper.awaitSingleChunkJobCompletion(instanceId);
}
@Test
public void testFirstStepToSecondStep() throws InterruptedException {
public void testFirstStepToSecondStep_singleChunkFasttracks() throws InterruptedException {
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> firstStep = (step, sink) -> {
sink.accept(new FirstStepOutput());
callLatch(myFirstStepLatch, step);
@ -67,7 +69,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> callLatch(myLastStepLatch, step);
String jobId = "test-job-2";
JobDefinition<? extends IModelJson> definition = buildJobDefinition(jobId, firstStep, lastStep);
JobDefinition<? extends IModelJson> definition = buildGatedJobDefinition(jobId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
@ -77,8 +79,40 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
String instanceId = myJobCoordinator.startInstance(request);
myFirstStepLatch.awaitExpected();
myBatch2JobHelper.assertNoGatedStep(instanceId);
myLastStepLatch.setExpectedCount(1);
myBatch2JobHelper.awaitJobCompletion(instanceId);
// Since there was only one chunk, the job should proceed without requiring a maintenance pass
myLastStepLatch.awaitExpected();
myBatch2JobHelper.awaitSingleChunkJobCompletion(instanceId);
}
@Test
public void testFirstStepToSecondStep_doubleChunk_doesNotFastTrack() throws InterruptedException {
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> firstStep = (step, sink) -> {
sink.accept(new FirstStepOutput());
sink.accept(new FirstStepOutput());
callLatch(myFirstStepLatch, step);
return RunOutcome.SUCCESS;
};
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> callLatch(myLastStepLatch, step);
String jobId = "test-job-5";
JobDefinition<? extends IModelJson> definition = buildGatedJobDefinition(jobId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
JobInstanceStartRequest request = buildRequest(jobId);
myFirstStepLatch.setExpectedCount(1);
String instanceId = myJobCoordinator.startInstance(request);
myFirstStepLatch.awaitExpected();
myBatch2JobHelper.awaitGatedStepId(FIRST_STEP_ID, instanceId);
myLastStepLatch.setExpectedCount(2);
myBatch2JobHelper.awaitMultipleChunkJobCompletion(instanceId);
myLastStepLatch.awaitExpected();
}
@ -92,7 +126,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> fail();
String jobId = "test-job-3";
JobDefinition<? extends IModelJson> definition = buildJobDefinition(jobId, firstStep, lastStep);
JobDefinition<? extends IModelJson> definition = buildGatedJobDefinition(jobId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
@ -115,7 +149,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> fail();
String jobId = "test-job-4";
JobDefinition<? extends IModelJson> definition = buildJobDefinition(jobId, firstStep, lastStep);
JobDefinition<? extends IModelJson> definition = buildGatedJobDefinition(jobId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
@ -146,7 +180,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
}
@Nonnull
private JobDefinition<? extends IModelJson> buildJobDefinition(String theJobId, IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> theFirstStep, IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> theLastStep) {
private JobDefinition<? extends IModelJson> buildGatedJobDefinition(String theJobId, IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> theFirstStep, IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> theLastStep) {
return JobDefinition.newBuilder()
.setJobDefinitionId(theJobId)
.setJobDescription("test job")
@ -154,13 +188,13 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
.setParametersType(TestJobParameters.class)
.gatedExecution()
.addFirstStep(
"first-step",
FIRST_STEP_ID,
"Test first step",
FirstStepOutput.class,
theFirstStep
)
.addLastStep(
"last-step",
LAST_STEP_ID,
"Test last step",
theLastStep
)

View File

@ -1,7 +1,7 @@
package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.impl.BatchWorkChunk;
import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk;
import ca.uhn.fhir.batch2.jobs.imprt.NdJsonFileJson;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
@ -171,9 +171,9 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
}
List<WorkChunk> chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 0);
assertEquals(null, chunks.get(0).getData());
assertEquals(null, chunks.get(1).getData());
assertEquals(null, chunks.get(2).getData());
assertNull(chunks.get(0).getData());
assertNull(chunks.get(1).getData());
assertNull(chunks.get(2).getData());
assertThat(chunks.stream().map(t -> t.getId()).collect(Collectors.toList()),
contains(ids.get(0), ids.get(1), ids.get(2)));
@ -207,7 +207,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null);
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(() -> new IllegalArgumentException());
assertEquals(null, chunk.getData());
assertNull(chunk.getData());
}
@Test

View File

@ -62,7 +62,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
startRequest.setParameters(parameters);
String id = myJobCoordinator.startInstance(startRequest);
myBatch2JobHelper.awaitJobCompletion(id);
myBatch2JobHelper.awaitSingleChunkJobCompletion(id);
// validate
assertEquals(2, myObservationDao.search(SearchParameterMap.newSynchronous()).size());
@ -95,7 +95,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
startRequest.setParameters(new ReindexJobParameters());
String id = myJobCoordinator.startInstance(startRequest);
myBatch2JobHelper.awaitJobCompletion(id);
myBatch2JobHelper.awaitSingleChunkJobCompletion(id);
// validate
assertEquals(50, myObservationDao.search(SearchParameterMap.newSynchronous()).size());

View File

@ -16,9 +16,7 @@ import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.test.utilities.BatchJobHelper;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.hapi.rest.server.helper.BatchHelperR4;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.DecimalType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.StringType;
@ -156,7 +154,7 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
ourLog.info(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(response));
StringType jobId = (StringType) response.getParameter(ProviderConstants.OPERATION_REINDEX_RESPONSE_JOB_ID);
myBatch2JobHelper.awaitJobCompletion(jobId.getValue());
myBatch2JobHelper.awaitSingleChunkJobCompletion(jobId.getValue());
// validate
List<String> alleleObservationIds = reindexTestHelper.getAlleleObservationIds(myClient);
@ -180,7 +178,7 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
ourLog.info(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(response));
jobId = (StringType) response.getParameter(ProviderConstants.OPERATION_REINDEX_RESPONSE_JOB_ID);
myBatch2JobHelper.awaitJobCompletion(jobId.getValue());
myBatch2JobHelper.awaitSingleChunkJobCompletion(jobId.getValue());
myTenantClientInterceptor.setTenantId(DEFAULT_PARTITION_NAME);
@ -223,7 +221,7 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
ourLog.info(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(response));
StringType jobId = (StringType) response.getParameter(ProviderConstants.OPERATION_REINDEX_RESPONSE_JOB_ID);
myBatch2JobHelper.awaitJobCompletion(jobId.getValue());
myBatch2JobHelper.awaitSingleChunkJobCompletion(jobId.getValue());
// validate
List<String> alleleObservationIds = reindexTestHelper.getAlleleObservationIds(myClient);

View File

@ -200,6 +200,7 @@ public class RestHookTestWithInterceptorRegisteredToDaoConfigR4Test extends Base
assertFalse(observation2.getId().isEmpty());
}
// TODO KHS this test has been failing intermittently
@Test
public void testRestHookSubscriptionXml() throws Exception {
String payload = "application/xml";

View File

@ -26,7 +26,6 @@ import ca.uhn.fhir.jpa.searchparam.nickname.NicknameSvc;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
public class NicknameMatcher implements IMdmStringMatcher {

View File

@ -15,7 +15,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.hamcrest.CoreMatchers;
import org.hl7.fhir.r4.model.CodeType;
import org.hl7.fhir.r4.model.InstantType;
import org.hl7.fhir.r4.model.OperationOutcome;

View File

@ -20,7 +20,7 @@ package ca.uhn.fhir.batch2.api;
* #L%
*/
import ca.uhn.fhir.batch2.impl.BatchWorkChunk;
import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.batch2.impl;
package ca.uhn.fhir.batch2.channel;
/*-
* #%L
@ -36,7 +36,7 @@ public class BatchJobSender {
myWorkChannelProducer = theWorkChannelProducer;
}
void sendWorkChannelMessage(JobWorkNotification theJobWorkNotification) {
public void sendWorkChannelMessage(JobWorkNotification theJobWorkNotification) {
JobWorkNotificationJsonMessage message = new JobWorkNotificationJsonMessage();
message.setPayload(theJobWorkNotification);

View File

@ -23,10 +23,10 @@ package ca.uhn.fhir.batch2.config;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.impl.BatchJobSender;
import ca.uhn.fhir.batch2.impl.JobMaintenanceServiceImpl;
import ca.uhn.fhir.batch2.impl.JobCoordinatorImpl;
import ca.uhn.fhir.batch2.impl.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
import ca.uhn.fhir.batch2.coordinator.JobCoordinatorImpl;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;

View File

@ -20,7 +20,7 @@ package ca.uhn.fhir.batch2.config;
* #L%
*/
import ca.uhn.fhir.batch2.impl.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -45,8 +45,9 @@ public class Batch2JobRegisterer {
JobDefinitionRegistry jobRegistry = myApplicationContext.getBean(JobDefinitionRegistry.class);
for (Map.Entry<String, JobDefinition> next : batchJobs.entrySet()) {
ourLog.info("Registering Batch2 Job Definition: {} / {}", next.getValue().getJobDefinitionId(), next.getValue().getJobDefinitionVersion());
jobRegistry.addJobDefinition(next.getValue());
JobDefinition<?> jobDefinition = next.getValue();
ourLog.info("Registering Batch2 Job Definition: {} / {}", jobDefinition.getJobDefinitionId(), jobDefinition.getJobDefinitionVersion());
jobRegistry.addJobDefinition(jobDefinition);
}
}

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.batch2.impl;
package ca.uhn.fhir.batch2.coordinator;
/*-
* #%L
@ -22,6 +22,7 @@ package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.batch2.impl;
package ca.uhn.fhir.batch2.coordinator;
/*-
* #%L
@ -21,16 +21,15 @@ package ca.uhn.fhir.batch2.impl;
*/
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.WorkChunkData;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.*;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.JsonUtil;
import javax.annotation.Nonnull;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> extends BaseDataSink<PT,IT,OT> {
private final BatchJobSender myBatchJobSender;
@ -39,6 +38,7 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
private final int myJobDefinitionVersion;
private final JobDefinitionStep<PT, OT, ?> myTargetStep;
private final AtomicInteger myChunkCounter = new AtomicInteger(0);
private final AtomicReference<String> myLastChunkId = new AtomicReference<>();
private final boolean myGatedExecution;
JobDataSink(@Nonnull BatchJobSender theBatchJobSender, @Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinition<?> theDefinition, @Nonnull String theInstanceId, @Nonnull JobWorkCursor<PT, IT, OT> theJobWorkCursor) {
@ -61,6 +61,7 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
BatchWorkChunk batchWorkChunk = new BatchWorkChunk(myJobDefinitionId, myJobDefinitionVersion, targetStepId, instanceId, sequence, dataValueString);
String chunkId = myJobPersistence.storeWorkChunk(batchWorkChunk);
myLastChunkId.set(chunkId);
if (!myGatedExecution) {
JobWorkNotification workNotification = new JobWorkNotification(myJobDefinitionId, myJobDefinitionVersion, instanceId, targetStepId, chunkId);
@ -73,4 +74,11 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
return myChunkCounter.get();
}
public String getOnlyChunkId() {
if (getWorkChunkCount() != 1) {
String msg = String.format("Expected this sink to have exactly one work chunk but there are %d. Job %s v%s step %s", getWorkChunkCount(), myJobDefinitionId, myJobDefinitionVersion, myTargetStep);
throw new IllegalStateException(Msg.code(2082) + msg);
}
return myLastChunkId.get();
}
}

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.batch2.impl;
package ca.uhn.fhir.batch2.coordinator;
/*-
* #%L
@ -22,6 +22,7 @@ package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson;
@ -88,4 +89,9 @@ public class JobDefinitionRegistry {
}
return opt.get();
}
public void setJobDefinition(JobInstance theInstance) {
JobDefinition<?> jobDefinition = getJobDefinitionOrThrowException(theInstance.getJobDefinitionId(), theInstance.getJobDefinitionVersion());
theInstance.setJobDefinition(jobDefinition);
}
}

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.batch2.impl;
package ca.uhn.fhir.batch2.coordinator;
/*-
* #%L
@ -27,10 +27,13 @@ import ca.uhn.fhir.batch2.api.JobStepFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson;
@ -41,6 +44,8 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.util.Optional;
import static ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor.updateInstanceStatus;
public class JobStepExecutor<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> {
private static final Logger ourLog = LoggerFactory.getLogger(JobStepExecutor.class);
@ -83,22 +88,57 @@ public class JobStepExecutor<PT extends IModelJson, IT extends IModelJson, OT ex
myJobPersistence.markInstanceAsCompleted(myInstanceId);
}
if (myDefinition.isGatedExecution() && myCursor.isFirstStep) {
initializeGatedExecution();
if (myDefinition.isGatedExecution()) {
handleGatedExecution(dataSink);
}
}
private void initializeGatedExecution() {
Optional<JobInstance> oInstance = myJobPersistence.fetchInstance(myInstanceId);
private void handleGatedExecution(BaseDataSink<PT, IT, OT> theDataSink) {
JobInstance jobInstance = initializeGatedExecutionIfRequired(theDataSink);
if (oInstance.isPresent()) {
JobInstance instance = oInstance.get();
instance.setCurrentGatedStepId(myCursor.getCurrentStepId());
myJobPersistence.updateInstance(instance);
if (eligibleForFastTracking(theDataSink, jobInstance)) {
ourLog.info("Gated job {} step {} produced at most one chunk: Fast tracking execution.", myDefinition.getJobDefinitionId(), myCursor.currentStep.getStepId());
// This job is defined to be gated, but so far every step has produced at most 1 work chunk, so it is
// eligible for fast tracking.
if (myCursor.isFinalStep()) {
if (updateInstanceStatus(jobInstance, StatusEnum.COMPLETED)) {
myJobPersistence.updateInstance(jobInstance);
}
} else {
JobWorkNotification workNotification = new JobWorkNotification(jobInstance, myCursor.nextStep.getStepId(), ((JobDataSink<PT,IT,OT>) theDataSink).getOnlyChunkId());
myBatchJobSender.sendWorkChannelMessage(workNotification);
}
}
}
private boolean eligibleForFastTracking(BaseDataSink<PT, IT, OT> theDataSink, JobInstance theJobInstance) {
return theJobInstance != null &&
!theJobInstance.hasGatedStep() &&
theDataSink.getWorkChunkCount() <= 1;
}
private JobInstance initializeGatedExecutionIfRequired(BaseDataSink<PT, IT, OT> theDataSink) {
Optional<JobInstance> oJobInstance = myJobPersistence.fetchInstance(myInstanceId);
if (oJobInstance.isEmpty()) {
return null;
}
JobInstance jobInstance = oJobInstance.get();
if (jobInstance.hasGatedStep()) {
// Gated execution is already initialized
return jobInstance;
}
if (theDataSink.getWorkChunkCount() <= 1) {
// Do not initialize gated execution for steps that produced only one chunk
return jobInstance;
}
jobInstance.setCurrentGatedStepId(myCursor.getCurrentStepId());
myJobPersistence.updateInstance(jobInstance);
return jobInstance;
}
private boolean executeStep(String theJobDefinitionId, @Nonnull WorkChunk theWorkChunk, PT theParameters, BaseDataSink<PT,IT,OT> theDataSink) {
JobDefinitionStep<PT, IT, OT> theTargetStep = theDataSink.getTargetStep();
String targetStepId = theTargetStep.getStepId();

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.batch2.impl;
package ca.uhn.fhir.batch2.coordinator;
/*-
* #%L
@ -21,6 +21,7 @@ package ca.uhn.fhir.batch2.impl;
*/
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.WorkChunk;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.batch2.impl;
package ca.uhn.fhir.batch2.coordinator;
/*-
* #%L
@ -22,6 +22,7 @@ package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
@ -95,6 +96,6 @@ class WorkChannelMessageHandler implements MessageHandler {
JobDefinition<?> definition = myJobDefinitionRegistry.getJobDefinitionOrThrowException(jobDefinitionId, jobDefinitionVersion);
return JobWorkCursor.fromJobDefinitionAndWorkNotification(definition, workNotification);
return JobWorkCursor.fromJobDefinitionAndRequestedStepId(definition, workNotification.getTargetStepId());
}
}

View File

@ -1,435 +0,0 @@
package ca.uhn.fhir.batch2.impl;
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.JobCompletionDetails;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.StopWatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.time.DateUtils;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
/**
* This class performs regular polls of the stored jobs in order to
* perform maintenance. This includes two major functions.
*
* <p>
* First, we calculate statistics and delete expired tasks. This class does
* the following things:
* <ul>
* <li>For instances that are IN_PROGRESS, calculates throughput and percent complete</li>
* <li>For instances that are IN_PROGRESS where all chunks are COMPLETE, marks instance as COMPLETE</li>
* <li>For instances that are COMPLETE, purges chunk data</li>
* <li>For instances that are IN_PROGRESS where at least one chunk is FAILED, marks instance as FAILED and propagates the error message to the instance, and purges chunk data</li>
* <li>For instances that are IN_PROGRESS with an error message set where no chunks are ERRORED or FAILED, clears the error message in the instance (meaning presumably there was an error but it cleared)</li>
* <li>For instances that are IN_PROGRESS and isCancelled flag is set marks them as ERRORED and indicating the current running step if any</li>
* <li>For instances that are COMPLETE or FAILED and are old, delete them entirely</li>
* </ul>
* </p>
*
* <p>
* Second, we check for any job instances where the job is configured to
* have gated execution. For these instances, we check if the current step
* is complete (all chunks are in COMPLETE status) and trigger the next step.
* </p>
*/
public class JobMaintenanceServiceImpl implements IJobMaintenanceService {
public static final int INSTANCES_PER_PASS = 100;
public static final long PURGE_THRESHOLD = 7L * DateUtils.MILLIS_PER_DAY;
private static final Logger ourLog = LoggerFactory.getLogger(JobMaintenanceServiceImpl.class);
private final IJobPersistence myJobPersistence;
private final ISchedulerService mySchedulerService;
private final JobDefinitionRegistry myJobDefinitionRegistry;
private final BatchJobSender myBatchJobSender;
/**
* Constructor
*/
public JobMaintenanceServiceImpl(@Nonnull ISchedulerService theSchedulerService, @Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinitionRegistry theJobDefinitionRegistry, @Nonnull BatchJobSender theBatchJobSender) {
Validate.notNull(theSchedulerService);
Validate.notNull(theJobPersistence);
Validate.notNull(theJobDefinitionRegistry);
Validate.notNull(theBatchJobSender);
myJobPersistence = theJobPersistence;
mySchedulerService = theSchedulerService;
myJobDefinitionRegistry = theJobDefinitionRegistry;
myBatchJobSender = theBatchJobSender;
}
@PostConstruct
public void start() {
ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition();
jobDefinition.setId(JobMaintenanceScheduledJob.class.getName());
jobDefinition.setJobClass(JobMaintenanceScheduledJob.class);
mySchedulerService.scheduleClusteredJob(DateUtils.MILLIS_PER_MINUTE, jobDefinition);
}
@Override
public void runMaintenancePass() {
// NB: If you add any new logic, update the class javadoc
Set<String> processedInstanceIds = new HashSet<>();
JobChunkProgressAccumulator progressAccumulator = new JobChunkProgressAccumulator();
for (int page = 0; ; page++) {
List<JobInstance> instances = myJobPersistence.fetchInstances(INSTANCES_PER_PASS, page);
for (JobInstance instance : instances) {
if (processedInstanceIds.add(instance.getInstanceId())) {
handleCancellation(instance);
cleanupInstance(instance, progressAccumulator);
triggerGatedExecutions(instance, progressAccumulator);
}
}
if (instances.size() < INSTANCES_PER_PASS) {
break;
}
}
}
private void handleCancellation(JobInstance theInstance) {
if (! theInstance.isCancelled()) { return; }
if (theInstance.getStatus() == StatusEnum.QUEUED || theInstance.getStatus() == StatusEnum.IN_PROGRESS) {
String msg = "Job instance cancelled";
if (theInstance.getCurrentGatedStepId() != null) {
msg += " while running step " + theInstance.getCurrentGatedStepId();
}
theInstance.setErrorMessage(msg);
theInstance.setStatus(StatusEnum.CANCELLED);
myJobPersistence.updateInstance(theInstance);
}
}
private void cleanupInstance(JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) {
switch (theInstance.getStatus()) {
case QUEUED:
break;
case IN_PROGRESS:
case ERRORED:
calculateInstanceProgress(theInstance, theProgressAccumulator);
break;
case COMPLETED:
case FAILED:
case CANCELLED:
if (theInstance.getEndTime() != null) {
long cutoff = System.currentTimeMillis() - PURGE_THRESHOLD;
if (theInstance.getEndTime().getTime() < cutoff) {
ourLog.info("Deleting old job instance {}", theInstance.getInstanceId());
myJobPersistence.deleteInstanceAndChunks(theInstance.getInstanceId());
return;
}
}
break;
}
if ((theInstance.getStatus() == StatusEnum.COMPLETED || theInstance.getStatus() == StatusEnum.FAILED
|| theInstance.getStatus() == StatusEnum.CANCELLED) && !theInstance.isWorkChunksPurged()) {
theInstance.setWorkChunksPurged(true);
myJobPersistence.deleteChunks(theInstance.getInstanceId());
myJobPersistence.updateInstance(theInstance);
}
}
private void calculateInstanceProgress(JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) {
int resourcesProcessed = 0;
int incompleteChunkCount = 0;
int completeChunkCount = 0;
int erroredChunkCount = 0;
int failedChunkCount = 0;
int errorCountForAllStatuses = 0;
Long earliestStartTime = null;
Long latestEndTime = null;
String errorMessage = null;
for (int page = 0; ; page++) {
List<WorkChunk> chunks = myJobPersistence.fetchWorkChunksWithoutData(theInstance.getInstanceId(), INSTANCES_PER_PASS, page);
for (WorkChunk chunk : chunks) {
theProgressAccumulator.addChunk(chunk.getInstanceId(), chunk.getId(), chunk.getTargetStepId(), chunk.getStatus());
errorCountForAllStatuses += chunk.getErrorCount();
if (chunk.getRecordsProcessed() != null) {
resourcesProcessed += chunk.getRecordsProcessed();
}
if (chunk.getStartTime() != null) {
if (earliestStartTime == null || earliestStartTime > chunk.getStartTime().getTime()) {
earliestStartTime = chunk.getStartTime().getTime();
}
}
if (chunk.getEndTime() != null) {
if (latestEndTime == null || latestEndTime < chunk.getEndTime().getTime()) {
latestEndTime = chunk.getEndTime().getTime();
}
}
switch (chunk.getStatus()) {
case QUEUED:
case IN_PROGRESS:
incompleteChunkCount++;
break;
case COMPLETED:
completeChunkCount++;
break;
case ERRORED:
erroredChunkCount++;
if (errorMessage == null) {
errorMessage = chunk.getErrorMessage();
}
break;
case FAILED:
failedChunkCount++;
errorMessage = chunk.getErrorMessage();
break;
case CANCELLED:
break;
}
}
if (chunks.size() < INSTANCES_PER_PASS) {
break;
}
}
if (earliestStartTime != null) {
theInstance.setStartTime(new Date(earliestStartTime));
}
theInstance.setErrorCount(errorCountForAllStatuses);
theInstance.setCombinedRecordsProcessed(resourcesProcessed);
boolean changedStatus = false;
if (completeChunkCount > 1 || erroredChunkCount > 1) {
double percentComplete = (double) (completeChunkCount) / (double) (incompleteChunkCount + completeChunkCount + failedChunkCount + erroredChunkCount);
theInstance.setProgress(percentComplete);
if (incompleteChunkCount == 0 && erroredChunkCount == 0 && failedChunkCount == 0) {
boolean completed = updateInstanceStatus(theInstance, StatusEnum.COMPLETED);
if (completed) {
JobDefinition<?> definition = myJobDefinitionRegistry.getJobDefinition(theInstance.getJobDefinitionId(), theInstance.getJobDefinitionVersion()).orElseThrow(() -> new IllegalStateException("Unknown job " + theInstance.getJobDefinitionId() + "/" + theInstance.getJobDefinitionVersion()));
invokeJobCompletionHandler(theInstance, definition);
}
changedStatus = completed;
} else if (erroredChunkCount > 0) {
changedStatus = updateInstanceStatus(theInstance, StatusEnum.ERRORED);
}
if (earliestStartTime != null && latestEndTime != null) {
long elapsedTime = latestEndTime - earliestStartTime;
if (elapsedTime > 0) {
double throughput = StopWatch.getThroughput(resourcesProcessed, elapsedTime, TimeUnit.SECONDS);
theInstance.setCombinedRecordsProcessedPerSecond(throughput);
String estimatedTimeRemaining = StopWatch.formatEstimatedTimeRemaining(completeChunkCount, (completeChunkCount + incompleteChunkCount), elapsedTime);
theInstance.setEstimatedTimeRemaining(estimatedTimeRemaining);
}
}
}
if (latestEndTime != null) {
if (failedChunkCount > 0) {
theInstance.setEndTime(new Date(latestEndTime));
} else if (completeChunkCount > 0 && incompleteChunkCount == 0 && erroredChunkCount == 0) {
theInstance.setEndTime(new Date(latestEndTime));
}
}
theInstance.setErrorMessage(errorMessage);
if (changedStatus || theInstance.getStatus() == StatusEnum.IN_PROGRESS) {
ourLog.info("Job {} of type {} has status {} - {} records processed ({}/sec) - ETA: {}", theInstance.getInstanceId(), theInstance.getJobDefinitionId(), theInstance.getStatus(), theInstance.getCombinedRecordsProcessed(), theInstance.getCombinedRecordsProcessedPerSecond(), theInstance.getEstimatedTimeRemaining());
}
if (failedChunkCount > 0) {
updateInstanceStatus(theInstance, StatusEnum.FAILED);
myJobPersistence.updateInstance(theInstance);
return;
}
if ((incompleteChunkCount + completeChunkCount + erroredChunkCount) >= 2 || errorCountForAllStatuses > 0) {
myJobPersistence.updateInstance(theInstance);
}
}
private <PT extends IModelJson> void invokeJobCompletionHandler(JobInstance theInstance, JobDefinition<PT> definition) {
IJobCompletionHandler<PT> completionHandler = definition.getCompletionHandler();
if (completionHandler != null) {
String instanceId = theInstance.getInstanceId();
PT jobParameters = theInstance.getParameters(definition.getParametersType());
JobCompletionDetails<PT> completionDetails = new JobCompletionDetails<>(jobParameters, instanceId);
completionHandler.jobComplete(completionDetails);
}
}
private boolean updateInstanceStatus(JobInstance theInstance, StatusEnum newStatus) {
if (theInstance.getStatus() != newStatus) {
ourLog.info("Marking job instance {} of type {} as {}", theInstance.getInstanceId(), theInstance.getJobDefinitionId(), newStatus);
theInstance.setStatus(newStatus);
return true;
}
return false;
}
private void triggerGatedExecutions(JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) {
if (!theInstance.isRunning()) {
return;
}
String jobDefinitionId = theInstance.getJobDefinitionId();
int jobDefinitionVersion = theInstance.getJobDefinitionVersion();
String instanceId = theInstance.getInstanceId();
JobDefinition<?> definition = myJobDefinitionRegistry.getJobDefinition(jobDefinitionId, jobDefinitionVersion).orElseThrow(() -> new IllegalStateException("Unknown job definition: " + jobDefinitionId + " " + jobDefinitionVersion));
if (!definition.isGatedExecution()) {
return;
}
String currentStepId = theInstance.getCurrentGatedStepId();
if (isBlank(currentStepId)) {
return;
}
if (definition.isLastStep(currentStepId)) {
return;
}
int incompleteChunks = theProgressAccumulator.countChunksWithStatus(instanceId, currentStepId, StatusEnum.getIncompleteStatuses());
if (incompleteChunks == 0) {
int currentStepIndex = definition.getStepIndex(currentStepId);
String nextStepId = definition.getSteps().get(currentStepIndex + 1).getStepId();
ourLog.info("All processing is complete for gated execution of instance {} step {}. Proceeding to step {}", instanceId, currentStepId, nextStepId);
List<String> chunksForNextStep = theProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, EnumSet.of(StatusEnum.QUEUED));
for (String nextChunkId : chunksForNextStep) {
JobWorkNotification workNotification = new JobWorkNotification(jobDefinitionId, jobDefinitionVersion, instanceId, nextStepId, nextChunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
}
theInstance.setCurrentGatedStepId(nextStepId);
myJobPersistence.updateInstance(theInstance);
}
}
public static class JobMaintenanceScheduledJob implements HapiJob {
@Autowired
private IJobMaintenanceService myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.runMaintenancePass();
}
}
/**
* While performing cleanup, the cleanup job loads all of the known
* work chunks to examine their status. This bean collects the counts that
* are found, so that they can be reused for maintenance jobs without
* needing to hit the database a second time.
*/
private static class JobChunkProgressAccumulator {
private final Set<String> myConsumedInstanceAndChunkIds = new HashSet<>();
private final Multimap<String, ChunkStatusCountKey> myInstanceIdToChunkStatuses = ArrayListMultimap.create();
public void addChunk(String theInstanceId, String theChunkId, String theStepId, StatusEnum theStatus) {
// Note: If chunks are being written while we're executing, we may see the same chunk twice. This
// check avoids adding it twice.
if (myConsumedInstanceAndChunkIds.add(theInstanceId + " " + theChunkId)) {
myInstanceIdToChunkStatuses.put(theInstanceId, new ChunkStatusCountKey(theChunkId, theStepId, theStatus));
}
}
public int countChunksWithStatus(String theInstanceId, String theStepId, Set<StatusEnum> theStatuses) {
return getChunkIdsWithStatus(theInstanceId, theStepId, theStatuses).size();
}
public List<String> getChunkIdsWithStatus(String theInstanceId, String theStepId, Set<StatusEnum> theStatuses) {
return getChunkStatuses(theInstanceId).stream().filter(t -> t.myStepId.equals(theStepId)).filter(t -> theStatuses.contains(t.myStatus)).map(t -> t.myChunkId).collect(Collectors.toList());
}
@Nonnull
private Collection<ChunkStatusCountKey> getChunkStatuses(String theInstanceId) {
Collection<ChunkStatusCountKey> chunkStatuses = myInstanceIdToChunkStatuses.get(theInstanceId);
chunkStatuses = defaultIfNull(chunkStatuses, emptyList());
return chunkStatuses;
}
private static class ChunkStatusCountKey {
public final String myChunkId;
public final String myStepId;
public final StatusEnum myStatus;
private ChunkStatusCountKey(String theChunkId, String theStepId, StatusEnum theStatus) {
myChunkId = theChunkId;
myStepId = theStepId;
myStatus = theStatus;
}
}
}
}

View File

@ -0,0 +1,168 @@
package ca.uhn.fhir.batch2.maintenance;
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
import ca.uhn.fhir.batch2.api.JobCompletionDetails;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import static ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor.updateInstanceStatus;
class InstanceProgress {
private static final Logger ourLog = LoggerFactory.getLogger(InstanceProgress.class);
private int myRecordsProcessed = 0;
private int myIncompleteChunkCount = 0;
private int myCompleteChunkCount = 0;
private int myErroredChunkCount = 0;
private int myFailedChunkCount = 0;
private int myErrorCountForAllStatuses = 0;
private Long myEarliestStartTime = null;
private Long myLatestEndTime = null;
private String myErrormessage = null;
public void addChunk(WorkChunk theChunk) {
myErrorCountForAllStatuses += theChunk.getErrorCount();
updateRecordsProcessed(theChunk);
updateEarliestTime(theChunk);
updateLatestEndTime(theChunk);
updateCompletionStatus(theChunk);
}
private void updateCompletionStatus(WorkChunk theChunk) {
switch (theChunk.getStatus()) {
case QUEUED:
case IN_PROGRESS:
myIncompleteChunkCount++;
break;
case COMPLETED:
myCompleteChunkCount++;
break;
case ERRORED:
myErroredChunkCount++;
if (myErrormessage == null) {
myErrormessage = theChunk.getErrorMessage();
}
break;
case FAILED:
myFailedChunkCount++;
myErrormessage = theChunk.getErrorMessage();
break;
case CANCELLED:
break;
}
}
private void updateLatestEndTime(WorkChunk theChunk) {
if (theChunk.getEndTime() != null) {
if (myLatestEndTime == null || myLatestEndTime < theChunk.getEndTime().getTime()) {
myLatestEndTime = theChunk.getEndTime().getTime();
}
}
}
private void updateEarliestTime(WorkChunk theChunk) {
if (theChunk.getStartTime() != null) {
if (myEarliestStartTime == null || myEarliestStartTime > theChunk.getStartTime().getTime()) {
myEarliestStartTime = theChunk.getStartTime().getTime();
}
}
}
private void updateRecordsProcessed(WorkChunk theChunk) {
if (theChunk.getRecordsProcessed() != null) {
myRecordsProcessed += theChunk.getRecordsProcessed();
}
}
public void updateInstance(JobInstance theInstance) {
if (myEarliestStartTime != null) {
theInstance.setStartTime(new Date(myEarliestStartTime));
}
theInstance.setErrorCount(myErrorCountForAllStatuses);
theInstance.setCombinedRecordsProcessed(myRecordsProcessed);
boolean changedStatus = updateStatus(theInstance);
setEndTime(theInstance);
theInstance.setErrorMessage(myErrormessage);
if (changedStatus || theInstance.getStatus() == StatusEnum.IN_PROGRESS) {
ourLog.info("Job {} of type {} has status {} - {} records processed ({}/sec) - ETA: {}", theInstance.getInstanceId(), theInstance.getJobDefinitionId(), theInstance.getStatus(), theInstance.getCombinedRecordsProcessed(), theInstance.getCombinedRecordsProcessedPerSecond(), theInstance.getEstimatedTimeRemaining());
}
}
private void setEndTime(JobInstance theInstance) {
if (myLatestEndTime != null) {
if (myFailedChunkCount > 0) {
theInstance.setEndTime(new Date(myLatestEndTime));
} else if (myCompleteChunkCount > 0 && myIncompleteChunkCount == 0 && myErroredChunkCount == 0) {
theInstance.setEndTime(new Date(myLatestEndTime));
}
}
}
private boolean updateStatus(JobInstance theInstance) {
boolean changedStatus = false;
if (myCompleteChunkCount > 1 || myErroredChunkCount > 1) {
double percentComplete = (double) (myCompleteChunkCount) / (double) (myIncompleteChunkCount + myCompleteChunkCount + myFailedChunkCount + myErroredChunkCount);
theInstance.setProgress(percentComplete);
if (jobSuccessfullyCompleted()) {
boolean completed = updateInstanceStatus(theInstance, StatusEnum.COMPLETED);
if (completed) {
invokeJobCompletionHandler(theInstance);
}
changedStatus = completed;
} else if (myErroredChunkCount > 0) {
changedStatus = updateInstanceStatus(theInstance, StatusEnum.ERRORED);
}
if (myEarliestStartTime != null && myLatestEndTime != null) {
long elapsedTime = myLatestEndTime - myEarliestStartTime;
if (elapsedTime > 0) {
double throughput = StopWatch.getThroughput(myRecordsProcessed, elapsedTime, TimeUnit.SECONDS);
theInstance.setCombinedRecordsProcessedPerSecond(throughput);
String estimatedTimeRemaining = StopWatch.formatEstimatedTimeRemaining(myCompleteChunkCount, (myCompleteChunkCount + myIncompleteChunkCount), elapsedTime);
theInstance.setEstimatedTimeRemaining(estimatedTimeRemaining);
}
}
}
return changedStatus;
}
private boolean jobSuccessfullyCompleted() {
return myIncompleteChunkCount == 0 && myErroredChunkCount == 0 && myFailedChunkCount == 0;
}
private <PT extends IModelJson> void invokeJobCompletionHandler(JobInstance myInstance) {
JobDefinition<PT> definition = (JobDefinition<PT>) myInstance.getJobDefinition();
IJobCompletionHandler<PT> completionHandler = definition.getCompletionHandler();
if (completionHandler != null) {
String instanceId = myInstance.getInstanceId();
PT jobParameters = myInstance.getParameters(definition.getParametersType());
JobCompletionDetails<PT> completionDetails = new JobCompletionDetails<>(jobParameters, instanceId);
completionHandler.jobComplete(completionDetails);
}
}
public boolean failed() {
return myFailedChunkCount > 0;
}
public boolean changed() {
return (myIncompleteChunkCount + myCompleteChunkCount + myErroredChunkCount) >= 2 || myErrorCountForAllStatuses > 0;
}
}

View File

@ -0,0 +1,68 @@
package ca.uhn.fhir.batch2.maintenance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
/**
* While performing cleanup, the cleanup job loads all of the known
* work chunks to examine their status. This bean collects the counts that
* are found, so that they can be reused for maintenance jobs without
* needing to hit the database a second time.
*/
class JobChunkProgressAccumulator {
private final Set<String> myConsumedInstanceAndChunkIds = new HashSet<>();
private final Multimap<String, ChunkStatusCountKey> myInstanceIdToChunkStatuses = ArrayListMultimap.create();
int countChunksWithStatus(String theInstanceId, String theStepId, Set<StatusEnum> theStatuses) {
return getChunkIdsWithStatus(theInstanceId, theStepId, theStatuses).size();
}
List<String> getChunkIdsWithStatus(String theInstanceId, String theStepId, Set<StatusEnum> theStatuses) {
return getChunkStatuses(theInstanceId).stream().filter(t -> t.myStepId.equals(theStepId)).filter(t -> theStatuses.contains(t.myStatus)).map(t -> t.myChunkId).collect(Collectors.toList());
}
@Nonnull
private Collection<ChunkStatusCountKey> getChunkStatuses(String theInstanceId) {
Collection<ChunkStatusCountKey> chunkStatuses = myInstanceIdToChunkStatuses.get(theInstanceId);
chunkStatuses = defaultIfNull(chunkStatuses, emptyList());
return chunkStatuses;
}
public void addChunk(WorkChunk theChunk) {
String instanceId = theChunk.getInstanceId();
String chunkId = theChunk.getId();
// Note: If chunks are being written while we're executing, we may see the same chunk twice. This
// check avoids adding it twice.
if (myConsumedInstanceAndChunkIds.add(instanceId + " " + chunkId)) {
myInstanceIdToChunkStatuses.put(instanceId, new ChunkStatusCountKey(chunkId, theChunk.getTargetStepId(), theChunk.getStatus()));
}
}
private static class ChunkStatusCountKey {
public final String myChunkId;
public final String myStepId;
public final StatusEnum myStatus;
private ChunkStatusCountKey(String theChunkId, String theStepId, StatusEnum theStatus) {
myChunkId = theChunkId;
myStepId = theStepId;
myStatus = theStatus;
}
}
}

View File

@ -0,0 +1,137 @@
package ca.uhn.fhir.batch2.maintenance;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.EnumSet;
import java.util.List;
public class JobInstanceProcessor {
private static final Logger ourLog = LoggerFactory.getLogger(JobInstanceProcessor.class);
public static final long PURGE_THRESHOLD = 7L * DateUtils.MILLIS_PER_DAY;
private final IJobPersistence myJobPersistence;
private final BatchJobSender myBatchJobSender;
private final JobInstance myInstance;
private final JobChunkProgressAccumulator myProgressAccumulator;
private final JobInstanceProgressCalculator myJobInstanceProgressCalculator;
JobInstanceProcessor(IJobPersistence theJobPersistence, BatchJobSender theBatchJobSender, JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) {
myJobPersistence = theJobPersistence;
myBatchJobSender = theBatchJobSender;
myInstance = theInstance;
myProgressAccumulator = theProgressAccumulator;
myJobInstanceProgressCalculator = new JobInstanceProgressCalculator(theJobPersistence, theInstance, theProgressAccumulator);
}
public void process() {
handleCancellation();
cleanupInstance();
triggerGatedExecutions();
}
private void handleCancellation() {
if (myInstance.isPendingCancellation()) {
myInstance.setErrorMessage(buildCancelledMessage());
myInstance.setStatus(StatusEnum.CANCELLED);
myJobPersistence.updateInstance(myInstance);
}
}
private String buildCancelledMessage() {
String msg = "Job instance cancelled";
if (myInstance.hasGatedStep()) {
msg += " while running step " + myInstance.getCurrentGatedStepId();
}
return msg;
}
private void cleanupInstance() {
switch (myInstance.getStatus()) {
case QUEUED:
break;
case IN_PROGRESS:
case ERRORED:
myJobInstanceProgressCalculator.calculateAndStoreInstanceProgress();
break;
case COMPLETED:
case FAILED:
case CANCELLED:
if (purgeExpiredInstance()) {
return;
}
break;
}
if (myInstance.isFinished() && !myInstance.isWorkChunksPurged()) {
myInstance.setWorkChunksPurged(true);
myJobPersistence.deleteChunks(myInstance.getInstanceId());
myJobPersistence.updateInstance(myInstance);
}
}
private boolean purgeExpiredInstance() {
if (myInstance.getEndTime() != null) {
long cutoff = System.currentTimeMillis() - PURGE_THRESHOLD;
if (myInstance.getEndTime().getTime() < cutoff) {
ourLog.info("Deleting old job instance {}", myInstance.getInstanceId());
myJobPersistence.deleteInstanceAndChunks(myInstance.getInstanceId());
return true;
}
}
return false;
}
private void triggerGatedExecutions() {
if (!myInstance.isRunning()) {
return;
}
if (!myInstance.hasGatedStep()) {
return;
}
JobWorkCursor<?,?,?> jobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(myInstance.getJobDefinition(), myInstance.getCurrentGatedStepId());
if (jobWorkCursor.isFinalStep()) {
return;
}
String instanceId = myInstance.getInstanceId();
String currentStepId = jobWorkCursor.getCurrentStepId();
int incompleteChunks = myProgressAccumulator.countChunksWithStatus(instanceId, currentStepId, StatusEnum.getIncompleteStatuses());
if (incompleteChunks == 0) {
String nextStepId = jobWorkCursor.nextStep.getStepId();
ourLog.info("All processing is complete for gated execution of instance {} step {}. Proceeding to step {}", instanceId, currentStepId, nextStepId);
List<String> chunksForNextStep = myProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, EnumSet.of(StatusEnum.QUEUED));
for (String nextChunkId : chunksForNextStep) {
JobWorkNotification workNotification = new JobWorkNotification(myInstance, nextStepId, nextChunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
}
myInstance.setCurrentGatedStepId(nextStepId);
myJobPersistence.updateInstance(myInstance);
}
}
public static boolean updateInstanceStatus(JobInstance myInstance, StatusEnum newStatus) {
if (myInstance.getStatus() != newStatus) {
ourLog.info("Marking job instance {} of type {} as {}", myInstance.getInstanceId(), myInstance.getJobDefinitionId(), newStatus);
myInstance.setStatus(newStatus);
return true;
}
return false;
}
}

View File

@ -0,0 +1,51 @@
package ca.uhn.fhir.batch2.maintenance;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import java.util.List;
import static ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor.updateInstanceStatus;
class JobInstanceProgressCalculator {
private final IJobPersistence myJobPersistence;
private final JobInstance myInstance;
private final JobChunkProgressAccumulator myProgressAccumulator;
JobInstanceProgressCalculator(IJobPersistence theJobPersistence, JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) {
myJobPersistence = theJobPersistence;
myInstance = theInstance;
myProgressAccumulator = theProgressAccumulator;
}
void calculateAndStoreInstanceProgress() {
InstanceProgress instanceProgress = new InstanceProgress();
for (int page = 0; ; page++) {
List<WorkChunk> chunks = myJobPersistence.fetchWorkChunksWithoutData(myInstance.getInstanceId(), JobMaintenanceServiceImpl.INSTANCES_PER_PASS, page);
for (WorkChunk chunk : chunks) {
myProgressAccumulator.addChunk(chunk);
instanceProgress.addChunk(chunk);
}
if (chunks.size() < JobMaintenanceServiceImpl.INSTANCES_PER_PASS) {
break;
}
}
instanceProgress.updateInstance(myInstance);
if (instanceProgress.failed()) {
updateInstanceStatus(myInstance, StatusEnum.FAILED);
myJobPersistence.updateInstance(myInstance);
return;
}
if (instanceProgress.changed()) {
myJobPersistence.updateInstance(myInstance);
}
}
}

View File

@ -0,0 +1,133 @@
package ca.uhn.fhir.batch2.maintenance;
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.time.DateUtils;
import org.quartz.JobExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* This class performs regular polls of the stored jobs in order to
* perform maintenance. This includes two major functions.
*
* <p>
* First, we calculate statistics and delete expired tasks. This class does
* the following things:
* <ul>
* <li>For instances that are IN_PROGRESS, calculates throughput and percent complete</li>
* <li>For instances that are IN_PROGRESS where all chunks are COMPLETE, marks instance as COMPLETE</li>
* <li>For instances that are COMPLETE, purges chunk data</li>
* <li>For instances that are IN_PROGRESS where at least one chunk is FAILED, marks instance as FAILED and propagates the error message to the instance, and purges chunk data</li>
* <li>For instances that are IN_PROGRESS with an error message set where no chunks are ERRORED or FAILED, clears the error message in the instance (meaning presumably there was an error but it cleared)</li>
* <li>For instances that are IN_PROGRESS and isCancelled flag is set marks them as ERRORED and indicating the current running step if any</li>
* <li>For instances that are COMPLETE or FAILED and are old, delete them entirely</li>
* </ul>
* </p>
*
* <p>
* Second, we check for any job instances where the job is configured to
* have gated execution. For these instances, we check if the current step
* is complete (all chunks are in COMPLETE status) and trigger the next step.
* </p>
*/
public class JobMaintenanceServiceImpl implements IJobMaintenanceService {
public static final int INSTANCES_PER_PASS = 100;
private final IJobPersistence myJobPersistence;
private final ISchedulerService mySchedulerService;
private final JobDefinitionRegistry myJobDefinitionRegistry;
private final BatchJobSender myBatchJobSender;
/**
* Constructor
*/
public JobMaintenanceServiceImpl(@Nonnull ISchedulerService theSchedulerService, @Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinitionRegistry theJobDefinitionRegistry, @Nonnull BatchJobSender theBatchJobSender) {
Validate.notNull(theSchedulerService);
Validate.notNull(theJobPersistence);
Validate.notNull(theJobDefinitionRegistry);
Validate.notNull(theBatchJobSender);
myJobPersistence = theJobPersistence;
mySchedulerService = theSchedulerService;
myJobDefinitionRegistry = theJobDefinitionRegistry;
myBatchJobSender = theBatchJobSender;
}
@PostConstruct
public void start() {
ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition();
jobDefinition.setId(JobMaintenanceScheduledJob.class.getName());
jobDefinition.setJobClass(JobMaintenanceScheduledJob.class);
mySchedulerService.scheduleClusteredJob(DateUtils.MILLIS_PER_MINUTE, jobDefinition);
}
@Override
public void runMaintenancePass() {
// NB: If you add any new logic, update the class javadoc
Set<String> processedInstanceIds = new HashSet<>();
JobChunkProgressAccumulator progressAccumulator = new JobChunkProgressAccumulator();
for (int page = 0; ; page++) {
List<JobInstance> instances = myJobPersistence.fetchInstances(INSTANCES_PER_PASS, page);
for (JobInstance instance : instances) {
if (processedInstanceIds.add(instance.getInstanceId())) {
myJobDefinitionRegistry.setJobDefinition(instance);
JobInstanceProcessor jobInstanceProcessor = new JobInstanceProcessor(myJobPersistence, myBatchJobSender, instance, progressAccumulator);
jobInstanceProcessor.process();
}
}
if (instances.size() < INSTANCES_PER_PASS) {
break;
}
}
}
public static class JobMaintenanceScheduledJob implements HapiJob {
@Autowired
private IJobMaintenanceService myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.runMaintenancePass();
}
}
}

View File

@ -23,6 +23,7 @@ package ca.uhn.fhir.batch2.model;
import ca.uhn.fhir.jpa.util.JsonDateDeserializer;
import ca.uhn.fhir.jpa.util.JsonDateSerializer;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
@ -31,6 +32,8 @@ import org.apache.commons.lang3.builder.ToStringStyle;
import java.util.Date;
import static org.apache.commons.lang3.StringUtils.isBlank;
public class JobInstance extends JobInstanceStartRequest implements IModelJson {
@JsonProperty(value = "jobDefinitionVersion")
@ -83,6 +86,9 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
@JsonProperty(value = "estimatedCompletion", access = JsonProperty.Access.READ_ONLY)
private String myEstimatedTimeRemaining;
@JsonIgnore
private JobDefinition<?> myJobDefinition;
/**
* Constructor
*/
@ -111,16 +117,12 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
setTotalElapsedMillis(theJobInstance.getTotalElapsedMillis());
setWorkChunksPurged(theJobInstance.isWorkChunksPurged());
setCurrentGatedStepId(theJobInstance.getCurrentGatedStepId());
myJobDefinition = theJobInstance.getJobDefinition();
}
public static JobInstance fromJobDefinition(JobDefinition<?> theJobDefinition) {
JobInstance instance = new JobInstance();
instance.setJobDefinitionId(theJobDefinition.getJobDefinitionId());
instance.setJobDefinitionVersion(theJobDefinition.getJobDefinitionVersion());
if (theJobDefinition.isGatedExecution()) {
instance.setCurrentGatedStepId(theJobDefinition.getFirstStepId());
}
instance.setJobDefinition(theJobDefinition);
return instance;
}
@ -250,6 +252,17 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
return this;
}
public void setJobDefinition(JobDefinition<?> theJobDefinition) {
myJobDefinition = theJobDefinition;
setJobDefinitionId(theJobDefinition.getJobDefinitionId());
setJobDefinitionVersion(theJobDefinition.getJobDefinitionVersion());
}
public JobDefinition<?> getJobDefinition() {
return myJobDefinition;
}
public boolean isCancelled() {
return myCancelled;
}
@ -284,4 +297,18 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
public boolean isRunning() {
return getStatus() == StatusEnum.IN_PROGRESS && !isCancelled();
}
public boolean isFinished() {
return myStatus == StatusEnum.COMPLETED ||
myStatus == StatusEnum.FAILED ||
myStatus == StatusEnum.CANCELLED;
}
public boolean hasGatedStep() {
return !isBlank(myCurrentGatedStepId);
}
public boolean isPendingCancellation() {
return myCancelled && (myStatus == StatusEnum.QUEUED || myStatus == StatusEnum.IN_PROGRESS);
}
}

View File

@ -40,7 +40,6 @@ import java.util.List;
*/
public class JobWorkCursor<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> {
private static final Logger ourLog = LoggerFactory.getLogger(JobWorkCursor.class);
public final JobDefinition<PT> jobDefinition;
public final boolean isFirstStep;
public final JobDefinitionStep<PT, IT, OT> currentStep;
@ -65,8 +64,7 @@ public class JobWorkCursor<PT extends IModelJson, IT extends IModelJson, OT exte
}
}
public static <PT extends IModelJson> JobWorkCursor<PT,?,?> fromJobDefinitionAndWorkNotification(JobDefinition<PT> theJobDefinition, JobWorkNotification theWorkNotification) {
String requestedStepId = theWorkNotification.getTargetStepId();
public static <PT extends IModelJson> JobWorkCursor<PT,?,?> fromJobDefinitionAndRequestedStepId(JobDefinition<PT> theJobDefinition, String theRequestedStepId) {
boolean isFirstStep = false;
JobDefinitionStep<PT,?,?> currentStep = null;
JobDefinitionStep<PT,?,?> nextStep = null;
@ -74,7 +72,7 @@ public class JobWorkCursor<PT extends IModelJson, IT extends IModelJson, OT exte
List<JobDefinitionStep<PT, ?, ?>> steps = theJobDefinition.getSteps();
for (int i = 0; i < steps.size(); i++) {
JobDefinitionStep<PT, ?, ?> step = steps.get(i);
if (step.getStepId().equals(requestedStepId)) {
if (step.getStepId().equals(theRequestedStepId)) {
currentStep = step;
if (i == 0) {
isFirstStep = true;
@ -87,7 +85,7 @@ public class JobWorkCursor<PT extends IModelJson, IT extends IModelJson, OT exte
}
if (currentStep == null) {
String msg = "Unknown step[" + requestedStepId + "] for job definition ID[" + theJobDefinition.getJobDefinitionId() + "] version[" + theJobDefinition.getJobDefinitionVersion() + "]";
String msg = "Unknown step[" + theRequestedStepId + "] for job definition ID[" + theJobDefinition.getJobDefinitionId() + "] version[" + theJobDefinition.getJobDefinitionVersion() + "]";
ourLog.warn(msg);
throw new InternalErrorException(Msg.code(2042) + msg);
}

View File

@ -53,7 +53,11 @@ public class JobWorkNotification implements IModelJson {
setTargetStepId(theTargetStepId);
}
public static JobWorkNotification firstStepNotification(JobDefinition<?> theJobDefinition, String theInstanceId, String theChunkId) {
public JobWorkNotification(JobInstance theInstance, String theNextStepId, String theNextChunkId) {
this(theInstance.getJobDefinitionId(), theInstance.getJobDefinitionVersion(), theInstance.getInstanceId(), theNextStepId, theNextChunkId);
}
public static JobWorkNotification firstStepNotification(JobDefinition<?> theJobDefinition, String theInstanceId, String theChunkId) {
String firstStepId = theJobDefinition.getFirstStepId();
String jobDefinitionId = theJobDefinition.getJobDefinitionId();
int jobDefinitionVersion = theJobDefinition.getJobDefinitionVersion();

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.batch2.impl;
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.VoidModel;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.batch2.impl;
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IJobParametersValidator;
@ -6,6 +6,7 @@ import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.batch2.impl;
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IJobPersistence;
@ -7,6 +7,7 @@ import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobWorkCursor;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.batch2.impl;
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.VoidModel;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.batch2.impl;
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.model.JobDefinition;
import org.junit.jupiter.api.Test;

View File

@ -1,8 +1,10 @@
package ca.uhn.fhir.batch2.impl;
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.JobCompletionDetails;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
@ -23,10 +25,10 @@ import org.springframework.messaging.Message;
import java.util.Date;
import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.createWorkChunk;
import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.createWorkChunkStep1;
import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.createWorkChunkStep2;
import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.createWorkChunkStep3;
import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunk;
import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunkStep1;
import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunkStep2;
import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunkStep3;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@ -168,6 +170,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
@Test
public void testFailed_PurgeOldInstance() {
myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
JobInstance instance = createInstance();
instance.setStatus(StatusEnum.FAILED);
instance.setEndTime(parseTime("2001-01-01T12:12:12Z"));
@ -220,6 +223,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
@Test
public void testInProgress_CalculateProgress_OneStepFailed() {
myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(createInstance()));
when(myJobPersistence.fetchWorkChunksWithoutData(eq(INSTANCE_ID), anyInt(), eq(0))).thenReturn(Lists.newArrayList(
createWorkChunkStep1().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:01:00-04:00")).setRecordsProcessed(25),

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.batch2.impl;
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobDefinition;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.batch2.impl;
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.model.api.annotation.PasswordField;
import ca.uhn.fhir.model.api.IModelJson;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.batch2.impl;
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.batch2.impl;
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;

View File

@ -1,7 +1,7 @@
package ca.uhn.fhir.batch2.model;
import ca.uhn.fhir.batch2.impl.BaseBatch2Test;
import ca.uhn.fhir.batch2.impl.TestJobParameters;
import ca.uhn.fhir.batch2.coordinator.BaseBatch2Test;
import ca.uhn.fhir.batch2.coordinator.TestJobParameters;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -21,12 +21,8 @@ class JobWorkCursorTest extends BaseBatch2Test {
@Test
public void createCursorStep1() {
// setup
JobWorkNotification workNotification = new JobWorkNotification();
workNotification.setTargetStepId(STEP_1);
// execute
JobWorkCursor<TestJobParameters, ?, ?> cursor = JobWorkCursor.fromJobDefinitionAndWorkNotification(myDefinition, workNotification);
JobWorkCursor<TestJobParameters, ?, ?> cursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(myDefinition, STEP_1);
// verify
assertCursor(cursor, true, false, STEP_1, STEP_2);
@ -34,12 +30,8 @@ class JobWorkCursorTest extends BaseBatch2Test {
@Test
public void createCursorStep2() {
// setup
JobWorkNotification workNotification = new JobWorkNotification();
workNotification.setTargetStepId(STEP_2);
// execute
JobWorkCursor<TestJobParameters, ?, ?> cursor = JobWorkCursor.fromJobDefinitionAndWorkNotification(myDefinition, workNotification);
JobWorkCursor<TestJobParameters, ?, ?> cursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(myDefinition, STEP_2);
// verify
assertCursor(cursor, false, false, STEP_2, STEP_3);
@ -47,12 +39,8 @@ class JobWorkCursorTest extends BaseBatch2Test {
@Test
public void createCursorStep3() {
// setup
JobWorkNotification workNotification = new JobWorkNotification();
workNotification.setTargetStepId(STEP_3);
// execute
JobWorkCursor<TestJobParameters, ?, ?> cursor = JobWorkCursor.fromJobDefinitionAndWorkNotification(myDefinition, workNotification);
JobWorkCursor<TestJobParameters, ?, ?> cursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(myDefinition, STEP_3);
// verify
assertCursor(cursor, false, true, STEP_3, null);
@ -61,13 +49,11 @@ class JobWorkCursorTest extends BaseBatch2Test {
@Test
public void unknownStep() {
// setup
JobWorkNotification workNotification = new JobWorkNotification();
String targetStepId = "Made a searching and fearless moral inventory of ourselves";
workNotification.setTargetStepId(targetStepId);
// execute
try {
JobWorkCursor.fromJobDefinitionAndWorkNotification(myDefinition, workNotification);
JobWorkCursor.fromJobDefinitionAndRequestedStepId(myDefinition, targetStepId);
// verify
fail();