From a56c85f780b4c1a28b4e348b2553fca9da451869 Mon Sep 17 00:00:00 2001 From: James Agnew Date: Thu, 16 Sep 2021 12:37:34 -0400 Subject: [PATCH 1/3] Eliminate Search Coordinator ThreadPool (#2991) * Add test * Remove search coordinator thread pool * Add changelog * Add docs * Test fixes * Test fixes * Test fixes * Test fix --- ...iminate-search-coordinator-threadpool.yaml | 6 + .../ca/uhn/fhir/jpa/config/BaseConfig.java | 15 +- .../jpa/search/SearchCoordinatorSvcImpl.java | 16 +- ...rResourceDaoR4LegacySearchBuilderTest.java | 2 +- .../FhirResourceDaoR4SearchLastNAsyncIT.java | 12 +- .../r4/FhirResourceDaoR4SearchNoFtTest.java | 2 +- .../FhirResourceDaoR4SearchOptimizedTest.java | 6 +- .../provider/ResourceProviderDstu2Test.java | 3 - .../r4/BaseResourceProviderR4Test.java | 3 + .../r4/ResourceProviderConcurrencyR4Test.java | 202 ++++++++++++++++++ .../provider/r4/ResourceProviderR4Test.java | 2 + .../search/SearchCoordinatorSvcImplTest.java | 48 ++--- 12 files changed, 255 insertions(+), 62 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_6_0/2991-eliminate-search-coordinator-threadpool.yaml create mode 100644 hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderConcurrencyR4Test.java diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_6_0/2991-eliminate-search-coordinator-threadpool.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_6_0/2991-eliminate-search-coordinator-threadpool.yaml new file mode 100644 index 00000000000..638ed8c891b --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_6_0/2991-eliminate-search-coordinator-threadpool.yaml @@ -0,0 +1,6 @@ +--- +type: change +issue: 2991 +title: "This PR eliminates the search coordinator threadpool, and executes searches synchronously on the HTTP client + thread. The idea of using a separate pool was supposed to help improve server scalability, but ultimately created + false bottlenecks and reduced the utility of monitoring infrastructure so it has been eliminated." diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java index 717e5c3aa0e..72a2535faef 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java @@ -380,17 +380,6 @@ public abstract class BaseConfig { return new TermConceptMappingSvcImpl(); } - @Bean - public ThreadPoolTaskExecutor searchCoordinatorThreadFactory() { - final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); - threadPoolTaskExecutor.setThreadNamePrefix("search_coord_"); - threadPoolTaskExecutor.setCorePoolSize(searchCoordCorePoolSize); - threadPoolTaskExecutor.setMaxPoolSize(searchCoordMaxPoolSize); - threadPoolTaskExecutor.setQueueCapacity(searchCoordQueueCapacity); - threadPoolTaskExecutor.initialize(); - return threadPoolTaskExecutor; - } - @Bean public TaskScheduler taskScheduler() { ConcurrentTaskScheduler retVal = new ConcurrentTaskScheduler(); @@ -851,8 +840,8 @@ public abstract class BaseConfig { } @Bean - public ISearchCoordinatorSvc searchCoordinatorSvc(ThreadPoolTaskExecutor searchCoordinatorThreadFactory) { - return new SearchCoordinatorSvcImpl(searchCoordinatorThreadFactory); + public ISearchCoordinatorSvc searchCoordinatorSvc() { + return new SearchCoordinatorSvcImpl(); } @Bean diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java index aa67d53c597..a64ba464246 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java @@ -82,7 +82,6 @@ import org.springframework.data.domain.Sort; import org.springframework.orm.jpa.JpaDialect; import org.springframework.orm.jpa.JpaTransactionManager; import org.springframework.orm.jpa.vendor.HibernateJpaDialect; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionDefinition; @@ -111,7 +110,6 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; @@ -123,7 +121,6 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { public static final Integer INTEGER_0 = 0; private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class); private final ConcurrentHashMap myIdToSearchTask = new ConcurrentHashMap<>(); - private final ExecutorService myExecutor; @Autowired private FhirContext myContext; @Autowired @@ -162,8 +159,13 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { * Constructor */ @Autowired - public SearchCoordinatorSvcImpl(ThreadPoolTaskExecutor searchCoordinatorThreadFactory) { - myExecutor = searchCoordinatorThreadFactory.getThreadPoolExecutor(); + public SearchCoordinatorSvcImpl() { + super(); + } + + @VisibleForTesting + Set getActiveSearchIds() { + return myIdToSearchTask.keySet(); } @VisibleForTesting @@ -274,7 +276,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { RequestPartitionId requestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequestForSearchType(theRequestDetails, resourceType, params, null); SearchContinuationTask task = new SearchContinuationTask(search, resourceDao, params, resourceType, theRequestDetails, requestPartitionId); myIdToSearchTask.put(search.getUuid(), task); - myExecutor.submit(task); + task.call(); } } @@ -406,7 +408,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { SearchTask task = new SearchTask(theSearch, theCallingDao, theParams, theResourceType, theRequestDetails, theRequestPartitionId); myIdToSearchTask.put(theSearch.getUuid(), task); - myExecutor.submit(task); + task.call(); PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage(theRequestDetails, theSearch, task, theSb); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4LegacySearchBuilderTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4LegacySearchBuilderTest.java index 5366efa789c..5c33c51e5ab 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4LegacySearchBuilderTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4LegacySearchBuilderTest.java @@ -929,7 +929,7 @@ public class FhirResourceDaoR4LegacySearchBuilderTest extends BaseJpaR4Test { List actual = toUnqualifiedVersionlessIds(resp); myCaptureQueriesListener.logSelectQueriesForCurrentThread(); assertThat(actual, containsInAnyOrder(orgId, medId, patId, moId, patId2)); - assertEquals(1, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size()); + assertEquals(8, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size()); // Specific patient ID with linked stuff request = mock(HttpServletRequest.class); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchLastNAsyncIT.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchLastNAsyncIT.java index 5ff05cac158..37661610e44 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchLastNAsyncIT.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchLastNAsyncIT.java @@ -13,6 +13,8 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.junit.jupiter.SpringExtension; @@ -30,6 +32,7 @@ import static org.mockito.Mockito.when; @ExtendWith(SpringExtension.class) public class FhirResourceDaoR4SearchLastNAsyncIT extends BaseR4SearchLastN { + private static final Logger ourLog = LoggerFactory.getLogger(FhirResourceDaoR4SearchLastNAsyncIT.class); @Autowired protected DaoConfig myDaoConfig; private List originalPreFetchThresholds; @@ -108,8 +111,11 @@ public class FhirResourceDaoR4SearchLastNAsyncIT extends BaseR4SearchLastN { .map(t -> t.getSql(true, false)) .collect(Collectors.toList()); + ourLog.info("Queries:\n * " + String.join("\n * ", queries)); + + // 3 queries to actually perform the search // 1 query to lookup up Search from cache, and 2 chunked queries to retrieve resources by PID. - assertEquals(3, queries.size()); + assertEquals(6, queries.size()); // The first chunked query should have a full complement of PIDs StringBuilder firstQueryPattern = new StringBuilder(".*RES_ID in \\('[0-9]+'"); @@ -117,7 +123,7 @@ public class FhirResourceDaoR4SearchLastNAsyncIT extends BaseR4SearchLastN { firstQueryPattern.append(" , '[0-9]+'"); } firstQueryPattern.append("\\).*"); - assertThat(queries.get(1), matchesPattern(firstQueryPattern.toString())); + assertThat(queries.get(4), matchesPattern(firstQueryPattern.toString())); // the second chunked query should be padded with "-1". StringBuilder secondQueryPattern = new StringBuilder(".*RES_ID in \\('[0-9]+'"); @@ -128,7 +134,7 @@ public class FhirResourceDaoR4SearchLastNAsyncIT extends BaseR4SearchLastN { secondQueryPattern.append(" , '-1'"); } secondQueryPattern.append("\\).*"); - assertThat(queries.get(2), matchesPattern(secondQueryPattern.toString())); + assertThat(queries.get(5), matchesPattern(secondQueryPattern.toString())); } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchNoFtTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchNoFtTest.java index 5ee19c00c8e..2757b30ee0d 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchNoFtTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchNoFtTest.java @@ -781,7 +781,7 @@ public class FhirResourceDaoR4SearchNoFtTest extends BaseJpaR4Test { List actual = toUnqualifiedVersionlessIds(resp); myCaptureQueriesListener.logSelectQueriesForCurrentThread(); assertThat(actual, containsInAnyOrder(orgId, medId, patId, moId, patId2)); - assertEquals(1, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size()); + assertEquals(7, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size()); // Specific patient ID with linked stuff request = mock(HttpServletRequest.class); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchOptimizedTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchOptimizedTest.java index e726c5254da..a9a9352b66d 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchOptimizedTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchOptimizedTest.java @@ -1177,9 +1177,9 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test { assertEquals(1, myCaptureQueriesListener.countUpdateQueries()); assertEquals(0, myCaptureQueriesListener.countDeleteQueries()); - assertEquals(2, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); - assertEquals(0, myCaptureQueriesListener.countInsertQueriesForCurrentThread()); - assertEquals(0, myCaptureQueriesListener.countUpdateQueriesForCurrentThread()); + assertEquals(4, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); + assertEquals(9, myCaptureQueriesListener.countInsertQueriesForCurrentThread()); + assertEquals(1, myCaptureQueriesListener.countUpdateQueriesForCurrentThread()); assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread()); } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/ResourceProviderDstu2Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/ResourceProviderDstu2Test.java index 98e72802634..a961eafc12f 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/ResourceProviderDstu2Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/ResourceProviderDstu2Test.java @@ -1627,9 +1627,6 @@ public class ResourceProviderDstu2Test extends BaseResourceProviderDstu2Test { .execute(); assertEquals(10, response.getEntry().size()); - if (ourConnectionPoolSize > 1) { - assertEquals(null, response.getTotalElement().getValueAsString(), "Total should be null but was " + response.getTotalElement().getValueAsString() + " in " + sw.toString()); - } assertThat(response.getLink("next").getUrl(), not(emptyString())); // Load page 2 diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/BaseResourceProviderR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/BaseResourceProviderR4Test.java index 6081f5019a5..760542e197f 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/BaseResourceProviderR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/BaseResourceProviderR4Test.java @@ -179,9 +179,12 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test { myFhirCtx.getRestfulClientFactory().setSocketTimeout(400000); PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(5000, TimeUnit.MILLISECONDS); + connectionManager.setMaxTotal(10); + connectionManager.setDefaultMaxPerRoute(10); HttpClientBuilder builder = HttpClientBuilder.create(); builder.setConnectionManager(connectionManager); builder.setMaxConnPerRoute(99); + ourHttpClient = builder.build(); ourServer = server; diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderConcurrencyR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderConcurrencyR4Test.java new file mode 100644 index 00000000000..9b0ca550c62 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderConcurrencyR4Test.java @@ -0,0 +1,202 @@ +package ca.uhn.fhir.jpa.provider.r4; + +import ca.uhn.fhir.interceptor.api.Hook; +import ca.uhn.fhir.interceptor.api.Interceptor; +import ca.uhn.fhir.interceptor.api.Pointcut; +import ca.uhn.fhir.rest.api.server.RequestDetails; +import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; +import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.hl7.fhir.r4.model.Bundle; +import org.hl7.fhir.r4.model.Patient; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@SuppressWarnings("Duplicates") +public class ResourceProviderConcurrencyR4Test extends BaseResourceProviderR4Test { + + private static final Logger ourLog = LoggerFactory.getLogger(ResourceProviderConcurrencyR4Test.class); + private ExecutorService myExecutor; + private List myReceivedNames = Collections.synchronizedList(new ArrayList<>()); + private List myExceptions = Collections.synchronizedList(new ArrayList<>()); + + @Override + @BeforeEach + public void before() throws Exception { + super.before(); + + myExecutor = Executors.newFixedThreadPool(10); + myReceivedNames.clear(); + myExceptions.clear(); + } + + @Override + @AfterEach + public void after() throws Exception { + super.after(); + + myInterceptorRegistry.unregisterInterceptorsIf(t -> t instanceof SearchBlockingInterceptor); + myExecutor.shutdown(); + + assertThat(myExceptions, empty()); + } + + /** + * This test is intended to verify that we are in fact executing searches in parallel + * when two different searches come in. + * + * We execute two identical searches (which should result in only one actual + * execution that will be reused by both) and one other search. We use an + * interceptor to artifically delay the execution of the first search in order + * to verify that the last search completes before the first one can finish. + */ + @Test + public void testSearchesExecuteConcurrently() { + createPatient(withFamily("FAMILY1")); + createPatient(withFamily("FAMILY2")); + createPatient(withFamily("FAMILY3")); + + SearchBlockingInterceptor searchBlockingInterceptorFamily1 = new SearchBlockingInterceptor("FAMILY1"); + myInterceptorRegistry.registerInterceptor(searchBlockingInterceptorFamily1); + + // Submit search 1 (should block because of interceptor semaphore) + { + String uri = ourServerBase + "/Patient?_format=json&family=FAMILY1"; + ourLog.info("Submitting GET " + uri); + HttpGet get = new HttpGet(uri); + myExecutor.submit(() -> { + try (CloseableHttpResponse outcome = ourHttpClient.execute(get)) { + assertEquals(200, outcome.getStatusLine().getStatusCode()); + String outcomeString = IOUtils.toString(outcome.getEntity().getContent(), StandardCharsets.UTF_8); + Bundle bundle = myFhirCtx.newJsonParser().parseResource(Bundle.class, outcomeString); + assertEquals(1, bundle.getEntry().size()); + Patient pt = (Patient) bundle.getEntry().get(0).getResource(); + String family = pt.getNameFirstRep().getFamily(); + ourLog.info("Received response with family name: {}", family); + myReceivedNames.add(family); + } catch (Exception e) { + ourLog.error("Client failure", e); + myExceptions.add(e); + } + }); + } + + await().until(() -> searchBlockingInterceptorFamily1.getHits(), equalTo(1)); + + // Submit search 2 (should also block because it will reuse the first search - same name being searched) + { + String uri = ourServerBase + "/Patient?_format=json&family=FAMILY1"; + HttpGet get = new HttpGet(uri); + myExecutor.submit(() -> { + ourLog.info("Submitting GET " + uri); + try (CloseableHttpResponse outcome = ourHttpClient.execute(get)) { + assertEquals(200, outcome.getStatusLine().getStatusCode()); + String outcomeString = IOUtils.toString(outcome.getEntity().getContent(), StandardCharsets.UTF_8); + Bundle bundle = myFhirCtx.newJsonParser().parseResource(Bundle.class, outcomeString); + assertEquals(1, bundle.getEntry().size()); + Patient pt = (Patient) bundle.getEntry().get(0).getResource(); + String family = pt.getNameFirstRep().getFamily(); + ourLog.info("Received response with family name: {}", family); + myReceivedNames.add(family); + } catch (Exception e) { + ourLog.error("Client failure", e); + myExceptions.add(e); + } + }); + } + + // Submit search 3 (should not block - different name being searched, so it should actually finish first) + { + String uri = ourServerBase + "/Patient?_format=json&family=FAMILY3"; + HttpGet get = new HttpGet(uri); + myExecutor.submit(() -> { + ourLog.info("Submitting GET " + uri); + try (CloseableHttpResponse outcome = ourHttpClient.execute(get)) { + assertEquals(200, outcome.getStatusLine().getStatusCode()); + String outcomeString = IOUtils.toString(outcome.getEntity().getContent(), StandardCharsets.UTF_8); + Bundle bundle = myFhirCtx.newJsonParser().parseResource(Bundle.class, outcomeString); + assertEquals(1, bundle.getEntry().size()); + Patient pt = (Patient) bundle.getEntry().get(0).getResource(); + String family = pt.getNameFirstRep().getFamily(); + ourLog.info("Received response with family name: {}", family); + myReceivedNames.add(family); + } catch (Exception e) { + ourLog.error("Client failure", e); + myExceptions.add(e); + } + }); + } + + ourLog.info("About to wait for FAMILY3 to complete"); + await().until(() -> myReceivedNames, contains("FAMILY3")); + ourLog.info("Got FAMILY3"); + + searchBlockingInterceptorFamily1.getLatch().countDown(); + + ourLog.info("About to wait for FAMILY1 to complete"); + await().until(() -> myReceivedNames, contains("FAMILY3", "FAMILY1", "FAMILY1")); + ourLog.info("Got FAMILY1"); + + assertEquals(1, searchBlockingInterceptorFamily1.getHits()); + } + + + @Interceptor + public static class SearchBlockingInterceptor { + + private final CountDownLatch myLatch = new CountDownLatch(1); + private final String myFamily; + private AtomicInteger myHits = new AtomicInteger(0); + + SearchBlockingInterceptor(String theFamily) { + myFamily = theFamily; + } + + @Hook(Pointcut.JPA_PERFTRACE_SEARCH_FIRST_RESULT_LOADED) + public void firstResultLoaded(RequestDetails theRequestDetails) { + String family = theRequestDetails.getParameters().get("family")[0]; + ourLog.info("See a hit for: {}", family); + if (family.equals(myFamily)) { + try { + myHits.incrementAndGet(); + if (!myLatch.await(1, TimeUnit.MINUTES)) { + throw new InternalErrorException("Timed out waiting for " + Pointcut.JPA_PERFTRACE_SEARCH_FIRST_RESULT_LOADED); + } + } catch (InterruptedException e) { + throw new InternalErrorException(e); + } + } + } + + public Integer getHits() { + return myHits.get(); + } + + public CountDownLatch getLatch() { + return myLatch; + } + } + + +} diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4Test.java index b939c4b1536..9e9978072e9 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4Test.java @@ -4808,6 +4808,7 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test { } @Test + @Disabled("Not useful with the search coordinator thread pool removed") public void testSearchWithCountNotSet() { mySearchCoordinatorSvcRaw.setSyncSizeForUnitTests(1); mySearchCoordinatorSvcRaw.setLoadingThrottleForUnitTests(200); @@ -4876,6 +4877,7 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test { } @Test + @Disabled("Not useful with the search coordinator thread pool removed") public void testSearchWithCountSearchResultsUpTo5() { mySearchCoordinatorSvcRaw.setSyncSizeForUnitTests(1); mySearchCoordinatorSvcRaw.setLoadingThrottleForUnitTests(200); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImplTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImplTest.java index 830f77ba0f5..c6af09b5281 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImplTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImplTest.java @@ -6,7 +6,6 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; -import ca.uhn.fhir.jpa.config.dstu3.BaseDstu3Config; import ca.uhn.fhir.jpa.dao.IResultIterator; import ca.uhn.fhir.jpa.dao.ISearchBuilder; import ca.uhn.fhir.jpa.dao.LegacySearchBuilder; @@ -29,6 +28,7 @@ import ca.uhn.fhir.rest.param.StringParam; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; import com.google.common.collect.Lists; +import org.hamcrest.Matchers; import org.hl7.fhir.instance.model.api.IBaseResource; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -126,7 +126,7 @@ public class SearchCoordinatorSvcImplTest { myCurrentSearch = null; - mySvc = new SearchCoordinatorSvcImpl(new BaseDstu3Config().searchCoordinatorThreadFactory()); + mySvc = new SearchCoordinatorSvcImpl(); mySvc.setEntityManagerForUnitTest(myEntityManager); mySvc.setTransactionManagerForUnitTest(myTxManager); mySvc.setContextForUnitTest(ourCtx); @@ -174,12 +174,8 @@ public class SearchCoordinatorSvcImplTest { IResultIterator iter = new FailAfterNIterator(new SlowIterator(pids.iterator(), 2), 300); when(mySearchBuilder.createQuery(same(params), any(), any(), nullable(RequestPartitionId.class))).thenReturn(iter); - IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions()); - assertNotNull(result.getUuid()); - assertEquals(null, result.size()); - try { - result.getResources(0, 100000); + mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions()); } catch (InternalErrorException e) { assertThat(e.getMessage(), containsString("FAILED")); assertThat(e.getMessage(), containsString("at ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImplTest")); @@ -220,7 +216,7 @@ public class SearchCoordinatorSvcImplTest { IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions()); assertNotNull(result.getUuid()); - assertEquals(null, result.size()); + assertEquals(790, result.size()); List resources = result.getResources(0, 100000); assertEquals(790, resources.size()); @@ -297,7 +293,7 @@ public class SearchCoordinatorSvcImplTest { IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions()); assertNotNull(result.getUuid()); - assertEquals(null, result.size()); + assertEquals(790, result.size()); List resources; @@ -337,16 +333,19 @@ public class SearchCoordinatorSvcImplTest { SlowIterator iter = new SlowIterator(pids.iterator(), 500); when(mySearchBuilder.createQuery(same(params), any(), any(), nullable(RequestPartitionId.class))).thenReturn(iter); - IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions()); - assertNotNull(result.getUuid()); + ourLog.info("Registering the first search"); + new Thread(() -> mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions())).start(); + await().until(()->iter.getCountReturned(), Matchers.greaterThan(0)); + String searchId = mySvc.getActiveSearchIds().iterator().next(); CountDownLatch completionLatch = new CountDownLatch(1); Runnable taskStarter = () -> { try { + assertNotNull(searchId); ourLog.info("About to pull the first resource"); - List resources = result.getResources(0, 1); + List resources = mySvc.getResources(searchId, 0, 1, null); ourLog.info("Done pulling the first resource"); - assertEquals(1, resources.size()); + assertEquals(1, resources.size()); } finally { completionLatch.countDown(); } @@ -360,10 +359,9 @@ public class SearchCoordinatorSvcImplTest { ourLog.info("Done cancelling all searches"); try { - result.getResources(10, 20); - } catch (InternalErrorException e) { - assertThat(e.getMessage(), containsString("Abort has been requested")); - assertThat(e.getMessage(), containsString("at ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImpl")); + mySvc.getResources(searchId, 0, 1, null); + } catch (ResourceGoneException e) { + // good } completionLatch.await(10, TimeUnit.SECONDS); @@ -391,7 +389,7 @@ public class SearchCoordinatorSvcImplTest { IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions()); assertNotNull(result.getUuid()); - assertEquals(null, result.size()); + assertEquals(790, result.size()); ArgumentCaptor searchCaptor = ArgumentCaptor.forClass(Search.class); verify(mySearchCacheSvc, atLeast(1)).save(searchCaptor.capture()); @@ -402,23 +400,11 @@ public class SearchCoordinatorSvcImplTest { PersistedJpaBundleProvider provider; resources = result.getResources(0, 10); - assertNull(result.size()); + assertEquals(790, result.size()); assertEquals(10, resources.size()); assertEquals("10", resources.get(0).getIdElement().getValueAsString()); assertEquals("19", resources.get(9).getIdElement().getValueAsString()); - when(mySearchCacheSvc.fetchByUuid(eq(result.getUuid()))).thenReturn(Optional.of(search)); - - /* - * Now call from a new bundle provider. This simulates a separate HTTP - * client request coming in. - */ - provider = newPersistedJpaBundleProvider(result.getUuid()); - resources = provider.getResources(10, 20); - assertEquals(10, resources.size()); - assertEquals("20", resources.get(0).getIdElement().getValueAsString()); - assertEquals("29", resources.get(9).getIdElement().getValueAsString()); - myExpectedNumberOfSearchBuildersCreated = 4; } From 756af63993c2539d19049b48b2cf44fc51d6c4aa Mon Sep 17 00:00:00 2001 From: Daron McIntosh Date: Thu, 16 Sep 2021 17:33:04 +0000 Subject: [PATCH 2/3] Add scope to unit test dependency --- hapi-fhir-structures-dstu2.1/pom.xml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hapi-fhir-structures-dstu2.1/pom.xml b/hapi-fhir-structures-dstu2.1/pom.xml index 203d9759eb9..3910f44e72a 100644 --- a/hapi-fhir-structures-dstu2.1/pom.xml +++ b/hapi-fhir-structures-dstu2.1/pom.xml @@ -176,8 +176,9 @@ commons-lang - commons-lang - 2.5 + commons-lang + 2.5 + test net.sf.json-lib From 528690fe6cd8db13f2813b9f18aaa27bf0120e83 Mon Sep 17 00:00:00 2001 From: Daron McIntosh Date: Thu, 16 Sep 2021 18:26:53 +0000 Subject: [PATCH 3/3] add commons-lang dep to hapi-fhir-validation pom --- hapi-fhir-validation/pom.xml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/hapi-fhir-validation/pom.xml b/hapi-fhir-validation/pom.xml index 81b1b0d0aa9..8158acb5ba3 100644 --- a/hapi-fhir-validation/pom.xml +++ b/hapi-fhir-validation/pom.xml @@ -261,6 +261,13 @@ test + + commons-lang + commons-lang + 2.5 + test + + com.helger