SearchCoordinatorSvcImpl gets ExecutorService from injected ThreadPoolTaskExecutor (#2083)

* Get ExecutorService from injected ThreadPoolTaskExecutor which enables support of TaskDecorators (e.g. to add/clear ThreadLocal logging context (MDC) variables)

* Properly initialize ThreadPoolTaskExecutor
This commit is contained in:
Tue Toft Nørgård 2020-09-16 15:05:28 +02:00 committed by GitHub
parent 6ab7a0bcfe
commit c884f29112
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 15 additions and 7 deletions

View File

@ -262,6 +262,14 @@ public abstract class BaseConfig {
return new DatabaseSearchResultCacheSvcImpl(); return new DatabaseSearchResultCacheSvcImpl();
} }
@Bean
public ThreadPoolTaskExecutor searchCoordinatorThreadFactory() {
final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("search_coord_");
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
@Bean @Bean
public TaskScheduler taskScheduler() { public TaskScheduler taskScheduler() {
ConcurrentTaskScheduler retVal = new ConcurrentTaskScheduler(); ConcurrentTaskScheduler retVal = new ConcurrentTaskScheduler();

View File

@ -37,7 +37,6 @@ import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.entity.SearchInclude; import ca.uhn.fhir.jpa.entity.SearchInclude;
import ca.uhn.fhir.jpa.entity.SearchTypeEnum; import ca.uhn.fhir.jpa.entity.SearchTypeEnum;
import ca.uhn.fhir.jpa.interceptor.JpaPreResourceAccessDetails; import ca.uhn.fhir.jpa.interceptor.JpaPreResourceAccessDetails;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.model.search.SearchStatusEnum; import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
@ -54,6 +53,7 @@ import ca.uhn.fhir.rest.api.SummaryEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails; import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails;
import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.IPagingProvider; import ca.uhn.fhir.rest.server.IPagingProvider;
import ca.uhn.fhir.rest.server.SimpleBundleProvider; import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
@ -80,7 +80,7 @@ import org.springframework.data.domain.Sort;
import org.springframework.orm.jpa.JpaDialect; import org.springframework.orm.jpa.JpaDialect;
import org.springframework.orm.jpa.JpaTransactionManager; import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.vendor.HibernateJpaDialect; import org.springframework.orm.jpa.vendor.HibernateJpaDialect;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionDefinition;
@ -111,7 +111,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
@ -161,9 +160,9 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
/** /**
* Constructor * Constructor
*/ */
public SearchCoordinatorSvcImpl() { @Autowired
CustomizableThreadFactory threadFactory = new CustomizableThreadFactory("search_coord_"); public SearchCoordinatorSvcImpl(ThreadPoolTaskExecutor searchCoordinatorThreadFactory) {
myExecutor = Executors.newCachedThreadPool(threadFactory); myExecutor = searchCoordinatorThreadFactory.getThreadPoolExecutor();
} }
@VisibleForTesting @VisibleForTesting

View File

@ -7,6 +7,7 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.config.dstu3.BaseDstu3Config;
import ca.uhn.fhir.jpa.dao.IResultIterator; import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder; import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilder; import ca.uhn.fhir.jpa.dao.SearchBuilder;
@ -124,7 +125,7 @@ public class SearchCoordinatorSvcImplTest {
myCurrentSearch = null; myCurrentSearch = null;
mySvc = new SearchCoordinatorSvcImpl(); mySvc = new SearchCoordinatorSvcImpl(new BaseDstu3Config().searchCoordinatorThreadFactory());
mySvc.setEntityManagerForUnitTest(myEntityManager); mySvc.setEntityManagerForUnitTest(myEntityManager);
mySvc.setTransactionManagerForUnitTest(myTxManager); mySvc.setTransactionManagerForUnitTest(myTxManager);
mySvc.setContextForUnitTest(ourCtx); mySvc.setContextForUnitTest(ourCtx);