From 308167698d6f8a58aaa56b83a69f505645d1c720 Mon Sep 17 00:00:00 2001 From: "justin.mckelvy" Date: Tue, 16 May 2023 14:12:37 -0600 Subject: [PATCH] merge executor bean, alter fork thread pool --- ...readFactory.java => CqlThreadFactory.java} | 14 ++++++----- .../config/BaseClinicalReasoningConfig.java | 25 ++++++------------- .../fhir/cr/dstu3/measure/MeasureService.java | 6 +++-- .../fhir/cr/r4/measure/MeasureService.java | 7 ++++-- 4 files changed, 24 insertions(+), 28 deletions(-) rename hapi-fhir-storage-cr/src/main/java/ca/uhn/fhir/cr/common/{CqlForkJoinWorkerThreadFactory.java => CqlThreadFactory.java} (77%) diff --git a/hapi-fhir-storage-cr/src/main/java/ca/uhn/fhir/cr/common/CqlForkJoinWorkerThreadFactory.java b/hapi-fhir-storage-cr/src/main/java/ca/uhn/fhir/cr/common/CqlThreadFactory.java similarity index 77% rename from hapi-fhir-storage-cr/src/main/java/ca/uhn/fhir/cr/common/CqlForkJoinWorkerThreadFactory.java rename to hapi-fhir-storage-cr/src/main/java/ca/uhn/fhir/cr/common/CqlThreadFactory.java index 230fc762fff..d114480ef5c 100644 --- a/hapi-fhir-storage-cr/src/main/java/ca/uhn/fhir/cr/common/CqlForkJoinWorkerThreadFactory.java +++ b/hapi-fhir-storage-cr/src/main/java/ca/uhn/fhir/cr/common/CqlThreadFactory.java @@ -22,22 +22,24 @@ package ca.uhn.fhir.cr.common; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; import java.util.concurrent.ForkJoinWorkerThread; +import java.util.concurrent.ThreadFactory; /** * This class resolves issues with loading JAXB in a server environment and using CompletableFutures * https://stackoverflow.com/questions/49113207/completablefuture-forkjoinpool-set-class-loader **/ -public class CqlForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { +public class CqlThreadFactory implements ThreadFactory { @Override - public final ForkJoinWorkerThread newThread(ForkJoinPool thePool) { - return new CqlForkJoinWorkerThread(thePool); + public Thread newThread(Runnable r) { + return new CqlThread(r); } - private static class CqlForkJoinWorkerThread extends ForkJoinWorkerThread { + private static class CqlThread extends Thread { - private CqlForkJoinWorkerThread(final ForkJoinPool thePool) { - super(thePool); + private CqlThread(Runnable runnable) { + super(runnable); + //super(thePool); // set the correct classloader here setContextClassLoader(Thread.currentThread().getContextClassLoader()); } diff --git a/hapi-fhir-storage-cr/src/main/java/ca/uhn/fhir/cr/config/BaseClinicalReasoningConfig.java b/hapi-fhir-storage-cr/src/main/java/ca/uhn/fhir/cr/config/BaseClinicalReasoningConfig.java index a7381f59c22..97ec14f981c 100644 --- a/hapi-fhir-storage-cr/src/main/java/ca/uhn/fhir/cr/config/BaseClinicalReasoningConfig.java +++ b/hapi-fhir-storage-cr/src/main/java/ca/uhn/fhir/cr/config/BaseClinicalReasoningConfig.java @@ -24,7 +24,7 @@ import ca.uhn.fhir.context.FhirVersionEnum; import ca.uhn.fhir.context.support.IValidationSupport; import ca.uhn.fhir.cr.common.CodeCacheResourceChangeListener; import ca.uhn.fhir.cr.common.CqlExceptionHandlingInterceptor; -import ca.uhn.fhir.cr.common.CqlForkJoinWorkerThreadFactory; +import ca.uhn.fhir.cr.common.CqlThreadFactory; import ca.uhn.fhir.cr.common.ElmCacheResourceChangeListener; import ca.uhn.fhir.cr.common.HapiFhirDal; import ca.uhn.fhir.cr.common.HapiFhirRetrieveProvider; @@ -72,17 +72,14 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Primary; import org.springframework.context.annotation.Scope; -import org.springframework.security.concurrent.DelegatingSecurityContextExecutor; import org.springframework.security.concurrent.DelegatingSecurityContextExecutorService; -import org.springframework.security.core.context.SecurityContextHolder; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ForkJoinPool; + @Configuration @@ -295,19 +292,11 @@ public abstract class BaseClinicalReasoningConfig { } @Bean - public Executor cqlExecutor() { - CqlForkJoinWorkerThreadFactory factory = new CqlForkJoinWorkerThreadFactory(); - ForkJoinPool myCommonPool = new ForkJoinPool(Math.min(32767, Runtime.getRuntime().availableProcessors()), - factory, - null, false); - - return new DelegatingSecurityContextExecutor(myCommonPool, - SecurityContextHolder.getContext()); - } - - @Bean - public ExecutorService measureExecutor() { - ExecutorService executor = Executors.newFixedThreadPool(CrProperties.MeasureProperties.DEFAULT_THREADS_BATCH_SIZE); + public ExecutorService cqlExecutor() { + CqlThreadFactory factory = new CqlThreadFactory(); + ExecutorService executor = Executors. + newFixedThreadPool(CrProperties.MeasureProperties.DEFAULT_THREADS_BATCH_SIZE + , factory); executor = new DelegatingSecurityContextExecutorService(executor); return executor; diff --git a/hapi-fhir-storage-cr/src/main/java/ca/uhn/fhir/cr/dstu3/measure/MeasureService.java b/hapi-fhir-storage-cr/src/main/java/ca/uhn/fhir/cr/dstu3/measure/MeasureService.java index 73c9d41b1df..05a9b9f07c0 100644 --- a/hapi-fhir-storage-cr/src/main/java/ca/uhn/fhir/cr/dstu3/measure/MeasureService.java +++ b/hapi-fhir-storage-cr/src/main/java/ca/uhn/fhir/cr/dstu3/measure/MeasureService.java @@ -51,6 +51,7 @@ import org.opencds.cqf.cql.evaluator.fhir.util.Clients; import org.opencds.cqf.cql.evaluator.measure.MeasureEvaluationOptions; import org.springframework.beans.factory.annotation.Autowired; +import javax.inject.Named; import java.util.Collections; import java.util.List; import java.util.Map; @@ -132,7 +133,8 @@ public class MeasureService implements IDaoRegistryUser { protected DaoRegistry myDaoRegistry; @Autowired - private ExecutorService myMeasureExecutor; + @Named("cqlExecutor") + private ExecutorService myCqlExecutor; protected RequestDetails myRequestDetails; /** @@ -186,7 +188,7 @@ public class MeasureService implements IDaoRegistryUser { TerminologyProvider terminologyProvider; - myMeasureEvaluationOptions.setMeasureExecutor(myMeasureExecutor); + myMeasureEvaluationOptions.setMeasureExecutor(myCqlExecutor); if (theTerminologyEndpoint != null) { IGenericClient client = Clients.forEndpoint(getFhirContext(), theTerminologyEndpoint); diff --git a/hapi-fhir-storage-cr/src/main/java/ca/uhn/fhir/cr/r4/measure/MeasureService.java b/hapi-fhir-storage-cr/src/main/java/ca/uhn/fhir/cr/r4/measure/MeasureService.java index 7258260b393..fb1cf9a488b 100644 --- a/hapi-fhir-storage-cr/src/main/java/ca/uhn/fhir/cr/r4/measure/MeasureService.java +++ b/hapi-fhir-storage-cr/src/main/java/ca/uhn/fhir/cr/r4/measure/MeasureService.java @@ -59,10 +59,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import javax.inject.Named; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import static ca.uhn.fhir.cr.constant.MeasureReportConstants.COUNTRY_CODING_SYSTEM_CODE; @@ -143,7 +145,8 @@ public class MeasureService implements IDaoRegistryUser { protected DaoRegistry myDaoRegistry; @Autowired - private ExecutorService myMeasureExecutor; + @Named("cqlExecutor") + private ExecutorService myCqlExecutor; protected RequestDetails myRequestDetails; @@ -198,7 +201,7 @@ public class MeasureService implements IDaoRegistryUser { TerminologyProvider terminologyProvider; - myMeasureEvaluationOptions.setMeasureExecutor(myMeasureExecutor); + myMeasureEvaluationOptions.setMeasureExecutor(myCqlExecutor); if (theTerminologyEndpoint != null) { IGenericClient client = Clients.forEndpoint(getFhirContext(), theTerminologyEndpoint);