waiting step 1

This commit is contained in:
leif stawnyczy 2024-03-22 14:29:20 -04:00
parent 69fa83a866
commit 26d6714a68
11 changed files with 244 additions and 49 deletions

View File

@ -64,6 +64,8 @@ import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
@ -73,6 +75,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -344,6 +347,19 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
});
}
@Override
public void onWorkChunkPollDelay(String theChunkId, int thePollDelayMs) {
int updated = myWorkChunkRepository.updateWorkChunkNextPollTime(
theChunkId,
WorkChunkStatusEnum.POLL_WAITING,
Date.from(Instant.now().plus(thePollDelayMs, ChronoUnit.MILLIS))
);
if (updated != 1) {
ourLog.warn("Expected to update 1 work chunk's poll delay; but found {}", updated);
}
}
@Override
public void onWorkChunkFailed(String theChunkId, String theErrorMessage) {
ourLog.info("Marking chunk {} as failed with message: {}", theChunkId, theErrorMessage);
@ -474,6 +490,15 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
myJobInstanceRepository.updateInstanceUpdateTime(theInstanceId, new Date());
}
@Override
public void updateWorkChunkToStatus(String theChunkId, WorkChunkStatusEnum theOldStatus, WorkChunkStatusEnum theNewStatus) {
int updated = myWorkChunkRepository.updateChunkStatus(theChunkId, theNewStatus, theOldStatus);
if (updated != 1) {
ourLog.warn("Expected to update 1 work chunk's status; instead updated {}", updated);
}
}
/**
* Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope
*/

View File

@ -79,6 +79,12 @@ public interface IBatch2WorkChunkRepository
@Param("status") WorkChunkStatusEnum theInProgress,
@Param("warningMessage") String theWarningMessage);
@Modifying
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myNextPollTime = :nextPollTime, e.myPollAttempts = e.myPollAttempts + 1 WHERE e.myId = :id")
int updateWorkChunkNextPollTime(@Param("id") String theChunkId,
@Param("status") WorkChunkStatusEnum theStatus,
@Param("nextPollTime") Date theNextPollTime);
@Modifying
@Query(
"UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, e.mySerializedData = null, e.myErrorMessage = :em WHERE e.myId IN(:ids)")

View File

