basic state transition shell work

This commit is contained in:
leif stawnyczy 2024-03-19 14:22:47 -04:00
parent 7067cf4491
commit 3f6c2f984b
16 changed files with 458 additions and 139 deletions

View File

@ -474,6 +474,14 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
myJobInstanceRepository.updateInstanceUpdateTime(theInstanceId, new Date());
}
@Override
public WorkChunk createWorkChunk(WorkChunk theWorkChunk) {
if (theWorkChunk.getId() == null) {
theWorkChunk.setId(UUID.randomUUID().toString());
}
return toChunk(myWorkChunkRepository.save(Batch2WorkChunkEntity.fromWorkChunk(theWorkChunk)));
}
/**
* Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope
*/

View File

@ -19,6 +19,7 @@
*/
package ca.uhn.fhir.jpa.entity;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import jakarta.persistence.Basic;
import jakarta.persistence.Column;
@ -162,6 +163,26 @@ public class Batch2WorkChunkEntity implements Serializable {
myWarningMessage = theWarningMessage;
}
public static Batch2WorkChunkEntity fromWorkChunk(WorkChunk theWorkChunk) {
return new Batch2WorkChunkEntity(
theWorkChunk.getId(),
theWorkChunk.getSequence(),
theWorkChunk.getJobDefinitionId(),
theWorkChunk.getJobDefinitionVersion(),
theWorkChunk.getInstanceId(),
theWorkChunk.getTargetStepId(),
theWorkChunk.getStatus(),
theWorkChunk.getCreateTime(),
theWorkChunk.getStartTime(),
theWorkChunk.getUpdateTime(),
theWorkChunk.getEndTime(),
theWorkChunk.getErrorMessage(),
theWorkChunk.getErrorCount(),
theWorkChunk.getRecordsProcessed(),
theWorkChunk.getWarningMessage()
);
}
public int getErrorCount() {
return myErrorCount;
}

View File

@ -22,6 +22,7 @@ import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
import ca.uhn.fhir.jpa.test.config.Batch2FastSchedulerConfig;
import ca.uhn.fhir.testjob.TestJobDefinitionUtils;
import ca.uhn.fhir.testjob.models.FirstStepOutput;
import ca.uhn.fhir.util.JsonUtil;
@ -40,6 +41,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.transaction.PlatformTransactionManager;
import java.time.Instant;
@ -66,6 +68,9 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@TestMethodOrder(MethodOrderer.MethodName.class)
@ContextConfiguration(classes = {
Batch2FastSchedulerConfig.class
})
public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
public static final String JOB_DEFINITION_ID = "definition-id";
@ -346,7 +351,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Override
public void runMaintenancePass() {
myBatch2JobHelper.runMaintenancePass();
myBatch2JobHelper.forceRunMaintenancePass();
}
}

View File

