Add async to the job launcher, fix test to reflect this

This commit is contained in:
Tadgh 2020-06-26 11:51:01 -07:00
parent 0b622597b4
commit b31b6652a9
3 changed files with 34 additions and 1 deletions

View File

@ -76,12 +76,14 @@ import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Scope;
import org.springframework.core.env.Environment;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.dao.annotation.PersistenceExceptionTranslationPostProcessor;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
import org.springframework.scheduling.concurrent.ScheduledExecutorFactoryBean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import javax.annotation.Nullable;
@ -259,6 +261,18 @@ public abstract class BaseConfig {
return retVal;
}
@Bean
public TaskExecutor jobLaunchingTaskExecutor() {
ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
asyncTaskExecutor.setCorePoolSize(5);
asyncTaskExecutor.setMaxPoolSize(10);
asyncTaskExecutor.setQueueCapacity(500);
asyncTaskExecutor.setThreadNamePrefix("JobLauncher-");
asyncTaskExecutor.initialize();
return asyncTaskExecutor;
}
@Bean
public IResourceReindexingSvc resourceReindexingSvc() {
return new ResourceReindexingSvcImpl();

View File

@ -339,7 +339,10 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
//Note that if the job is generated, and doesnt rely on an existed persisted BulkExportJobEntity, it will need to
//create one itself, which means that its jobUUID isnt known until it starts. to get around this, we move
public void awaitJobCompletion(JobExecution theJobExecution) throws InterruptedException {
await().until(() -> theJobExecution.getStatus() == BatchStatus.COMPLETED);
await().until(() -> {
JobExecution jobExecution = myJobExplorer.getJobExecution(theJobExecution.getId());
return jobExecution.getStatus() == BatchStatus.COMPLETED;
});
}
@Test

View File

@ -23,10 +23,13 @@ package ca.uhn.fhir.jpa.batch.config;
import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.explore.support.MapJobExplorerFactoryBean;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.task.TaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
@ -35,6 +38,10 @@ public class NonPersistedBatchConfigurer extends DefaultBatchConfigurer {
@Qualifier("hapiTransactionManager")
private PlatformTransactionManager myHapiPlatformTransactionManager;
@Autowired
@Qualifier("jobLaunchingTaskExecutor")
private TaskExecutor myTaskExecutor;
private MapJobRepositoryFactoryBean myJobRepositoryFactory;
@Override
@ -58,4 +65,13 @@ public class NonPersistedBatchConfigurer extends DefaultBatchConfigurer {
jobExplorerFactoryBean.afterPropertiesSet();
return jobExplorerFactoryBean.getObject();
}
@Override
protected JobLauncher createJobLauncher() throws Exception {
SimpleJobLauncher launcher = new SimpleJobLauncher();
launcher.setTaskExecutor(myTaskExecutor);
launcher.setJobRepository(getJobRepository());
launcher.afterPropertiesSet();
return launcher;
}
}