@ -121,6 +121,13 @@ public class Batch2WorkChunkEntity implements Serializable {
@Column(name = "WARNING_MSG", length = WARNING_MSG_MAX_LENGTH, nullable = true)
private String myWarningMessage;
@Column(name = "NEXT_POLL_TIME", nullable = true)
@Temporal(TemporalType.TIMESTAMP)
private Date myNextPollTime;
@Column(name = "POLL_ATTEMPTS", nullable = true)
private int myPollAttempts;
/**
* Default constructor for Hibernate.
*/
@ -144,7 +151,9 @@ public class Batch2WorkChunkEntity implements Serializable {
String theErrorMessage,
int theErrorCount,
Integer theRecordsProcessed,
String theWarningMessage) {
String theWarningMessage,
Date theNextPollTime,
Integer thePollAttempts) {
myId = theId;
mySequence = theSequence;
myJobDefinitionId = theJobDefinitionId;
@ -160,6 +169,8 @@ public class Batch2WorkChunkEntity implements Serializable {
myErrorCount = theErrorCount;
myRecordsProcessed = theRecordsProcessed;
myWarningMessage = theWarningMessage;
myNextPollTime = theNextPollTime;
myPollAttempts = thePollAttempts;
}
public int getErrorCount() {
@ -294,6 +305,22 @@ public class Batch2WorkChunkEntity implements Serializable {
myInstanceId = theInstanceId;
}
public Date getNextPollTime() {
return myNextPollTime;
}
public void setNextPollTime(Date theNextPollTime) {
myNextPollTime = theNextPollTime;
}
public int getPollAttempts() {
return myPollAttempts;
}
public void setPollAttempts(int thePollAttempts) {
myPollAttempts = thePollAttempts;
}
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
@ -313,6 +340,8 @@ public class Batch2WorkChunkEntity implements Serializable {
.append("status", myStatus)
.append("errorMessage", myErrorMessage)
.append("warningMessage", myWarningMessage)
.append("nextPollTime", myNextPollTime)
.append("pollAttempts", myPollAttempts)
.toString();
}
}

View File

@ -131,15 +131,15 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
Builder.BuilderWithTableName mdmLinkTable = version.onTable("MPI_LINK");
mdmLinkTable
.addIndex("20230911.1", "IDX_EMPI_TGT_MR_LS")
.unique(false)
.online(true)
.withColumns("TARGET_TYPE", "MATCH_RESULT", "LINK_SOURCE");
.addIndex("20230911.1", "IDX_EMPI_TGT_MR_LS")
.unique(false)
.online(true)
.withColumns("TARGET_TYPE", "MATCH_RESULT", "LINK_SOURCE");
mdmLinkTable
.addIndex("20230911.2", "IDX_EMPi_TGT_MR_SCore")
.unique(false)
.online(true)
.withColumns("TARGET_TYPE", "MATCH_RESULT", "SCORE");
.addIndex("20230911.2", "IDX_EMPi_TGT_MR_SCore")
.unique(false)
.online(true)
.withColumns("TARGET_TYPE", "MATCH_RESULT", "SCORE");
// Move forced_id constraints to hfj_resource and the new fhir_id column
// Note: we leave the HFJ_FORCED_ID.IDX_FORCEDID_TYPE_FID index in place to support old writers for a while.
@ -159,20 +159,20 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
hfjResource.dropIndex("20231027.1", "IDX_RES_FHIR_ID");
hfjResource
.addIndex("20231027.2", "IDX_RES_TYPE_FHIR_ID")
.unique(true)
.online(true)
// include res_id and our deleted flag so we can satisfy Observation?_sort=_id from the index on
// platforms that support it.
.includeColumns("RES_ID, RES_DELETED_AT")
.withColumns("RES_TYPE", "FHIR_ID");
.addIndex("20231027.2", "IDX_RES_TYPE_FHIR_ID")
.unique(true)
.online(true)
// include res_id and our deleted flag so we can satisfy Observation?_sort=_id from the index on
// platforms that support it.
.includeColumns("RES_ID, RES_DELETED_AT")
.withColumns("RES_TYPE", "FHIR_ID");
// For resolving references that don't supply the type.
hfjResource
.addIndex("20231027.3", "IDX_RES_FHIR_ID")
.unique(false)
.online(true)
.withColumns("FHIR_ID");
.addIndex("20231027.3", "IDX_RES_FHIR_ID")
.unique(false)
.online(true)
.withColumns("FHIR_ID");
Builder.BuilderWithTableName batch2JobInstanceTable = version.onTable("BT2_JOB_INSTANCE");
@ -182,33 +182,33 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
{
version.executeRawSql(
"20231212.1",
"CREATE INDEX CONCURRENTLY idx_sp_string_hash_nrm_pattern_ops ON hfj_spidx_string USING btree (hash_norm_prefix, sp_value_normalized varchar_pattern_ops, res_id, partition_id)")
.setTransactional(false)
.onlyAppliesToPlatforms(DriverTypeEnum.POSTGRES_9_4)
.onlyIf(
String.format(
QUERY_FOR_COLUMN_COLLATION_TEMPLATE,
"HFJ_SPIDX_STRING".toLowerCase(),
"SP_VALUE_NORMALIZED".toLowerCase()),
"Column HFJ_SPIDX_STRING.SP_VALUE_NORMALIZED already has a collation of 'C' so doing nothing")
.onlyIf(
"SELECT NOT EXISTS(select 1 from pg_indexes where indexname='idx_sp_string_hash_nrm_pattern_ops')",
"Index idx_sp_string_hash_nrm_pattern_ops already exists");
"20231212.1",
"CREATE INDEX CONCURRENTLY idx_sp_string_hash_nrm_pattern_ops ON hfj_spidx_string USING btree (hash_norm_prefix, sp_value_normalized varchar_pattern_ops, res_id, partition_id)")
.setTransactional(false)
.onlyAppliesToPlatforms(DriverTypeEnum.POSTGRES_9_4)
.onlyIf(
String.format(
QUERY_FOR_COLUMN_COLLATION_TEMPLATE,
"HFJ_SPIDX_STRING".toLowerCase(),
"SP_VALUE_NORMALIZED".toLowerCase()),
"Column HFJ_SPIDX_STRING.SP_VALUE_NORMALIZED already has a collation of 'C' so doing nothing")
.onlyIf(
"SELECT NOT EXISTS(select 1 from pg_indexes where indexname='idx_sp_string_hash_nrm_pattern_ops')",
"Index idx_sp_string_hash_nrm_pattern_ops already exists");
version.executeRawSql(
"20231212.2",
"CREATE UNIQUE INDEX CONCURRENTLY idx_sp_uri_hash_identity_pattern_ops ON hfj_spidx_uri USING btree (hash_identity, sp_uri varchar_pattern_ops, res_id, partition_id)")
.setTransactional(false)
.onlyAppliesToPlatforms(DriverTypeEnum.POSTGRES_9_4)
.onlyIf(
String.format(
QUERY_FOR_COLUMN_COLLATION_TEMPLATE,
"HFJ_SPIDX_URI".toLowerCase(),
"SP_URI".toLowerCase()),
"Column HFJ_SPIDX_STRING.SP_VALUE_NORMALIZED already has a collation of 'C' so doing nothing")
.onlyIf(
"SELECT NOT EXISTS(select 1 from pg_indexes where indexname='idx_sp_uri_hash_identity_pattern_ops')",
"Index idx_sp_uri_hash_identity_pattern_ops already exists.");
"20231212.2",
"CREATE UNIQUE INDEX CONCURRENTLY idx_sp_uri_hash_identity_pattern_ops ON hfj_spidx_uri USING btree (hash_identity, sp_uri varchar_pattern_ops, res_id, partition_id)")
.setTransactional(false)
.onlyAppliesToPlatforms(DriverTypeEnum.POSTGRES_9_4)
.onlyIf(
String.format(
QUERY_FOR_COLUMN_COLLATION_TEMPLATE,
"HFJ_SPIDX_URI".toLowerCase(),
"SP_URI".toLowerCase()),
"Column HFJ_SPIDX_STRING.SP_VALUE_NORMALIZED already has a collation of 'C' so doing nothing")
.onlyIf(
"SELECT NOT EXISTS(select 1 from pg_indexes where indexname='idx_sp_uri_hash_identity_pattern_ops')",
"Index idx_sp_uri_hash_identity_pattern_ops already exists.");
}
// This fix was bad for MSSQL, it has been set to do nothing.
@ -216,6 +216,18 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
// This fix will work for MSSQL or Oracle.
version.addTask(new ForceIdMigrationFixTask(version.getRelease(), "20231222.1"));
// add columns to Batch2WorkChunkEntity
Builder.BuilderWithTableName batch2WorkChunkTable = version.onTable("BT2_WORK_CHUNK");
batch2WorkChunkTable
.addColumn("20240315.1", "NEXT_POLL_TIME")
.nullable()
.type(ColumnTypeEnum.DATE_TIMESTAMP);
batch2WorkChunkTable
.addColumn("20240315.2", "POLL_ATTEMPTS")
.nullable()
.type(ColumnTypeEnum.INT);
}
protected void init680() {

View File

@ -10,6 +10,7 @@ import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.ILastJobStepWorker;
import ca.uhn.fhir.batch2.api.IReductionStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RetryChunkLaterException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
@ -45,11 +46,13 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Sort;
import org.springframework.test.context.ContextConfiguration;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -428,10 +431,13 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
}
@Test
public void testJobWithLongPollingStep() {
public void testJobWithLongPollingStep() throws InterruptedException {
// create job definition
int callsToMake = 3;
String jobId = new Exception().getStackTrace()[0].getMethodName();
AtomicInteger pollCounter = new AtomicInteger();
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> first = (step, sink) -> {
myFirstStepLatch.call(1);
return RunOutcome.SUCCESS;
@ -439,9 +445,15 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
// step 2
IJobStepWorker<TestJobParameters, FirstStepOutput, SecondStepOutput> second = (step, sink) -> {
// TODO - poll
// simulate a call
Awaitility.await().atMost(100, TimeUnit.MICROSECONDS);
return RunOutcome.SUCCESS;
// we use Batch2FastSchedulerConfig, so we have a fast scheduler
// that should catch and call repeatedly pretty quickly
if (pollCounter.getAndIncrement() > callsToMake) {
return RunOutcome.SUCCESS;
}
throw new RetryChunkLaterException(300);
};
// step 3
@ -475,7 +487,23 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
.build();
myJobDefinitionRegistry.addJobDefinition(jd);
// test
JobInstanceStartRequest request = buildRequest(jobId);
myFirstStepLatch.setExpectedCount(1);
myLastStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
String instanceId = startResponse.getInstanceId();
// waiting for the job
myBatch2JobHelper.awaitGatedStepId(FIRST_STEP_ID, instanceId);
myFirstStepLatch.awaitExpected();
myBatch2JobHelper.awaitGatedStepId(SECOND_STEP_ID, instanceId);
myLastStepLatch.awaitExpected();
myBatch2JobHelper.awaitJobCompletion(instanceId);
// verify
assertEquals(callsToMake, pollCounter.get());
}
@Test

View File

@ -280,4 +280,12 @@ public interface IJobPersistence extends IWorkChunkPersistence {
return markInstanceAsStatusWhenStatusIn(
theJobInstanceId, StatusEnum.IN_PROGRESS, Collections.singleton(StatusEnum.QUEUED));
}
/**
* Updates the given work chunk from the given status to the new status.
* @param theChunkId the given chunk id
* @param theOldStatus the old status
* @param theNewStatus the new status
*/
void updateWorkChunkToStatus(String theChunkId, WorkChunkStatusEnum theOldStatus, WorkChunkStatusEnum theNewStatus);
}

View File

@ -84,6 +84,16 @@ public interface IWorkChunkPersistence {
// on impl - @Transactional(propagation = Propagation.REQUIRES_NEW)
WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters);
/**
* Updates the specified Work Chunk to set the next polling interval.
* It wil also:
* * update the poll attempts
* * sets the workchunk status to POLL_WAITING (if it's not already in this state)
* @param theChunkId the id of the chunk to update
* @param thePollDelayMs the amount of time (after now) to wait (in ms)
*/
void onWorkChunkPollDelay(String theChunkId, int thePollDelayMs);
/**
* An unrecoverable error.
* Transition to {@link WorkChunkStatusEnum#FAILED}

View File

@ -0,0 +1,30 @@
package ca.uhn.fhir.batch2.api;
/**
* Exception that is thrown when a polling step needs to be retried at a later
* time.
*/
public class RetryChunkLaterException extends RuntimeException {
private static final int ONE_MIN = 60*1000; // 1 min
/**
* The delay to wait (in ms) for the next poll call.
* For now, it's a constant, but we hold it here in
* case we want to change this behaviour in the future.
*/
private int myNextPollDelayMs = ONE_MIN;
public RetryChunkLaterException() {
this(ONE_MIN);
}
public RetryChunkLaterException(int theMsDelay) {
super();
myNextPollDelayMs = theMsDelay;
}
public int getMsPollDelay() {
return myNextPollDelayMs;
}
}

View File

@ -23,6 +23,7 @@ import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.JobStepFailedException;
import ca.uhn.fhir.batch2.api.RetryChunkLaterException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
@ -57,6 +58,10 @@ public class StepExecutor {
try {
outcome = theStepWorker.run(theStepExecutionDetails, theDataSink);
Validate.notNull(outcome, "Step theWorker returned null: %s", theStepWorker.getClass());
} catch (RetryChunkLaterException ex) {
ourLog.debug("Polling job encountered; will retry after {}ms", ex.getMsPollDelay());
myJobPersistence.onWorkChunkPollDelay(theStepExecutionDetails.getChunkId(), ex.getMsPollDelay());
return false;
} catch (JobExecutionFailedException e) {
ourLog.error(
"Unrecoverable failure executing job {} step {} chunk {}",

View File

@ -42,6 +42,7 @@ import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@ -101,6 +102,7 @@ public class JobInstanceProcessor {
JobDefinition<? extends IModelJson> jobDefinition =
myJobDefinitionegistry.getJobDefinitionOrThrowException(theInstance);
processPollingChunks(theInstance, jobDefinition);
enqueueReadyChunks(theInstance, jobDefinition, false);
cleanupInstance(theInstance);
triggerGatedExecutions(theInstance, jobDefinition);
@ -416,4 +418,25 @@ public class JobInstanceProcessor {
// we can enqueue them
enqueueReadyChunks(theInstance, theJobDefinition, true);
}
private void processPollingChunks(JobInstance theInstance, JobDefinition<?> theJobDefinition) {
TransactionStatus transactionStatus = myTransactionManager.getTransaction(new DefaultTransactionDefinition());
Stream<WorkChunk> chunks = myJobPersistence.fetchAllWorkChunksForJobInStates(theInstance.getInstanceId(), Set.of(WorkChunkStatusEnum.POLL_WAITING));
chunks.forEach(chunk -> {
Date pollTime = chunk.getNextPollTime();
if (pollTime.after(new Date())) {
/*
* We'll update these 1 at a time;
* We're unlikely to have many jobs with many steps
* in this state. But if we ever do, we can update this area.
*/
myJobPersistence.updateWorkChunkToStatus(chunk.getId(),
WorkChunkStatusEnum.POLL_WAITING,
WorkChunkStatusEnum.READY);
}
});
myTransactionManager.commit(transactionStatus);
}
}

View File

@ -87,11 +87,20 @@ public class WorkChunk implements IModelJson {
@JsonDeserialize(using = JsonDateDeserializer.class)
private Date myUpdateTime;
/**
* Timestamp of when next to call the current workchunk poll step.
*/
@JsonProperty("nextPollTimestamp")
@JsonSerialize(using = JsonDateSerializer.class)
@JsonDeserialize(using = JsonDateDeserializer.class)
private Date myNextPollTime;
/**
* Total polling attempts done thus far.
*/
@JsonProperty("pollAttempts")
private int myPollAttempts;
@JsonProperty(value = "recordsProcessed", access = JsonProperty.Access.READ_ONLY)
private Integer myRecordsProcessed;
@ -264,6 +273,14 @@ public class WorkChunk implements IModelJson {
myNextPollTime = theNextPollTime;
}
public int getPollAttempts() {
return myPollAttempts;
}
public void setPollAttempts(int thePollAttempts) {
myPollAttempts = thePollAttempts;
}
public String getWarningMessage() {
return myWarningMessage;
}
@ -291,6 +308,8 @@ public class WorkChunk implements IModelJson {
b.append("RecordsProcessed", myRecordsProcessed);
if (myNextPollTime != null) {
b.append("NextPollTime", myNextPollTime);
b.append("PollAttempts", myPollAttempts);
}
if (isNotBlank(myErrorMessage)) {
b.append("ErrorMessage", myErrorMessage);