diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_2_0/5746-reduction-jobs-should-fire-completion-handler.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_2_0/5746-reduction-jobs-should-fire-completion-handler.yaml new file mode 100644 index 00000000000..b947964b795 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_2_0/5746-reduction-jobs-should-fire-completion-handler.yaml @@ -0,0 +1,6 @@ +--- +type: fix +issue: 5746 +title: "Batch2 jobs with reduction steps didn't fire the completion handler. + This has been fixed. +" diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java index 1114a7e4d58..afda728dd7d 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java @@ -28,9 +28,11 @@ import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel; import ca.uhn.fhir.jpa.test.BaseJpaR4Test; import ca.uhn.fhir.jpa.test.Batch2JobHelper; import ca.uhn.fhir.model.api.IModelJson; +import ca.uhn.fhir.rest.api.server.SystemRequestDetails; import ca.uhn.fhir.util.JsonUtil; import ca.uhn.test.concurrency.PointcutLatch; import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.annotation.Nonnull; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -42,7 +44,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Page; import org.springframework.data.domain.Sort; -import jakarta.annotation.Nonnull; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -64,6 +65,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test { public static final int TEST_JOB_VERSION = 1; public static final String FIRST_STEP_ID = "first-step"; + public static final String SECOND_STEP_ID = "second-step"; public static final String LAST_STEP_ID = "last-step"; @Autowired JobDefinitionRegistry myJobDefinitionRegistry; @@ -94,7 +96,8 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test { public void before() throws Exception { super.before(); - myCompletionHandler = details -> {}; + myCompletionHandler = details -> { + }; myWorkChannel = (LinkedBlockingChannel) myChannelFactory.getOrCreateReceiver(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, new ChannelConsumerSettings()); myStorageSettings.setJobFastTrackingEnabled(true); } @@ -139,7 +142,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test { List jobIds = new ArrayList<>(); for (int i = 0; i < maxJobsToSave; i++) { JobInstanceStartRequest request = buildRequest(jobId); - Batch2JobStartResponse response = myJobCoordinator.startInstance(request); + Batch2JobStartResponse response = myJobCoordinator.startInstance(new SystemRequestDetails(), request); jobIds.add(response.getInstanceId()); } @@ -188,7 +191,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test { JobInstanceStartRequest request = buildRequest(jobId); myFirstStepLatch.setExpectedCount(1); - Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request); + Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request); myFirstStepLatch.awaitExpected(); myBatch2JobHelper.awaitJobCompletion(startResponse.getInstanceId()); @@ -212,7 +215,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test { myFirstStepLatch.setExpectedCount(1); myLastStepLatch.setExpectedCount(1); - String batchJobId = myJobCoordinator.startInstance(request).getInstanceId(); + String batchJobId = myJobCoordinator.startInstance(new SystemRequestDetails(), request).getInstanceId(); myFirstStepLatch.awaitExpected(); myBatch2JobHelper.assertFastTracking(batchJobId); @@ -231,76 +234,119 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test { assertEquals(1.0, jobInstance.getProgress()); } - private void createThreeStepReductionJob( - String theJobId, - IJobStepWorker theFirstStep, - IJobStepWorker theSecondStep, - IReductionStepWorker theReductionsStep - ) { - // create job definition (it's the test method's name) - JobDefinition jd = JobDefinition.newBuilder() - .setJobDefinitionId(theJobId) - .setJobDescription("test job") - .setJobDefinitionVersion(TEST_JOB_VERSION) - .setParametersType(TestJobParameters.class) - .gatedExecution() - .addFirstStep( - FIRST_STEP_ID, - "Test first step", - FirstStepOutput.class, - theFirstStep - ) - .addIntermediateStep("SECOND", - "Second step", - SecondStepOutput.class, - theSecondStep) - .addFinalReducerStep( - LAST_STEP_ID, - "Test last step", - ReductionStepOutput.class, - theReductionsStep - ) - .build(); - myJobDefinitionRegistry.addJobDefinition(jd); + @Test + public void testJobWithReductionStepFiresCompletionHandler() throws InterruptedException { + // setup + String jobId = new Exception().getStackTrace()[0].getMethodName(); + String testInfo = "test"; + int totalCalls = 2; + AtomicInteger secondStepInt = new AtomicInteger(); + + AtomicBoolean completionBool = new AtomicBoolean(); + + myCompletionHandler = (params) -> { + // ensure our completion handler fires + completionBool.getAndSet(true); + }; + + buildAndDefine3StepReductionJob(jobId, new IReductionStepHandler() { + private final AtomicBoolean myBoolean = new AtomicBoolean(); + + private final AtomicInteger mySecondGate = new AtomicInteger(); + + @Override + public void firstStep(StepExecutionDetails theStep, IJobDataSink theDataSink) { + for (int i = 0; i < totalCalls; i++) { + theDataSink.accept(new FirstStepOutput()); + } + callLatch(myFirstStepLatch, theStep); + } + + @Override + public void secondStep(StepExecutionDetails theStep, IJobDataSink theDataSink) { + SecondStepOutput output = new SecondStepOutput(); + output.setValue(testInfo + secondStepInt.getAndIncrement()); + theDataSink.accept(output); + } + + @Override + public void reductionStepConsume(ChunkExecutionDetails theChunkDetails, IJobDataSink theDataSink) { + int val = mySecondGate.getAndIncrement(); + } + + @Override + public void reductionStepRun(StepExecutionDetails theStepExecutionDetails, IJobDataSink theDataSink) { + boolean isRunAlready = myBoolean.getAndSet(true); + assertFalse(isRunAlready, "Reduction step should only be called once!"); + + theDataSink.accept(new ReductionStepOutput(new ArrayList<>())); + callLatch(myLastStepLatch, theStepExecutionDetails); + } + }); + + // test + JobInstanceStartRequest request = buildRequest(jobId); + myFirstStepLatch.setExpectedCount(1); + Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request); + + String instanceId = startResponse.getInstanceId(); + myFirstStepLatch.awaitExpected(); + assertNotNull(instanceId); + + myBatch2JobHelper.awaitGatedStepId(FIRST_STEP_ID, instanceId); + + // wait for last step to finish + ourLog.info("Setting last step latch"); + myLastStepLatch.setExpectedCount(1); + + // waiting + myBatch2JobHelper.awaitJobCompletion(instanceId); + myLastStepLatch.awaitExpected(); + ourLog.info("awaited the last step"); + + // verify + Optional instanceOp = myJobPersistence.fetchInstance(instanceId); + assertTrue(instanceOp.isPresent()); + JobInstance jobInstance = instanceOp.get(); + + // ensure our completion handler fires + assertTrue(completionBool.get()); + + assertEquals(StatusEnum.COMPLETED, jobInstance.getStatus()); + assertEquals(1.0, jobInstance.getProgress()); } @ParameterizedTest - @ValueSource(booleans = { true, false }) + @ValueSource(booleans = {true, false}) public void testJobDefinitionWithReductionStepIT(boolean theDelayReductionStepBool) throws InterruptedException { // setup String jobId = new Exception().getStackTrace()[0].getMethodName() + "_" + theDelayReductionStepBool; String testInfo = "test"; AtomicInteger secondStepInt = new AtomicInteger(); - // step 1 - IJobStepWorker first = (step, sink) -> { - sink.accept(new FirstStepOutput()); - sink.accept(new FirstStepOutput()); - callLatch(myFirstStepLatch, step); - return RunOutcome.SUCCESS; - }; - - // step 2 - IJobStepWorker second = (step, sink) -> { - SecondStepOutput output = new SecondStepOutput(); - output.setValue(testInfo + secondStepInt.getAndIncrement()); - sink.accept(output); - - return RunOutcome.SUCCESS; - }; - - // step 3 - IReductionStepWorker last = new IReductionStepWorker<>() { - + buildAndDefine3StepReductionJob(jobId, new IReductionStepHandler() { private final ArrayList myOutput = new ArrayList<>(); private final AtomicBoolean myBoolean = new AtomicBoolean(); private final AtomicInteger mySecondGate = new AtomicInteger(); - @Nonnull @Override - public ChunkOutcome consume(ChunkExecutionDetails theChunkDetails) { + public void firstStep(StepExecutionDetails theStep, IJobDataSink theDataSink) { + theDataSink.accept(new FirstStepOutput()); + theDataSink.accept(new FirstStepOutput()); + callLatch(myFirstStepLatch, theStep); + } + + @Override + public void secondStep(StepExecutionDetails theStep, IJobDataSink theDataSink) { + SecondStepOutput output = new SecondStepOutput(); + output.setValue(testInfo + secondStepInt.getAndIncrement()); + theDataSink.accept(output); + } + + @Override + public void reductionStepConsume(ChunkExecutionDetails theChunkDetails, IJobDataSink theDataSink) { myOutput.add(theChunkDetails.getData()); // 1 because we know 2 packets are coming. // we'll fire the second maintenance run on the second packet @@ -309,20 +355,14 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test { ourLog.info("SECOND FORCED MAINTENANCE PASS FORCED"); myBatch2JobHelper.forceRunMaintenancePass(); } - return ChunkOutcome.SUCCESS(); } - @Nonnull @Override - public RunOutcome run( - @Nonnull StepExecutionDetails theStepExecutionDetails, - @Nonnull IJobDataSink theDataSink - ) throws JobExecutionFailedException { + public void reductionStepRun(StepExecutionDetails theStepExecutionDetails, IJobDataSink theDataSink) { boolean isRunAlready = myBoolean.getAndSet(true); assertFalse(isRunAlready, "Reduction step should only be called once!"); complete(theStepExecutionDetails, theDataSink); - return RunOutcome.SUCCESS; } private void complete( @@ -333,13 +373,12 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test { theDataSink.accept(new ReductionStepOutput(myOutput)); callLatch(myLastStepLatch, theStepExecutionDetails); } - }; - createThreeStepReductionJob(jobId, first, second, last); + }); // run test JobInstanceStartRequest request = buildRequest(jobId); myFirstStepLatch.setExpectedCount(1); - Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request); + Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request); String instanceId = startResponse.getInstanceId(); myFirstStepLatch.awaitExpected(); @@ -398,7 +437,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test { JobInstanceStartRequest request = buildRequest(jobDefId); myFirstStepLatch.setExpectedCount(1); - Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request); + Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request); String instanceId = startResponse.getInstanceId(); myFirstStepLatch.awaitExpected(); @@ -427,7 +466,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test { JobInstanceStartRequest request = buildRequest(jobDefId); // execute - Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request); + Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request); String instanceId = startResponse.getInstanceId(); // validate @@ -453,7 +492,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test { // execute ourLog.info("Starting job"); myFirstStepLatch.setExpectedCount(1); - Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request); + Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request); String instanceId = startResponse.getInstanceId(); myFirstStepLatch.awaitExpected(); @@ -508,7 +547,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test { // test JobInstanceStartRequest request = buildRequest(jobDefId); myFirstStepLatch.setExpectedCount(1); - Batch2JobStartResponse response = myJobCoordinator.startInstance(request); + Batch2JobStartResponse response = myJobCoordinator.startInstance(new SystemRequestDetails(), request); JobInstance instance = myBatch2JobHelper.awaitJobHasStatus(response.getInstanceId(), 30, // we want to wait a long time (2 min here) cause backoff is incremental StatusEnum.FAILED @@ -551,6 +590,80 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test { .build(); } + + private void buildAndDefine3StepReductionJob( + String theJobId, + IReductionStepHandler theHandler + ) { + // step 1 + IJobStepWorker first = (step, sink) -> { + theHandler.firstStep(step, sink); + return RunOutcome.SUCCESS; + }; + + // step 2 + IJobStepWorker second = (step, sink) -> { + theHandler.secondStep(step, sink); + return RunOutcome.SUCCESS; + }; + + // step 3 + IReductionStepWorker last = new IReductionStepWorker<>() { + + @Nonnull + @Override + public ChunkOutcome consume(ChunkExecutionDetails theChunkDetails) { + theHandler.reductionStepConsume(theChunkDetails, null); + return ChunkOutcome.SUCCESS(); + } + + @Nonnull + @Override + public RunOutcome run( + @Nonnull StepExecutionDetails theStepExecutionDetails, + @Nonnull IJobDataSink theDataSink + ) throws JobExecutionFailedException { + theHandler.reductionStepRun(theStepExecutionDetails, theDataSink); + return RunOutcome.SUCCESS; + } + }; + createThreeStepReductionJob(theJobId, first, second, last); + } + + private void createThreeStepReductionJob( + String theJobId, + IJobStepWorker theFirstStep, + IJobStepWorker theSecondStep, + IReductionStepWorker theReductionsStep + ) { + // create job definition (it's the test method's name) + JobDefinition jd = JobDefinition.newBuilder() + .setJobDefinitionId(theJobId) + .setJobDescription("test job") + .setJobDefinitionVersion(TEST_JOB_VERSION) + .setParametersType(TestJobParameters.class) + .gatedExecution() + .addFirstStep( + FIRST_STEP_ID, + "Test first step", + FirstStepOutput.class, + theFirstStep + ) + .addIntermediateStep(SECOND_STEP_ID, + "Second step", + SecondStepOutput.class, + theSecondStep) + .addFinalReducerStep( + LAST_STEP_ID, + "Test last step", + ReductionStepOutput.class, + theReductionsStep + ) + .completionHandler(myCompletionHandler) + .build(); + myJobDefinitionRegistry.addJobDefinition(jd); + } + static class TestJobParameters implements IModelJson { TestJobParameters() { } @@ -581,4 +694,14 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test { myResult = theResult; } } + + private interface IReductionStepHandler { + void firstStep(StepExecutionDetails theStep, IJobDataSink theDataSink); + + void secondStep(StepExecutionDetails theStep, IJobDataSink theDataSink); + + void reductionStepConsume(ChunkExecutionDetails theChunkDetails, IJobDataSink theDataSink); + + void reductionStepRun(StepExecutionDetails theStepExecutionDetails, IJobDataSink theDataSink); + } } diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportTest.java index ee6275a72c7..dad2eba02bd 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportTest.java @@ -24,6 +24,7 @@ import ca.uhn.fhir.test.utilities.HttpClientExtension; import ca.uhn.fhir.util.Batch2JobDefinitionConstants; import ca.uhn.fhir.util.JsonUtil; import com.google.common.collect.Sets; +import jakarta.annotation.Nonnull; import org.apache.commons.io.LineIterator; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; @@ -66,7 +67,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import jakarta.annotation.Nonnull; import java.io.IOException; import java.io.StringReader; import java.util.ArrayList; @@ -352,7 +352,6 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test { verifyBulkExportResults(options, List.of("Patient/PING1"), Collections.singletonList("Patient/PNING3")); } finally { myCaptureQueriesListener.logSelectQueries(); - } } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/ReductionStepExecutionDetails.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/ReductionStepExecutionDetails.java deleted file mode 100644 index 2687d8eeb07..00000000000 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/ReductionStepExecutionDetails.java +++ /dev/null @@ -1,58 +0,0 @@ -/*- - * #%L - * HAPI FHIR JPA Server - Batch2 Task Processor - * %% - * Copyright (C) 2014 - 2024 Smile CDR, Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -package ca.uhn.fhir.batch2.api; - -import ca.uhn.fhir.batch2.model.JobInstance; -import ca.uhn.fhir.i18n.Msg; -import ca.uhn.fhir.model.api.IModelJson; -import jakarta.annotation.Nonnull; -import jakarta.annotation.Nullable; - -/** - * This class is used for Reduction Step for Batch2 Jobs. - * @param - Job Parameters type - * @param - Input data type - * @param - Output data type. Output will actually be a ListResult of these objects. - */ -public class ReductionStepExecutionDetails - extends StepExecutionDetails { - - public ReductionStepExecutionDetails( - @Nonnull PT theParameters, @Nullable IT theData, @Nonnull JobInstance theInstance) { - // TODO KHS shouldn't the chunkId be null? - super(theParameters, theData, theInstance, "VOID"); - } - - public ReductionStepExecutionDetails(@Nonnull PT theParameters, @Nonnull JobInstance theInstance) { - this(theParameters, null, theInstance); - } - - @Override - @Nonnull - public final IT getData() { - throw new UnsupportedOperationException( - Msg.code(2099) + " Reduction steps should have all data by the time execution is called."); - } - - @Override - public boolean hasAssociatedWorkChunk() { - return false; - } -} diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImpl.java index 98503104fab..3d1d69c07a7 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImpl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImpl.java @@ -20,9 +20,11 @@ package ca.uhn.fhir.batch2.coordinator; import ca.uhn.fhir.batch2.api.ChunkExecutionDetails; +import ca.uhn.fhir.batch2.api.IJobCompletionHandler; import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.api.IReductionStepExecutorService; import ca.uhn.fhir.batch2.api.IReductionStepWorker; +import ca.uhn.fhir.batch2.api.JobCompletionDetails; import ca.uhn.fhir.batch2.api.StepExecutionDetails; import ca.uhn.fhir.batch2.model.ChunkOutcome; import ca.uhn.fhir.batch2.model.JobDefinitionStep; @@ -221,7 +223,6 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS return null; }); } finally { - executeInTransactionWithSynchronization(() -> { ourLog.info( "Reduction step for instance[{}] produced {} successful and {} failed chunks", @@ -256,6 +257,18 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS WorkChunkStatusEnum.FAILED, "JOB ABORTED"); } + + if (response.isSuccessful()) { + /** + * All reduction steps are final steps. + */ + IJobCompletionHandler completionHandler = + theJobWorkCursor.getJobDefinition().getCompletionHandler(); + if (completionHandler != null) { + completionHandler.jobComplete(new JobCompletionDetails<>(parameters, instance)); + } + } + return null; }); } diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImplTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImplTest.java index 0b806c10848..22aa7ba9c30 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImplTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImplTest.java @@ -74,7 +74,6 @@ public class ReductionStepExecutorServiceImplTest { mySvc = new ReductionStepExecutorServiceImpl(myJobPersistence, myTransactionService, myJobDefinitionRegistry); } - @Test public void doExecution_reductionWithChunkFailed_marksAllFutureChunksAsFailedButPreviousAsSuccess() { // setup @@ -87,7 +86,6 @@ public class ReductionStepExecutorServiceImplTest { jobInstance.setStatus(StatusEnum.IN_PROGRESS); JobWorkCursor workCursor = mock(JobWorkCursor.class); - // when when(workCursor.getCurrentStep()).thenReturn((JobDefinitionStep) createJobDefinition().getSteps().get(1)); when(workCursor.getJobDefinition()).thenReturn(createJobDefinition());