Batch2 fastracking configurable (#4453)

* licenses

* hapi-fhir side done

* cdr side

* add assert

* add test

* bump hapi version

Co-authored-by: Ken Stevens <ken@smilecdr.com>
This commit is contained in:
Ken Stevens 2023-01-20 19:03:13 -05:00 committed by GitHub
parent 278d5cc481
commit bf53a08b83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
91 changed files with 625 additions and 118 deletions

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -110,6 +110,9 @@ public final class HapiSystemProperties {
public static void enableUnitTestMode() {
System.setProperty(UNIT_TEST_MODE, Boolean.TRUE.toString());
}
public static void disableUnitTestMode() {
System.setProperty(UNIT_TEST_MODE, Boolean.FALSE.toString());
}
public static boolean isUnitTestModeEnabled() {
return Boolean.parseBoolean(System.getProperty(UNIT_TEST_MODE));

View File

@ -4,14 +4,14 @@
<modelVersion>4.0.0</modelVersion>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-bom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<packaging>pom</packaging>
<name>HAPI FHIR BOM</name>
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-cli</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -0,0 +1,8 @@
---
type: change
issue: 4065
title: "A new DaoConfig configuration setting has been added called JobFastTrackingEnabled, default false.
If this setting is enabled, then gated batch jobs that produce only one chunk will immediately trigger a batch
maintenance job. This may be useful for testing, but is not recommended for production use. Prior to this change,
fasttracking was always enabled which meant if the server was not busy, small batch jobs would be processed quickly.
However this lead do instability on high-volume servers, so this feature is now disabled by default."

View File

@ -11,7 +11,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.config;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.packages.loader.PackageLoaderSvc;
import ca.uhn.fhir.jpa.packages.loader.PackageResourceParsingSvc;

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.packages.loader;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.hl7.fhir.utilities.npm.NpmPackage;
import java.io.InputStream;

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.packages.loader;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.packages.PackageInstallationSpec;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.packages.loader;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.packages.util;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import com.google.common.collect.Lists;
import java.util.Collections;

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.provider.r4;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.hl7.fhir.instance.model.api.IBaseResource;
import java.util.function.Consumer;

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -174,7 +174,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> firstStep = (step, sink) -> callLatch(myFirstStepLatch, step);
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> fail();
String jobId = "test-job-1";
String jobId = new Exception().getStackTrace()[0].getMethodName();
JobDefinition<? extends IModelJson> definition = buildGatedJobDefinition(jobId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
@ -326,12 +326,12 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
};
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> callLatch(myLastStepLatch, step);
String jobId = "test-job-5";
JobDefinition<? extends IModelJson> definition = buildGatedJobDefinition(jobId, firstStep, lastStep);
String jobDefId = new Exception().getStackTrace()[0].getMethodName();
JobDefinition<? extends IModelJson> definition = buildGatedJobDefinition(jobDefId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
JobInstanceStartRequest request = buildRequest(jobId);
JobInstanceStartRequest request = buildRequest(jobDefId);
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request);
@ -355,12 +355,12 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
};
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> fail();
String jobId = "test-job-3";
JobDefinition<? extends IModelJson> definition = buildGatedJobDefinition(jobId, firstStep, lastStep);
String jobDefId = new Exception().getStackTrace()[0].getMethodName();
JobDefinition<? extends IModelJson> definition = buildGatedJobDefinition(jobDefId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
JobInstanceStartRequest request = buildRequest(jobId);
JobInstanceStartRequest request = buildRequest(jobDefId);
// execute
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request);
@ -379,12 +379,12 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
};
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> fail();
String jobId = "test-job-4";
JobDefinition<? extends IModelJson> definition = buildGatedJobDefinition(jobId, firstStep, lastStep);
String jobDefId = new Exception().getStackTrace()[0].getMethodName();
JobDefinition<? extends IModelJson> definition = buildGatedJobDefinition(jobDefId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
JobInstanceStartRequest request = buildRequest(jobId);
JobInstanceStartRequest request = buildRequest(jobDefId);
// execute
myFirstStepLatch.setExpectedCount(1);
@ -418,9 +418,9 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
return RunOutcome.SUCCESS;
};
// job definition
String jobId = new Exception().getStackTrace()[0].getMethodName();
String jobDefId = new Exception().getStackTrace()[0].getMethodName();
JobDefinition<? extends IModelJson> jd = JobDefinition.newBuilder()
.setJobDefinitionId(jobId)
.setJobDefinitionId(jobDefId)
.setJobDescription("test job")
.setJobDefinitionVersion(TEST_JOB_VERSION)
.setParametersType(TestJobParameters.class)
@ -439,7 +439,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
.build();
myJobDefinitionRegistry.addJobDefinition(jd);
// test
JobInstanceStartRequest request = buildRequest(jobId);
JobInstanceStartRequest request = buildRequest(jobDefId);
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse response = myJobCoordinator.startInstance(request);
JobInstance instance = myBatch2JobHelper.awaitJobHasStatus(response.getJobId(),

View File

@ -0,0 +1,253 @@
package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.test.utilities.UnregisterScheduledProcessor;
import ca.uhn.test.concurrency.PointcutLatch;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import static ca.uhn.fhir.batch2.config.BaseBatch2Config.CHANNEL_NAME;
import static org.junit.jupiter.api.Assertions.assertTrue;
@TestPropertySource(properties = {
UnregisterScheduledProcessor.SCHEDULING_DISABLED_EQUALS_FALSE
})
@ContextConfiguration(classes = {Batch2JobMaintenanceIT.SpringConfig.class})
public class Batch2JobMaintenanceIT extends BaseJpaR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(Batch2JobMaintenanceIT.class);
public static final int TEST_JOB_VERSION = 1;
public static final String FIRST_STEP_ID = "first-step";
public static final String LAST_STEP_ID = "last-step";
@Autowired
JobDefinitionRegistry myJobDefinitionRegistry;
@Autowired
IJobCoordinator myJobCoordinator;
@Autowired
IJobMaintenanceService myJobMaintenanceService;
@Autowired
Batch2JobHelper myBatch2JobHelper;
@Autowired
private IChannelFactory myChannelFactory;
@Autowired
IJobPersistence myJobPersistence;
private final PointcutLatch myFirstStepLatch = new PointcutLatch("First Step");
private final PointcutLatch myLastStepLatch = new PointcutLatch("Last Step");
private IJobCompletionHandler<TestJobParameters> myCompletionHandler;
private LinkedBlockingChannel myWorkChannel;
private final List<StackTraceElement[]> myStackTraceElements = new ArrayList<>();
private static RunOutcome callLatch(PointcutLatch theLatch, StepExecutionDetails<?, ?> theStep) {
theLatch.call(theStep);
return RunOutcome.SUCCESS;
}
@BeforeEach
public void before() {
myCompletionHandler = details -> {};
myWorkChannel = (LinkedBlockingChannel) myChannelFactory.getOrCreateReceiver(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, new ChannelConsumerSettings());
JobMaintenanceServiceImpl jobMaintenanceService = (JobMaintenanceServiceImpl) myJobMaintenanceService;
jobMaintenanceService.setMaintenanceJobStartedCallback(() -> {
ourLog.info("Batch maintenance job started");
myStackTraceElements.add(Thread.currentThread().getStackTrace());
});
}
@AfterEach
public void after() {
myWorkChannel.clearInterceptorsForUnitTest();
myDaoConfig.setJobFastTrackingEnabled(true);
JobMaintenanceServiceImpl jobMaintenanceService = (JobMaintenanceServiceImpl) myJobMaintenanceService;
jobMaintenanceService.setMaintenanceJobStartedCallback(() -> {});
}
@Test
public void testFirstStepToSecondStep_singleChunkFasttracks() throws InterruptedException {
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> firstStep = (step, sink) -> {
sink.accept(new FirstStepOutput());
callLatch(myFirstStepLatch, step);
return RunOutcome.SUCCESS;
};
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> callLatch(myLastStepLatch, step);
String jobDefId = new Exception().getStackTrace()[0].getMethodName();
JobDefinition<? extends IModelJson> definition = buildGatedJobDefinition(jobDefId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
JobInstanceStartRequest request = buildRequest(jobDefId);
myFirstStepLatch.setExpectedCount(1);
myLastStepLatch.setExpectedCount(1);
String batchJobId = myJobCoordinator.startInstance(request).getJobId();
myFirstStepLatch.awaitExpected();
myBatch2JobHelper.assertFastTracking(batchJobId);
// Since there was only one chunk, the job should proceed without requiring a maintenance pass
myBatch2JobHelper.awaitJobCompletion(batchJobId);
myBatch2JobHelper.assertFastTracking(batchJobId);
myLastStepLatch.awaitExpected();
myBatch2JobHelper.assertFastTracking(batchJobId);
assertJobMaintenanceCalledByQuartzThread();
assertJobMaintenanceCalledAtLeast(2);
}
private void assertJobMaintenanceCalledAtLeast(int theSize) {
assertTrue(myStackTraceElements.size() >= theSize, "Expected at least " + theSize + " calls to job maintenance but got " + myStackTraceElements.size());
}
private void assertJobMaintenanceCalledByQuartzThread() {
StackTraceElement[] stackTrace = myStackTraceElements.get(0);
boolean found = false;
for (StackTraceElement stackTraceElement : stackTrace) {
if (stackTraceElement.getClassName().equals("org.quartz.core.JobRunShell")) {
found = true;
break;
}
}
assertTrue(found, "Job maintenance should be called by Quartz thread");
}
@Test
public void testFirstStepToSecondStepFasttrackingDisabled_singleChunkDoesNotFasttrack() throws InterruptedException {
myDaoConfig.setJobFastTrackingEnabled(false);
IJobStepWorker<Batch2JobMaintenanceIT.TestJobParameters, VoidModel, Batch2JobMaintenanceIT.FirstStepOutput> firstStep = (step, sink) -> {
sink.accept(new Batch2JobMaintenanceIT.FirstStepOutput());
callLatch(myFirstStepLatch, step);
return RunOutcome.SUCCESS;
};
IJobStepWorker<Batch2JobMaintenanceIT.TestJobParameters, Batch2JobMaintenanceIT.FirstStepOutput, VoidModel> lastStep = (step, sink) -> callLatch(myLastStepLatch, step);
String jobDefId = new Exception().getStackTrace()[0].getMethodName();
JobDefinition<? extends IModelJson> definition = buildGatedJobDefinition(jobDefId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
JobInstanceStartRequest request = buildRequest(jobDefId);
myFirstStepLatch.setExpectedCount(1);
myLastStepLatch.setExpectedCount(1);
String batchJobId = myJobCoordinator.startInstance(request).getJobId();
myFirstStepLatch.awaitExpected();
myBatch2JobHelper.assertFastTracking(batchJobId);
// Since there was only one chunk, the job should request fasttracking
myBatch2JobHelper.awaitJobCompletionWithoutMaintenancePass(batchJobId);
// However since we disabled fasttracking, the job should not have fasttracked
myBatch2JobHelper.assertNotFastTracking(batchJobId);
myLastStepLatch.awaitExpected();
myBatch2JobHelper.assertNotFastTracking(batchJobId);
assertJobMaintenanceCalledByQuartzThread();
assertJobMaintenanceCalledAtLeast(2);
}
@Nonnull
private JobInstanceStartRequest buildRequest(String jobId) {
JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(jobId);
TestJobParameters parameters = new TestJobParameters();
request.setParameters(parameters);
return request;
}
@Nonnull
private JobDefinition<? extends IModelJson> buildGatedJobDefinition(String theJobId, IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> theFirstStep, IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> theLastStep) {
return JobDefinition.newBuilder()
.setJobDefinitionId(theJobId)
.setJobDescription("test job")
.setJobDefinitionVersion(TEST_JOB_VERSION)
.setParametersType(TestJobParameters.class)
.gatedExecution()
.addFirstStep(
FIRST_STEP_ID,
"Test first step",
FirstStepOutput.class,
theFirstStep
)
.addLastStep(
LAST_STEP_ID,
"Test last step",
theLastStep
)
.completionHandler(myCompletionHandler)
.build();
}
static class TestJobParameters implements IModelJson {
TestJobParameters() {
}
}
static class FirstStepOutput implements IModelJson {
FirstStepOutput() {
}
}
static class SecondStepOutput implements IModelJson {
@JsonProperty("test")
private String myTestValue;
SecondStepOutput() {
}
public void setValue(String theV) {
myTestValue = theV;
}
}
static class ReductionStepOutput implements IModelJson {
@JsonProperty("result")
private List<?> myResult;
ReductionStepOutput(List<?> theResult) {
myResult = theResult;
}
}
static class SpringConfig {
@Autowired
IJobMaintenanceService myJobMaintenanceService;
@PostConstruct
void fastScheduler() {
((JobMaintenanceServiceImpl)myJobMaintenanceService).setScheduledJobFrequencyMillis(200);
}
}
}

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -63,54 +63,82 @@ public class Batch2JobHelper {
return awaitJobCompletion(theStartResponse.getJobId());
}
public JobInstance awaitJobCompletion(String theId) {
return awaitJobHasStatus(theId, StatusEnum.COMPLETED);
public JobInstance awaitJobCompletion(String theBatchJobId) {
return awaitJobHasStatus(theBatchJobId, StatusEnum.COMPLETED);
}
public JobInstance awaitJobCancelled(String theId) {
return awaitJobHasStatus(theId, StatusEnum.CANCELLED);
public JobInstance awaitJobCompletionWithoutMaintenancePass(String theBatchJobId) {
return awaitJobHasStatusWithoutMaintenancePass(theBatchJobId, StatusEnum.COMPLETED);
}
public JobInstance awaitJobCompletion(String theId, int theSecondsToWait) {
return awaitJobHasStatus(theId, theSecondsToWait, StatusEnum.COMPLETED);
public JobInstance awaitJobCancelled(String theBatchJobId) {
return awaitJobHasStatus(theBatchJobId, StatusEnum.CANCELLED);
}
public JobInstance awaitJobHasStatus(String theId, StatusEnum... theExpectedStatus) {
return awaitJobHasStatus(theId, 10, theExpectedStatus);
public JobInstance awaitJobCompletion(String theBatchJobId, int theSecondsToWait) {
return awaitJobHasStatus(theBatchJobId, theSecondsToWait, StatusEnum.COMPLETED);
}
public JobInstance awaitJobHasStatus(String theId, int theSecondsToWait, StatusEnum... theExpectedStatus) {
public JobInstance awaitJobHasStatus(String theBatchJobId, StatusEnum... theExpectedStatus) {
return awaitJobHasStatus(theBatchJobId, 10, theExpectedStatus);
}
public JobInstance awaitJobHasStatusWithoutMaintenancePass(String theBatchJobId, StatusEnum... theExpectedStatus) {
return awaitJobawaitJobHasStatusWithoutMaintenancePass(theBatchJobId, 10, theExpectedStatus);
}
public JobInstance awaitJobHasStatus(String theBatchJobId, int theSecondsToWait, StatusEnum... theExpectedStatus) {
assert !TransactionSynchronizationManager.isActualTransactionActive();
try {
await()
.atMost(theSecondsToWait, TimeUnit.SECONDS)
.until(() -> checkStatusWithMaintenancePass(theId, theExpectedStatus));
.until(() -> checkStatusWithMaintenancePass(theBatchJobId, theExpectedStatus));
} catch (ConditionTimeoutException e) {
String statuses = myJobPersistence.fetchInstances(100, 0)
.stream()
.map(t -> t.getJobDefinitionId() + "/" + t.getStatus().name())
.collect(Collectors.joining("\n"));
String currentStatus = myJobCoordinator.getInstance(theId).getStatus().name();
String currentStatus = myJobCoordinator.getInstance(theBatchJobId).getStatus().name();
fail("Job still has status " + currentStatus + " - All statuses:\n" + statuses);
}
return myJobCoordinator.getInstance(theId);
return myJobCoordinator.getInstance(theBatchJobId);
}
private boolean checkStatusWithMaintenancePass(String theId, StatusEnum... theExpectedStatuses) {
if (hasStatus(theId, theExpectedStatuses)) {
public JobInstance awaitJobawaitJobHasStatusWithoutMaintenancePass(String theBatchJobId, int theSecondsToWait, StatusEnum... theExpectedStatus) {
assert !TransactionSynchronizationManager.isActualTransactionActive();
try {
await()
.atMost(theSecondsToWait, TimeUnit.SECONDS)
.until(() -> hasStatus(theBatchJobId, theExpectedStatus));
} catch (ConditionTimeoutException e) {
String statuses = myJobPersistence.fetchInstances(100, 0)
.stream()
.map(t -> t.getJobDefinitionId() + "/" + t.getStatus().name())
.collect(Collectors.joining("\n"));
String currentStatus = myJobCoordinator.getInstance(theBatchJobId).getStatus().name();
fail("Job still has status " + currentStatus + " - All statuses:\n" + statuses);
}
return myJobCoordinator.getInstance(theBatchJobId);
}
private boolean checkStatusWithMaintenancePass(String theBatchJobId, StatusEnum... theExpectedStatuses) {
if (hasStatus(theBatchJobId, theExpectedStatuses)) {
return true;
}
myJobMaintenanceService.runMaintenancePass();
return hasStatus(theId, theExpectedStatuses);
return hasStatus(theBatchJobId, theExpectedStatuses);
}
private boolean hasStatus(String theId, StatusEnum[] theExpectedStatuses) {
return ArrayUtils.contains(theExpectedStatuses, getStatus(theId));
private boolean hasStatus(String theBatchJobId, StatusEnum[] theExpectedStatuses) {
return ArrayUtils.contains(theExpectedStatuses, getStatus(theBatchJobId));
}
private StatusEnum getStatus(String theId) {
return myJobCoordinator.getInstance(theId).getStatus();
private StatusEnum getStatus(String theBatchJobId) {
return myJobCoordinator.getInstance(theBatchJobId).getStatus();
}
public JobInstance awaitJobFailure(Batch2JobStartResponse theStartResponse) {
@ -121,8 +149,8 @@ public class Batch2JobHelper {
return awaitJobHasStatus(theJobId, StatusEnum.ERRORED, StatusEnum.FAILED);
}
public void awaitJobInProgress(String theId) {
await().until(() -> checkStatusWithMaintenancePass(theId, StatusEnum.IN_PROGRESS));
public void awaitJobInProgress(String theBatchJobId) {
await().until(() -> checkStatusWithMaintenancePass(theBatchJobId, StatusEnum.IN_PROGRESS));
}
public void assertNotFastTracking(String theInstanceId) {
@ -214,4 +242,5 @@ public class Batch2JobHelper {
public void runMaintenancePass() {
myJobMaintenanceService.runMaintenancePass();
}
}

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -7,7 +7,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>hapi-fhir-serviceloaders</artifactId>
<groupId>ca.uhn.hapi.fhir</groupId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>hapi-fhir-serviceloaders</artifactId>
<groupId>ca.uhn.hapi.fhir</groupId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@ -20,7 +20,7 @@
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-caching-api</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>hapi-fhir-serviceloaders</artifactId>
<groupId>ca.uhn.hapi.fhir</groupId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>hapi-fhir</artifactId>
<groupId>ca.uhn.hapi.fhir</groupId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>hapi-fhir</artifactId>
<groupId>ca.uhn.hapi.fhir</groupId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-client-apache</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-client-okhttp</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-server-jersey</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.migrate.taskdef;
/*-
* #%L
* HAPI FHIR Server - SQL Migration
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.migrate.DriverTypeEnum;

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -29,6 +29,7 @@ import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor;
import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
@ -81,11 +82,13 @@ public abstract class BaseBatch2Config {
@Bean
public IJobMaintenanceService batch2JobMaintenanceService(ISchedulerService theSchedulerService,
JobDefinitionRegistry theJobDefinitionRegistry,
DaoConfig theDaoConfig,
BatchJobSender theBatchJobSender,
WorkChunkProcessor theExecutor
) {
return new JobMaintenanceServiceImpl(theSchedulerService,
myPersistence,
theDaoConfig,
theJobDefinitionRegistry,
theBatchJobSender,
theExecutor

View File

@ -26,8 +26,6 @@ import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.jpa.batch.log.Logs;
@ -36,8 +34,6 @@ import org.slf4j.Logger;
import javax.annotation.Nonnull;
import java.util.Date;
import java.util.Optional;
import java.util.Date;
public class JobStepExecutor<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@ -98,7 +94,11 @@ public class JobStepExecutor<PT extends IModelJson, IT extends IModelJson, OT ex
private void handleFastTracking(BaseDataSink<PT, IT, OT> theDataSink) {
if (theDataSink.getWorkChunkCount() <= 1) {
ourLog.debug("Gated job {} step {} produced exactly one chunk: Triggering a maintenance pass.", myDefinition.getJobDefinitionId(), myCursor.currentStep.getStepId());
myJobMaintenanceService.triggerMaintenancePass();
boolean success = myJobMaintenanceService.triggerMaintenancePass();
if (!success) {
myInstance.setFastTracking(false);
myJobPersistence.updateInstance(myInstance);
}
} else {
ourLog.debug("Gated job {} step {} produced {} chunks: Disabling fast tracking.", myDefinition.getJobDefinitionId(), myCursor.currentStep.getStepId(), theDataSink.getWorkChunkCount());
myInstance.setFastTracking(false);

View File

@ -27,6 +27,7 @@ import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
@ -84,21 +85,28 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
private final IJobPersistence myJobPersistence;
private final ISchedulerService mySchedulerService;
private final DaoConfig myDaoConfig;
private final JobDefinitionRegistry myJobDefinitionRegistry;
private final BatchJobSender myBatchJobSender;
private final WorkChunkProcessor myJobExecutorSvc;
private final Semaphore myRunMaintenanceSemaphore = new Semaphore(1);
private long myScheduledJobFrequencyMillis = DateUtils.MILLIS_PER_MINUTE;
private Runnable myMaintenanceJobStartedCallback = () -> {};
private Runnable myMaintenanceJobFinishedCallback = () -> {};
/**
* Constructor
*/
public JobMaintenanceServiceImpl(@Nonnull ISchedulerService theSchedulerService,
@Nonnull IJobPersistence theJobPersistence,
DaoConfig theDaoConfig,
@Nonnull JobDefinitionRegistry theJobDefinitionRegistry,
@Nonnull BatchJobSender theBatchJobSender,
@Nonnull WorkChunkProcessor theExecutor
) {
myDaoConfig = theDaoConfig;
Validate.notNull(theSchedulerService);
Validate.notNull(theJobPersistence);
Validate.notNull(theJobDefinitionRegistry);
@ -113,7 +121,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
@Override
public void scheduleJobs(ISchedulerService theSchedulerService) {
mySchedulerService.scheduleClusteredJob(DateUtils.MILLIS_PER_MINUTE, buildJobDefinition());
mySchedulerService.scheduleClusteredJob(myScheduledJobFrequencyMillis, buildJobDefinition());
}
@Nonnull
@ -124,11 +132,18 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
return jobDefinition;
}
public void setScheduledJobFrequencyMillis(long theScheduledJobFrequencyMillis) {
myScheduledJobFrequencyMillis = theScheduledJobFrequencyMillis;
}
/**
* @return true if a request to run a maintance pass was submitted
*/
@Override
public boolean triggerMaintenancePass() {
if (!myDaoConfig.isJobFastTrackingEnabled()) {
return false;
}
if (mySchedulerService.isClusteredSchedulingEnabled()) {
mySchedulerService.triggerClusteredJobImmediately(buildJobDefinition());
return true;
@ -178,6 +193,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
}
private void doMaintenancePass() {
myMaintenanceJobStartedCallback.run();
Set<String> processedInstanceIds = new HashSet<>();
JobChunkProgressAccumulator progressAccumulator = new JobChunkProgressAccumulator();
for (int page = 0; ; page++) {
@ -196,6 +212,15 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
break;
}
}
myMaintenanceJobFinishedCallback.run();
}
public void setMaintenanceJobStartedCallback(Runnable theMaintenanceJobStartedCallback) {
myMaintenanceJobStartedCallback = theMaintenanceJobStartedCallback;
}
public void setMaintenanceJobFinishedCallback(Runnable theMaintenanceJobFinishedCallback) {
myMaintenanceJobFinishedCallback = theMaintenanceJobFinishedCallback;
}
public static class JobMaintenanceScheduledJob implements HapiJob {

View File

@ -14,6 +14,7 @@ import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import com.google.common.collect.Lists;
@ -25,6 +26,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.messaging.Message;
@ -68,6 +70,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
private IJobPersistence myJobPersistence;
@Mock
private WorkChunkProcessor myJobExecutorSvc;
@Spy
private DaoConfig myDaoConfig = new DaoConfig();
private JobMaintenanceServiceImpl mySvc;
@Captor
private ArgumentCaptor<JobInstance> myInstanceCaptor;
@ -87,10 +91,12 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
BatchJobSender batchJobSender = new BatchJobSender(myWorkChannelProducer);
mySvc = new JobMaintenanceServiceImpl(mySchedulerService,
myJobPersistence,
myDaoConfig,
myJobDefinitionRegistry,
batchJobSender,
myJobExecutorSvc
);
myDaoConfig.setJobFastTrackingEnabled(true);
}
@Test
@ -183,10 +189,13 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
);
when (myJobPersistence.canAdvanceInstanceToNextStep(any(), any())).thenReturn(true);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition(JobDefinition.Builder::gatedExecution));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false)))
.thenReturn(chunks.iterator());
when(myJobPersistence.fetchallchunkidsforstepWithStatus(eq(INSTANCE_ID), eq(STEP_2), eq(StatusEnum.QUEUED)))
.thenReturn(chunks.stream().map(chunk -> chunk.getId()).collect(Collectors.toList()));
JobInstance instance1 = createInstance();
instance1.setCurrentGatedStepId(STEP_1);
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance1));
@ -196,6 +205,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
// Verify
verify(myWorkChannelProducer, times(2)).send(myMessageCaptor.capture());
verify(myJobPersistence, times(2)).updateInstance(myInstanceCaptor.capture());
JobWorkNotification payload0 = myMessageCaptor.getAllValues().get(0).getPayload();
assertEquals(STEP_2, payload0.getTargetStepId());
assertEquals(CHUNK_ID, payload0.getChunkId());
@ -347,6 +357,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
mySvc.runMaintenancePass();
// Verify
verify(myJobPersistence, times(2)).updateInstance(myInstanceCaptor.capture());
assertEquals(StatusEnum.CANCELLED, instance1.getStatus());
assertTrue(instance1.getErrorMessage().startsWith("Job instance cancelled"));
}
@ -392,6 +403,12 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
verify(myJobPersistence, times(1)).fetchInstances(anyInt(), eq(0));
}
@Test
void triggerMaintenancePassDisabled_noneInProgress_doesNotRunMaintenace() {
myDaoConfig.setJobFastTrackingEnabled(false);
mySvc.triggerMaintenancePass();
verifyNoMoreInteractions(myJobPersistence);
}
@Test
void triggerMaintenancePass_twoSimultaneousRequests_onlyCallOnce() throws InterruptedException, ExecutionException {

View File

@ -7,7 +7,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -334,6 +334,10 @@ public class DaoConfig {
* Since 6.2.0
*/
private int myBulkExportFileMaximumCapacity = 1_000;
/**
* Since 6.4.0
*/
private boolean myJobFastTrackingEnabled = false;
/**
* Constructor
@ -355,6 +359,10 @@ public class DaoConfig {
ourLog.info("Status based reindexing is DISABLED");
setStatusBasedReindexingDisabled(true);
}
if (HapiSystemProperties.isUnitTestModeEnabled()) {
setJobFastTrackingEnabled(true);
}
}
/**
@ -2985,6 +2993,26 @@ public class DaoConfig {
myBulkExportFileMaximumCapacity = theBulkExportFileMaximumCapacity;
}
/**
* If this setting is enabled, then gated batch jobs that produce only one chunk will immediately trigger a batch
* maintenance job. This may be useful for testing, but is not recommended for production use.
*
* @since 6.4.0
*/
public boolean isJobFastTrackingEnabled() {
return myJobFastTrackingEnabled;
}
/**
* If this setting is enabled, then gated batch jobs that produce only one chunk will immediately trigger a batch
* maintenance job. This may be useful for testing, but is not recommended for production use.
*
* @since 6.4.0
*/
public void setJobFastTrackingEnabled(boolean theJobFastTrackingEnabled) {
myJobFastTrackingEnabled = theJobFastTrackingEnabled;
}
public enum StoreMetaSourceInformationEnum {
NONE(false, false),
SOURCE_URI(true, false),

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -37,6 +37,7 @@ public class UnregisterScheduledProcessor implements BeanFactoryPostProcessor {
public static final String SCHEDULING_DISABLED = "scheduling_disabled";
public static final String SCHEDULING_DISABLED_EQUALS_TRUE = "scheduling_disabled=true";
public static final String SCHEDULING_DISABLED_EQUALS_FALSE = "scheduling_disabled=false";
private final Environment myEnvironment;

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<packaging>pom</packaging>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<name>HAPI-FHIR</name>
<description>An open-source implementation of the FHIR specification in Java.</description>
<url>https://hapifhir.io</url>
@ -2128,7 +2128,7 @@
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-checkstyle</artifactId>
<!-- Remember to bump this when you upgrade the version -->
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
</dependency>
</dependencies>
</plugin>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.10-SNAPSHOT</version>
<version>6.3.11-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>