Extend default batch job deadline, and fail if exceeded

This commit is contained in:
Michael Buckley 2024-08-15 13:25:38 -04:00
parent c4b34708b5
commit 3b5dfb54eb

View File

@ -25,6 +25,7 @@ import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import com.google.common.annotations.VisibleForTesting;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.slf4j.Logger;
@ -50,6 +51,8 @@ public class Batch2JobHelper {
private static final Logger ourLog = LoggerFactory.getLogger(Batch2JobHelper.class);
private static final int BATCH_SIZE = 100;
public static final int DEFAULT_WAIT_DEADLINE = 30;
public static final Duration DEFAULT_WAIT_DURATION = Duration.of(DEFAULT_WAIT_DEADLINE, ChronoUnit.SECONDS);
private final IJobMaintenanceService myJobMaintenanceService;
private final IJobCoordinator myJobCoordinator;
@ -82,11 +85,11 @@ public class Batch2JobHelper {
}
public JobInstance awaitJobHasStatus(String theInstanceId, StatusEnum... theExpectedStatus) {
return awaitJobHasStatus(theInstanceId, 10, theExpectedStatus);
return awaitJobHasStatus(theInstanceId, DEFAULT_WAIT_DEADLINE, theExpectedStatus);
}
public JobInstance awaitJobHasStatusWithoutMaintenancePass(String theInstanceId, StatusEnum... theExpectedStatus) {
return awaitJobawaitJobHasStatusWithoutMaintenancePass(theInstanceId, 10, theExpectedStatus);
return awaitJobawaitJobHasStatusWithoutMaintenancePass(theInstanceId, DEFAULT_WAIT_DEADLINE, theExpectedStatus);
}
public JobInstance awaitJobHasStatus(String theInstanceId, int theSecondsToWait, StatusEnum... theExpectedStatus) {
@ -144,7 +147,7 @@ public class Batch2JobHelper {
return myJobCoordinator.getInstance(theBatchJobId);
}
private boolean checkStatusWithMaintenancePass(String theInstanceId, StatusEnum... theExpectedStatuses) throws InterruptedException {
private boolean checkStatusWithMaintenancePass(String theInstanceId, StatusEnum... theExpectedStatuses) {
if (hasStatus(theInstanceId, theExpectedStatuses)) {
return true;
}
@ -174,7 +177,7 @@ public class Batch2JobHelper {
AtomicInteger counter = new AtomicInteger();
try {
await()
.atMost(Duration.of(10, ChronoUnit.SECONDS))
.atMost(DEFAULT_WAIT_DURATION)
.until(() -> {
counter.getAndIncrement();
forceRunMaintenancePass();
@ -182,19 +185,20 @@ public class Batch2JobHelper {
});
} catch (ConditionTimeoutException ex) {
StatusEnum status = getStatus(theInstanceId);
String msg = String.format(
"Job %s has state %s after 10s timeout and %d checks",
throw new RuntimeException(String.format(
"Job %s has state %s after %s timeout and %d checks",
theInstanceId,
status.name(),
DEFAULT_WAIT_DURATION,
counter.get()
);
), ex);
}
}
public void awaitJobInProgress(String theInstanceId) {
try {
await()
.atMost(Duration.of(10, ChronoUnit.SECONDS))
.atMost(DEFAULT_WAIT_DURATION)
.until(() -> checkStatusWithMaintenancePass(theInstanceId, StatusEnum.IN_PROGRESS));
} catch (ConditionTimeoutException ex) {
StatusEnum statusEnum = getStatus(theInstanceId);
@ -291,12 +295,7 @@ public class Batch2JobHelper {
}
if (!map.isEmpty()) {
ourLog.error(
"Found Running Jobs "
+ map.keySet().stream()
.map(k -> k + " : " + map.get(k))
.collect(Collectors.joining("\n"))
);
ourLog.error("Found Running Jobs {}",map.keySet().stream().map(k -> k + " : " + map.get(k)).collect(Collectors.joining("\n")));
return true;
}
@ -305,7 +304,7 @@ public class Batch2JobHelper {
public void awaitNoJobsRunning(boolean theExpectAtLeastOneJobToExist) {
HashMap<String, String> map = new HashMap<>();
Awaitility.await().atMost(10, TimeUnit.SECONDS)
Awaitility.await().atMost(DEFAULT_WAIT_DURATION)
.until(() -> {
myJobMaintenanceService.runMaintenancePass();
@ -335,6 +334,7 @@ public class Batch2JobHelper {
myJobMaintenanceService.runMaintenancePass();
}
@VisibleForTesting
public void enableMaintenanceRunner(boolean theEnabled) {
myJobMaintenanceService.enableMaintenancePass(theEnabled);
}