merge executor bean, alter fork thread pool

This commit is contained in:
justin.mckelvy 2023-05-16 14:12:37 -06:00
parent 6747111c10
commit 308167698d
4 changed files with 24 additions and 28 deletions

View File

@ -22,22 +22,24 @@ package ca.uhn.fhir.cr.common;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.ThreadFactory;
/**
* This class resolves issues with loading JAXB in a server environment and using CompletableFutures
* https://stackoverflow.com/questions/49113207/completablefuture-forkjoinpool-set-class-loader
**/
public class CqlForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory {
public class CqlThreadFactory implements ThreadFactory {
@Override
public final ForkJoinWorkerThread newThread(ForkJoinPool thePool) {
return new CqlForkJoinWorkerThread(thePool);
public Thread newThread(Runnable r) {
return new CqlThread(r);
}
private static class CqlForkJoinWorkerThread extends ForkJoinWorkerThread {
private static class CqlThread extends Thread {
private CqlForkJoinWorkerThread(final ForkJoinPool thePool) {
super(thePool);
private CqlThread(Runnable runnable) {
super(runnable);
//super(thePool);
// set the correct classloader here
setContextClassLoader(Thread.currentThread().getContextClassLoader());
}

View File

@ -24,7 +24,7 @@ import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.context.support.IValidationSupport;
import ca.uhn.fhir.cr.common.CodeCacheResourceChangeListener;
import ca.uhn.fhir.cr.common.CqlExceptionHandlingInterceptor;
import ca.uhn.fhir.cr.common.CqlForkJoinWorkerThreadFactory;
import ca.uhn.fhir.cr.common.CqlThreadFactory;
import ca.uhn.fhir.cr.common.ElmCacheResourceChangeListener;
import ca.uhn.fhir.cr.common.HapiFhirDal;
import ca.uhn.fhir.cr.common.HapiFhirRetrieveProvider;
@ -72,17 +72,14 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Scope;
import org.springframework.security.concurrent.DelegatingSecurityContextExecutor;
import org.springframework.security.concurrent.DelegatingSecurityContextExecutorService;
import org.springframework.security.core.context.SecurityContextHolder;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
@Configuration
@ -295,19 +292,11 @@ public abstract class BaseClinicalReasoningConfig {
}
@Bean
public Executor cqlExecutor() {
CqlForkJoinWorkerThreadFactory factory = new CqlForkJoinWorkerThreadFactory();
ForkJoinPool myCommonPool = new ForkJoinPool(Math.min(32767, Runtime.getRuntime().availableProcessors()),
factory,
null, false);
return new DelegatingSecurityContextExecutor(myCommonPool,
SecurityContextHolder.getContext());
}
@Bean
public ExecutorService measureExecutor() {
ExecutorService executor = Executors.newFixedThreadPool(CrProperties.MeasureProperties.DEFAULT_THREADS_BATCH_SIZE);
public ExecutorService cqlExecutor() {
CqlThreadFactory factory = new CqlThreadFactory();
ExecutorService executor = Executors.
newFixedThreadPool(CrProperties.MeasureProperties.DEFAULT_THREADS_BATCH_SIZE
, factory);
executor = new DelegatingSecurityContextExecutorService(executor);
return executor;

View File

@ -51,6 +51,7 @@ import org.opencds.cqf.cql.evaluator.fhir.util.Clients;
import org.opencds.cqf.cql.evaluator.measure.MeasureEvaluationOptions;
import org.springframework.beans.factory.annotation.Autowired;
import javax.inject.Named;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -132,7 +133,8 @@ public class MeasureService implements IDaoRegistryUser {
protected DaoRegistry myDaoRegistry;
@Autowired
private ExecutorService myMeasureExecutor;
@Named("cqlExecutor")
private ExecutorService myCqlExecutor;
protected RequestDetails myRequestDetails;
/**
@ -186,7 +188,7 @@ public class MeasureService implements IDaoRegistryUser {
TerminologyProvider terminologyProvider;
myMeasureEvaluationOptions.setMeasureExecutor(myMeasureExecutor);
myMeasureEvaluationOptions.setMeasureExecutor(myCqlExecutor);
if (theTerminologyEndpoint != null) {
IGenericClient client = Clients.forEndpoint(getFhirContext(), theTerminologyEndpoint);

View File

@ -59,10 +59,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.inject.Named;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import static ca.uhn.fhir.cr.constant.MeasureReportConstants.COUNTRY_CODING_SYSTEM_CODE;
@ -143,7 +145,8 @@ public class MeasureService implements IDaoRegistryUser {
protected DaoRegistry myDaoRegistry;
@Autowired
private ExecutorService myMeasureExecutor;
@Named("cqlExecutor")
private ExecutorService myCqlExecutor;
protected RequestDetails myRequestDetails;
@ -198,7 +201,7 @@ public class MeasureService implements IDaoRegistryUser {
TerminologyProvider terminologyProvider;
myMeasureEvaluationOptions.setMeasureExecutor(myMeasureExecutor);
myMeasureEvaluationOptions.setMeasureExecutor(myCqlExecutor);
if (theTerminologyEndpoint != null) {
IGenericClient client = Clients.forEndpoint(getFhirContext(), theTerminologyEndpoint);