mirror of
https://github.com/hapifhir/hapi-fhir.git
synced 2025-02-18 02:45:07 +00:00
Merge remote-tracking branch 'origin/master' into do-20240206-core-bump-6-2-16
This commit is contained in:
commit
8419104894
@ -0,0 +1,6 @@
|
||||
---
|
||||
type: fix
|
||||
issue: 5746
|
||||
title: "Batch2 jobs with reduction steps didn't fire the completion handler.
|
||||
This has been fixed.
|
||||
"
|
@ -28,9 +28,11 @@ import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
|
||||
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
|
||||
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
|
||||
import ca.uhn.fhir.model.api.IModelJson;
|
||||
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
|
||||
import ca.uhn.fhir.util.JsonUtil;
|
||||
import ca.uhn.test.concurrency.PointcutLatch;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import jakarta.annotation.Nonnull;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@ -42,7 +44,6 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.domain.Page;
|
||||
import org.springframework.data.domain.Sort;
|
||||
|
||||
import jakarta.annotation.Nonnull;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
@ -64,6 +65,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
|
||||
|
||||
public static final int TEST_JOB_VERSION = 1;
|
||||
public static final String FIRST_STEP_ID = "first-step";
|
||||
public static final String SECOND_STEP_ID = "second-step";
|
||||
public static final String LAST_STEP_ID = "last-step";
|
||||
@Autowired
|
||||
JobDefinitionRegistry myJobDefinitionRegistry;
|
||||
@ -94,7 +96,8 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
|
||||
public void before() throws Exception {
|
||||
super.before();
|
||||
|
||||
myCompletionHandler = details -> {};
|
||||
myCompletionHandler = details -> {
|
||||
};
|
||||
myWorkChannel = (LinkedBlockingChannel) myChannelFactory.getOrCreateReceiver(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, new ChannelConsumerSettings());
|
||||
myStorageSettings.setJobFastTrackingEnabled(true);
|
||||
}
|
||||
@ -139,7 +142,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
|
||||
List<String> jobIds = new ArrayList<>();
|
||||
for (int i = 0; i < maxJobsToSave; i++) {
|
||||
JobInstanceStartRequest request = buildRequest(jobId);
|
||||
Batch2JobStartResponse response = myJobCoordinator.startInstance(request);
|
||||
Batch2JobStartResponse response = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
|
||||
jobIds.add(response.getInstanceId());
|
||||
}
|
||||
|
||||
@ -188,7 +191,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
|
||||
JobInstanceStartRequest request = buildRequest(jobId);
|
||||
|
||||
myFirstStepLatch.setExpectedCount(1);
|
||||
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request);
|
||||
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
|
||||
myFirstStepLatch.awaitExpected();
|
||||
|
||||
myBatch2JobHelper.awaitJobCompletion(startResponse.getInstanceId());
|
||||
@ -212,7 +215,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
|
||||
|
||||
myFirstStepLatch.setExpectedCount(1);
|
||||
myLastStepLatch.setExpectedCount(1);
|
||||
String batchJobId = myJobCoordinator.startInstance(request).getInstanceId();
|
||||
String batchJobId = myJobCoordinator.startInstance(new SystemRequestDetails(), request).getInstanceId();
|
||||
myFirstStepLatch.awaitExpected();
|
||||
|
||||
myBatch2JobHelper.assertFastTracking(batchJobId);
|
||||
@ -231,76 +234,119 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
|
||||
assertEquals(1.0, jobInstance.getProgress());
|
||||
}
|
||||
|
||||
private void createThreeStepReductionJob(
|
||||
String theJobId,
|
||||
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> theFirstStep,
|
||||
IJobStepWorker<TestJobParameters, FirstStepOutput, SecondStepOutput> theSecondStep,
|
||||
IReductionStepWorker<TestJobParameters, SecondStepOutput, ReductionStepOutput> theReductionsStep
|
||||
) {
|
||||
// create job definition (it's the test method's name)
|
||||
JobDefinition<? extends IModelJson> jd = JobDefinition.newBuilder()
|
||||
.setJobDefinitionId(theJobId)
|
||||
.setJobDescription("test job")
|
||||
.setJobDefinitionVersion(TEST_JOB_VERSION)
|
||||
.setParametersType(TestJobParameters.class)
|
||||
.gatedExecution()
|
||||
.addFirstStep(
|
||||
FIRST_STEP_ID,
|
||||
"Test first step",
|
||||
FirstStepOutput.class,
|
||||
theFirstStep
|
||||
)
|
||||
.addIntermediateStep("SECOND",
|
||||
"Second step",
|
||||
SecondStepOutput.class,
|
||||
theSecondStep)
|
||||
.addFinalReducerStep(
|
||||
LAST_STEP_ID,
|
||||
"Test last step",
|
||||
ReductionStepOutput.class,
|
||||
theReductionsStep
|
||||
)
|
||||
.build();
|
||||
myJobDefinitionRegistry.addJobDefinition(jd);
|
||||
@Test
|
||||
public void testJobWithReductionStepFiresCompletionHandler() throws InterruptedException {
|
||||
// setup
|
||||
String jobId = new Exception().getStackTrace()[0].getMethodName();
|
||||
String testInfo = "test";
|
||||
int totalCalls = 2;
|
||||
AtomicInteger secondStepInt = new AtomicInteger();
|
||||
|
||||
AtomicBoolean completionBool = new AtomicBoolean();
|
||||
|
||||
myCompletionHandler = (params) -> {
|
||||
// ensure our completion handler fires
|
||||
completionBool.getAndSet(true);
|
||||
};
|
||||
|
||||
buildAndDefine3StepReductionJob(jobId, new IReductionStepHandler() {
|
||||
private final AtomicBoolean myBoolean = new AtomicBoolean();
|
||||
|
||||
private final AtomicInteger mySecondGate = new AtomicInteger();
|
||||
|
||||
@Override
|
||||
public void firstStep(StepExecutionDetails<TestJobParameters, VoidModel> theStep, IJobDataSink<FirstStepOutput> theDataSink) {
|
||||
for (int i = 0; i < totalCalls; i++) {
|
||||
theDataSink.accept(new FirstStepOutput());
|
||||
}
|
||||
callLatch(myFirstStepLatch, theStep);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void secondStep(StepExecutionDetails<TestJobParameters, FirstStepOutput> theStep, IJobDataSink<SecondStepOutput> theDataSink) {
|
||||
SecondStepOutput output = new SecondStepOutput();
|
||||
output.setValue(testInfo + secondStepInt.getAndIncrement());
|
||||
theDataSink.accept(output);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reductionStepConsume(ChunkExecutionDetails<TestJobParameters, SecondStepOutput> theChunkDetails, IJobDataSink<ReductionStepOutput> theDataSink) {
|
||||
int val = mySecondGate.getAndIncrement();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reductionStepRun(StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails, IJobDataSink<ReductionStepOutput> theDataSink) {
|
||||
boolean isRunAlready = myBoolean.getAndSet(true);
|
||||
assertFalse(isRunAlready, "Reduction step should only be called once!");
|
||||
|
||||
theDataSink.accept(new ReductionStepOutput(new ArrayList<>()));
|
||||
callLatch(myLastStepLatch, theStepExecutionDetails);
|
||||
}
|
||||
});
|
||||
|
||||
// test
|
||||
JobInstanceStartRequest request = buildRequest(jobId);
|
||||
myFirstStepLatch.setExpectedCount(1);
|
||||
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
|
||||
|
||||
String instanceId = startResponse.getInstanceId();
|
||||
myFirstStepLatch.awaitExpected();
|
||||
assertNotNull(instanceId);
|
||||
|
||||
myBatch2JobHelper.awaitGatedStepId(FIRST_STEP_ID, instanceId);
|
||||
|
||||
// wait for last step to finish
|
||||
ourLog.info("Setting last step latch");
|
||||
myLastStepLatch.setExpectedCount(1);
|
||||
|
||||
// waiting
|
||||
myBatch2JobHelper.awaitJobCompletion(instanceId);
|
||||
myLastStepLatch.awaitExpected();
|
||||
ourLog.info("awaited the last step");
|
||||
|
||||
// verify
|
||||
Optional<JobInstance> instanceOp = myJobPersistence.fetchInstance(instanceId);
|
||||
assertTrue(instanceOp.isPresent());
|
||||
JobInstance jobInstance = instanceOp.get();
|
||||
|
||||
// ensure our completion handler fires
|
||||
assertTrue(completionBool.get());
|
||||
|
||||
assertEquals(StatusEnum.COMPLETED, jobInstance.getStatus());
|
||||
assertEquals(1.0, jobInstance.getProgress());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = { true, false })
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testJobDefinitionWithReductionStepIT(boolean theDelayReductionStepBool) throws InterruptedException {
|
||||
// setup
|
||||
String jobId = new Exception().getStackTrace()[0].getMethodName() + "_" + theDelayReductionStepBool;
|
||||
String testInfo = "test";
|
||||
AtomicInteger secondStepInt = new AtomicInteger();
|
||||
|
||||
// step 1
|
||||
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> first = (step, sink) -> {
|
||||
sink.accept(new FirstStepOutput());
|
||||
sink.accept(new FirstStepOutput());
|
||||
callLatch(myFirstStepLatch, step);
|
||||
return RunOutcome.SUCCESS;
|
||||
};
|
||||
|
||||
// step 2
|
||||
IJobStepWorker<TestJobParameters, FirstStepOutput, SecondStepOutput> second = (step, sink) -> {
|
||||
SecondStepOutput output = new SecondStepOutput();
|
||||
output.setValue(testInfo + secondStepInt.getAndIncrement());
|
||||
sink.accept(output);
|
||||
|
||||
return RunOutcome.SUCCESS;
|
||||
};
|
||||
|
||||
// step 3
|
||||
IReductionStepWorker<TestJobParameters, SecondStepOutput, ReductionStepOutput> last = new IReductionStepWorker<>() {
|
||||
|
||||
buildAndDefine3StepReductionJob(jobId, new IReductionStepHandler() {
|
||||
private final ArrayList<SecondStepOutput> myOutput = new ArrayList<>();
|
||||
|
||||
private final AtomicBoolean myBoolean = new AtomicBoolean();
|
||||
|
||||
private final AtomicInteger mySecondGate = new AtomicInteger();
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public ChunkOutcome consume(ChunkExecutionDetails<TestJobParameters, SecondStepOutput> theChunkDetails) {
|
||||
public void firstStep(StepExecutionDetails<TestJobParameters, VoidModel> theStep, IJobDataSink<FirstStepOutput> theDataSink) {
|
||||
theDataSink.accept(new FirstStepOutput());
|
||||
theDataSink.accept(new FirstStepOutput());
|
||||
callLatch(myFirstStepLatch, theStep);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void secondStep(StepExecutionDetails<TestJobParameters, FirstStepOutput> theStep, IJobDataSink<SecondStepOutput> theDataSink) {
|
||||
SecondStepOutput output = new SecondStepOutput();
|
||||
output.setValue(testInfo + secondStepInt.getAndIncrement());
|
||||
theDataSink.accept(output);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reductionStepConsume(ChunkExecutionDetails<TestJobParameters, SecondStepOutput> theChunkDetails, IJobDataSink<ReductionStepOutput> theDataSink) {
|
||||
myOutput.add(theChunkDetails.getData());
|
||||
// 1 because we know 2 packets are coming.
|
||||
// we'll fire the second maintenance run on the second packet
|
||||
@ -309,20 +355,14 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
|
||||
ourLog.info("SECOND FORCED MAINTENANCE PASS FORCED");
|
||||
myBatch2JobHelper.forceRunMaintenancePass();
|
||||
}
|
||||
return ChunkOutcome.SUCCESS();
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public RunOutcome run(
|
||||
@Nonnull StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails,
|
||||
@Nonnull IJobDataSink<ReductionStepOutput> theDataSink
|
||||
) throws JobExecutionFailedException {
|
||||
public void reductionStepRun(StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails, IJobDataSink<ReductionStepOutput> theDataSink) {
|
||||
boolean isRunAlready = myBoolean.getAndSet(true);
|
||||
assertFalse(isRunAlready, "Reduction step should only be called once!");
|
||||
|
||||
complete(theStepExecutionDetails, theDataSink);
|
||||
return RunOutcome.SUCCESS;
|
||||
}
|
||||
|
||||
private void complete(
|
||||
@ -333,13 +373,12 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
|
||||
theDataSink.accept(new ReductionStepOutput(myOutput));
|
||||
callLatch(myLastStepLatch, theStepExecutionDetails);
|
||||
}
|
||||
};
|
||||
createThreeStepReductionJob(jobId, first, second, last);
|
||||
});
|
||||
|
||||
// run test
|
||||
JobInstanceStartRequest request = buildRequest(jobId);
|
||||
myFirstStepLatch.setExpectedCount(1);
|
||||
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request);
|
||||
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
|
||||
|
||||
String instanceId = startResponse.getInstanceId();
|
||||
myFirstStepLatch.awaitExpected();
|
||||
@ -398,7 +437,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
|
||||
JobInstanceStartRequest request = buildRequest(jobDefId);
|
||||
|
||||
myFirstStepLatch.setExpectedCount(1);
|
||||
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request);
|
||||
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
|
||||
String instanceId = startResponse.getInstanceId();
|
||||
myFirstStepLatch.awaitExpected();
|
||||
|
||||
@ -427,7 +466,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
|
||||
JobInstanceStartRequest request = buildRequest(jobDefId);
|
||||
|
||||
// execute
|
||||
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request);
|
||||
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
|
||||
String instanceId = startResponse.getInstanceId();
|
||||
|
||||
// validate
|
||||
@ -453,7 +492,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
|
||||
// execute
|
||||
ourLog.info("Starting job");
|
||||
myFirstStepLatch.setExpectedCount(1);
|
||||
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request);
|
||||
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
|
||||
String instanceId = startResponse.getInstanceId();
|
||||
myFirstStepLatch.awaitExpected();
|
||||
|
||||
@ -508,7 +547,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
|
||||
// test
|
||||
JobInstanceStartRequest request = buildRequest(jobDefId);
|
||||
myFirstStepLatch.setExpectedCount(1);
|
||||
Batch2JobStartResponse response = myJobCoordinator.startInstance(request);
|
||||
Batch2JobStartResponse response = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
|
||||
JobInstance instance = myBatch2JobHelper.awaitJobHasStatus(response.getInstanceId(),
|
||||
30, // we want to wait a long time (2 min here) cause backoff is incremental
|
||||
StatusEnum.FAILED
|
||||
@ -551,6 +590,80 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
private void buildAndDefine3StepReductionJob(
|
||||
String theJobId,
|
||||
IReductionStepHandler theHandler
|
||||
) {
|
||||
// step 1
|
||||
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> first = (step, sink) -> {
|
||||
theHandler.firstStep(step, sink);
|
||||
return RunOutcome.SUCCESS;
|
||||
};
|
||||
|
||||
// step 2
|
||||
IJobStepWorker<TestJobParameters, FirstStepOutput, SecondStepOutput> second = (step, sink) -> {
|
||||
theHandler.secondStep(step, sink);
|
||||
return RunOutcome.SUCCESS;
|
||||
};
|
||||
|
||||
// step 3
|
||||
IReductionStepWorker<TestJobParameters, SecondStepOutput, ReductionStepOutput> last = new IReductionStepWorker<>() {
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public ChunkOutcome consume(ChunkExecutionDetails<TestJobParameters, SecondStepOutput> theChunkDetails) {
|
||||
theHandler.reductionStepConsume(theChunkDetails, null);
|
||||
return ChunkOutcome.SUCCESS();
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public RunOutcome run(
|
||||
@Nonnull StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails,
|
||||
@Nonnull IJobDataSink<ReductionStepOutput> theDataSink
|
||||
) throws JobExecutionFailedException {
|
||||
theHandler.reductionStepRun(theStepExecutionDetails, theDataSink);
|
||||
return RunOutcome.SUCCESS;
|
||||
}
|
||||
};
|
||||
createThreeStepReductionJob(theJobId, first, second, last);
|
||||
}
|
||||
|
||||
private void createThreeStepReductionJob(
|
||||
String theJobId,
|
||||
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> theFirstStep,
|
||||
IJobStepWorker<TestJobParameters, FirstStepOutput, SecondStepOutput> theSecondStep,
|
||||
IReductionStepWorker<TestJobParameters, SecondStepOutput, ReductionStepOutput> theReductionsStep
|
||||
) {
|
||||
// create job definition (it's the test method's name)
|
||||
JobDefinition<? extends IModelJson> jd = JobDefinition.newBuilder()
|
||||
.setJobDefinitionId(theJobId)
|
||||
.setJobDescription("test job")
|
||||
.setJobDefinitionVersion(TEST_JOB_VERSION)
|
||||
.setParametersType(TestJobParameters.class)
|
||||
.gatedExecution()
|
||||
.addFirstStep(
|
||||
FIRST_STEP_ID,
|
||||
"Test first step",
|
||||
FirstStepOutput.class,
|
||||
theFirstStep
|
||||
)
|
||||
.addIntermediateStep(SECOND_STEP_ID,
|
||||
"Second step",
|
||||
SecondStepOutput.class,
|
||||
theSecondStep)
|
||||
.addFinalReducerStep(
|
||||
LAST_STEP_ID,
|
||||
"Test last step",
|
||||
ReductionStepOutput.class,
|
||||
theReductionsStep
|
||||
)
|
||||
.completionHandler(myCompletionHandler)
|
||||
.build();
|
||||
myJobDefinitionRegistry.addJobDefinition(jd);
|
||||
}
|
||||
|
||||
static class TestJobParameters implements IModelJson {
|
||||
TestJobParameters() {
|
||||
}
|
||||
@ -581,4 +694,14 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
|
||||
myResult = theResult;
|
||||
}
|
||||
}
|
||||
|
||||
private interface IReductionStepHandler {
|
||||
void firstStep(StepExecutionDetails<TestJobParameters, VoidModel> theStep, IJobDataSink<FirstStepOutput> theDataSink);
|
||||
|
||||
void secondStep(StepExecutionDetails<TestJobParameters, FirstStepOutput> theStep, IJobDataSink<SecondStepOutput> theDataSink);
|
||||
|
||||
void reductionStepConsume(ChunkExecutionDetails<TestJobParameters, SecondStepOutput> theChunkDetails, IJobDataSink<ReductionStepOutput> theDataSink);
|
||||
|
||||
void reductionStepRun(StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails, IJobDataSink<ReductionStepOutput> theDataSink);
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import ca.uhn.fhir.test.utilities.HttpClientExtension;
|
||||
import ca.uhn.fhir.util.Batch2JobDefinitionConstants;
|
||||
import ca.uhn.fhir.util.JsonUtil;
|
||||
import com.google.common.collect.Sets;
|
||||
import jakarta.annotation.Nonnull;
|
||||
import org.apache.commons.io.LineIterator;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
@ -66,7 +67,6 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import jakarta.annotation.Nonnull;
|
||||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
import java.util.ArrayList;
|
||||
@ -352,7 +352,6 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
|
||||
verifyBulkExportResults(options, List.of("Patient/PING1"), Collections.singletonList("Patient/PNING3"));
|
||||
} finally {
|
||||
myCaptureQueriesListener.logSelectQueries();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,58 +0,0 @@
|
||||
/*-
|
||||
* #%L
|
||||
* HAPI FHIR JPA Server - Batch2 Task Processor
|
||||
* %%
|
||||
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
|
||||
* %%
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* #L%
|
||||
*/
|
||||
package ca.uhn.fhir.batch2.api;
|
||||
|
||||
import ca.uhn.fhir.batch2.model.JobInstance;
|
||||
import ca.uhn.fhir.i18n.Msg;
|
||||
import ca.uhn.fhir.model.api.IModelJson;
|
||||
import jakarta.annotation.Nonnull;
|
||||
import jakarta.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* This class is used for Reduction Step for Batch2 Jobs.
|
||||
* @param <PT> - Job Parameters type
|
||||
* @param <IT> - Input data type
|
||||
* @param <OT> - Output data type. Output will actually be a ListResult of these objects.
|
||||
*/
|
||||
public class ReductionStepExecutionDetails<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson>
|
||||
extends StepExecutionDetails<PT, IT> {
|
||||
|
||||
public ReductionStepExecutionDetails(
|
||||
@Nonnull PT theParameters, @Nullable IT theData, @Nonnull JobInstance theInstance) {
|
||||
// TODO KHS shouldn't the chunkId be null?
|
||||
super(theParameters, theData, theInstance, "VOID");
|
||||
}
|
||||
|
||||
public ReductionStepExecutionDetails(@Nonnull PT theParameters, @Nonnull JobInstance theInstance) {
|
||||
this(theParameters, null, theInstance);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nonnull
|
||||
public final IT getData() {
|
||||
throw new UnsupportedOperationException(
|
||||
Msg.code(2099) + " Reduction steps should have all data by the time execution is called.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasAssociatedWorkChunk() {
|
||||
return false;
|
||||
}
|
||||
}
|
@ -20,9 +20,11 @@
|
||||
package ca.uhn.fhir.batch2.coordinator;
|
||||
|
||||
import ca.uhn.fhir.batch2.api.ChunkExecutionDetails;
|
||||
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
|
||||
import ca.uhn.fhir.batch2.api.IJobPersistence;
|
||||
import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
|
||||
import ca.uhn.fhir.batch2.api.IReductionStepWorker;
|
||||
import ca.uhn.fhir.batch2.api.JobCompletionDetails;
|
||||
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
|
||||
import ca.uhn.fhir.batch2.model.ChunkOutcome;
|
||||
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
|
||||
@ -221,7 +223,6 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
|
||||
return null;
|
||||
});
|
||||
} finally {
|
||||
|
||||
executeInTransactionWithSynchronization(() -> {
|
||||
ourLog.info(
|
||||
"Reduction step for instance[{}] produced {} successful and {} failed chunks",
|
||||
@ -256,6 +257,18 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
|
||||
WorkChunkStatusEnum.FAILED,
|
||||
"JOB ABORTED");
|
||||
}
|
||||
|
||||
if (response.isSuccessful()) {
|
||||
/**
|
||||
* All reduction steps are final steps.
|
||||
*/
|
||||
IJobCompletionHandler<PT> completionHandler =
|
||||
theJobWorkCursor.getJobDefinition().getCompletionHandler();
|
||||
if (completionHandler != null) {
|
||||
completionHandler.jobComplete(new JobCompletionDetails<>(parameters, instance));
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
@ -74,7 +74,6 @@ public class ReductionStepExecutorServiceImplTest {
|
||||
mySvc = new ReductionStepExecutorServiceImpl(myJobPersistence, myTransactionService, myJobDefinitionRegistry);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void doExecution_reductionWithChunkFailed_marksAllFutureChunksAsFailedButPreviousAsSuccess() {
|
||||
// setup
|
||||
@ -87,7 +86,6 @@ public class ReductionStepExecutorServiceImplTest {
|
||||
jobInstance.setStatus(StatusEnum.IN_PROGRESS);
|
||||
JobWorkCursor<TestJobParameters, StepInputData, StepOutputData> workCursor = mock(JobWorkCursor.class);
|
||||
|
||||
|
||||
// when
|
||||
when(workCursor.getCurrentStep()).thenReturn((JobDefinitionStep<TestJobParameters, StepInputData, StepOutputData>) createJobDefinition().getSteps().get(1));
|
||||
when(workCursor.getJobDefinition()).thenReturn(createJobDefinition());
|
||||
|
Loading…
x
Reference in New Issue
Block a user