@ -20,15 +20,19 @@
package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.hapi.fhir.batch2.test.models.JobMaintenanceStateInformation;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobStep2InputType;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobStep3InputType;
@ -41,16 +45,21 @@ import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertNotNull;
/**
* Specification tests for batch2 storage and event system.
* These tests are abstract, and do not depend on JPA.
* Test setups should use the public batch2 api to create scenarios.
*/
public abstract class AbstractIJobPersistenceSpecificationTest implements IInProgressActionsTests, IInstanceStateTransitions, IWorkChunkStateTransitions, IWorkChunkStorageTests, IWorkChunkErrorActionsTests, WorkChunkTestConstants {
public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMaintenanceActions, IInProgressActionsTests, IInstanceStateTransitions, IWorkChunkStateTransitions, IWorkChunkStorageTests, IWorkChunkErrorActionsTests, WorkChunkTestConstants {
@Autowired
private IJobPersistence mySvc;
@ -61,6 +70,9 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IInPro
@Autowired
private PlatformTransactionManager myTransactionManager;
@Autowired
private IJobMaintenanceService myMaintenanceService;
public PlatformTransactionManager getTransactionManager() {
return myTransactionManager;
}
@ -69,32 +81,38 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IInPro
return mySvc;
}
public JobDefinition<TestJobParameters> withJobDefinition() {
return JobDefinition.newBuilder()
.setJobDefinitionId(JOB_DEFINITION_ID)
public JobDefinition<TestJobParameters> withJobDefinition(boolean theIsGatedBoolean) {
JobDefinition.Builder<TestJobParameters, ?> builder = JobDefinition.newBuilder()
.setJobDefinitionId(theIsGatedBoolean ? GATED_JOB_DEFINITION_ID : JOB_DEFINITION_ID)
.setJobDefinitionVersion(JOB_DEF_VER)
.setJobDescription("A job description")
.setParametersType(TestJobParameters.class)
.addFirstStep(TARGET_STEP_ID, "the first step", TestJobStep2InputType.class, (theStepExecutionDetails, theDataSink) -> new RunOutcome(0))
.addIntermediateStep("2nd-step-id", "the second step", TestJobStep3InputType.class, (theStepExecutionDetails, theDataSink) -> new RunOutcome(0))
.addLastStep("last-step-id", "the final step", (theStepExecutionDetails, theDataSink) -> new RunOutcome(0))
.build();
.addLastStep("last-step-id", "the final step", (theStepExecutionDetails, theDataSink) -> new RunOutcome(0));
if (theIsGatedBoolean) {
builder.gatedExecution();
}
return builder.build();
}
@AfterEach
public void after() {
myJobDefinitionRegistry.removeJobDefinition(JOB_DEFINITION_ID, JOB_DEF_VER);
myMaintenanceService.enableMaintenancePass(true);
}
@Nonnull
public JobInstance createInstance() {
JobDefinition<TestJobParameters> jobDefinition = withJobDefinition();
if (myJobDefinitionRegistry.getJobDefinition(JOB_DEFINITION_ID, JOB_DEF_VER).isEmpty()) {
public JobInstance createInstance(JobDefinition<?> theJobDefinition) {
JobDefinition<?> jobDefinition = theJobDefinition == null ? withJobDefinition(false)
: theJobDefinition;
if (myJobDefinitionRegistry.getJobDefinition(theJobDefinition.getJobDefinitionId(), theJobDefinition.getJobDefinitionVersion()).isEmpty()) {
myJobDefinitionRegistry.addJobDefinition(jobDefinition);
}
JobInstance instance = new JobInstance();
instance.setJobDefinitionId(jobDefinition.getJobDefinitionId());
instance.setJobDefinitionVersion(jobDefinition.getJobDefinitionVersion());
instance.setStatus(StatusEnum.QUEUED);
instance.setJobDefinitionVersion(JOB_DEF_VER);
instance.setParameters(CHUNK_DATA);
@ -148,8 +166,8 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IInPro
await().until(() -> sw.getMillis() > 0);
}
public String createAndStoreJobInstance() {
JobInstance jobInstance = createInstance();
public String createAndStoreJobInstance(JobDefinition<?> theJobDefinition) {
JobInstance jobInstance = createInstance(theJobDefinition);
return mySvc.storeNewInstance(jobInstance);
}
@ -158,4 +176,36 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IInPro
mySvc.onWorkChunkDequeue(chunkId);
return chunkId;
}
public void enableMaintenanceRunner(boolean theToEnable) {
myMaintenanceService.enableMaintenancePass(theToEnable);
}
public void createChunksInStates(JobMaintenanceStateInformation theJobMaintenanceStateInformation) {
Set<String> stepIds = new HashSet<>();
for (WorkChunk workChunk : theJobMaintenanceStateInformation.getInitialWorkChunks()) {
mySvc.createWorkChunk(workChunk);
stepIds.add(workChunk.getTargetStepId());
}
// if it's a gated job, we'll manually set the step id for the instance
JobDefinition<?> jobDef = theJobMaintenanceStateInformation.getJobDefinition();
if (jobDef.isGatedExecution()) {
AtomicReference<String> latestStepId = new AtomicReference<>();
int totalSteps = jobDef.getSteps().size();
for (int i = totalSteps - 1; i >= 0; i--) {
JobDefinitionStep<?, ?, ?> step = jobDef.getSteps().get(i);
if (stepIds.contains(step.getStepId())) {
latestStepId.set(step.getStepId());
break;
}
}
// should def have a value
assertNotNull(latestStepId.get());
String instanceId = theJobMaintenanceStateInformation.getInstanceId();
mySvc.updateInstance(instanceId, instance -> {
instance.setCurrentGatedStepId(latestStepId.get());
return true;
});
}
}
}

View File

@ -12,7 +12,7 @@ public interface IInProgressActionsTests extends IWorkChunkCommon, WorkChunkTest
@Test
default void processingOk_inProgressToSuccess_clearsDataSavesRecordCount() {
String jobId = createAndStoreJobInstance();
String jobId = createAndStoreJobInstance(null);
String myChunkId = createAndDequeueWorkChunk(jobId);
// execution ok
getSvc().onWorkChunkCompletion(new WorkChunkCompletionEvent(myChunkId, 3, 0));
@ -28,7 +28,7 @@ public interface IInProgressActionsTests extends IWorkChunkCommon, WorkChunkTest
@Test
default void processingRetryableError_inProgressToError_bumpsCountRecordsMessage() {
String jobId = createAndStoreJobInstance();
String jobId = createAndStoreJobInstance(null);
String myChunkId = createAndDequeueWorkChunk(jobId);
// execution had a retryable error
getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_A));
@ -42,7 +42,7 @@ public interface IInProgressActionsTests extends IWorkChunkCommon, WorkChunkTest
@Test
default void processingFailure_inProgressToFailed() {
String jobId = createAndStoreJobInstance();
String jobId = createAndStoreJobInstance(null);
String myChunkId = createAndDequeueWorkChunk(jobId);
// execution had a failure
getSvc().onWorkChunkFailed(myChunkId, "some error");

View File

@ -30,7 +30,7 @@ public interface IInstanceStateTransitions extends IWorkChunkCommon, WorkChunkTe
@Test
default void createInstance_createsInQueuedWithChunk() {
// given
JobDefinition<?> jd = withJobDefinition();
JobDefinition<?> jd = withJobDefinition(false);
// when
IJobPersistence.CreateResult createResult =
@ -55,7 +55,7 @@ public interface IInstanceStateTransitions extends IWorkChunkCommon, WorkChunkTe
@Test
default void testCreateInstance_firstChunkDequeued_movesToInProgress() {
// given
JobDefinition<?> jd = withJobDefinition();
JobDefinition<?> jd = withJobDefinition(false);
IJobPersistence.CreateResult createResult = newTxTemplate().execute(status->
getSvc().onCreateWithFirstChunk(jd, "{}"));
assertNotNull(createResult);
@ -82,7 +82,7 @@ public interface IInstanceStateTransitions extends IWorkChunkCommon, WorkChunkTe
String instanceId2 = getSvc().storeNewInstance(normalInstance);
JobDefinitionRegistry jobDefinitionRegistry = new JobDefinitionRegistry();
jobDefinitionRegistry.addJobDefinitionIfNotRegistered(withJobDefinition());
jobDefinitionRegistry.addJobDefinitionIfNotRegistered(withJobDefinition(false));
// when
runInTransaction(()-> {

View File

@ -0,0 +1,164 @@
package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.hapi.fhir.batch2.test.models.JobMaintenanceStateInformation;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestConstants {
Logger ourLog = LoggerFactory.getLogger(IJobMaintenanceActions.class);
void enableMaintenanceRunner(boolean theToEnable);
void createChunksInStates(JobMaintenanceStateInformation theInitialState);
// fixme step 1 is special - only 1 chunk.
// fixme cover step 1 to step 2 and step 2->3
@ParameterizedTest
@ValueSource(strings = {
// """
// 1|COMPLETED
// 2|GATED
// """,
"""
1|COMPLETED
2|QUEUED
""",
"""
1|COMPLETED
2|COMPLETED
2|ERRORED
2|IN_PROGRESS
""",
"""
1|COMPLETED
2|ERRORED # equivalent of QUEUED
2|COMPLETED
""",
// """
// 1|COMPLETED
// 2|READY
// 2|QUEUED
// 2|COMPLETED
// 2|ERRORED
// 2|FAILED
// 2|IN_PROGRESS
// 3|GATED
// 3|GATED
// """,
// """
// 1|COMPLETED
// 2|READY
// 2|QUEUED
// 2|COMPLETED
// 2|ERRORED
// 2|FAILED
// 2|IN_PROGRESS
// 3|QUEUED # a lie
// 3|GATED
// """
})
default void testGatedStep2NotReady_notAdvance(String theChunkState) {
// given
enableMaintenanceRunner(false);
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState);
// setup
createChunksInStates(result);
// TEST run job maintenance - force transition
enableMaintenanceRunner(true);
runMaintenancePass();
// verify
verifyWorkChunkFinalStates(result);
}
@ParameterizedTest
@ValueSource(strings = {
// """
// # new code only
// 1|COMPLETED
// 2|COMPLETED
// 2|COMPLETED
// 3|GATED|READY
// 3|GATED|READY
// """,
// """
// # OLD code only
// 1|COMPLETED
// 2|QUEUED,2|READY
// 2|QUEUED,2|READY
// """,
// """
// # mixed code only
// 1|COMPLETED
// 2|COMPLETED
// 2|COMPLETED
// 3|GATED|READY
// 3|QUEUED|READY
// """
})
default void testGatedStep2ReadyToAdvance_advanceToStep3(String theChunkState) {
JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState);
// setup
enableMaintenanceRunner(false);
createChunksInStates(result);
// TEST run job maintenance - force transition
enableMaintenanceRunner(true);
runMaintenancePass();
// verify
verifyWorkChunkFinalStates(result);
}
private JobMaintenanceStateInformation setupGatedWorkChunkTransitionTest(String theChunkState) {
// get the job def and store the instance
JobDefinition<?> definition = withJobDefinition(true);
String instanceId = createAndStoreJobInstance(definition);
JobMaintenanceStateInformation stateInformation = new JobMaintenanceStateInformation(instanceId, definition);
stateInformation.initialize(theChunkState);
ourLog.info("Starting test case \n {}", theChunkState);
// display comments if there are any
ourLog.info(String.join(", ", stateInformation.getLineComments()));
return stateInformation;
}
private void verifyWorkChunkFinalStates(JobMaintenanceStateInformation theStateInformation) {
assertEquals(theStateInformation.getInitialWorkChunks().size(), theStateInformation.getFinalWorkChunk().size());
HashMap<String, WorkChunk> workchunkMap = new HashMap<>();
for (WorkChunk fs : theStateInformation.getFinalWorkChunk()) {
workchunkMap.put(fs.getId(), fs);
}
// fetch all workchunks
Iterator<WorkChunk> workChunkIterator = getSvc().fetchAllWorkChunksIterator(theStateInformation.getInstanceId(), true);
List<WorkChunk> workchunks = new ArrayList<>();
workChunkIterator.forEachRemaining(workchunks::add);
assertEquals(workchunks.size(), workchunkMap.size());
for (WorkChunk wc : workchunks) {
WorkChunk expected = workchunkMap.get(wc.getId());
// verify status and step id
assertEquals(expected.getTargetStepId(), wc.getTargetStepId());
assertEquals(expected.getStatus(), wc.getStatus());
}
}
}

View File

@ -1,111 +0,0 @@
package ca.uhn.hapi.fhir.batch2.test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.junit.jupiter.api.Assertions.fail;
public interface IJobMaintenenaceActions extends IWorkChunkCommon, WorkChunkTestConstants {
// fixme step 1 is special - only 1 chunk.
// fixme cover step 1 to step 2 and step 2->3
@ParameterizedTest
@ValueSource(strings={
"""
1|COMPLETE
2|GATED
""",
"""
1|COMPLETE
2|QUEUED
""",
"""
1|COMPLETE
2|COMPLETE
2|ERRORED
2|IN_PROGRESS
""",
"""
1|COMPLETE
2|READY
2|QUEUED
2|COMPLETE
2|ERRORED
2|FAILED
2|IN_PROGRESS
3|GATED
3|GATED
""",
"""
1|COMPLETE
2|READY
2|QUEUED
2|COMPLETE
2|ERRORED
2|FAILED
2|IN_PROGRESS
3|QUEUED # a lie
3|GATED
"""
}
)
default void testGatedStep2NotReady_notAdvance(String theChunkState) {
// given
// need job instance definition
// step 1
// step 2
// step 3
// IN STEP 2
// chunks
setupChunksInStates(theChunkState);
// when
// run job maintenance
// then
// step not changed.
fail();
}
@ParameterizedTest
@ValueSource(strings={
"""
# new code only
1|COMPLETE
2|COMPLETE
2|COMPLETE
3|GATED|READY
3|GATED|READY
""",
"""
# OLD code only
1|COMPLETE
2|COMPLETE
2|COMPLETE
3|QUEUED|READY
3|QUEUED|READY
""",
"""
# mixed code only
1|COMPLETE
2|COMPLETE
2|COMPLETE
3|GATED|READY
3|QUEUED|READY
"""
})
default void testGatedStep2ReadyToAdvance_advanceToStep3(String theTestStates) {
// given
// when
// then
fail();
}
void setupChunksInStates(String theChunkState);
}

View File

@ -10,7 +10,7 @@ import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
public interface IWorkChunkCommon extends WorkChunkTestConstants {
String createAndStoreJobInstance();
String createAndStoreJobInstance(JobDefinition<?> theJobDefinition);
String createAndDequeueWorkChunk(String theJobInstanceId);
@ -24,7 +24,7 @@ public interface IWorkChunkCommon extends WorkChunkTestConstants {
public void sleepUntilTimeChanges();
JobDefinition<TestJobParameters> withJobDefinition();
JobDefinition<TestJobParameters> withJobDefinition(boolean theIsGatedJob);
TransactionTemplate newTxTemplate();

View File

@ -18,7 +18,7 @@ public interface IWorkChunkErrorActionsTests extends IWorkChunkCommon, WorkChunk
*/
@Test
default void errorRetry_errorToInProgress() {
String jobId = createAndStoreJobInstance();
String jobId = createAndStoreJobInstance(null);
String myChunkId = createAndDequeueWorkChunk(jobId);
getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, FIRST_ERROR_MESSAGE));
@ -37,7 +37,7 @@ public interface IWorkChunkErrorActionsTests extends IWorkChunkCommon, WorkChunk
@Test
default void errorRetry_repeatError_increasesErrorCount() {
String jobId = createAndStoreJobInstance();
String jobId = createAndStoreJobInstance(null);
String myChunkId = createAndDequeueWorkChunk(jobId);
getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, FIRST_ERROR_MESSAGE));
@ -48,7 +48,6 @@ public interface IWorkChunkErrorActionsTests extends IWorkChunkCommon, WorkChunk
// when another error happens
getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B));
// verify the state, new message, and error count
var workChunkEntity = freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, workChunkEntity.getStatus());
@ -58,7 +57,7 @@ public interface IWorkChunkErrorActionsTests extends IWorkChunkCommon, WorkChunk
@Test
default void errorThenRetryAndComplete_addsErrorCounts() {
String jobId = createAndStoreJobInstance();
String jobId = createAndStoreJobInstance(null);
String myChunkId = createAndDequeueWorkChunk(jobId);
getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, FIRST_ERROR_MESSAGE));
@ -78,11 +77,10 @@ public interface IWorkChunkErrorActionsTests extends IWorkChunkCommon, WorkChunk
@Test
default void errorRetry_maxErrors_movesToFailed() {
// we start with 1 error already
String jobId = createAndStoreJobInstance();
String jobId = createAndStoreJobInstance(null);
String myChunkId = createAndDequeueWorkChunk(jobId);
getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, FIRST_ERROR_MESSAGE));
// 2nd try
getSvc().onWorkChunkDequeue(myChunkId);
getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B));

