5746 fixing reducer step (#5753)

* fixing reduction step to fire job completion handler

* unwinding

* fixing changelog

* spotless

* review fixes

---------

Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-mbp.home>
This commit is contained in:
TipzCM 2024-03-04 09:12:31 -05:00 committed by GitHub
parent 3a1a0418a0
commit 370d2c16b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 217 additions and 136 deletions

View File

@ -0,0 +1,6 @@
---
type: fix
issue: 5746
title: "Batch2 jobs with reduction steps didn't fire the completion handler.
This has been fixed.
"

View File

@ -28,9 +28,11 @@ import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.util.JsonUtil;
import ca.uhn.test.concurrency.PointcutLatch;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nonnull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -42,7 +44,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Sort;
import jakarta.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@ -64,6 +65,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
public static final int TEST_JOB_VERSION = 1;
public static final String FIRST_STEP_ID = "first-step";
public static final String SECOND_STEP_ID = "second-step";
public static final String LAST_STEP_ID = "last-step";
@Autowired
JobDefinitionRegistry myJobDefinitionRegistry;
@ -94,7 +96,8 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
public void before() throws Exception {
super.before();
myCompletionHandler = details -> {};
myCompletionHandler = details -> {
};
myWorkChannel = (LinkedBlockingChannel) myChannelFactory.getOrCreateReceiver(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, new ChannelConsumerSettings());
myStorageSettings.setJobFastTrackingEnabled(true);
}
@ -139,7 +142,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
List<String> jobIds = new ArrayList<>();
for (int i = 0; i < maxJobsToSave; i++) {
JobInstanceStartRequest request = buildRequest(jobId);
Batch2JobStartResponse response = myJobCoordinator.startInstance(request);
Batch2JobStartResponse response = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
jobIds.add(response.getInstanceId());
}
@ -188,7 +191,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
JobInstanceStartRequest request = buildRequest(jobId);
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
myFirstStepLatch.awaitExpected();
myBatch2JobHelper.awaitJobCompletion(startResponse.getInstanceId());
@ -212,7 +215,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myFirstStepLatch.setExpectedCount(1);
myLastStepLatch.setExpectedCount(1);
String batchJobId = myJobCoordinator.startInstance(request).getInstanceId();
String batchJobId = myJobCoordinator.startInstance(new SystemRequestDetails(), request).getInstanceId();
myFirstStepLatch.awaitExpected();
myBatch2JobHelper.assertFastTracking(batchJobId);
@ -231,76 +234,119 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
assertEquals(1.0, jobInstance.getProgress());
}
private void createThreeStepReductionJob(
String theJobId,
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> theFirstStep,
IJobStepWorker<TestJobParameters, FirstStepOutput, SecondStepOutput> theSecondStep,
IReductionStepWorker<TestJobParameters, SecondStepOutput, ReductionStepOutput> theReductionsStep
) {
// create job definition (it's the test method's name)
JobDefinition<? extends IModelJson> jd = JobDefinition.newBuilder()
.setJobDefinitionId(theJobId)
.setJobDescription("test job")
.setJobDefinitionVersion(TEST_JOB_VERSION)
.setParametersType(TestJobParameters.class)
.gatedExecution()
.addFirstStep(
FIRST_STEP_ID,
"Test first step",
FirstStepOutput.class,
theFirstStep
)
.addIntermediateStep("SECOND",
"Second step",
SecondStepOutput.class,
theSecondStep)
.addFinalReducerStep(
LAST_STEP_ID,
"Test last step",
ReductionStepOutput.class,
theReductionsStep
)
.build();
myJobDefinitionRegistry.addJobDefinition(jd);
@Test
public void testJobWithReductionStepFiresCompletionHandler() throws InterruptedException {
// setup
String jobId = new Exception().getStackTrace()[0].getMethodName();
String testInfo = "test";
int totalCalls = 2;
AtomicInteger secondStepInt = new AtomicInteger();
AtomicBoolean completionBool = new AtomicBoolean();
myCompletionHandler = (params) -> {
// ensure our completion handler fires
completionBool.getAndSet(true);
};
buildAndDefine3StepReductionJob(jobId, new IReductionStepHandler() {
private final AtomicBoolean myBoolean = new AtomicBoolean();
private final AtomicInteger mySecondGate = new AtomicInteger();
@Override
public void firstStep(StepExecutionDetails<TestJobParameters, VoidModel> theStep, IJobDataSink<FirstStepOutput> theDataSink) {
for (int i = 0; i < totalCalls; i++) {
theDataSink.accept(new FirstStepOutput());
}
callLatch(myFirstStepLatch, theStep);
}
@Override
public void secondStep(StepExecutionDetails<TestJobParameters, FirstStepOutput> theStep, IJobDataSink<SecondStepOutput> theDataSink) {
SecondStepOutput output = new SecondStepOutput();
output.setValue(testInfo + secondStepInt.getAndIncrement());
theDataSink.accept(output);
}
@Override
public void reductionStepConsume(ChunkExecutionDetails<TestJobParameters, SecondStepOutput> theChunkDetails, IJobDataSink<ReductionStepOutput> theDataSink) {
int val = mySecondGate.getAndIncrement();
}
@Override
public void reductionStepRun(StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails, IJobDataSink<ReductionStepOutput> theDataSink) {
boolean isRunAlready = myBoolean.getAndSet(true);
assertFalse(isRunAlready, "Reduction step should only be called once!");
theDataSink.accept(new ReductionStepOutput(new ArrayList<>()));
callLatch(myLastStepLatch, theStepExecutionDetails);
}
});
// test
JobInstanceStartRequest request = buildRequest(jobId);
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
String instanceId = startResponse.getInstanceId();
myFirstStepLatch.awaitExpected();
assertNotNull(instanceId);
myBatch2JobHelper.awaitGatedStepId(FIRST_STEP_ID, instanceId);
// wait for last step to finish
ourLog.info("Setting last step latch");
myLastStepLatch.setExpectedCount(1);
// waiting
myBatch2JobHelper.awaitJobCompletion(instanceId);
myLastStepLatch.awaitExpected();
ourLog.info("awaited the last step");
// verify
Optional<JobInstance> instanceOp = myJobPersistence.fetchInstance(instanceId);
assertTrue(instanceOp.isPresent());
JobInstance jobInstance = instanceOp.get();
// ensure our completion handler fires
assertTrue(completionBool.get());
assertEquals(StatusEnum.COMPLETED, jobInstance.getStatus());
assertEquals(1.0, jobInstance.getProgress());
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
@ValueSource(booleans = {true, false})
public void testJobDefinitionWithReductionStepIT(boolean theDelayReductionStepBool) throws InterruptedException {
// setup
String jobId = new Exception().getStackTrace()[0].getMethodName() + "_" + theDelayReductionStepBool;
String testInfo = "test";
AtomicInteger secondStepInt = new AtomicInteger();
// step 1
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> first = (step, sink) -> {
sink.accept(new FirstStepOutput());
sink.accept(new FirstStepOutput());
callLatch(myFirstStepLatch, step);
return RunOutcome.SUCCESS;
};
// step 2
IJobStepWorker<TestJobParameters, FirstStepOutput, SecondStepOutput> second = (step, sink) -> {
SecondStepOutput output = new SecondStepOutput();
output.setValue(testInfo + secondStepInt.getAndIncrement());
sink.accept(output);
return RunOutcome.SUCCESS;
};
// step 3
IReductionStepWorker<TestJobParameters, SecondStepOutput, ReductionStepOutput> last = new IReductionStepWorker<>() {
buildAndDefine3StepReductionJob(jobId, new IReductionStepHandler() {
private final ArrayList<SecondStepOutput> myOutput = new ArrayList<>();
private final AtomicBoolean myBoolean = new AtomicBoolean();
private final AtomicInteger mySecondGate = new AtomicInteger();
@Nonnull
@Override
public ChunkOutcome consume(ChunkExecutionDetails<TestJobParameters, SecondStepOutput> theChunkDetails) {
public void firstStep(StepExecutionDetails<TestJobParameters, VoidModel> theStep, IJobDataSink<FirstStepOutput> theDataSink) {
theDataSink.accept(new FirstStepOutput());
theDataSink.accept(new FirstStepOutput());
callLatch(myFirstStepLatch, theStep);
}
@Override
public void secondStep(StepExecutionDetails<TestJobParameters, FirstStepOutput> theStep, IJobDataSink<SecondStepOutput> theDataSink) {
SecondStepOutput output = new SecondStepOutput();
output.setValue(testInfo + secondStepInt.getAndIncrement());
theDataSink.accept(output);
}
@Override
public void reductionStepConsume(ChunkExecutionDetails<TestJobParameters, SecondStepOutput> theChunkDetails, IJobDataSink<ReductionStepOutput> theDataSink) {
myOutput.add(theChunkDetails.getData());
// 1 because we know 2 packets are coming.
// we'll fire the second maintenance run on the second packet
@ -309,20 +355,14 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
ourLog.info("SECOND FORCED MAINTENANCE PASS FORCED");
myBatch2JobHelper.forceRunMaintenancePass();
}
return ChunkOutcome.SUCCESS();
}
@Nonnull
@Override
public RunOutcome run(
@Nonnull StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails,
@Nonnull IJobDataSink<ReductionStepOutput> theDataSink
) throws JobExecutionFailedException {
public void reductionStepRun(StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails, IJobDataSink<ReductionStepOutput> theDataSink) {
boolean isRunAlready = myBoolean.getAndSet(true);
assertFalse(isRunAlready, "Reduction step should only be called once!");
complete(theStepExecutionDetails, theDataSink);
return RunOutcome.SUCCESS;
}
private void complete(
@ -333,13 +373,12 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
theDataSink.accept(new ReductionStepOutput(myOutput));
callLatch(myLastStepLatch, theStepExecutionDetails);
}
};
createThreeStepReductionJob(jobId, first, second, last);
});
// run test
JobInstanceStartRequest request = buildRequest(jobId);
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
String instanceId = startResponse.getInstanceId();
myFirstStepLatch.awaitExpected();
@ -398,7 +437,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
JobInstanceStartRequest request = buildRequest(jobDefId);
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
String instanceId = startResponse.getInstanceId();
myFirstStepLatch.awaitExpected();
@ -427,7 +466,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
JobInstanceStartRequest request = buildRequest(jobDefId);
// execute
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
String instanceId = startResponse.getInstanceId();
// validate
@ -453,7 +492,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
// execute
ourLog.info("Starting job");
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
String instanceId = startResponse.getInstanceId();
myFirstStepLatch.awaitExpected();
@ -508,7 +547,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
// test
JobInstanceStartRequest request = buildRequest(jobDefId);
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse response = myJobCoordinator.startInstance(request);
Batch2JobStartResponse response = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
JobInstance instance = myBatch2JobHelper.awaitJobHasStatus(response.getInstanceId(),
30, // we want to wait a long time (2 min here) cause backoff is incremental
StatusEnum.FAILED
@ -551,6 +590,80 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
.build();
}
private void buildAndDefine3StepReductionJob(
String theJobId,
IReductionStepHandler theHandler
) {
// step 1
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> first = (step, sink) -> {
theHandler.firstStep(step, sink);
return RunOutcome.SUCCESS;
};
// step 2
IJobStepWorker<TestJobParameters, FirstStepOutput, SecondStepOutput> second = (step, sink) -> {
theHandler.secondStep(step, sink);
return RunOutcome.SUCCESS;
};
// step 3
IReductionStepWorker<TestJobParameters, SecondStepOutput, ReductionStepOutput> last = new IReductionStepWorker<>() {
@Nonnull
@Override
public ChunkOutcome consume(ChunkExecutionDetails<TestJobParameters, SecondStepOutput> theChunkDetails) {
theHandler.reductionStepConsume(theChunkDetails, null);
return ChunkOutcome.SUCCESS();
}
@Nonnull
@Override
public RunOutcome run(
@Nonnull StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails,
@Nonnull IJobDataSink<ReductionStepOutput> theDataSink
) throws JobExecutionFailedException {
theHandler.reductionStepRun(theStepExecutionDetails, theDataSink);
return RunOutcome.SUCCESS;
}
};
createThreeStepReductionJob(theJobId, first, second, last);
}
private void createThreeStepReductionJob(
String theJobId,
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> theFirstStep,
IJobStepWorker<TestJobParameters, FirstStepOutput, SecondStepOutput> theSecondStep,
IReductionStepWorker<TestJobParameters, SecondStepOutput, ReductionStepOutput> theReductionsStep
) {
// create job definition (it's the test method's name)
JobDefinition<? extends IModelJson> jd = JobDefinition.newBuilder()
.setJobDefinitionId(theJobId)
.setJobDescription("test job")
.setJobDefinitionVersion(TEST_JOB_VERSION)
.setParametersType(TestJobParameters.class)
.gatedExecution()
.addFirstStep(
FIRST_STEP_ID,
"Test first step",
FirstStepOutput.class,
theFirstStep
)
.addIntermediateStep(SECOND_STEP_ID,
"Second step",
SecondStepOutput.class,
theSecondStep)
.addFinalReducerStep(
LAST_STEP_ID,
"Test last step",
ReductionStepOutput.class,
theReductionsStep
)
.completionHandler(myCompletionHandler)
.build();
myJobDefinitionRegistry.addJobDefinition(jd);
}
static class TestJobParameters implements IModelJson {
TestJobParameters() {
}
@ -581,4 +694,14 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myResult = theResult;
}
}
private interface IReductionStepHandler {
void firstStep(StepExecutionDetails<TestJobParameters, VoidModel> theStep, IJobDataSink<FirstStepOutput> theDataSink);
void secondStep(StepExecutionDetails<TestJobParameters, FirstStepOutput> theStep, IJobDataSink<SecondStepOutput> theDataSink);
void reductionStepConsume(ChunkExecutionDetails<TestJobParameters, SecondStepOutput> theChunkDetails, IJobDataSink<ReductionStepOutput> theDataSink);
void reductionStepRun(StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails, IJobDataSink<ReductionStepOutput> theDataSink);
}
}

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.test.utilities.HttpClientExtension;
import ca.uhn.fhir.util.Batch2JobDefinitionConstants;
import ca.uhn.fhir.util.JsonUtil;
import com.google.common.collect.Sets;
import jakarta.annotation.Nonnull;
import org.apache.commons.io.LineIterator;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
@ -66,7 +67,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
@ -352,7 +352,6 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
verifyBulkExportResults(options, List.of("Patient/PING1"), Collections.singletonList("Patient/PNING3"));
} finally {
myCaptureQueriesListener.logSelectQueries();
}
}

