Eliminate Search Coordinator ThreadPool (#2991)
* Add test * Remove search coordinator thread pool * Add changelog * Add docs * Test fixes * Test fixes * Test fixes * Test fix
This commit is contained in:
parent
b682819f25
commit
a56c85f780
|
@ -0,0 +1,6 @@
|
|||
---
|
||||
type: change
|
||||
issue: 2991
|
||||
title: "This PR eliminates the search coordinator threadpool, and executes searches synchronously on the HTTP client
|
||||
thread. The idea of using a separate pool was supposed to help improve server scalability, but ultimately created
|
||||
false bottlenecks and reduced the utility of monitoring infrastructure so it has been eliminated."
|
|
@ -380,17 +380,6 @@ public abstract class BaseConfig {
|
|||
return new TermConceptMappingSvcImpl();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ThreadPoolTaskExecutor searchCoordinatorThreadFactory() {
|
||||
final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
|
||||
threadPoolTaskExecutor.setThreadNamePrefix("search_coord_");
|
||||
threadPoolTaskExecutor.setCorePoolSize(searchCoordCorePoolSize);
|
||||
threadPoolTaskExecutor.setMaxPoolSize(searchCoordMaxPoolSize);
|
||||
threadPoolTaskExecutor.setQueueCapacity(searchCoordQueueCapacity);
|
||||
threadPoolTaskExecutor.initialize();
|
||||
return threadPoolTaskExecutor;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public TaskScheduler taskScheduler() {
|
||||
ConcurrentTaskScheduler retVal = new ConcurrentTaskScheduler();
|
||||
|
@ -851,8 +840,8 @@ public abstract class BaseConfig {
|
|||
}
|
||||
|
||||
@Bean
|
||||
public ISearchCoordinatorSvc searchCoordinatorSvc(ThreadPoolTaskExecutor searchCoordinatorThreadFactory) {
|
||||
return new SearchCoordinatorSvcImpl(searchCoordinatorThreadFactory);
|
||||
public ISearchCoordinatorSvc searchCoordinatorSvc() {
|
||||
return new SearchCoordinatorSvcImpl();
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
|
|
@ -82,7 +82,6 @@ import org.springframework.data.domain.Sort;
|
|||
import org.springframework.orm.jpa.JpaDialect;
|
||||
import org.springframework.orm.jpa.JpaTransactionManager;
|
||||
import org.springframework.orm.jpa.vendor.HibernateJpaDialect;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.TransactionDefinition;
|
||||
|
@ -111,7 +110,6 @@ import java.util.UUID;
|
|||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
|
||||
|
@ -123,7 +121,6 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
|
|||
public static final Integer INTEGER_0 = 0;
|
||||
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class);
|
||||
private final ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>();
|
||||
private final ExecutorService myExecutor;
|
||||
@Autowired
|
||||
private FhirContext myContext;
|
||||
@Autowired
|
||||
|
@ -162,8 +159,13 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
|
|||
* Constructor
|
||||
*/
|
||||
@Autowired
|
||||
public SearchCoordinatorSvcImpl(ThreadPoolTaskExecutor searchCoordinatorThreadFactory) {
|
||||
myExecutor = searchCoordinatorThreadFactory.getThreadPoolExecutor();
|
||||
public SearchCoordinatorSvcImpl() {
|
||||
super();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Set<String> getActiveSearchIds() {
|
||||
return myIdToSearchTask.keySet();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -274,7 +276,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
|
|||
RequestPartitionId requestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequestForSearchType(theRequestDetails, resourceType, params, null);
|
||||
SearchContinuationTask task = new SearchContinuationTask(search, resourceDao, params, resourceType, theRequestDetails, requestPartitionId);
|
||||
myIdToSearchTask.put(search.getUuid(), task);
|
||||
myExecutor.submit(task);
|
||||
task.call();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -406,7 +408,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
|
|||
|
||||
SearchTask task = new SearchTask(theSearch, theCallingDao, theParams, theResourceType, theRequestDetails, theRequestPartitionId);
|
||||
myIdToSearchTask.put(theSearch.getUuid(), task);
|
||||
myExecutor.submit(task);
|
||||
task.call();
|
||||
|
||||
PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage(theRequestDetails, theSearch, task, theSb);
|
||||
|
||||
|
|
|
@ -929,7 +929,7 @@ public class FhirResourceDaoR4LegacySearchBuilderTest extends BaseJpaR4Test {
|
|||
List<IIdType> actual = toUnqualifiedVersionlessIds(resp);
|
||||
myCaptureQueriesListener.logSelectQueriesForCurrentThread();
|
||||
assertThat(actual, containsInAnyOrder(orgId, medId, patId, moId, patId2));
|
||||
assertEquals(1, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size());
|
||||
assertEquals(8, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size());
|
||||
|
||||
// Specific patient ID with linked stuff
|
||||
request = mock(HttpServletRequest.class);
|
||||
|
|
|
@ -13,6 +13,8 @@ import org.junit.jupiter.api.AfterEach;
|
|||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||
|
||||
|
@ -30,6 +32,7 @@ import static org.mockito.Mockito.when;
|
|||
@ExtendWith(SpringExtension.class)
|
||||
public class FhirResourceDaoR4SearchLastNAsyncIT extends BaseR4SearchLastN {
|
||||
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(FhirResourceDaoR4SearchLastNAsyncIT.class);
|
||||
@Autowired
|
||||
protected DaoConfig myDaoConfig;
|
||||
private List<Integer> originalPreFetchThresholds;
|
||||
|
@ -108,8 +111,11 @@ public class FhirResourceDaoR4SearchLastNAsyncIT extends BaseR4SearchLastN {
|
|||
.map(t -> t.getSql(true, false))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
ourLog.info("Queries:\n * " + String.join("\n * ", queries));
|
||||
|
||||
// 3 queries to actually perform the search
|
||||
// 1 query to lookup up Search from cache, and 2 chunked queries to retrieve resources by PID.
|
||||
assertEquals(3, queries.size());
|
||||
assertEquals(6, queries.size());
|
||||
|
||||
// The first chunked query should have a full complement of PIDs
|
||||
StringBuilder firstQueryPattern = new StringBuilder(".*RES_ID in \\('[0-9]+'");
|
||||
|
@ -117,7 +123,7 @@ public class FhirResourceDaoR4SearchLastNAsyncIT extends BaseR4SearchLastN {
|
|||
firstQueryPattern.append(" , '[0-9]+'");
|
||||
}
|
||||
firstQueryPattern.append("\\).*");
|
||||
assertThat(queries.get(1), matchesPattern(firstQueryPattern.toString()));
|
||||
assertThat(queries.get(4), matchesPattern(firstQueryPattern.toString()));
|
||||
|
||||
// the second chunked query should be padded with "-1".
|
||||
StringBuilder secondQueryPattern = new StringBuilder(".*RES_ID in \\('[0-9]+'");
|
||||
|
@ -128,7 +134,7 @@ public class FhirResourceDaoR4SearchLastNAsyncIT extends BaseR4SearchLastN {
|
|||
secondQueryPattern.append(" , '-1'");
|
||||
}
|
||||
secondQueryPattern.append("\\).*");
|
||||
assertThat(queries.get(2), matchesPattern(secondQueryPattern.toString()));
|
||||
assertThat(queries.get(5), matchesPattern(secondQueryPattern.toString()));
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -781,7 +781,7 @@ public class FhirResourceDaoR4SearchNoFtTest extends BaseJpaR4Test {
|
|||
List<IIdType> actual = toUnqualifiedVersionlessIds(resp);
|
||||
myCaptureQueriesListener.logSelectQueriesForCurrentThread();
|
||||
assertThat(actual, containsInAnyOrder(orgId, medId, patId, moId, patId2));
|
||||
assertEquals(1, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size());
|
||||
assertEquals(7, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size());
|
||||
|
||||
// Specific patient ID with linked stuff
|
||||
request = mock(HttpServletRequest.class);
|
||||
|
|
|
@ -1177,9 +1177,9 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
|
|||
assertEquals(1, myCaptureQueriesListener.countUpdateQueries());
|
||||
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
|
||||
|
||||
assertEquals(2, myCaptureQueriesListener.countSelectQueriesForCurrentThread());
|
||||
assertEquals(0, myCaptureQueriesListener.countInsertQueriesForCurrentThread());
|
||||
assertEquals(0, myCaptureQueriesListener.countUpdateQueriesForCurrentThread());
|
||||
assertEquals(4, myCaptureQueriesListener.countSelectQueriesForCurrentThread());
|
||||
assertEquals(9, myCaptureQueriesListener.countInsertQueriesForCurrentThread());
|
||||
assertEquals(1, myCaptureQueriesListener.countUpdateQueriesForCurrentThread());
|
||||
assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread());
|
||||
|
||||
}
|
||||
|
|
|
@ -1627,9 +1627,6 @@ public class ResourceProviderDstu2Test extends BaseResourceProviderDstu2Test {
|
|||
.execute();
|
||||
|
||||
assertEquals(10, response.getEntry().size());
|
||||
if (ourConnectionPoolSize > 1) {
|
||||
assertEquals(null, response.getTotalElement().getValueAsString(), "Total should be null but was " + response.getTotalElement().getValueAsString() + " in " + sw.toString());
|
||||
}
|
||||
assertThat(response.getLink("next").getUrl(), not(emptyString()));
|
||||
|
||||
// Load page 2
|
||||
|
|
|
@ -179,9 +179,12 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test {
|
|||
myFhirCtx.getRestfulClientFactory().setSocketTimeout(400000);
|
||||
|
||||
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(5000, TimeUnit.MILLISECONDS);
|
||||
connectionManager.setMaxTotal(10);
|
||||
connectionManager.setDefaultMaxPerRoute(10);
|
||||
HttpClientBuilder builder = HttpClientBuilder.create();
|
||||
builder.setConnectionManager(connectionManager);
|
||||
builder.setMaxConnPerRoute(99);
|
||||
|
||||
ourHttpClient = builder.build();
|
||||
|
||||
ourServer = server;
|
||||
|
|
|
@ -0,0 +1,202 @@
|
|||
package ca.uhn.fhir.jpa.provider.r4;
|
||||
|
||||
import ca.uhn.fhir.interceptor.api.Hook;
|
||||
import ca.uhn.fhir.interceptor.api.Interceptor;
|
||||
import ca.uhn.fhir.interceptor.api.Pointcut;
|
||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.hl7.fhir.r4.model.Bundle;
|
||||
import org.hl7.fhir.r4.model.Patient;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
@SuppressWarnings("Duplicates")
|
||||
public class ResourceProviderConcurrencyR4Test extends BaseResourceProviderR4Test {
|
||||
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(ResourceProviderConcurrencyR4Test.class);
|
||||
private ExecutorService myExecutor;
|
||||
private List<String> myReceivedNames = Collections.synchronizedList(new ArrayList<>());
|
||||
private List<Throwable> myExceptions = Collections.synchronizedList(new ArrayList<>());
|
||||
|
||||
@Override
|
||||
@BeforeEach
|
||||
public void before() throws Exception {
|
||||
super.before();
|
||||
|
||||
myExecutor = Executors.newFixedThreadPool(10);
|
||||
myReceivedNames.clear();
|
||||
myExceptions.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
@AfterEach
|
||||
public void after() throws Exception {
|
||||
super.after();
|
||||
|
||||
myInterceptorRegistry.unregisterInterceptorsIf(t -> t instanceof SearchBlockingInterceptor);
|
||||
myExecutor.shutdown();
|
||||
|
||||
assertThat(myExceptions, empty());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test is intended to verify that we are in fact executing searches in parallel
|
||||
* when two different searches come in.
|
||||
*
|
||||
* We execute two identical searches (which should result in only one actual
|
||||
* execution that will be reused by both) and one other search. We use an
|
||||
* interceptor to artifically delay the execution of the first search in order
|
||||
* to verify that the last search completes before the first one can finish.
|
||||
*/
|
||||
@Test
|
||||
public void testSearchesExecuteConcurrently() {
|
||||
createPatient(withFamily("FAMILY1"));
|
||||
createPatient(withFamily("FAMILY2"));
|
||||
createPatient(withFamily("FAMILY3"));
|
||||
|
||||
SearchBlockingInterceptor searchBlockingInterceptorFamily1 = new SearchBlockingInterceptor("FAMILY1");
|
||||
myInterceptorRegistry.registerInterceptor(searchBlockingInterceptorFamily1);
|
||||
|
||||
// Submit search 1 (should block because of interceptor semaphore)
|
||||
{
|
||||
String uri = ourServerBase + "/Patient?_format=json&family=FAMILY1";
|
||||
ourLog.info("Submitting GET " + uri);
|
||||
HttpGet get = new HttpGet(uri);
|
||||
myExecutor.submit(() -> {
|
||||
try (CloseableHttpResponse outcome = ourHttpClient.execute(get)) {
|
||||
assertEquals(200, outcome.getStatusLine().getStatusCode());
|
||||
String outcomeString = IOUtils.toString(outcome.getEntity().getContent(), StandardCharsets.UTF_8);
|
||||
Bundle bundle = myFhirCtx.newJsonParser().parseResource(Bundle.class, outcomeString);
|
||||
assertEquals(1, bundle.getEntry().size());
|
||||
Patient pt = (Patient) bundle.getEntry().get(0).getResource();
|
||||
String family = pt.getNameFirstRep().getFamily();
|
||||
ourLog.info("Received response with family name: {}", family);
|
||||
myReceivedNames.add(family);
|
||||
} catch (Exception e) {
|
||||
ourLog.error("Client failure", e);
|
||||
myExceptions.add(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
await().until(() -> searchBlockingInterceptorFamily1.getHits(), equalTo(1));
|
||||
|
||||
// Submit search 2 (should also block because it will reuse the first search - same name being searched)
|
||||
{
|
||||
String uri = ourServerBase + "/Patient?_format=json&family=FAMILY1";
|
||||
HttpGet get = new HttpGet(uri);
|
||||
myExecutor.submit(() -> {
|
||||
ourLog.info("Submitting GET " + uri);
|
||||
try (CloseableHttpResponse outcome = ourHttpClient.execute(get)) {
|
||||
assertEquals(200, outcome.getStatusLine().getStatusCode());
|
||||
String outcomeString = IOUtils.toString(outcome.getEntity().getContent(), StandardCharsets.UTF_8);
|
||||
Bundle bundle = myFhirCtx.newJsonParser().parseResource(Bundle.class, outcomeString);
|
||||
assertEquals(1, bundle.getEntry().size());
|
||||
Patient pt = (Patient) bundle.getEntry().get(0).getResource();
|
||||
String family = pt.getNameFirstRep().getFamily();
|
||||
ourLog.info("Received response with family name: {}", family);
|
||||
myReceivedNames.add(family);
|
||||
} catch (Exception e) {
|
||||
ourLog.error("Client failure", e);
|
||||
myExceptions.add(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Submit search 3 (should not block - different name being searched, so it should actually finish first)
|
||||
{
|
||||
String uri = ourServerBase + "/Patient?_format=json&family=FAMILY3";
|
||||
HttpGet get = new HttpGet(uri);
|
||||
myExecutor.submit(() -> {
|
||||
ourLog.info("Submitting GET " + uri);
|
||||
try (CloseableHttpResponse outcome = ourHttpClient.execute(get)) {
|
||||
assertEquals(200, outcome.getStatusLine().getStatusCode());
|
||||
String outcomeString = IOUtils.toString(outcome.getEntity().getContent(), StandardCharsets.UTF_8);
|
||||
Bundle bundle = myFhirCtx.newJsonParser().parseResource(Bundle.class, outcomeString);
|
||||
assertEquals(1, bundle.getEntry().size());
|
||||
Patient pt = (Patient) bundle.getEntry().get(0).getResource();
|
||||
String family = pt.getNameFirstRep().getFamily();
|
||||
ourLog.info("Received response with family name: {}", family);
|
||||
myReceivedNames.add(family);
|
||||
} catch (Exception e) {
|
||||
ourLog.error("Client failure", e);
|
||||
myExceptions.add(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
ourLog.info("About to wait for FAMILY3 to complete");
|
||||
await().until(() -> myReceivedNames, contains("FAMILY3"));
|
||||
ourLog.info("Got FAMILY3");
|
||||
|
||||
searchBlockingInterceptorFamily1.getLatch().countDown();
|
||||
|
||||
ourLog.info("About to wait for FAMILY1 to complete");
|
||||
await().until(() -> myReceivedNames, contains("FAMILY3", "FAMILY1", "FAMILY1"));
|
||||
ourLog.info("Got FAMILY1");
|
||||
|
||||
assertEquals(1, searchBlockingInterceptorFamily1.getHits());
|
||||
}
|
||||
|
||||
|
||||
@Interceptor
|
||||
public static class SearchBlockingInterceptor {
|
||||
|
||||
private final CountDownLatch myLatch = new CountDownLatch(1);
|
||||
private final String myFamily;
|
||||
private AtomicInteger myHits = new AtomicInteger(0);
|
||||
|
||||
SearchBlockingInterceptor(String theFamily) {
|
||||
myFamily = theFamily;
|
||||
}
|
||||
|
||||
@Hook(Pointcut.JPA_PERFTRACE_SEARCH_FIRST_RESULT_LOADED)
|
||||
public void firstResultLoaded(RequestDetails theRequestDetails) {
|
||||
String family = theRequestDetails.getParameters().get("family")[0];
|
||||
ourLog.info("See a hit for: {}", family);
|
||||
if (family.equals(myFamily)) {
|
||||
try {
|
||||
myHits.incrementAndGet();
|
||||
if (!myLatch.await(1, TimeUnit.MINUTES)) {
|
||||
throw new InternalErrorException("Timed out waiting for " + Pointcut.JPA_PERFTRACE_SEARCH_FIRST_RESULT_LOADED);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new InternalErrorException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Integer getHits() {
|
||||
return myHits.get();
|
||||
}
|
||||
|
||||
public CountDownLatch getLatch() {
|
||||
return myLatch;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -4808,6 +4808,7 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
|
|||
}
|
||||
|
||||
@Test
|
||||
@Disabled("Not useful with the search coordinator thread pool removed")
|
||||
public void testSearchWithCountNotSet() {
|
||||
mySearchCoordinatorSvcRaw.setSyncSizeForUnitTests(1);
|
||||
mySearchCoordinatorSvcRaw.setLoadingThrottleForUnitTests(200);
|
||||
|
@ -4876,6 +4877,7 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
|
|||
}
|
||||
|
||||
@Test
|
||||
@Disabled("Not useful with the search coordinator thread pool removed")
|
||||
public void testSearchWithCountSearchResultsUpTo5() {
|
||||
mySearchCoordinatorSvcRaw.setSyncSizeForUnitTests(1);
|
||||
mySearchCoordinatorSvcRaw.setLoadingThrottleForUnitTests(200);
|
||||
|
|
|
@ -6,7 +6,6 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
|||
import ca.uhn.fhir.jpa.api.config.DaoConfig;
|
||||
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
|
||||
import ca.uhn.fhir.jpa.config.dstu3.BaseDstu3Config;
|
||||
import ca.uhn.fhir.jpa.dao.IResultIterator;
|
||||
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
|
||||
import ca.uhn.fhir.jpa.dao.LegacySearchBuilder;
|
||||
|
@ -29,6 +28,7 @@ import ca.uhn.fhir.rest.param.StringParam;
|
|||
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
||||
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
@ -126,7 +126,7 @@ public class SearchCoordinatorSvcImplTest {
|
|||
|
||||
myCurrentSearch = null;
|
||||
|
||||
mySvc = new SearchCoordinatorSvcImpl(new BaseDstu3Config().searchCoordinatorThreadFactory());
|
||||
mySvc = new SearchCoordinatorSvcImpl();
|
||||
mySvc.setEntityManagerForUnitTest(myEntityManager);
|
||||
mySvc.setTransactionManagerForUnitTest(myTxManager);
|
||||
mySvc.setContextForUnitTest(ourCtx);
|
||||
|
@ -174,12 +174,8 @@ public class SearchCoordinatorSvcImplTest {
|
|||
IResultIterator iter = new FailAfterNIterator(new SlowIterator(pids.iterator(), 2), 300);
|
||||
when(mySearchBuilder.createQuery(same(params), any(), any(), nullable(RequestPartitionId.class))).thenReturn(iter);
|
||||
|
||||
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
|
||||
assertNotNull(result.getUuid());
|
||||
assertEquals(null, result.size());
|
||||
|
||||
try {
|
||||
result.getResources(0, 100000);
|
||||
mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
|
||||
} catch (InternalErrorException e) {
|
||||
assertThat(e.getMessage(), containsString("FAILED"));
|
||||
assertThat(e.getMessage(), containsString("at ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImplTest"));
|
||||
|
@ -220,7 +216,7 @@ public class SearchCoordinatorSvcImplTest {
|
|||
|
||||
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
|
||||
assertNotNull(result.getUuid());
|
||||
assertEquals(null, result.size());
|
||||
assertEquals(790, result.size());
|
||||
|
||||
List<IBaseResource> resources = result.getResources(0, 100000);
|
||||
assertEquals(790, resources.size());
|
||||
|
@ -297,7 +293,7 @@ public class SearchCoordinatorSvcImplTest {
|
|||
|
||||
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
|
||||
assertNotNull(result.getUuid());
|
||||
assertEquals(null, result.size());
|
||||
assertEquals(790, result.size());
|
||||
|
||||
List<IBaseResource> resources;
|
||||
|
||||
|
@ -337,14 +333,17 @@ public class SearchCoordinatorSvcImplTest {
|
|||
SlowIterator iter = new SlowIterator(pids.iterator(), 500);
|
||||
when(mySearchBuilder.createQuery(same(params), any(), any(), nullable(RequestPartitionId.class))).thenReturn(iter);
|
||||
|
||||
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
|
||||
assertNotNull(result.getUuid());
|
||||
ourLog.info("Registering the first search");
|
||||
new Thread(() -> mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions())).start();
|
||||
await().until(()->iter.getCountReturned(), Matchers.greaterThan(0));
|
||||
|
||||
String searchId = mySvc.getActiveSearchIds().iterator().next();
|
||||
CountDownLatch completionLatch = new CountDownLatch(1);
|
||||
Runnable taskStarter = () -> {
|
||||
try {
|
||||
assertNotNull(searchId);
|
||||
ourLog.info("About to pull the first resource");
|
||||
List<IBaseResource> resources = result.getResources(0, 1);
|
||||
List<ResourcePersistentId> resources = mySvc.getResources(searchId, 0, 1, null);
|
||||
ourLog.info("Done pulling the first resource");
|
||||
assertEquals(1, resources.size());
|
||||
} finally {
|
||||
|
@ -360,10 +359,9 @@ public class SearchCoordinatorSvcImplTest {
|
|||
ourLog.info("Done cancelling all searches");
|
||||
|
||||
try {
|
||||
result.getResources(10, 20);
|
||||
} catch (InternalErrorException e) {
|
||||
assertThat(e.getMessage(), containsString("Abort has been requested"));
|
||||
assertThat(e.getMessage(), containsString("at ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImpl"));
|
||||
mySvc.getResources(searchId, 0, 1, null);
|
||||
} catch (ResourceGoneException e) {
|
||||
// good
|
||||
}
|
||||
|
||||
completionLatch.await(10, TimeUnit.SECONDS);
|
||||
|
@ -391,7 +389,7 @@ public class SearchCoordinatorSvcImplTest {
|
|||
|
||||
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
|
||||
assertNotNull(result.getUuid());
|
||||
assertEquals(null, result.size());
|
||||
assertEquals(790, result.size());
|
||||
|
||||
ArgumentCaptor<Search> searchCaptor = ArgumentCaptor.forClass(Search.class);
|
||||
verify(mySearchCacheSvc, atLeast(1)).save(searchCaptor.capture());
|
||||
|
@ -402,23 +400,11 @@ public class SearchCoordinatorSvcImplTest {
|
|||
PersistedJpaBundleProvider provider;
|
||||
|
||||
resources = result.getResources(0, 10);
|
||||
assertNull(result.size());
|
||||
assertEquals(790, result.size());
|
||||
assertEquals(10, resources.size());
|
||||
assertEquals("10", resources.get(0).getIdElement().getValueAsString());
|
||||
assertEquals("19", resources.get(9).getIdElement().getValueAsString());
|
||||
|
||||
when(mySearchCacheSvc.fetchByUuid(eq(result.getUuid()))).thenReturn(Optional.of(search));
|
||||
|
||||
/*
|
||||
* Now call from a new bundle provider. This simulates a separate HTTP
|
||||
* client request coming in.
|
||||
*/
|
||||
provider = newPersistedJpaBundleProvider(result.getUuid());
|
||||
resources = provider.getResources(10, 20);
|
||||
assertEquals(10, resources.size());
|
||||
assertEquals("20", resources.get(0).getIdElement().getValueAsString());
|
||||
assertEquals("29", resources.get(9).getIdElement().getValueAsString());
|
||||
|
||||
myExpectedNumberOfSearchBuildersCreated = 4;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue