diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/i18n/Msg.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/i18n/Msg.java
index 54c46c6535b..318fdf1bfc8 100644
--- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/i18n/Msg.java
+++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/i18n/Msg.java
@@ -25,7 +25,7 @@ public final class Msg {
/**
* IMPORTANT: Please update the following comment after you add a new code
- * Last code value: 2081
+ * Last code value: 2082
*/
private Msg() {}
diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_1_0/3631-fasttrack-single-chunk-jobs.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_1_0/3631-fasttrack-single-chunk-jobs.yaml
new file mode 100644
index 00000000000..b1640145a8b
--- /dev/null
+++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_1_0/3631-fasttrack-single-chunk-jobs.yaml
@@ -0,0 +1,5 @@
+---
+type: add
+issue: 3631
+title: "If a gated Batch job step and all prior steps produced only a single chunk of work, notify the next step immediately
+instead of waiting for the scheduled job maintenance to advance it."
diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaBatch2Config.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaBatch2Config.java
index 3ef73f220b4..6dcd36085da 100644
--- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaBatch2Config.java
+++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaBatch2Config.java
@@ -22,7 +22,7 @@ package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.config.BaseBatch2Config;
-import ca.uhn.fhir.batch2.impl.SynchronizedJobPersistenceWrapper;
+import ca.uhn.fhir.batch2.coordinator.SynchronizedJobPersistenceWrapper;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import org.springframework.context.annotation.Bean;
diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java
index 06642b0b4f5..35ebaf684fe 100644
--- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java
+++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java
@@ -21,7 +21,7 @@ package ca.uhn.fhir.jpa.batch2;
*/
import ca.uhn.fhir.batch2.api.IJobPersistence;
-import ca.uhn.fhir.batch2.impl.BatchWorkChunk;
+import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
diff --git a/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/Batch2JobHelper.java b/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/Batch2JobHelper.java
index 5de35466685..86c58e4d785 100644
--- a/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/Batch2JobHelper.java
+++ b/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/Batch2JobHelper.java
@@ -29,6 +29,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.equalTo;
+import static org.junit.jupiter.api.Assertions.assertNull;
public class Batch2JobHelper {
@@ -38,13 +39,17 @@ public class Batch2JobHelper {
@Autowired
private IJobCoordinator myJobCoordinator;
- public void awaitJobCompletion(String theId) {
+ public void awaitMultipleChunkJobCompletion(String theId) {
await().until(() -> {
myJobMaintenanceService.runMaintenancePass();
return myJobCoordinator.getInstance(theId).getStatus();
}, equalTo(StatusEnum.COMPLETED));
}
+ public void awaitSingleChunkJobCompletion(String theId) {
+ await().until(() -> myJobCoordinator.getInstance(theId).getStatus() == StatusEnum.COMPLETED);
+ }
+
public JobInstance awaitJobFailure(String theId) {
await().until(() -> {
myJobMaintenanceService.runMaintenancePass();
@@ -66,4 +71,12 @@ public class Batch2JobHelper {
return myJobCoordinator.getInstance(theId).getStatus();
}, equalTo(StatusEnum.IN_PROGRESS));
}
+
+ public void assertNoGatedStep(String theInstanceId) {
+ assertNull(myJobCoordinator.getInstance(theInstanceId).getCurrentGatedStepId());
+ }
+
+ public void awaitGatedStepId(String theExpectedGatedStepId, String theInstanceId) {
+ await().until(() -> theExpectedGatedStepId.equals(myJobCoordinator.getInstance(theInstanceId).getCurrentGatedStepId()));
+ }
}
diff --git a/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java b/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java
index 660a2641cc1..7d85a0aea60 100644
--- a/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java
+++ b/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java
@@ -6,7 +6,7 @@ import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
-import ca.uhn.fhir.batch2.impl.JobDefinitionRegistry;
+import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
@@ -22,6 +22,8 @@ import static org.junit.jupiter.api.Assertions.fail;
public class Batch2CoordinatorIT extends BaseJpaR4Test {
public static final int TEST_JOB_VERSION = 1;
+ public static final String FIRST_STEP_ID = "first-step";
+ public static final String LAST_STEP_ID = "last-step";
@Autowired
JobDefinitionRegistry myJobDefinitionRegistry;
@Autowired
@@ -43,7 +45,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
IJobStepWorker lastStep = (step, sink) -> fail();
String jobId = "test-job-1";
- JobDefinition extends IModelJson> definition = buildJobDefinition(jobId, firstStep, lastStep);
+ JobDefinition extends IModelJson> definition = buildGatedJobDefinition(jobId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
@@ -53,12 +55,12 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
String instanceId = myJobCoordinator.startInstance(request);
myFirstStepLatch.awaitExpected();
- myBatch2JobHelper.awaitJobCompletion(instanceId);
+ myBatch2JobHelper.awaitSingleChunkJobCompletion(instanceId);
}
@Test
- public void testFirstStepToSecondStep() throws InterruptedException {
+ public void testFirstStepToSecondStep_singleChunkFasttracks() throws InterruptedException {
IJobStepWorker firstStep = (step, sink) -> {
sink.accept(new FirstStepOutput());
callLatch(myFirstStepLatch, step);
@@ -67,7 +69,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
IJobStepWorker lastStep = (step, sink) -> callLatch(myLastStepLatch, step);
String jobId = "test-job-2";
- JobDefinition extends IModelJson> definition = buildJobDefinition(jobId, firstStep, lastStep);
+ JobDefinition extends IModelJson> definition = buildGatedJobDefinition(jobId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
@@ -77,8 +79,40 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
String instanceId = myJobCoordinator.startInstance(request);
myFirstStepLatch.awaitExpected();
+ myBatch2JobHelper.assertNoGatedStep(instanceId);
+
myLastStepLatch.setExpectedCount(1);
- myBatch2JobHelper.awaitJobCompletion(instanceId);
+ // Since there was only one chunk, the job should proceed without requiring a maintenance pass
+ myLastStepLatch.awaitExpected();
+ myBatch2JobHelper.awaitSingleChunkJobCompletion(instanceId);
+ }
+
+
+ @Test
+ public void testFirstStepToSecondStep_doubleChunk_doesNotFastTrack() throws InterruptedException {
+ IJobStepWorker firstStep = (step, sink) -> {
+ sink.accept(new FirstStepOutput());
+ sink.accept(new FirstStepOutput());
+ callLatch(myFirstStepLatch, step);
+ return RunOutcome.SUCCESS;
+ };
+ IJobStepWorker lastStep = (step, sink) -> callLatch(myLastStepLatch, step);
+
+ String jobId = "test-job-5";
+ JobDefinition extends IModelJson> definition = buildGatedJobDefinition(jobId, firstStep, lastStep);
+
+ myJobDefinitionRegistry.addJobDefinition(definition);
+
+ JobInstanceStartRequest request = buildRequest(jobId);
+
+ myFirstStepLatch.setExpectedCount(1);
+ String instanceId = myJobCoordinator.startInstance(request);
+ myFirstStepLatch.awaitExpected();
+
+ myBatch2JobHelper.awaitGatedStepId(FIRST_STEP_ID, instanceId);
+
+ myLastStepLatch.setExpectedCount(2);
+ myBatch2JobHelper.awaitMultipleChunkJobCompletion(instanceId);
myLastStepLatch.awaitExpected();
}
@@ -92,7 +126,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
IJobStepWorker lastStep = (step, sink) -> fail();
String jobId = "test-job-3";
- JobDefinition extends IModelJson> definition = buildJobDefinition(jobId, firstStep, lastStep);
+ JobDefinition extends IModelJson> definition = buildGatedJobDefinition(jobId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
@@ -115,7 +149,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
IJobStepWorker lastStep = (step, sink) -> fail();
String jobId = "test-job-4";
- JobDefinition extends IModelJson> definition = buildJobDefinition(jobId, firstStep, lastStep);
+ JobDefinition extends IModelJson> definition = buildGatedJobDefinition(jobId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
@@ -146,7 +180,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
}
@Nonnull
- private JobDefinition extends IModelJson> buildJobDefinition(String theJobId, IJobStepWorker theFirstStep, IJobStepWorker theLastStep) {
+ private JobDefinition extends IModelJson> buildGatedJobDefinition(String theJobId, IJobStepWorker theFirstStep, IJobStepWorker theLastStep) {
return JobDefinition.newBuilder()
.setJobDefinitionId(theJobId)
.setJobDescription("test job")
@@ -154,13 +188,13 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
.setParametersType(TestJobParameters.class)
.gatedExecution()
.addFirstStep(
- "first-step",
+ FIRST_STEP_ID,
"Test first step",
FirstStepOutput.class,
theFirstStep
)
.addLastStep(
- "last-step",
+ LAST_STEP_ID,
"Test last step",
theLastStep
)
diff --git a/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java b/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java
index 447e7c9db22..24aad38b882 100644
--- a/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java
+++ b/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java
@@ -1,7 +1,7 @@
package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobPersistence;
-import ca.uhn.fhir.batch2.impl.BatchWorkChunk;
+import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk;
import ca.uhn.fhir.batch2.jobs.imprt.NdJsonFileJson;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
@@ -171,9 +171,9 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
}
List chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 0);
- assertEquals(null, chunks.get(0).getData());
- assertEquals(null, chunks.get(1).getData());
- assertEquals(null, chunks.get(2).getData());
+ assertNull(chunks.get(0).getData());
+ assertNull(chunks.get(1).getData());
+ assertNull(chunks.get(2).getData());
assertThat(chunks.stream().map(t -> t.getId()).collect(Collectors.toList()),
contains(ids.get(0), ids.get(1), ids.get(2)));
@@ -207,7 +207,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null);
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(() -> new IllegalArgumentException());
- assertEquals(null, chunk.getData());
+ assertNull(chunk.getData());
}
@Test
diff --git a/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobTest.java b/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobTest.java
index 1c5d228dd06..33e5b128174 100644
--- a/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobTest.java
+++ b/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobTest.java
@@ -62,7 +62,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
startRequest.setParameters(parameters);
String id = myJobCoordinator.startInstance(startRequest);
- myBatch2JobHelper.awaitJobCompletion(id);
+ myBatch2JobHelper.awaitSingleChunkJobCompletion(id);
// validate
assertEquals(2, myObservationDao.search(SearchParameterMap.newSynchronous()).size());
@@ -95,7 +95,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
startRequest.setParameters(new ReindexJobParameters());
String id = myJobCoordinator.startInstance(startRequest);
- myBatch2JobHelper.awaitJobCompletion(id);
+ myBatch2JobHelper.awaitSingleChunkJobCompletion(id);
// validate
assertEquals(50, myObservationDao.search(SearchParameterMap.newSynchronous()).size());
diff --git a/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/provider/r4/MultitenantBatchOperationR4Test.java b/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/provider/r4/MultitenantBatchOperationR4Test.java
index 7ab06c62a62..c46294572da 100644
--- a/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/provider/r4/MultitenantBatchOperationR4Test.java
+++ b/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/provider/r4/MultitenantBatchOperationR4Test.java
@@ -16,9 +16,7 @@ import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.test.utilities.BatchJobHelper;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.hapi.rest.server.helper.BatchHelperR4;
-import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Bundle;
-import org.hl7.fhir.r4.model.DecimalType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.StringType;
@@ -156,7 +154,7 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
ourLog.info(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(response));
StringType jobId = (StringType) response.getParameter(ProviderConstants.OPERATION_REINDEX_RESPONSE_JOB_ID);
- myBatch2JobHelper.awaitJobCompletion(jobId.getValue());
+ myBatch2JobHelper.awaitSingleChunkJobCompletion(jobId.getValue());
// validate
List alleleObservationIds = reindexTestHelper.getAlleleObservationIds(myClient);
@@ -180,7 +178,7 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
ourLog.info(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(response));
jobId = (StringType) response.getParameter(ProviderConstants.OPERATION_REINDEX_RESPONSE_JOB_ID);
- myBatch2JobHelper.awaitJobCompletion(jobId.getValue());
+ myBatch2JobHelper.awaitSingleChunkJobCompletion(jobId.getValue());
myTenantClientInterceptor.setTenantId(DEFAULT_PARTITION_NAME);
@@ -223,7 +221,7 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
ourLog.info(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(response));
StringType jobId = (StringType) response.getParameter(ProviderConstants.OPERATION_REINDEX_RESPONSE_JOB_ID);
- myBatch2JobHelper.awaitJobCompletion(jobId.getValue());
+ myBatch2JobHelper.awaitSingleChunkJobCompletion(jobId.getValue());
// validate
List alleleObservationIds = reindexTestHelper.getAlleleObservationIds(myClient);
diff --git a/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestWithInterceptorRegisteredToDaoConfigR4Test.java b/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestWithInterceptorRegisteredToDaoConfigR4Test.java
index 56e4704b9dc..387e8411a11 100644
--- a/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestWithInterceptorRegisteredToDaoConfigR4Test.java
+++ b/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestWithInterceptorRegisteredToDaoConfigR4Test.java
@@ -200,6 +200,7 @@ public class RestHookTestWithInterceptorRegisteredToDaoConfigR4Test extends Base
assertFalse(observation2.getId().isEmpty());
}
+ // TODO KHS this test has been failing intermittently
@Test
public void testRestHookSubscriptionXml() throws Exception {
String payload = "application/xml";
diff --git a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/rules/matcher/NicknameMatcher.java b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/rules/matcher/NicknameMatcher.java
index b5aa75c29fa..66835dd61e4 100644
--- a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/rules/matcher/NicknameMatcher.java
+++ b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/rules/matcher/NicknameMatcher.java
@@ -26,7 +26,6 @@ import ca.uhn.fhir.jpa.searchparam.nickname.NicknameSvc;
import java.io.IOException;
import java.util.Collection;
-import java.util.List;
import java.util.Locale;
public class NicknameMatcher implements IMdmStringMatcher {
diff --git a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/imprt/BulkDataImportProviderTest.java b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/imprt/BulkDataImportProviderTest.java
index 22c2710c0ec..37d575c65e3 100644
--- a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/imprt/BulkDataImportProviderTest.java
+++ b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/imprt/BulkDataImportProviderTest.java
@@ -15,7 +15,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
-import org.hamcrest.CoreMatchers;
import org.hl7.fhir.r4.model.CodeType;
import org.hl7.fhir.r4.model.InstantType;
import org.hl7.fhir.r4.model.OperationOutcome;
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java
index 9f65ad2db23..972bbb7ac1c 100644
--- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java
@@ -20,7 +20,7 @@ package ca.uhn.fhir.batch2.api;
* #L%
*/
-import ca.uhn.fhir.batch2.impl.BatchWorkChunk;
+import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BatchJobSender.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/channel/BatchJobSender.java
similarity index 92%
rename from hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BatchJobSender.java
rename to hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/channel/BatchJobSender.java
index eb6a14a4faf..68622c763a7 100644
--- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BatchJobSender.java
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/channel/BatchJobSender.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.channel;
/*-
* #%L
@@ -36,7 +36,7 @@ public class BatchJobSender {
myWorkChannelProducer = theWorkChannelProducer;
}
- void sendWorkChannelMessage(JobWorkNotification theJobWorkNotification) {
+ public void sendWorkChannelMessage(JobWorkNotification theJobWorkNotification) {
JobWorkNotificationJsonMessage message = new JobWorkNotificationJsonMessage();
message.setPayload(theJobWorkNotification);
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java
index 54569124be8..8329c8fcc15 100644
--- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java
@@ -23,10 +23,10 @@ package ca.uhn.fhir.batch2.config;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPersistence;
-import ca.uhn.fhir.batch2.impl.BatchJobSender;
-import ca.uhn.fhir.batch2.impl.JobMaintenanceServiceImpl;
-import ca.uhn.fhir.batch2.impl.JobCoordinatorImpl;
-import ca.uhn.fhir.batch2.impl.JobDefinitionRegistry;
+import ca.uhn.fhir.batch2.channel.BatchJobSender;
+import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
+import ca.uhn.fhir.batch2.coordinator.JobCoordinatorImpl;
+import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/Batch2JobRegisterer.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/Batch2JobRegisterer.java
index a2867a50b6c..c42fc3c4908 100644
--- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/Batch2JobRegisterer.java
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/Batch2JobRegisterer.java
@@ -20,7 +20,7 @@ package ca.uhn.fhir.batch2.config;
* #L%
*/
-import ca.uhn.fhir.batch2.impl.JobDefinitionRegistry;
+import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,8 +45,9 @@ public class Batch2JobRegisterer {
JobDefinitionRegistry jobRegistry = myApplicationContext.getBean(JobDefinitionRegistry.class);
for (Map.Entry next : batchJobs.entrySet()) {
- ourLog.info("Registering Batch2 Job Definition: {} / {}", next.getValue().getJobDefinitionId(), next.getValue().getJobDefinitionVersion());
- jobRegistry.addJobDefinition(next.getValue());
+ JobDefinition> jobDefinition = next.getValue();
+ ourLog.info("Registering Batch2 Job Definition: {} / {}", jobDefinition.getJobDefinitionId(), jobDefinition.getJobDefinitionVersion());
+ jobRegistry.addJobDefinition(jobDefinition);
}
}
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BaseDataSink.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/BaseDataSink.java
similarity index 98%
rename from hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BaseDataSink.java
rename to hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/BaseDataSink.java
index 19048ca0214..dadbc038c1e 100644
--- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BaseDataSink.java
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/BaseDataSink.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
/*-
* #%L
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BatchWorkChunk.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/BatchWorkChunk.java
similarity index 98%
rename from hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BatchWorkChunk.java
rename to hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/BatchWorkChunk.java
index 565073dadc7..b0aea096463 100644
--- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BatchWorkChunk.java
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/BatchWorkChunk.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
/*-
* #%L
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/FinalStepDataSink.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/FinalStepDataSink.java
similarity index 97%
rename from hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/FinalStepDataSink.java
rename to hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/FinalStepDataSink.java
index aaec803de92..2696c719a49 100644
--- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/FinalStepDataSink.java
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/FinalStepDataSink.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
/*-
* #%L
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java
similarity index 98%
rename from hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImpl.java
rename to hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java
index 28781e7e357..d0494c6e6c8 100644
--- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImpl.java
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
/*-
* #%L
@@ -22,6 +22,7 @@ package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPersistence;
+import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobDataSink.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobDataSink.java
similarity index 80%
rename from hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobDataSink.java
rename to hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobDataSink.java
index a39b176500a..16b16ab88f1 100644
--- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobDataSink.java
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobDataSink.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
/*-
* #%L
@@ -21,16 +21,15 @@ package ca.uhn.fhir.batch2.impl;
*/
import ca.uhn.fhir.batch2.api.IJobPersistence;
-import ca.uhn.fhir.batch2.model.JobDefinition;
-import ca.uhn.fhir.batch2.model.JobDefinitionStep;
-import ca.uhn.fhir.batch2.model.JobWorkCursor;
-import ca.uhn.fhir.batch2.model.JobWorkNotification;
-import ca.uhn.fhir.batch2.model.WorkChunkData;
+import ca.uhn.fhir.batch2.channel.BatchJobSender;
+import ca.uhn.fhir.batch2.model.*;
+import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.JsonUtil;
import javax.annotation.Nonnull;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
class JobDataSink extends BaseDataSink {
private final BatchJobSender myBatchJobSender;
@@ -39,6 +38,7 @@ class JobDataSink myTargetStep;
private final AtomicInteger myChunkCounter = new AtomicInteger(0);
+ private final AtomicReference myLastChunkId = new AtomicReference<>();
private final boolean myGatedExecution;
JobDataSink(@Nonnull BatchJobSender theBatchJobSender, @Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinition> theDefinition, @Nonnull String theInstanceId, @Nonnull JobWorkCursor theJobWorkCursor) {
@@ -61,6 +61,7 @@ class JobDataSink jobDefinition = getJobDefinitionOrThrowException(theInstance.getJobDefinitionId(), theInstance.getJobDefinitionVersion());
+ theInstance.setJobDefinition(jobDefinition);
+ }
}
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobParameterJsonValidator.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobParameterJsonValidator.java
similarity index 98%
rename from hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobParameterJsonValidator.java
rename to hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobParameterJsonValidator.java
index 388b29f088c..b28763748e9 100644
--- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobParameterJsonValidator.java
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobParameterJsonValidator.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
/*-
* #%L
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobQuerySvc.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobQuerySvc.java
similarity index 98%
rename from hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobQuerySvc.java
rename to hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobQuerySvc.java
index f428bc0e72f..017539daaee 100644
--- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobQuerySvc.java
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobQuerySvc.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
/*-
* #%L
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobStepExecutor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobStepExecutor.java
similarity index 70%
rename from hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobStepExecutor.java
rename to hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobStepExecutor.java
index 241f01743cc..3e4245bb3ef 100644
--- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobStepExecutor.java
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobStepExecutor.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
/*-
* #%L
@@ -27,10 +27,13 @@ import ca.uhn.fhir.batch2.api.JobStepFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
+import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
+import ca.uhn.fhir.batch2.model.JobWorkNotification;
+import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson;
@@ -41,6 +44,8 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.util.Optional;
+import static ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor.updateInstanceStatus;
+
public class JobStepExecutor {
private static final Logger ourLog = LoggerFactory.getLogger(JobStepExecutor.class);
@@ -83,22 +88,57 @@ public class JobStepExecutor oInstance = myJobPersistence.fetchInstance(myInstanceId);
+ private void handleGatedExecution(BaseDataSink theDataSink) {
+ JobInstance jobInstance = initializeGatedExecutionIfRequired(theDataSink);
- if (oInstance.isPresent()) {
- JobInstance instance = oInstance.get();
- instance.setCurrentGatedStepId(myCursor.getCurrentStepId());
- myJobPersistence.updateInstance(instance);
+ if (eligibleForFastTracking(theDataSink, jobInstance)) {
+ ourLog.info("Gated job {} step {} produced at most one chunk: Fast tracking execution.", myDefinition.getJobDefinitionId(), myCursor.currentStep.getStepId());
+ // This job is defined to be gated, but so far every step has produced at most 1 work chunk, so it is
+ // eligible for fast tracking.
+ if (myCursor.isFinalStep()) {
+ if (updateInstanceStatus(jobInstance, StatusEnum.COMPLETED)) {
+ myJobPersistence.updateInstance(jobInstance);
+ }
+ } else {
+ JobWorkNotification workNotification = new JobWorkNotification(jobInstance, myCursor.nextStep.getStepId(), ((JobDataSink) theDataSink).getOnlyChunkId());
+ myBatchJobSender.sendWorkChannelMessage(workNotification);
+ }
}
}
+ private boolean eligibleForFastTracking(BaseDataSink theDataSink, JobInstance theJobInstance) {
+ return theJobInstance != null &&
+ !theJobInstance.hasGatedStep() &&
+ theDataSink.getWorkChunkCount() <= 1;
+ }
+
+ private JobInstance initializeGatedExecutionIfRequired(BaseDataSink theDataSink) {
+ Optional oJobInstance = myJobPersistence.fetchInstance(myInstanceId);
+ if (oJobInstance.isEmpty()) {
+ return null;
+ }
+
+ JobInstance jobInstance = oJobInstance.get();
+ if (jobInstance.hasGatedStep()) {
+ // Gated execution is already initialized
+ return jobInstance;
+ }
+
+ if (theDataSink.getWorkChunkCount() <= 1) {
+ // Do not initialize gated execution for steps that produced only one chunk
+ return jobInstance;
+ }
+
+ jobInstance.setCurrentGatedStepId(myCursor.getCurrentStepId());
+ myJobPersistence.updateInstance(jobInstance);
+ return jobInstance;
+ }
+
private boolean executeStep(String theJobDefinitionId, @Nonnull WorkChunk theWorkChunk, PT theParameters, BaseDataSink theDataSink) {
JobDefinitionStep theTargetStep = theDataSink.getTargetStep();
String targetStepId = theTargetStep.getStepId();
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobStepExecutorFactory.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobStepExecutorFactory.java
similarity index 94%
rename from hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobStepExecutorFactory.java
rename to hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobStepExecutorFactory.java
index b8c648900d4..ecc6c96845d 100644
--- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobStepExecutorFactory.java
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobStepExecutorFactory.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
/*-
* #%L
@@ -21,6 +21,7 @@ package ca.uhn.fhir.batch2.impl;
*/
import ca.uhn.fhir.batch2.api.IJobPersistence;
+import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.WorkChunk;
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/SynchronizedJobPersistenceWrapper.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/SynchronizedJobPersistenceWrapper.java
similarity index 98%
rename from hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/SynchronizedJobPersistenceWrapper.java
rename to hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/SynchronizedJobPersistenceWrapper.java
index 99cc71af692..13ea1030db2 100644
--- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/SynchronizedJobPersistenceWrapper.java
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/SynchronizedJobPersistenceWrapper.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
/*-
* #%L
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/WorkChannelMessageHandler.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandler.java
similarity index 95%
rename from hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/WorkChannelMessageHandler.java
rename to hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandler.java
index 24b2824059d..5ce1ea34892 100644
--- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/WorkChannelMessageHandler.java
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandler.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
/*-
* #%L
@@ -22,6 +22,7 @@ package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.api.IJobPersistence;
+import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
@@ -95,6 +96,6 @@ class WorkChannelMessageHandler implements MessageHandler {
JobDefinition> definition = myJobDefinitionRegistry.getJobDefinitionOrThrowException(jobDefinitionId, jobDefinitionVersion);
- return JobWorkCursor.fromJobDefinitionAndWorkNotification(definition, workNotification);
+ return JobWorkCursor.fromJobDefinitionAndRequestedStepId(definition, workNotification.getTargetStepId());
}
}
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobMaintenanceServiceImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobMaintenanceServiceImpl.java
deleted file mode 100644
index a7edd6ac2c8..00000000000
--- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobMaintenanceServiceImpl.java
+++ /dev/null
@@ -1,435 +0,0 @@
-package ca.uhn.fhir.batch2.impl;
-
-/*-
- * #%L
- * HAPI FHIR JPA Server - Batch2 Task Processor
- * %%
- * Copyright (C) 2014 - 2022 Smile CDR, Inc.
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
-import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
-import ca.uhn.fhir.batch2.api.IJobPersistence;
-import ca.uhn.fhir.batch2.api.JobCompletionDetails;
-import ca.uhn.fhir.batch2.model.JobDefinition;
-import ca.uhn.fhir.batch2.model.JobInstance;
-import ca.uhn.fhir.batch2.model.JobWorkNotification;
-import ca.uhn.fhir.batch2.model.StatusEnum;
-import ca.uhn.fhir.batch2.model.WorkChunk;
-import ca.uhn.fhir.jpa.model.sched.HapiJob;
-import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
-import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
-import ca.uhn.fhir.model.api.IModelJson;
-import ca.uhn.fhir.util.StopWatch;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.commons.lang3.Validate;
-import org.apache.commons.lang3.time.DateUtils;
-import org.quartz.JobExecutionContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import javax.annotation.Nonnull;
-import javax.annotation.PostConstruct;
-import java.util.Collection;
-import java.util.Date;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static java.util.Collections.emptyList;
-import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
-import static org.apache.commons.lang3.StringUtils.isBlank;
-
-/**
- * This class performs regular polls of the stored jobs in order to
- * perform maintenance. This includes two major functions.
- *
- *
- * First, we calculate statistics and delete expired tasks. This class does
- * the following things:
- *
- * - For instances that are IN_PROGRESS, calculates throughput and percent complete
- * - For instances that are IN_PROGRESS where all chunks are COMPLETE, marks instance as COMPLETE
- * - For instances that are COMPLETE, purges chunk data
- * - For instances that are IN_PROGRESS where at least one chunk is FAILED, marks instance as FAILED and propagates the error message to the instance, and purges chunk data
- * - For instances that are IN_PROGRESS with an error message set where no chunks are ERRORED or FAILED, clears the error message in the instance (meaning presumably there was an error but it cleared)
- * - For instances that are IN_PROGRESS and isCancelled flag is set marks them as ERRORED and indicating the current running step if any
- * - For instances that are COMPLETE or FAILED and are old, delete them entirely
- *
- *
- *
- *
- * Second, we check for any job instances where the job is configured to
- * have gated execution. For these instances, we check if the current step
- * is complete (all chunks are in COMPLETE status) and trigger the next step.
- *
- */
-public class JobMaintenanceServiceImpl implements IJobMaintenanceService {
-
- public static final int INSTANCES_PER_PASS = 100;
- public static final long PURGE_THRESHOLD = 7L * DateUtils.MILLIS_PER_DAY;
- private static final Logger ourLog = LoggerFactory.getLogger(JobMaintenanceServiceImpl.class);
- private final IJobPersistence myJobPersistence;
- private final ISchedulerService mySchedulerService;
- private final JobDefinitionRegistry myJobDefinitionRegistry;
- private final BatchJobSender myBatchJobSender;
-
-
- /**
- * Constructor
- */
- public JobMaintenanceServiceImpl(@Nonnull ISchedulerService theSchedulerService, @Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinitionRegistry theJobDefinitionRegistry, @Nonnull BatchJobSender theBatchJobSender) {
- Validate.notNull(theSchedulerService);
- Validate.notNull(theJobPersistence);
- Validate.notNull(theJobDefinitionRegistry);
- Validate.notNull(theBatchJobSender);
-
- myJobPersistence = theJobPersistence;
- mySchedulerService = theSchedulerService;
- myJobDefinitionRegistry = theJobDefinitionRegistry;
- myBatchJobSender = theBatchJobSender;
- }
-
- @PostConstruct
- public void start() {
- ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition();
- jobDefinition.setId(JobMaintenanceScheduledJob.class.getName());
- jobDefinition.setJobClass(JobMaintenanceScheduledJob.class);
- mySchedulerService.scheduleClusteredJob(DateUtils.MILLIS_PER_MINUTE, jobDefinition);
- }
-
- @Override
- public void runMaintenancePass() {
-
- // NB: If you add any new logic, update the class javadoc
-
- Set processedInstanceIds = new HashSet<>();
- JobChunkProgressAccumulator progressAccumulator = new JobChunkProgressAccumulator();
- for (int page = 0; ; page++) {
- List instances = myJobPersistence.fetchInstances(INSTANCES_PER_PASS, page);
-
- for (JobInstance instance : instances) {
- if (processedInstanceIds.add(instance.getInstanceId())) {
- handleCancellation(instance);
- cleanupInstance(instance, progressAccumulator);
- triggerGatedExecutions(instance, progressAccumulator);
- }
- }
-
- if (instances.size() < INSTANCES_PER_PASS) {
- break;
- }
- }
- }
-
- private void handleCancellation(JobInstance theInstance) {
- if (! theInstance.isCancelled()) { return; }
-
- if (theInstance.getStatus() == StatusEnum.QUEUED || theInstance.getStatus() == StatusEnum.IN_PROGRESS) {
- String msg = "Job instance cancelled";
- if (theInstance.getCurrentGatedStepId() != null) {
- msg += " while running step " + theInstance.getCurrentGatedStepId();
- }
- theInstance.setErrorMessage(msg);
- theInstance.setStatus(StatusEnum.CANCELLED);
- myJobPersistence.updateInstance(theInstance);
- }
-
- }
-
- private void cleanupInstance(JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) {
- switch (theInstance.getStatus()) {
- case QUEUED:
- break;
- case IN_PROGRESS:
- case ERRORED:
- calculateInstanceProgress(theInstance, theProgressAccumulator);
- break;
- case COMPLETED:
- case FAILED:
- case CANCELLED:
- if (theInstance.getEndTime() != null) {
- long cutoff = System.currentTimeMillis() - PURGE_THRESHOLD;
- if (theInstance.getEndTime().getTime() < cutoff) {
- ourLog.info("Deleting old job instance {}", theInstance.getInstanceId());
- myJobPersistence.deleteInstanceAndChunks(theInstance.getInstanceId());
- return;
- }
- }
- break;
- }
-
- if ((theInstance.getStatus() == StatusEnum.COMPLETED || theInstance.getStatus() == StatusEnum.FAILED
- || theInstance.getStatus() == StatusEnum.CANCELLED) && !theInstance.isWorkChunksPurged()) {
- theInstance.setWorkChunksPurged(true);
- myJobPersistence.deleteChunks(theInstance.getInstanceId());
- myJobPersistence.updateInstance(theInstance);
- }
-
- }
-
- private void calculateInstanceProgress(JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) {
- int resourcesProcessed = 0;
- int incompleteChunkCount = 0;
- int completeChunkCount = 0;
- int erroredChunkCount = 0;
- int failedChunkCount = 0;
- int errorCountForAllStatuses = 0;
- Long earliestStartTime = null;
- Long latestEndTime = null;
- String errorMessage = null;
- for (int page = 0; ; page++) {
- List chunks = myJobPersistence.fetchWorkChunksWithoutData(theInstance.getInstanceId(), INSTANCES_PER_PASS, page);
-
- for (WorkChunk chunk : chunks) {
- theProgressAccumulator.addChunk(chunk.getInstanceId(), chunk.getId(), chunk.getTargetStepId(), chunk.getStatus());
-
- errorCountForAllStatuses += chunk.getErrorCount();
-
- if (chunk.getRecordsProcessed() != null) {
- resourcesProcessed += chunk.getRecordsProcessed();
- }
- if (chunk.getStartTime() != null) {
- if (earliestStartTime == null || earliestStartTime > chunk.getStartTime().getTime()) {
- earliestStartTime = chunk.getStartTime().getTime();
- }
- }
- if (chunk.getEndTime() != null) {
- if (latestEndTime == null || latestEndTime < chunk.getEndTime().getTime()) {
- latestEndTime = chunk.getEndTime().getTime();
- }
- }
- switch (chunk.getStatus()) {
- case QUEUED:
- case IN_PROGRESS:
- incompleteChunkCount++;
- break;
- case COMPLETED:
- completeChunkCount++;
- break;
- case ERRORED:
- erroredChunkCount++;
- if (errorMessage == null) {
- errorMessage = chunk.getErrorMessage();
- }
- break;
- case FAILED:
- failedChunkCount++;
- errorMessage = chunk.getErrorMessage();
- break;
- case CANCELLED:
- break;
- }
- }
-
- if (chunks.size() < INSTANCES_PER_PASS) {
- break;
- }
- }
-
- if (earliestStartTime != null) {
- theInstance.setStartTime(new Date(earliestStartTime));
- }
- theInstance.setErrorCount(errorCountForAllStatuses);
- theInstance.setCombinedRecordsProcessed(resourcesProcessed);
-
- boolean changedStatus = false;
- if (completeChunkCount > 1 || erroredChunkCount > 1) {
-
- double percentComplete = (double) (completeChunkCount) / (double) (incompleteChunkCount + completeChunkCount + failedChunkCount + erroredChunkCount);
- theInstance.setProgress(percentComplete);
-
- if (incompleteChunkCount == 0 && erroredChunkCount == 0 && failedChunkCount == 0) {
- boolean completed = updateInstanceStatus(theInstance, StatusEnum.COMPLETED);
- if (completed) {
- JobDefinition> definition = myJobDefinitionRegistry.getJobDefinition(theInstance.getJobDefinitionId(), theInstance.getJobDefinitionVersion()).orElseThrow(() -> new IllegalStateException("Unknown job " + theInstance.getJobDefinitionId() + "/" + theInstance.getJobDefinitionVersion()));
- invokeJobCompletionHandler(theInstance, definition);
- }
- changedStatus = completed;
- } else if (erroredChunkCount > 0) {
- changedStatus = updateInstanceStatus(theInstance, StatusEnum.ERRORED);
- }
-
- if (earliestStartTime != null && latestEndTime != null) {
- long elapsedTime = latestEndTime - earliestStartTime;
- if (elapsedTime > 0) {
- double throughput = StopWatch.getThroughput(resourcesProcessed, elapsedTime, TimeUnit.SECONDS);
- theInstance.setCombinedRecordsProcessedPerSecond(throughput);
-
- String estimatedTimeRemaining = StopWatch.formatEstimatedTimeRemaining(completeChunkCount, (completeChunkCount + incompleteChunkCount), elapsedTime);
- theInstance.setEstimatedTimeRemaining(estimatedTimeRemaining);
- }
- }
-
- }
-
- if (latestEndTime != null) {
- if (failedChunkCount > 0) {
- theInstance.setEndTime(new Date(latestEndTime));
- } else if (completeChunkCount > 0 && incompleteChunkCount == 0 && erroredChunkCount == 0) {
- theInstance.setEndTime(new Date(latestEndTime));
- }
- }
-
- theInstance.setErrorMessage(errorMessage);
-
- if (changedStatus || theInstance.getStatus() == StatusEnum.IN_PROGRESS) {
- ourLog.info("Job {} of type {} has status {} - {} records processed ({}/sec) - ETA: {}", theInstance.getInstanceId(), theInstance.getJobDefinitionId(), theInstance.getStatus(), theInstance.getCombinedRecordsProcessed(), theInstance.getCombinedRecordsProcessedPerSecond(), theInstance.getEstimatedTimeRemaining());
- }
-
- if (failedChunkCount > 0) {
- updateInstanceStatus(theInstance, StatusEnum.FAILED);
- myJobPersistence.updateInstance(theInstance);
- return;
- }
-
- if ((incompleteChunkCount + completeChunkCount + erroredChunkCount) >= 2 || errorCountForAllStatuses > 0) {
- myJobPersistence.updateInstance(theInstance);
- }
-
- }
-
- private void invokeJobCompletionHandler(JobInstance theInstance, JobDefinition definition) {
- IJobCompletionHandler completionHandler = definition.getCompletionHandler();
- if (completionHandler != null) {
-
- String instanceId = theInstance.getInstanceId();
- PT jobParameters = theInstance.getParameters(definition.getParametersType());
- JobCompletionDetails completionDetails = new JobCompletionDetails<>(jobParameters, instanceId);
- completionHandler.jobComplete(completionDetails);
- }
- }
-
- private boolean updateInstanceStatus(JobInstance theInstance, StatusEnum newStatus) {
- if (theInstance.getStatus() != newStatus) {
- ourLog.info("Marking job instance {} of type {} as {}", theInstance.getInstanceId(), theInstance.getJobDefinitionId(), newStatus);
- theInstance.setStatus(newStatus);
- return true;
- }
- return false;
- }
-
- private void triggerGatedExecutions(JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) {
- if (!theInstance.isRunning()) {
- return;
- }
-
- String jobDefinitionId = theInstance.getJobDefinitionId();
- int jobDefinitionVersion = theInstance.getJobDefinitionVersion();
- String instanceId = theInstance.getInstanceId();
-
- JobDefinition> definition = myJobDefinitionRegistry.getJobDefinition(jobDefinitionId, jobDefinitionVersion).orElseThrow(() -> new IllegalStateException("Unknown job definition: " + jobDefinitionId + " " + jobDefinitionVersion));
- if (!definition.isGatedExecution()) {
- return;
- }
-
- String currentStepId = theInstance.getCurrentGatedStepId();
- if (isBlank(currentStepId)) {
- return;
- }
-
- if (definition.isLastStep(currentStepId)) {
- return;
- }
-
- int incompleteChunks = theProgressAccumulator.countChunksWithStatus(instanceId, currentStepId, StatusEnum.getIncompleteStatuses());
- if (incompleteChunks == 0) {
-
- int currentStepIndex = definition.getStepIndex(currentStepId);
- String nextStepId = definition.getSteps().get(currentStepIndex + 1).getStepId();
-
- ourLog.info("All processing is complete for gated execution of instance {} step {}. Proceeding to step {}", instanceId, currentStepId, nextStepId);
- List chunksForNextStep = theProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, EnumSet.of(StatusEnum.QUEUED));
- for (String nextChunkId : chunksForNextStep) {
- JobWorkNotification workNotification = new JobWorkNotification(jobDefinitionId, jobDefinitionVersion, instanceId, nextStepId, nextChunkId);
- myBatchJobSender.sendWorkChannelMessage(workNotification);
- }
-
- theInstance.setCurrentGatedStepId(nextStepId);
- myJobPersistence.updateInstance(theInstance);
- }
-
- }
-
-
- public static class JobMaintenanceScheduledJob implements HapiJob {
- @Autowired
- private IJobMaintenanceService myTarget;
-
- @Override
- public void execute(JobExecutionContext theContext) {
- myTarget.runMaintenancePass();
- }
- }
-
-
- /**
- * While performing cleanup, the cleanup job loads all of the known
- * work chunks to examine their status. This bean collects the counts that
- * are found, so that they can be reused for maintenance jobs without
- * needing to hit the database a second time.
- */
- private static class JobChunkProgressAccumulator {
-
- private final Set myConsumedInstanceAndChunkIds = new HashSet<>();
- private final Multimap myInstanceIdToChunkStatuses = ArrayListMultimap.create();
-
- public void addChunk(String theInstanceId, String theChunkId, String theStepId, StatusEnum theStatus) {
- // Note: If chunks are being written while we're executing, we may see the same chunk twice. This
- // check avoids adding it twice.
- if (myConsumedInstanceAndChunkIds.add(theInstanceId + " " + theChunkId)) {
- myInstanceIdToChunkStatuses.put(theInstanceId, new ChunkStatusCountKey(theChunkId, theStepId, theStatus));
- }
- }
-
- public int countChunksWithStatus(String theInstanceId, String theStepId, Set theStatuses) {
- return getChunkIdsWithStatus(theInstanceId, theStepId, theStatuses).size();
- }
-
- public List getChunkIdsWithStatus(String theInstanceId, String theStepId, Set theStatuses) {
- return getChunkStatuses(theInstanceId).stream().filter(t -> t.myStepId.equals(theStepId)).filter(t -> theStatuses.contains(t.myStatus)).map(t -> t.myChunkId).collect(Collectors.toList());
- }
-
- @Nonnull
- private Collection getChunkStatuses(String theInstanceId) {
- Collection chunkStatuses = myInstanceIdToChunkStatuses.get(theInstanceId);
- chunkStatuses = defaultIfNull(chunkStatuses, emptyList());
- return chunkStatuses;
- }
-
-
- private static class ChunkStatusCountKey {
- public final String myChunkId;
- public final String myStepId;
- public final StatusEnum myStatus;
-
- private ChunkStatusCountKey(String theChunkId, String theStepId, StatusEnum theStatus) {
- myChunkId = theChunkId;
- myStepId = theStepId;
- myStatus = theStatus;
- }
- }
-
-
- }
-
-
-}
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/InstanceProgress.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/InstanceProgress.java
new file mode 100644
index 00000000000..7c749c6699c
--- /dev/null
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/InstanceProgress.java
@@ -0,0 +1,168 @@
+package ca.uhn.fhir.batch2.maintenance;
+
+import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
+import ca.uhn.fhir.batch2.api.JobCompletionDetails;
+import ca.uhn.fhir.batch2.model.JobDefinition;
+import ca.uhn.fhir.batch2.model.JobInstance;
+import ca.uhn.fhir.batch2.model.StatusEnum;
+import ca.uhn.fhir.batch2.model.WorkChunk;
+import ca.uhn.fhir.model.api.IModelJson;
+import ca.uhn.fhir.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import static ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor.updateInstanceStatus;
+
+class InstanceProgress {
+ private static final Logger ourLog = LoggerFactory.getLogger(InstanceProgress.class);
+
+ private int myRecordsProcessed = 0;
+ private int myIncompleteChunkCount = 0;
+ private int myCompleteChunkCount = 0;
+ private int myErroredChunkCount = 0;
+ private int myFailedChunkCount = 0;
+ private int myErrorCountForAllStatuses = 0;
+ private Long myEarliestStartTime = null;
+ private Long myLatestEndTime = null;
+ private String myErrormessage = null;
+
+ public void addChunk(WorkChunk theChunk) {
+ myErrorCountForAllStatuses += theChunk.getErrorCount();
+
+ updateRecordsProcessed(theChunk);
+ updateEarliestTime(theChunk);
+ updateLatestEndTime(theChunk);
+ updateCompletionStatus(theChunk);
+ }
+
+ private void updateCompletionStatus(WorkChunk theChunk) {
+ switch (theChunk.getStatus()) {
+ case QUEUED:
+ case IN_PROGRESS:
+ myIncompleteChunkCount++;
+ break;
+ case COMPLETED:
+ myCompleteChunkCount++;
+ break;
+ case ERRORED:
+ myErroredChunkCount++;
+ if (myErrormessage == null) {
+ myErrormessage = theChunk.getErrorMessage();
+ }
+ break;
+ case FAILED:
+ myFailedChunkCount++;
+ myErrormessage = theChunk.getErrorMessage();
+ break;
+ case CANCELLED:
+ break;
+ }
+ }
+
+ private void updateLatestEndTime(WorkChunk theChunk) {
+ if (theChunk.getEndTime() != null) {
+ if (myLatestEndTime == null || myLatestEndTime < theChunk.getEndTime().getTime()) {
+ myLatestEndTime = theChunk.getEndTime().getTime();
+ }
+ }
+ }
+
+ private void updateEarliestTime(WorkChunk theChunk) {
+ if (theChunk.getStartTime() != null) {
+ if (myEarliestStartTime == null || myEarliestStartTime > theChunk.getStartTime().getTime()) {
+ myEarliestStartTime = theChunk.getStartTime().getTime();
+ }
+ }
+ }
+
+ private void updateRecordsProcessed(WorkChunk theChunk) {
+ if (theChunk.getRecordsProcessed() != null) {
+ myRecordsProcessed += theChunk.getRecordsProcessed();
+ }
+ }
+
+ public void updateInstance(JobInstance theInstance) {
+ if (myEarliestStartTime != null) {
+ theInstance.setStartTime(new Date(myEarliestStartTime));
+ }
+ theInstance.setErrorCount(myErrorCountForAllStatuses);
+ theInstance.setCombinedRecordsProcessed(myRecordsProcessed);
+
+ boolean changedStatus = updateStatus(theInstance);
+
+ setEndTime(theInstance);
+
+ theInstance.setErrorMessage(myErrormessage);
+
+ if (changedStatus || theInstance.getStatus() == StatusEnum.IN_PROGRESS) {
+ ourLog.info("Job {} of type {} has status {} - {} records processed ({}/sec) - ETA: {}", theInstance.getInstanceId(), theInstance.getJobDefinitionId(), theInstance.getStatus(), theInstance.getCombinedRecordsProcessed(), theInstance.getCombinedRecordsProcessedPerSecond(), theInstance.getEstimatedTimeRemaining());
+ }
+ }
+
+ private void setEndTime(JobInstance theInstance) {
+ if (myLatestEndTime != null) {
+ if (myFailedChunkCount > 0) {
+ theInstance.setEndTime(new Date(myLatestEndTime));
+ } else if (myCompleteChunkCount > 0 && myIncompleteChunkCount == 0 && myErroredChunkCount == 0) {
+ theInstance.setEndTime(new Date(myLatestEndTime));
+ }
+ }
+ }
+
+ private boolean updateStatus(JobInstance theInstance) {
+ boolean changedStatus = false;
+ if (myCompleteChunkCount > 1 || myErroredChunkCount > 1) {
+
+ double percentComplete = (double) (myCompleteChunkCount) / (double) (myIncompleteChunkCount + myCompleteChunkCount + myFailedChunkCount + myErroredChunkCount);
+ theInstance.setProgress(percentComplete);
+
+ if (jobSuccessfullyCompleted()) {
+ boolean completed = updateInstanceStatus(theInstance, StatusEnum.COMPLETED);
+ if (completed) {
+ invokeJobCompletionHandler(theInstance);
+ }
+ changedStatus = completed;
+ } else if (myErroredChunkCount > 0) {
+ changedStatus = updateInstanceStatus(theInstance, StatusEnum.ERRORED);
+ }
+
+ if (myEarliestStartTime != null && myLatestEndTime != null) {
+ long elapsedTime = myLatestEndTime - myEarliestStartTime;
+ if (elapsedTime > 0) {
+ double throughput = StopWatch.getThroughput(myRecordsProcessed, elapsedTime, TimeUnit.SECONDS);
+ theInstance.setCombinedRecordsProcessedPerSecond(throughput);
+
+ String estimatedTimeRemaining = StopWatch.formatEstimatedTimeRemaining(myCompleteChunkCount, (myCompleteChunkCount + myIncompleteChunkCount), elapsedTime);
+ theInstance.setEstimatedTimeRemaining(estimatedTimeRemaining);
+ }
+ }
+ }
+ return changedStatus;
+ }
+
+ private boolean jobSuccessfullyCompleted() {
+ return myIncompleteChunkCount == 0 && myErroredChunkCount == 0 && myFailedChunkCount == 0;
+ }
+
+ private void invokeJobCompletionHandler(JobInstance myInstance) {
+ JobDefinition definition = (JobDefinition) myInstance.getJobDefinition();
+ IJobCompletionHandler completionHandler = definition.getCompletionHandler();
+ if (completionHandler != null) {
+ String instanceId = myInstance.getInstanceId();
+ PT jobParameters = myInstance.getParameters(definition.getParametersType());
+ JobCompletionDetails completionDetails = new JobCompletionDetails<>(jobParameters, instanceId);
+ completionHandler.jobComplete(completionDetails);
+ }
+ }
+
+ public boolean failed() {
+ return myFailedChunkCount > 0;
+ }
+
+ public boolean changed() {
+ return (myIncompleteChunkCount + myCompleteChunkCount + myErroredChunkCount) >= 2 || myErrorCountForAllStatuses > 0;
+ }
+}
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobChunkProgressAccumulator.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobChunkProgressAccumulator.java
new file mode 100644
index 00000000000..38f5e985e47
--- /dev/null
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobChunkProgressAccumulator.java
@@ -0,0 +1,68 @@
+package ca.uhn.fhir.batch2.maintenance;
+
+
+import ca.uhn.fhir.batch2.model.StatusEnum;
+import ca.uhn.fhir.batch2.model.WorkChunk;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+import javax.annotation.Nonnull;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyList;
+import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
+
+/**
+ * While performing cleanup, the cleanup job loads all of the known
+ * work chunks to examine their status. This bean collects the counts that
+ * are found, so that they can be reused for maintenance jobs without
+ * needing to hit the database a second time.
+ */
+class JobChunkProgressAccumulator {
+
+ private final Set myConsumedInstanceAndChunkIds = new HashSet<>();
+ private final Multimap myInstanceIdToChunkStatuses = ArrayListMultimap.create();
+
+ int countChunksWithStatus(String theInstanceId, String theStepId, Set theStatuses) {
+ return getChunkIdsWithStatus(theInstanceId, theStepId, theStatuses).size();
+ }
+
+ List getChunkIdsWithStatus(String theInstanceId, String theStepId, Set theStatuses) {
+ return getChunkStatuses(theInstanceId).stream().filter(t -> t.myStepId.equals(theStepId)).filter(t -> theStatuses.contains(t.myStatus)).map(t -> t.myChunkId).collect(Collectors.toList());
+ }
+
+ @Nonnull
+ private Collection getChunkStatuses(String theInstanceId) {
+ Collection chunkStatuses = myInstanceIdToChunkStatuses.get(theInstanceId);
+ chunkStatuses = defaultIfNull(chunkStatuses, emptyList());
+ return chunkStatuses;
+ }
+
+ public void addChunk(WorkChunk theChunk) {
+ String instanceId = theChunk.getInstanceId();
+ String chunkId = theChunk.getId();
+ // Note: If chunks are being written while we're executing, we may see the same chunk twice. This
+ // check avoids adding it twice.
+ if (myConsumedInstanceAndChunkIds.add(instanceId + " " + chunkId)) {
+ myInstanceIdToChunkStatuses.put(instanceId, new ChunkStatusCountKey(chunkId, theChunk.getTargetStepId(), theChunk.getStatus()));
+ }
+ }
+
+ private static class ChunkStatusCountKey {
+ public final String myChunkId;
+ public final String myStepId;
+ public final StatusEnum myStatus;
+
+ private ChunkStatusCountKey(String theChunkId, String theStepId, StatusEnum theStatus) {
+ myChunkId = theChunkId;
+ myStepId = theStepId;
+ myStatus = theStatus;
+ }
+ }
+
+
+}
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java
new file mode 100644
index 00000000000..840459d664e
--- /dev/null
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java
@@ -0,0 +1,137 @@
+package ca.uhn.fhir.batch2.maintenance;
+
+import ca.uhn.fhir.batch2.api.IJobPersistence;
+import ca.uhn.fhir.batch2.channel.BatchJobSender;
+import ca.uhn.fhir.batch2.model.JobInstance;
+import ca.uhn.fhir.batch2.model.JobWorkCursor;
+import ca.uhn.fhir.batch2.model.JobWorkNotification;
+import ca.uhn.fhir.batch2.model.StatusEnum;
+import org.apache.commons.lang3.time.DateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.EnumSet;
+import java.util.List;
+
+public class JobInstanceProcessor {
+ private static final Logger ourLog = LoggerFactory.getLogger(JobInstanceProcessor.class);
+ public static final long PURGE_THRESHOLD = 7L * DateUtils.MILLIS_PER_DAY;
+
+ private final IJobPersistence myJobPersistence;
+ private final BatchJobSender myBatchJobSender;
+
+ private final JobInstance myInstance;
+ private final JobChunkProgressAccumulator myProgressAccumulator;
+ private final JobInstanceProgressCalculator myJobInstanceProgressCalculator;
+
+ JobInstanceProcessor(IJobPersistence theJobPersistence, BatchJobSender theBatchJobSender, JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) {
+ myJobPersistence = theJobPersistence;
+ myBatchJobSender = theBatchJobSender;
+ myInstance = theInstance;
+ myProgressAccumulator = theProgressAccumulator;
+ myJobInstanceProgressCalculator = new JobInstanceProgressCalculator(theJobPersistence, theInstance, theProgressAccumulator);
+ }
+
+ public void process() {
+ handleCancellation();
+ cleanupInstance();
+ triggerGatedExecutions();
+ }
+
+ private void handleCancellation() {
+ if (myInstance.isPendingCancellation()) {
+ myInstance.setErrorMessage(buildCancelledMessage());
+ myInstance.setStatus(StatusEnum.CANCELLED);
+ myJobPersistence.updateInstance(myInstance);
+ }
+ }
+
+ private String buildCancelledMessage() {
+ String msg = "Job instance cancelled";
+ if (myInstance.hasGatedStep()) {
+ msg += " while running step " + myInstance.getCurrentGatedStepId();
+ }
+ return msg;
+ }
+
+ private void cleanupInstance() {
+ switch (myInstance.getStatus()) {
+ case QUEUED:
+ break;
+ case IN_PROGRESS:
+ case ERRORED:
+ myJobInstanceProgressCalculator.calculateAndStoreInstanceProgress();
+ break;
+ case COMPLETED:
+ case FAILED:
+ case CANCELLED:
+ if (purgeExpiredInstance()) {
+ return;
+ }
+ break;
+ }
+
+ if (myInstance.isFinished() && !myInstance.isWorkChunksPurged()) {
+ myInstance.setWorkChunksPurged(true);
+ myJobPersistence.deleteChunks(myInstance.getInstanceId());
+ myJobPersistence.updateInstance(myInstance);
+ }
+ }
+
+ private boolean purgeExpiredInstance() {
+ if (myInstance.getEndTime() != null) {
+ long cutoff = System.currentTimeMillis() - PURGE_THRESHOLD;
+ if (myInstance.getEndTime().getTime() < cutoff) {
+ ourLog.info("Deleting old job instance {}", myInstance.getInstanceId());
+ myJobPersistence.deleteInstanceAndChunks(myInstance.getInstanceId());
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void triggerGatedExecutions() {
+ if (!myInstance.isRunning()) {
+ return;
+ }
+
+ if (!myInstance.hasGatedStep()) {
+ return;
+ }
+
+ JobWorkCursor,?,?> jobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(myInstance.getJobDefinition(), myInstance.getCurrentGatedStepId());
+
+ if (jobWorkCursor.isFinalStep()) {
+ return;
+ }
+
+ String instanceId = myInstance.getInstanceId();
+ String currentStepId = jobWorkCursor.getCurrentStepId();
+ int incompleteChunks = myProgressAccumulator.countChunksWithStatus(instanceId, currentStepId, StatusEnum.getIncompleteStatuses());
+ if (incompleteChunks == 0) {
+
+ String nextStepId = jobWorkCursor.nextStep.getStepId();
+
+ ourLog.info("All processing is complete for gated execution of instance {} step {}. Proceeding to step {}", instanceId, currentStepId, nextStepId);
+ List chunksForNextStep = myProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, EnumSet.of(StatusEnum.QUEUED));
+ for (String nextChunkId : chunksForNextStep) {
+ JobWorkNotification workNotification = new JobWorkNotification(myInstance, nextStepId, nextChunkId);
+ myBatchJobSender.sendWorkChannelMessage(workNotification);
+ }
+
+ myInstance.setCurrentGatedStepId(nextStepId);
+ myJobPersistence.updateInstance(myInstance);
+ }
+
+ }
+
+ public static boolean updateInstanceStatus(JobInstance myInstance, StatusEnum newStatus) {
+ if (myInstance.getStatus() != newStatus) {
+ ourLog.info("Marking job instance {} of type {} as {}", myInstance.getInstanceId(), myInstance.getJobDefinitionId(), newStatus);
+ myInstance.setStatus(newStatus);
+ return true;
+ }
+ return false;
+ }
+
+}
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProgressCalculator.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProgressCalculator.java
new file mode 100644
index 00000000000..0d9f22beaa3
--- /dev/null
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProgressCalculator.java
@@ -0,0 +1,51 @@
+package ca.uhn.fhir.batch2.maintenance;
+
+import ca.uhn.fhir.batch2.api.IJobPersistence;
+import ca.uhn.fhir.batch2.model.JobInstance;
+import ca.uhn.fhir.batch2.model.StatusEnum;
+import ca.uhn.fhir.batch2.model.WorkChunk;
+
+import java.util.List;
+
+import static ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor.updateInstanceStatus;
+
+class JobInstanceProgressCalculator {
+ private final IJobPersistence myJobPersistence;
+ private final JobInstance myInstance;
+ private final JobChunkProgressAccumulator myProgressAccumulator;
+
+ JobInstanceProgressCalculator(IJobPersistence theJobPersistence, JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) {
+ myJobPersistence = theJobPersistence;
+ myInstance = theInstance;
+ myProgressAccumulator = theProgressAccumulator;
+ }
+
+ void calculateAndStoreInstanceProgress() {
+ InstanceProgress instanceProgress = new InstanceProgress();
+
+ for (int page = 0; ; page++) {
+ List chunks = myJobPersistence.fetchWorkChunksWithoutData(myInstance.getInstanceId(), JobMaintenanceServiceImpl.INSTANCES_PER_PASS, page);
+
+ for (WorkChunk chunk : chunks) {
+ myProgressAccumulator.addChunk(chunk);
+ instanceProgress.addChunk(chunk);
+ }
+
+ if (chunks.size() < JobMaintenanceServiceImpl.INSTANCES_PER_PASS) {
+ break;
+ }
+ }
+
+ instanceProgress.updateInstance(myInstance);
+
+ if (instanceProgress.failed()) {
+ updateInstanceStatus(myInstance, StatusEnum.FAILED);
+ myJobPersistence.updateInstance(myInstance);
+ return;
+ }
+
+ if (instanceProgress.changed()) {
+ myJobPersistence.updateInstance(myInstance);
+ }
+ }
+}
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImpl.java
new file mode 100644
index 00000000000..09872b53890
--- /dev/null
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImpl.java
@@ -0,0 +1,133 @@
+package ca.uhn.fhir.batch2.maintenance;
+
+/*-
+ * #%L
+ * HAPI FHIR JPA Server - Batch2 Task Processor
+ * %%
+ * Copyright (C) 2014 - 2022 Smile CDR, Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
+import ca.uhn.fhir.batch2.api.IJobPersistence;
+import ca.uhn.fhir.batch2.channel.BatchJobSender;
+import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
+import ca.uhn.fhir.batch2.model.JobInstance;
+import ca.uhn.fhir.jpa.model.sched.HapiJob;
+import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
+import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
+import org.apache.commons.lang3.Validate;
+import org.apache.commons.lang3.time.DateUtils;
+import org.quartz.JobExecutionContext;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import javax.annotation.Nonnull;
+import javax.annotation.PostConstruct;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class performs regular polls of the stored jobs in order to
+ * perform maintenance. This includes two major functions.
+ *
+ *
+ * First, we calculate statistics and delete expired tasks. This class does
+ * the following things:
+ *
+ * - For instances that are IN_PROGRESS, calculates throughput and percent complete
+ * - For instances that are IN_PROGRESS where all chunks are COMPLETE, marks instance as COMPLETE
+ * - For instances that are COMPLETE, purges chunk data
+ * - For instances that are IN_PROGRESS where at least one chunk is FAILED, marks instance as FAILED and propagates the error message to the instance, and purges chunk data
+ * - For instances that are IN_PROGRESS with an error message set where no chunks are ERRORED or FAILED, clears the error message in the instance (meaning presumably there was an error but it cleared)
+ * - For instances that are IN_PROGRESS and isCancelled flag is set marks them as ERRORED and indicating the current running step if any
+ * - For instances that are COMPLETE or FAILED and are old, delete them entirely
+ *
+ *
+ *
+ *
+ * Second, we check for any job instances where the job is configured to
+ * have gated execution. For these instances, we check if the current step
+ * is complete (all chunks are in COMPLETE status) and trigger the next step.
+ *
+ */
+public class JobMaintenanceServiceImpl implements IJobMaintenanceService {
+
+ public static final int INSTANCES_PER_PASS = 100;
+
+ private final IJobPersistence myJobPersistence;
+ private final ISchedulerService mySchedulerService;
+ private final JobDefinitionRegistry myJobDefinitionRegistry;
+ private final BatchJobSender myBatchJobSender;
+
+
+ /**
+ * Constructor
+ */
+ public JobMaintenanceServiceImpl(@Nonnull ISchedulerService theSchedulerService, @Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinitionRegistry theJobDefinitionRegistry, @Nonnull BatchJobSender theBatchJobSender) {
+ Validate.notNull(theSchedulerService);
+ Validate.notNull(theJobPersistence);
+ Validate.notNull(theJobDefinitionRegistry);
+ Validate.notNull(theBatchJobSender);
+
+ myJobPersistence = theJobPersistence;
+ mySchedulerService = theSchedulerService;
+ myJobDefinitionRegistry = theJobDefinitionRegistry;
+ myBatchJobSender = theBatchJobSender;
+ }
+
+ @PostConstruct
+ public void start() {
+ ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition();
+ jobDefinition.setId(JobMaintenanceScheduledJob.class.getName());
+ jobDefinition.setJobClass(JobMaintenanceScheduledJob.class);
+ mySchedulerService.scheduleClusteredJob(DateUtils.MILLIS_PER_MINUTE, jobDefinition);
+ }
+
+ @Override
+ public void runMaintenancePass() {
+
+ // NB: If you add any new logic, update the class javadoc
+
+ Set processedInstanceIds = new HashSet<>();
+ JobChunkProgressAccumulator progressAccumulator = new JobChunkProgressAccumulator();
+ for (int page = 0; ; page++) {
+ List instances = myJobPersistence.fetchInstances(INSTANCES_PER_PASS, page);
+
+ for (JobInstance instance : instances) {
+ if (processedInstanceIds.add(instance.getInstanceId())) {
+ myJobDefinitionRegistry.setJobDefinition(instance);
+ JobInstanceProcessor jobInstanceProcessor = new JobInstanceProcessor(myJobPersistence, myBatchJobSender, instance, progressAccumulator);
+ jobInstanceProcessor.process();
+ }
+ }
+
+ if (instances.size() < INSTANCES_PER_PASS) {
+ break;
+ }
+ }
+ }
+
+ public static class JobMaintenanceScheduledJob implements HapiJob {
+ @Autowired
+ private IJobMaintenanceService myTarget;
+
+ @Override
+ public void execute(JobExecutionContext theContext) {
+ myTarget.runMaintenancePass();
+ }
+ }
+
+}
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobInstance.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobInstance.java
index 83ade549023..09adb83f15c 100644
--- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobInstance.java
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobInstance.java
@@ -23,6 +23,7 @@ package ca.uhn.fhir.batch2.model;
import ca.uhn.fhir.jpa.util.JsonDateDeserializer;
import ca.uhn.fhir.jpa.util.JsonDateSerializer;
import ca.uhn.fhir.model.api.IModelJson;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
@@ -31,6 +32,8 @@ import org.apache.commons.lang3.builder.ToStringStyle;
import java.util.Date;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
public class JobInstance extends JobInstanceStartRequest implements IModelJson {
@JsonProperty(value = "jobDefinitionVersion")
@@ -83,6 +86,9 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
@JsonProperty(value = "estimatedCompletion", access = JsonProperty.Access.READ_ONLY)
private String myEstimatedTimeRemaining;
+ @JsonIgnore
+ private JobDefinition> myJobDefinition;
+
/**
* Constructor
*/
@@ -111,16 +117,12 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
setTotalElapsedMillis(theJobInstance.getTotalElapsedMillis());
setWorkChunksPurged(theJobInstance.isWorkChunksPurged());
setCurrentGatedStepId(theJobInstance.getCurrentGatedStepId());
+ myJobDefinition = theJobInstance.getJobDefinition();
}
public static JobInstance fromJobDefinition(JobDefinition> theJobDefinition) {
JobInstance instance = new JobInstance();
-
- instance.setJobDefinitionId(theJobDefinition.getJobDefinitionId());
- instance.setJobDefinitionVersion(theJobDefinition.getJobDefinitionVersion());
- if (theJobDefinition.isGatedExecution()) {
- instance.setCurrentGatedStepId(theJobDefinition.getFirstStepId());
- }
+ instance.setJobDefinition(theJobDefinition);
return instance;
}
@@ -250,6 +252,17 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
return this;
}
+
+ public void setJobDefinition(JobDefinition> theJobDefinition) {
+ myJobDefinition = theJobDefinition;
+ setJobDefinitionId(theJobDefinition.getJobDefinitionId());
+ setJobDefinitionVersion(theJobDefinition.getJobDefinitionVersion());
+ }
+
+ public JobDefinition> getJobDefinition() {
+ return myJobDefinition;
+ }
+
public boolean isCancelled() {
return myCancelled;
}
@@ -284,4 +297,18 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
public boolean isRunning() {
return getStatus() == StatusEnum.IN_PROGRESS && !isCancelled();
}
+
+ public boolean isFinished() {
+ return myStatus == StatusEnum.COMPLETED ||
+ myStatus == StatusEnum.FAILED ||
+ myStatus == StatusEnum.CANCELLED;
+ }
+
+ public boolean hasGatedStep() {
+ return !isBlank(myCurrentGatedStepId);
+ }
+
+ public boolean isPendingCancellation() {
+ return myCancelled && (myStatus == StatusEnum.QUEUED || myStatus == StatusEnum.IN_PROGRESS);
+ }
}
diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobWorkCursor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobWorkCursor.java
index b4e43e6a482..4e2dece47be 100644
--- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobWorkCursor.java
+++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobWorkCursor.java
@@ -40,7 +40,6 @@ import java.util.List;
*/
public class JobWorkCursor {
private static final Logger ourLog = LoggerFactory.getLogger(JobWorkCursor.class);
-
public final JobDefinition jobDefinition;
public final boolean isFirstStep;
public final JobDefinitionStep currentStep;
@@ -65,8 +64,7 @@ public class JobWorkCursor JobWorkCursor fromJobDefinitionAndWorkNotification(JobDefinition theJobDefinition, JobWorkNotification theWorkNotification) {
- String requestedStepId = theWorkNotification.getTargetStepId();
+ public static JobWorkCursor fromJobDefinitionAndRequestedStepId(JobDefinition theJobDefinition, String theRequestedStepId) {
boolean isFirstStep = false;
JobDefinitionStep currentStep = null;
JobDefinitionStep nextStep = null;
@@ -74,7 +72,7 @@ public class JobWorkCursor> steps = theJobDefinition.getSteps();
for (int i = 0; i < steps.size(); i++) {
JobDefinitionStep step = steps.get(i);
- if (step.getStepId().equals(requestedStepId)) {
+ if (step.getStepId().equals(theRequestedStepId)) {
currentStep = step;
if (i == 0) {
isFirstStep = true;
@@ -87,7 +85,7 @@ public class JobWorkCursor theJobDefinition, String theInstanceId, String theChunkId) {
+ public JobWorkNotification(JobInstance theInstance, String theNextStepId, String theNextChunkId) {
+ this(theInstance.getJobDefinitionId(), theInstance.getJobDefinitionVersion(), theInstance.getInstanceId(), theNextStepId, theNextChunkId);
+ }
+
+ public static JobWorkNotification firstStepNotification(JobDefinition> theJobDefinition, String theInstanceId, String theChunkId) {
String firstStepId = theJobDefinition.getFirstStepId();
String jobDefinitionId = theJobDefinition.getJobDefinitionId();
int jobDefinitionVersion = theJobDefinition.getJobDefinitionVersion();
diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/BaseBatch2Test.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/BaseBatch2Test.java
similarity index 98%
rename from hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/BaseBatch2Test.java
rename to hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/BaseBatch2Test.java
index 85c64dbe63a..f79d941387c 100644
--- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/BaseBatch2Test.java
+++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/BaseBatch2Test.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.VoidModel;
diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImplTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImplTest.java
similarity index 99%
rename from hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImplTest.java
rename to hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImplTest.java
index 8e8df003b03..c2bcf821e3f 100644
--- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImplTest.java
+++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImplTest.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IJobParametersValidator;
@@ -6,6 +6,7 @@ import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
+import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDataSinkTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDataSinkTest.java
similarity index 98%
rename from hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDataSinkTest.java
rename to hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDataSinkTest.java
index 8412ffc91d3..7d69bda87a5 100644
--- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDataSinkTest.java
+++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDataSinkTest.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IJobPersistence;
@@ -7,6 +7,7 @@ import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
+import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDefinitionRegistryTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDefinitionRegistryTest.java
similarity index 98%
rename from hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDefinitionRegistryTest.java
rename to hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDefinitionRegistryTest.java
index 4c7760f96e7..39a40d65215 100644
--- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDefinitionRegistryTest.java
+++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDefinitionRegistryTest.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.VoidModel;
diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDefinitionTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDefinitionTest.java
similarity index 95%
rename from hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDefinitionTest.java
rename to hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDefinitionTest.java
index 07123ea0fd8..62444c2ca1d 100644
--- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDefinitionTest.java
+++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDefinitionTest.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.model.JobDefinition;
import org.junit.jupiter.api.Test;
diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobMaintenanceServiceImplTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobMaintenanceServiceImplTest.java
similarity index 95%
rename from hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobMaintenanceServiceImplTest.java
rename to hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobMaintenanceServiceImplTest.java
index 863b1987c03..c70bd4edee8 100644
--- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobMaintenanceServiceImplTest.java
+++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobMaintenanceServiceImplTest.java
@@ -1,8 +1,10 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.JobCompletionDetails;
+import ca.uhn.fhir.batch2.channel.BatchJobSender;
+import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
@@ -23,10 +25,10 @@ import org.springframework.messaging.Message;
import java.util.Date;
-import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.createWorkChunk;
-import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.createWorkChunkStep1;
-import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.createWorkChunkStep2;
-import static ca.uhn.fhir.batch2.impl.JobCoordinatorImplTest.createWorkChunkStep3;
+import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunk;
+import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunkStep1;
+import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunkStep2;
+import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunkStep3;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -168,6 +170,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
@Test
public void testFailed_PurgeOldInstance() {
+ myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
JobInstance instance = createInstance();
instance.setStatus(StatusEnum.FAILED);
instance.setEndTime(parseTime("2001-01-01T12:12:12Z"));
@@ -220,6 +223,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
@Test
public void testInProgress_CalculateProgress_OneStepFailed() {
+ myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(createInstance()));
when(myJobPersistence.fetchWorkChunksWithoutData(eq(INSTANCE_ID), anyInt(), eq(0))).thenReturn(Lists.newArrayList(
createWorkChunkStep1().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:01:00-04:00")).setRecordsProcessed(25),
diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobQuerySvcTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobQuerySvcTest.java
similarity index 98%
rename from hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobQuerySvcTest.java
rename to hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobQuerySvcTest.java
index ef93f5e2a0b..e917f73b85c 100644
--- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobQuerySvcTest.java
+++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobQuerySvcTest.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobDefinition;
diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/TestJobParameters.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/TestJobParameters.java
similarity index 96%
rename from hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/TestJobParameters.java
rename to hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/TestJobParameters.java
index b9a9303d4ef..bea3a00065f 100644
--- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/TestJobParameters.java
+++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/TestJobParameters.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.model.api.annotation.PasswordField;
import ca.uhn.fhir.model.api.IModelJson;
diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/TestJobStep2InputType.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/TestJobStep2InputType.java
similarity index 94%
rename from hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/TestJobStep2InputType.java
rename to hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/TestJobStep2InputType.java
index 64d4e0671f7..2b023499871 100644
--- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/TestJobStep2InputType.java
+++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/TestJobStep2InputType.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/TestJobStep3InputType.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/TestJobStep3InputType.java
similarity index 93%
rename from hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/TestJobStep3InputType.java
rename to hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/TestJobStep3InputType.java
index 2ddccebdfe1..ae6a406f713 100644
--- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/TestJobStep3InputType.java
+++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/TestJobStep3InputType.java
@@ -1,4 +1,4 @@
-package ca.uhn.fhir.batch2.impl;
+package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/model/JobWorkCursorTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/model/JobWorkCursorTest.java
index 5a5d2f1dee9..071eb93419b 100644
--- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/model/JobWorkCursorTest.java
+++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/model/JobWorkCursorTest.java
@@ -1,7 +1,7 @@
package ca.uhn.fhir.batch2.model;
-import ca.uhn.fhir.batch2.impl.BaseBatch2Test;
-import ca.uhn.fhir.batch2.impl.TestJobParameters;
+import ca.uhn.fhir.batch2.coordinator.BaseBatch2Test;
+import ca.uhn.fhir.batch2.coordinator.TestJobParameters;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -21,12 +21,8 @@ class JobWorkCursorTest extends BaseBatch2Test {
@Test
public void createCursorStep1() {
- // setup
- JobWorkNotification workNotification = new JobWorkNotification();
- workNotification.setTargetStepId(STEP_1);
-
// execute
- JobWorkCursor cursor = JobWorkCursor.fromJobDefinitionAndWorkNotification(myDefinition, workNotification);
+ JobWorkCursor cursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(myDefinition, STEP_1);
// verify
assertCursor(cursor, true, false, STEP_1, STEP_2);
@@ -34,12 +30,8 @@ class JobWorkCursorTest extends BaseBatch2Test {
@Test
public void createCursorStep2() {
- // setup
- JobWorkNotification workNotification = new JobWorkNotification();
- workNotification.setTargetStepId(STEP_2);
-
// execute
- JobWorkCursor cursor = JobWorkCursor.fromJobDefinitionAndWorkNotification(myDefinition, workNotification);
+ JobWorkCursor cursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(myDefinition, STEP_2);
// verify
assertCursor(cursor, false, false, STEP_2, STEP_3);
@@ -47,12 +39,8 @@ class JobWorkCursorTest extends BaseBatch2Test {
@Test
public void createCursorStep3() {
- // setup
- JobWorkNotification workNotification = new JobWorkNotification();
- workNotification.setTargetStepId(STEP_3);
-
// execute
- JobWorkCursor cursor = JobWorkCursor.fromJobDefinitionAndWorkNotification(myDefinition, workNotification);
+ JobWorkCursor cursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(myDefinition, STEP_3);
// verify
assertCursor(cursor, false, true, STEP_3, null);
@@ -61,13 +49,11 @@ class JobWorkCursorTest extends BaseBatch2Test {
@Test
public void unknownStep() {
// setup
- JobWorkNotification workNotification = new JobWorkNotification();
String targetStepId = "Made a searching and fearless moral inventory of ourselves";
- workNotification.setTargetStepId(targetStepId);
// execute
try {
- JobWorkCursor.fromJobDefinitionAndWorkNotification(myDefinition, workNotification);
+ JobWorkCursor.fromJobDefinitionAndRequestedStepId(myDefinition, targetStepId);
// verify
fail();