View File

@ -10,7 +10,7 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT
@Test
default void chunkCreation_isQueued() {
String jobInstanceId = createAndStoreJobInstance();
String jobInstanceId = createAndStoreJobInstance(null);
String myChunkId = createChunk(jobInstanceId);
WorkChunk fetchedWorkChunk = freshFetchWorkChunk(myChunkId);
@ -19,7 +19,7 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT
@Test
default void chunkReceived_queuedToInProgress() {
String jobInstanceId = createAndStoreJobInstance();
String jobInstanceId = createAndStoreJobInstance(null);
String myChunkId = createChunk(jobInstanceId);
runMaintenancePass();

View File

@ -2,6 +2,9 @@ package ca.uhn.hapi.fhir.batch2.test;
public interface WorkChunkTestConstants {
public static final String JOB_DEFINITION_ID = "definition-id";
// we use a separate id for gated jobs because these job definitions might not
// be cleaned up after any given test run
String GATED_JOB_DEFINITION_ID = "gated_job_def_id";
public static final String TARGET_STEP_ID = "step-id";
public static final String DEF_CHUNK_ID = "definition-chunkId";
public static final int JOB_DEF_VER = 1;

View File

@ -0,0 +1,159 @@
package ca.uhn.hapi.fhir.batch2.test.models;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.commons.lang3.StringUtils.isEmpty;
public class JobMaintenanceStateInformation {
private static final Logger ourLog = LoggerFactory.getLogger(JobMaintenanceStateInformation.class);
private static final String COMMENT_PATTERN = "(#.*)$";
private final List<String> myLineComments = new ArrayList<>();
private final List<WorkChunk> myInitialWorkChunks = new ArrayList<>();
private final List<WorkChunk> myFinalWorkChunk = new ArrayList<>();
private final JobDefinition<?> myJobDefinition;
private final String myInstanceId;
public JobMaintenanceStateInformation(String theInstanceId, JobDefinition<?> theJobDefinition) {
myInstanceId = theInstanceId;
myJobDefinition = theJobDefinition;
}
public List<String> getLineComments() {
return myLineComments;
}
public List<WorkChunk> getInitialWorkChunks() {
return myInitialWorkChunks;
}
public List<WorkChunk> getFinalWorkChunk() {
return myFinalWorkChunk;
}
public JobDefinition<?> getJobDefinition() {
return myJobDefinition;
}
public String getInstanceId() {
return myInstanceId;
}
public void initialize(String theState) {
String[] chunkLines = theState.split("\n");
Pattern pattern = Pattern.compile(COMMENT_PATTERN);
for (String chunkLine : chunkLines) {
String line = chunkLine.trim();
Matcher matcher = pattern.matcher(line);
if (matcher.find()) {
String comment = matcher.group(0);
line = line.replaceAll(comment, "");
if (isEmpty(line)) {
myLineComments.add(line);
continue;
}
// else - inline comment: eg: 1|Complete # comment
}
addWorkChunkStates(line);
}
}
/**
* Parses the line according to:
* (work chunk step id)|(workchunk initial state)|optionally:(work chunk final state)
*/
private void addWorkChunkStates(String theLine) {
if (theLine.contains(",")) {
// has final state
String[] states = theLine.split(",");
int len = states.length;
if (len != 2) {
throw new RuntimeException("Unexpected number of state transitions. Expected 2, found " + states.length);
}
addWorkChunkBasedOnState(states[0], chunk -> {
myInitialWorkChunks.add(chunk);
});
addWorkChunkBasedOnState(states[1], chunk -> {
myFinalWorkChunk.add(chunk);
});
} else {
// does not have final state; no change
addWorkChunkBasedOnState(theLine, chunk -> {
myInitialWorkChunks.add(chunk);
myFinalWorkChunk.add(chunk);
});
}
}
private void addWorkChunkBasedOnState(String theLine, Consumer<WorkChunk> theAdder) {
String[] parts = theLine.split("\\|");
int len = parts.length;
if (len < 2) {
throw new RuntimeException("Unable to parse line " + theLine + " into initial and final states");
}
String stepId = getJobStepId(parts[0]);
WorkChunkStatusEnum initialStatus = WorkChunkStatusEnum.valueOf(parts[1].trim());
WorkChunk initial = createBaseWorkChunk();
initial.setStatus(initialStatus);
initial.setTargetStepId(stepId);
theAdder.accept(initial);
}
private String getJobStepId(String theIndexId) {
try {
int index = Integer.parseInt(theIndexId.trim());
if (index >= myJobDefinition.getSteps().size()) {
throw new RuntimeException("Unable to find step with index " + index);
}
int counter = 0;
for (JobDefinitionStep<?, ?, ?> step : myJobDefinition.getSteps()) {
if (counter == index) {
return step.getStepId();
}
counter++;
}
// will never happen
throw new RuntimeException("Could not find step for index " + theIndexId);
} catch (NumberFormatException ex) {
ourLog.info("Encountered non-number {}; This will be treated as the step id itself", theIndexId);
return theIndexId;
}
}
private WorkChunk createBaseWorkChunk() {
WorkChunk chunk = new WorkChunk();
chunk.setId(UUID.randomUUID().toString());
chunk.setJobDefinitionId(myJobDefinition.getJobDefinitionId());
chunk.setInstanceId(myInstanceId);
chunk.setJobDefinitionVersion(myJobDefinition.getJobDefinitionVersion());
chunk.setCreateTime(new Date());
return chunk;
}
}

View File

@ -36,4 +36,11 @@ public interface IJobMaintenanceService {
*/
@VisibleForTesting
void forceMaintenancePass();
/**
* This is only to be called in a testing environment
* to ensure state changes are controlled.
*/
@VisibleForTesting
void enableMaintenancePass(boolean thetoEnable);
}

View File

@ -27,6 +27,7 @@ import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.slf4j.Logger;
@ -280,4 +281,7 @@ public interface IJobPersistence extends IWorkChunkPersistence {
return markInstanceAsStatusWhenStatusIn(
theJobInstanceId, StatusEnum.IN_PROGRESS, Collections.singleton(StatusEnum.QUEUED));
}
@VisibleForTesting
WorkChunk createWorkChunk(WorkChunk theWorkChunk);
}

View File

@ -99,6 +99,8 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
private Runnable myMaintenanceJobFinishedCallback = () -> {};
private final IReductionStepExecutorService myReductionStepExecutorService;
private boolean myEnabledBool = true;
/**
* Constructor
*/
@ -200,8 +202,17 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
doMaintenancePass();
}
@Override
public void enableMaintenancePass(boolean theToEnable) {
myEnabledBool = theToEnable;
}
@Override
public void runMaintenancePass() {
if (!myEnabledBool) {
ourLog.error("Maintenance job is disabled! This will affect all batch2 jobs!");
}
if (!myRunMaintenanceSemaphore.tryAcquire()) {
ourLog.debug("Another maintenance pass is already in progress. Ignoring request.");
return;