View File

@ -1,58 +0,0 @@
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.api;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
/**
* This class is used for Reduction Step for Batch2 Jobs.
* @param <PT> - Job Parameters type
* @param <IT> - Input data type
* @param <OT> - Output data type. Output will actually be a ListResult of these objects.
*/
public class ReductionStepExecutionDetails<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson>
extends StepExecutionDetails<PT, IT> {
public ReductionStepExecutionDetails(
@Nonnull PT theParameters, @Nullable IT theData, @Nonnull JobInstance theInstance) {
// TODO KHS shouldn't the chunkId be null?
super(theParameters, theData, theInstance, "VOID");
}
public ReductionStepExecutionDetails(@Nonnull PT theParameters, @Nonnull JobInstance theInstance) {
this(theParameters, null, theInstance);
}
@Override
@Nonnull
public final IT getData() {
throw new UnsupportedOperationException(
Msg.code(2099) + " Reduction steps should have all data by the time execution is called.");
}
@Override
public boolean hasAssociatedWorkChunk() {
return false;
}
}

View File

@ -20,9 +20,11 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.ChunkExecutionDetails;
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
import ca.uhn.fhir.batch2.api.IReductionStepWorker;
import ca.uhn.fhir.batch2.api.JobCompletionDetails;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.model.ChunkOutcome;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
@ -221,7 +223,6 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
return null;
});
} finally {
executeInTransactionWithSynchronization(() -> {
ourLog.info(
"Reduction step for instance[{}] produced {} successful and {} failed chunks",
@ -256,6 +257,18 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
WorkChunkStatusEnum.FAILED,
"JOB ABORTED");
}
if (response.isSuccessful()) {
/**
* All reduction steps are final steps.
*/
IJobCompletionHandler<PT> completionHandler =
theJobWorkCursor.getJobDefinition().getCompletionHandler();
if (completionHandler != null) {
completionHandler.jobComplete(new JobCompletionDetails<>(parameters, instance));
}
}
return null;
});
}

View File

@ -74,7 +74,6 @@ public class ReductionStepExecutorServiceImplTest {
mySvc = new ReductionStepExecutorServiceImpl(myJobPersistence, myTransactionService, myJobDefinitionRegistry);
}
@Test
public void doExecution_reductionWithChunkFailed_marksAllFutureChunksAsFailedButPreviousAsSuccess() {
// setup
@ -87,7 +86,6 @@ public class ReductionStepExecutorServiceImplTest {
jobInstance.setStatus(StatusEnum.IN_PROGRESS);
JobWorkCursor<TestJobParameters, StepInputData, StepOutputData> workCursor = mock(JobWorkCursor.class);
// when
when(workCursor.getCurrentStep()).thenReturn((JobDefinitionStep<TestJobParameters, StepInputData, StepOutputData>) createJobDefinition().getSteps().get(1));
when(workCursor.getJobDefinition()).thenReturn(createJobDefinition());