review comments 1

This commit is contained in:
leif stawnyczy 2024-03-25 21:22:15 -04:00
parent 6ac3c73234
commit 4a4be77351
13 changed files with 140 additions and 93 deletions

View File

@ -399,28 +399,6 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
}
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId) {
Batch2JobInstanceEntity jobInstanceEntity = getRunningJob(theInstanceId);
if (jobInstanceEntity == null) {
return false;
}
Set<WorkChunkStatusEnum> statusesForStep =
getDistinctWorkChunkStatesForJobAndStep(theInstanceId, theCurrentStepId);
ourLog.debug(
"Checking whether gated job can advanced to next step. [instanceId={}, currentStepId={}, statusesForStep={}]",
theInstanceId,
theCurrentStepId,
statusesForStep);
return statusesForStep.isEmpty()
|| statusesForStep.equals(Set.of(WorkChunkStatusEnum.COMPLETED))
|| statusesForStep.equals(Set.of(WorkChunkStatusEnum.READY));
}
@Override
public Set<WorkChunkStatusEnum> getDistinctWorkChunkStatesForJobAndStep(
String theInstanceId, String theCurrentStepId) {

View File

@ -22,7 +22,7 @@ import static ca.uhn.fhir.batch2.model.JobDefinition.ID_MAX_LENGTH;
@Immutable
@Subselect("SELECT e.id as id, "
+ " e.seq as seq,"
+ " e.stat as status, "
+ " e.stat as state, "
+ " e.instance_id as instance_id, "
+ " e.definition_id as job_definition_id, "
+ " e.definition_ver as job_definition_version, "
@ -37,7 +37,7 @@ public class Batch2WorkChunkMetadataView implements Serializable {
@Column(name = "SEQ", nullable = false)
private int mySequence;
@Column(name = "STATUS", length = ID_MAX_LENGTH, nullable = false)
@Column(name = "STATE", length = ID_MAX_LENGTH, nullable = false)
@Enumerated(EnumType.STRING)
private WorkChunkStatusEnum myStatus;

View File

@ -206,7 +206,7 @@ public class Batch2JobMaintenanceIT extends BaseJpaR4Test {
@Nonnull
private JobDefinition<? extends IModelJson> buildGatedJobDefinition(String theJobId, IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> theFirstStep, IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> theLastStep) {
return TestJobDefinitionUtils.buildGatedJobDefinition(
return TestJobDefinitionUtils.buildJobDefinition(
theJobId,
theFirstStep,
theLastStep,

View File

@ -31,6 +31,7 @@ import ca.uhn.hapi.fhir.batch2.test.configs.SpyOverrideConfig;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import jakarta.annotation.Nonnull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
@ -97,6 +98,11 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Autowired
public JobDefinitionRegistry myJobDefinitionRegistry;
@AfterEach
public void after() {
myJobDefinitionRegistry.removeJobDefinition(JOB_DEFINITION_ID, JOB_DEF_VER);
}
@Test
public void testDeleteInstance() {
// Setup
@ -383,7 +389,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testStoreAndFetchWorkChunk_NoData() {
JobInstance instance = createInstance(true);
JobInstance instance = createInstance(true, false);
String instanceId = mySvc.storeNewInstance(instance);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null);
@ -465,7 +471,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testStoreAndFetchWorkChunk_WithData() {
JobInstance instance = createInstance(true);
JobInstance instance = createInstance(true, false);
String instanceId = mySvc.storeNewInstance(instance);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
@ -485,7 +491,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testMarkChunkAsCompleted_Success() {
JobInstance instance = createInstance(true);
JobInstance instance = createInstance(true, false);
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA);
assertNotNull(chunkId);
@ -520,43 +526,9 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
});
}
@Test
public void testGatedAdvancementByStatus() {
// Setup
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkId, 0, 0));
boolean canAdvance = mySvc.canAdvanceInstanceToNextStep(instanceId, STEP_CHUNK_ID);
assertTrue(canAdvance);
//Storing a new chunk with QUEUED should prevent advancement.
String newChunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
canAdvance = mySvc.canAdvanceInstanceToNextStep(instanceId, STEP_CHUNK_ID);
assertFalse(canAdvance);
//Toggle it to complete
mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(newChunkId, 50, 0));
canAdvance = mySvc.canAdvanceInstanceToNextStep(instanceId, STEP_CHUNK_ID);
assertTrue(canAdvance);
//Create a new chunk and set it in progress.
String newerChunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
mySvc.onWorkChunkDequeue(newerChunkId);
canAdvance = mySvc.canAdvanceInstanceToNextStep(instanceId, STEP_CHUNK_ID);
assertFalse(canAdvance);
//Toggle IN_PROGRESS to complete
mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(newerChunkId, 50, 0));
canAdvance = mySvc.canAdvanceInstanceToNextStep(instanceId, STEP_CHUNK_ID);
assertTrue(canAdvance);
}
@Test
public void testMarkChunkAsCompleted_Error() {
JobInstance instance = createInstance(true);
JobInstance instance = createInstance(true, false);
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(JOB_DEFINITION_ID, TestJobDefinitionUtils.FIRST_STEP_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
@ -608,7 +580,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testMarkChunkAsCompleted_Fail() {
JobInstance instance = createInstance(true);
JobInstance instance = createInstance(true, false);
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
@ -697,11 +669,11 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
}
private JobInstance createInstance() {
return createInstance(false);
return createInstance(false, false);
}
@Nonnull
private JobInstance createInstance(boolean theCreateJobDefBool) {
private JobInstance createInstance(boolean theCreateJobDefBool, boolean theCreateGatedJob) {
JobInstance instance = new JobInstance();
instance.setJobDefinitionId(JOB_DEFINITION_ID);
instance.setStatus(StatusEnum.QUEUED);
@ -710,19 +682,37 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
instance.setReport("TEST");
if (theCreateJobDefBool) {
JobDefinition<?> jobDef = TestJobDefinitionUtils.buildGatedJobDefinition(
JOB_DEFINITION_ID,
(step, sink) -> {
sink.accept(new FirstStepOutput());
return RunOutcome.SUCCESS;
},
(step, sink) -> {
return RunOutcome.SUCCESS;
},
theDetails -> {
JobDefinition<?> jobDef;
}
);
if (theCreateGatedJob) {
jobDef = TestJobDefinitionUtils.buildGatedJobDefinition(
JOB_DEFINITION_ID,
(step, sink) -> {
sink.accept(new FirstStepOutput());
return RunOutcome.SUCCESS;
},
(step, sink) -> {
return RunOutcome.SUCCESS;
},
theDetails -> {
}
);
} else {
jobDef = TestJobDefinitionUtils.buildJobDefinition(
JOB_DEFINITION_ID,
(step, sink) -> {
sink.accept(new FirstStepOutput());
return RunOutcome.SUCCESS;
},
(step, sink) -> {
return RunOutcome.SUCCESS;
},
theDetails -> {
}
);
}
if (myJobDefinitionRegistry.getJobDefinition(jobDef.getJobDefinitionId(), jobDef.getJobDefinitionVersion()).isEmpty()) {
myJobDefinitionRegistry.addJobDefinition(jobDef);
}

View File

@ -8,6 +8,7 @@ import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.testjob.models.FirstStepOutput;
import ca.uhn.fhir.testjob.models.TestJobParameters;
@SuppressWarnings({"unchecked", "rawtypes"})
public class TestJobDefinitionUtils {
public static final int TEST_JOB_VERSION = 1;
@ -15,24 +16,41 @@ public class TestJobDefinitionUtils {
public static final String LAST_STEP_ID = "last-step";
/**
* Creates a test job definition
* @param theJobId
* @param theFirstStep
* @param theLastStep
* @param theCompletionHandler
* @return
* Creates a test job definition.
* This job will not be gated.
*/
public static JobDefinition<? extends IModelJson> buildJobDefinition(
String theJobId,
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> theFirstStep,
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> theLastStep,
IJobCompletionHandler<TestJobParameters> theCompletionHandler) {
return getJobBuilder(theJobId, theFirstStep, theLastStep, theCompletionHandler).build();
}
/**
* Creates a test job defintion.
* This job will be gated.
*/
public static JobDefinition<? extends IModelJson> buildGatedJobDefinition(
String theJobId,
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> theFirstStep,
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> theLastStep,
IJobCompletionHandler<TestJobParameters> theCompletionHandler) {
return getJobBuilder(theJobId, theFirstStep, theLastStep, theCompletionHandler)
.gatedExecution().build();
}
private static JobDefinition.Builder getJobBuilder(
String theJobId,
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> theFirstStep,
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> theLastStep,
IJobCompletionHandler<TestJobParameters> theCompletionHandler
) {
return JobDefinition.newBuilder()
.setJobDefinitionId(theJobId)
.setJobDescription("test job")
.setJobDefinitionVersion(TEST_JOB_VERSION)
.setParametersType(TestJobParameters.class)
.gatedExecution()
.addFirstStep(
FIRST_STEP_ID,
"Test first step",
@ -44,7 +62,6 @@ public class TestJobDefinitionUtils {
"Test last step",
theLastStep
)
.completionHandler(theCompletionHandler)
.build();
.completionHandler(theCompletionHandler);
}
}

View File

@ -2,5 +2,8 @@ package ca.uhn.fhir.testjob.models;
import ca.uhn.fhir.model.api.IModelJson;
/**
* Sample first step output for test job defintions created in {@link ca.uhn.fhir.testjob.TestJobDefinitionUtils}
*/
public class FirstStepOutput implements IModelJson {
}

View File

@ -2,5 +2,8 @@ package ca.uhn.fhir.testjob.models;
import ca.uhn.fhir.model.api.IModelJson;
/**
* Sample output object for reduction steps for test job created in {@link ca.uhn.fhir.testjob.TestJobDefinitionUtils}
*/
public class ReductionStepOutput implements IModelJson {
}

View File

@ -2,5 +2,8 @@ package ca.uhn.fhir.testjob.models;
import ca.uhn.fhir.model.api.IModelJson;
/**
* Sample job parameters; these are used for jobs created in {@link ca.uhn.fhir.testjob.TestJobDefinitionUtils}
*/
public class TestJobParameters implements IModelJson {
}

View File

@ -24,6 +24,26 @@ import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
/**
* This class is used to help set up and verify WorkChunk transitions.
*
* Creating this object requires an instanceid (of a stored instance), and jobdefinition (of stored job defintion)
* and a state string.
*
* State strings are defined as follows:
* "step-name-or-step-index,INITIAL_STATE|step-name-or-step-index,FINAL_STATE"
*
* where "step-name-or-step-index" is the name or index of a step in the provided
* JobDefinition; the step that the work chunk should start in.
*
* If no final state/step name is provided, no transition is assumed.
*
* Further, comments can be added to the state string, but must be started by a "#".
*
* Eg:
* 1,READY|1,QUEUED # will create an initial work chunk in the READY state in step 1.
* # validation will verify that this workchunk has been transitioned to QUEUED.
*/
public class JobMaintenanceStateInformation {
private static final Logger ourLog = LoggerFactory.getLogger(JobMaintenanceStateInformation.class);

View File

@ -119,10 +119,6 @@ public interface IJobPersistence extends IWorkChunkPersistence {
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest);
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
@Deprecated
boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId);
/**
* Returns set of all distinct states for the specified job instance id
* and step id.

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkData;
import ca.uhn.fhir.i18n.Msg;
@ -51,6 +52,8 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
private final AtomicReference<String> myLastChunkId = new AtomicReference<>();
private final IHapiTransactionService myHapiTransactionService;
private final boolean myGatedExecution;
JobDataSink(
@Nonnull BatchJobSender theBatchJobSender,
@Nonnull IJobPersistence theJobPersistence,
@ -65,6 +68,7 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
myJobDefinitionVersion = theDefinition.getJobDefinitionVersion();
myTargetStep = theJobWorkCursor.nextStep;
myHapiTransactionService = theHapiTransactionService;
myGatedExecution = theDefinition.isGatedExecution();
}
@Override
@ -85,6 +89,18 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
.execute(() -> myJobPersistence.onWorkChunkCreate(batchWorkChunk));
myLastChunkId.set(chunkId);
if (!myGatedExecution) {
myJobPersistence.enqueueWorkChunkForProcessing(chunkId, updated -> {
if (updated == 1) {
JobWorkNotification workNotification = new JobWorkNotification(
myJobDefinitionId, myJobDefinitionVersion, instanceId, targetStepId, chunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
} else {
ourLog.error("Expected to have updated 1 workchunk, but instead found {}. Chunk is not sent to queue.", updated);
}
});
}
}
@Override

View File

@ -314,7 +314,13 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
ReductionStepChunkProcessingResponse theResponseObject,
JobWorkCursor<PT, IT, OT> theJobWorkCursor) {
if (theChunk.getStatus() != WorkChunkStatusEnum.READY) {
/*
* Reduction steps are done inline and only on gated jobs.
* As such, all workchunks once they get here should either be:
* 1) READY (7.1 new status)
* 2) COMPLETED (7.0 legacy)
*/
if (theChunk.getStatus() != WorkChunkStatusEnum.READY || theChunk.getStatus() == WorkChunkStatusEnum.COMPLETED) {
// This should never happen since jobs with reduction are required to be gated
ourLog.error(
"Unexpected chunk {} with status {} found while reducing {}. No chunks feeding into a reduction step should be in a state other than READY.",

View File

@ -29,6 +29,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import jakarta.annotation.Nonnull;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
@ -36,6 +37,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -101,6 +105,11 @@ class JobDataSinkTest {
// execute
// Let's test our first step worker by calling run on it:
when(myJobPersistence.onWorkChunkCreate(myBatchWorkChunkCaptor.capture())).thenReturn(CHUNK_ID);
doAnswer(args -> {
Consumer<Integer> consumer = args.getArgument(1);
consumer.accept(1);
return 1;
}).when(myJobPersistence).enqueueWorkChunkForProcessing(anyString(), any());
JobInstance instance = JobInstance.fromInstanceId(JOB_INSTANCE_ID);
StepExecutionDetails<TestJobParameters, VoidModel> details = new StepExecutionDetails<>(new TestJobParameters().setParam1("" + PID_COUNT), null, instance, CHUNK_ID);
JobWorkCursor<TestJobParameters, VoidModel, Step1Output> cursor = new JobWorkCursor<>(job, true, firstStep, lastStep);
@ -113,7 +122,13 @@ class JobDataSinkTest {
// theDataSink.accept(output) called by firstStepWorker above calls two services. Let's validate them both.
verify(myBatchJobSender, never()).sendWorkChannelMessage(any());
verify(myBatchJobSender).sendWorkChannelMessage(myJobWorkNotificationCaptor.capture());
JobWorkNotification notification = myJobWorkNotificationCaptor.getValue();
assertEquals(JOB_DEF_ID, notification.getJobDefinitionId());
assertEquals(JOB_INSTANCE_ID, notification.getInstanceId());
assertEquals(CHUNK_ID, notification.getChunkId());
assertEquals(JOB_DEF_VERSION, notification.getJobDefinitionVersion());
assertEquals(LAST_STEP_ID, notification.getTargetStepId());
WorkChunkCreateEvent batchWorkChunk = myBatchWorkChunkCaptor.getValue();
assertEquals(JOB_DEF_VERSION, batchWorkChunk.jobDefinitionVersion);