Merge branch 'master' into issue-2995-valuesetcomposeincludeversion-is-not-set-when-uploading-terminology

This commit is contained in:
jmarchionatto 2021-09-17 11:32:18 -04:00 committed by GitHub
commit 115056800c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 265 additions and 64 deletions

View File

@ -0,0 +1,6 @@
---
type: change
issue: 2991
title: "This PR eliminates the search coordinator threadpool, and executes searches synchronously on the HTTP client
thread. The idea of using a separate pool was supposed to help improve server scalability, but ultimately created
false bottlenecks and reduced the utility of monitoring infrastructure so it has been eliminated."

View File

@ -380,17 +380,6 @@ public abstract class BaseConfig {
return new TermConceptMappingSvcImpl(); return new TermConceptMappingSvcImpl();
} }
@Bean
public ThreadPoolTaskExecutor searchCoordinatorThreadFactory() {
final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("search_coord_");
threadPoolTaskExecutor.setCorePoolSize(searchCoordCorePoolSize);
threadPoolTaskExecutor.setMaxPoolSize(searchCoordMaxPoolSize);
threadPoolTaskExecutor.setQueueCapacity(searchCoordQueueCapacity);
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
@Bean @Bean
public TaskScheduler taskScheduler() { public TaskScheduler taskScheduler() {
ConcurrentTaskScheduler retVal = new ConcurrentTaskScheduler(); ConcurrentTaskScheduler retVal = new ConcurrentTaskScheduler();
@ -851,8 +840,8 @@ public abstract class BaseConfig {
} }
@Bean @Bean
public ISearchCoordinatorSvc searchCoordinatorSvc(ThreadPoolTaskExecutor searchCoordinatorThreadFactory) { public ISearchCoordinatorSvc searchCoordinatorSvc() {
return new SearchCoordinatorSvcImpl(searchCoordinatorThreadFactory); return new SearchCoordinatorSvcImpl();
} }
@Bean @Bean

View File

@ -82,7 +82,6 @@ import org.springframework.data.domain.Sort;
import org.springframework.orm.jpa.JpaDialect; import org.springframework.orm.jpa.JpaDialect;
import org.springframework.orm.jpa.JpaTransactionManager; import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.vendor.HibernateJpaDialect; import org.springframework.orm.jpa.vendor.HibernateJpaDialect;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionDefinition;
@ -111,7 +110,6 @@ import java.util.UUID;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
@ -123,7 +121,6 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
public static final Integer INTEGER_0 = 0; public static final Integer INTEGER_0 = 0;
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class); private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class);
private final ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>();
private final ExecutorService myExecutor;
@Autowired @Autowired
private FhirContext myContext; private FhirContext myContext;
@Autowired @Autowired
@ -162,8 +159,13 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
* Constructor * Constructor
*/ */
@Autowired @Autowired
public SearchCoordinatorSvcImpl(ThreadPoolTaskExecutor searchCoordinatorThreadFactory) { public SearchCoordinatorSvcImpl() {
myExecutor = searchCoordinatorThreadFactory.getThreadPoolExecutor(); super();
}
@VisibleForTesting
Set<String> getActiveSearchIds() {
return myIdToSearchTask.keySet();
} }
@VisibleForTesting @VisibleForTesting
@ -274,7 +276,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
RequestPartitionId requestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequestForSearchType(theRequestDetails, resourceType, params, null); RequestPartitionId requestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequestForSearchType(theRequestDetails, resourceType, params, null);
SearchContinuationTask task = new SearchContinuationTask(search, resourceDao, params, resourceType, theRequestDetails, requestPartitionId); SearchContinuationTask task = new SearchContinuationTask(search, resourceDao, params, resourceType, theRequestDetails, requestPartitionId);
myIdToSearchTask.put(search.getUuid(), task); myIdToSearchTask.put(search.getUuid(), task);
myExecutor.submit(task); task.call();
} }
} }
@ -406,7 +408,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
SearchTask task = new SearchTask(theSearch, theCallingDao, theParams, theResourceType, theRequestDetails, theRequestPartitionId); SearchTask task = new SearchTask(theSearch, theCallingDao, theParams, theResourceType, theRequestDetails, theRequestPartitionId);
myIdToSearchTask.put(theSearch.getUuid(), task); myIdToSearchTask.put(theSearch.getUuid(), task);
myExecutor.submit(task); task.call();
PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage(theRequestDetails, theSearch, task, theSb); PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage(theRequestDetails, theSearch, task, theSb);

View File

@ -929,7 +929,7 @@ public class FhirResourceDaoR4LegacySearchBuilderTest extends BaseJpaR4Test {
List<IIdType> actual = toUnqualifiedVersionlessIds(resp); List<IIdType> actual = toUnqualifiedVersionlessIds(resp);
myCaptureQueriesListener.logSelectQueriesForCurrentThread(); myCaptureQueriesListener.logSelectQueriesForCurrentThread();
assertThat(actual, containsInAnyOrder(orgId, medId, patId, moId, patId2)); assertThat(actual, containsInAnyOrder(orgId, medId, patId, moId, patId2));
assertEquals(1, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size()); assertEquals(8, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size());
// Specific patient ID with linked stuff // Specific patient ID with linked stuff
request = mock(HttpServletRequest.class); request = mock(HttpServletRequest.class);

View File

@ -13,6 +13,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.test.context.junit.jupiter.SpringExtension;
@ -30,6 +32,7 @@ import static org.mockito.Mockito.when;
@ExtendWith(SpringExtension.class) @ExtendWith(SpringExtension.class)
public class FhirResourceDaoR4SearchLastNAsyncIT extends BaseR4SearchLastN { public class FhirResourceDaoR4SearchLastNAsyncIT extends BaseR4SearchLastN {
private static final Logger ourLog = LoggerFactory.getLogger(FhirResourceDaoR4SearchLastNAsyncIT.class);
@Autowired @Autowired
protected DaoConfig myDaoConfig; protected DaoConfig myDaoConfig;
private List<Integer> originalPreFetchThresholds; private List<Integer> originalPreFetchThresholds;
@ -108,8 +111,11 @@ public class FhirResourceDaoR4SearchLastNAsyncIT extends BaseR4SearchLastN {
.map(t -> t.getSql(true, false)) .map(t -> t.getSql(true, false))
.collect(Collectors.toList()); .collect(Collectors.toList());
ourLog.info("Queries:\n * " + String.join("\n * ", queries));
// 3 queries to actually perform the search
// 1 query to lookup up Search from cache, and 2 chunked queries to retrieve resources by PID. // 1 query to lookup up Search from cache, and 2 chunked queries to retrieve resources by PID.
assertEquals(3, queries.size()); assertEquals(6, queries.size());
// The first chunked query should have a full complement of PIDs // The first chunked query should have a full complement of PIDs
StringBuilder firstQueryPattern = new StringBuilder(".*RES_ID in \\('[0-9]+'"); StringBuilder firstQueryPattern = new StringBuilder(".*RES_ID in \\('[0-9]+'");
@ -117,7 +123,7 @@ public class FhirResourceDaoR4SearchLastNAsyncIT extends BaseR4SearchLastN {
firstQueryPattern.append(" , '[0-9]+'"); firstQueryPattern.append(" , '[0-9]+'");
} }
firstQueryPattern.append("\\).*"); firstQueryPattern.append("\\).*");
assertThat(queries.get(1), matchesPattern(firstQueryPattern.toString())); assertThat(queries.get(4), matchesPattern(firstQueryPattern.toString()));
// the second chunked query should be padded with "-1". // the second chunked query should be padded with "-1".
StringBuilder secondQueryPattern = new StringBuilder(".*RES_ID in \\('[0-9]+'"); StringBuilder secondQueryPattern = new StringBuilder(".*RES_ID in \\('[0-9]+'");
@ -128,7 +134,7 @@ public class FhirResourceDaoR4SearchLastNAsyncIT extends BaseR4SearchLastN {
secondQueryPattern.append(" , '-1'"); secondQueryPattern.append(" , '-1'");
} }
secondQueryPattern.append("\\).*"); secondQueryPattern.append("\\).*");
assertThat(queries.get(2), matchesPattern(secondQueryPattern.toString())); assertThat(queries.get(5), matchesPattern(secondQueryPattern.toString()));
} }

View File

@ -781,7 +781,7 @@ public class FhirResourceDaoR4SearchNoFtTest extends BaseJpaR4Test {
List<IIdType> actual = toUnqualifiedVersionlessIds(resp); List<IIdType> actual = toUnqualifiedVersionlessIds(resp);
myCaptureQueriesListener.logSelectQueriesForCurrentThread(); myCaptureQueriesListener.logSelectQueriesForCurrentThread();
assertThat(actual, containsInAnyOrder(orgId, medId, patId, moId, patId2)); assertThat(actual, containsInAnyOrder(orgId, medId, patId, moId, patId2));
assertEquals(1, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size()); assertEquals(7, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size());
// Specific patient ID with linked stuff // Specific patient ID with linked stuff
request = mock(HttpServletRequest.class); request = mock(HttpServletRequest.class);

View File

@ -1177,9 +1177,9 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
assertEquals(1, myCaptureQueriesListener.countUpdateQueries()); assertEquals(1, myCaptureQueriesListener.countUpdateQueries());
assertEquals(0, myCaptureQueriesListener.countDeleteQueries()); assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
assertEquals(2, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); assertEquals(4, myCaptureQueriesListener.countSelectQueriesForCurrentThread());
assertEquals(0, myCaptureQueriesListener.countInsertQueriesForCurrentThread()); assertEquals(9, myCaptureQueriesListener.countInsertQueriesForCurrentThread());
assertEquals(0, myCaptureQueriesListener.countUpdateQueriesForCurrentThread()); assertEquals(1, myCaptureQueriesListener.countUpdateQueriesForCurrentThread());
assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread()); assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread());
} }

View File

@ -1627,9 +1627,6 @@ public class ResourceProviderDstu2Test extends BaseResourceProviderDstu2Test {
.execute(); .execute();
assertEquals(10, response.getEntry().size()); assertEquals(10, response.getEntry().size());
if (ourConnectionPoolSize > 1) {
assertEquals(null, response.getTotalElement().getValueAsString(), "Total should be null but was " + response.getTotalElement().getValueAsString() + " in " + sw.toString());
}
assertThat(response.getLink("next").getUrl(), not(emptyString())); assertThat(response.getLink("next").getUrl(), not(emptyString()));
// Load page 2 // Load page 2

View File

@ -179,9 +179,12 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test {
myFhirCtx.getRestfulClientFactory().setSocketTimeout(400000); myFhirCtx.getRestfulClientFactory().setSocketTimeout(400000);
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(5000, TimeUnit.MILLISECONDS); PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(5000, TimeUnit.MILLISECONDS);
connectionManager.setMaxTotal(10);
connectionManager.setDefaultMaxPerRoute(10);
HttpClientBuilder builder = HttpClientBuilder.create(); HttpClientBuilder builder = HttpClientBuilder.create();
builder.setConnectionManager(connectionManager); builder.setConnectionManager(connectionManager);
builder.setMaxConnPerRoute(99); builder.setMaxConnPerRoute(99);
ourHttpClient = builder.build(); ourHttpClient = builder.build();
ourServer = server; ourServer = server;

View File

@ -0,0 +1,202 @@
package ca.uhn.fhir.jpa.provider.r4;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
@SuppressWarnings("Duplicates")
public class ResourceProviderConcurrencyR4Test extends BaseResourceProviderR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(ResourceProviderConcurrencyR4Test.class);
private ExecutorService myExecutor;
private List<String> myReceivedNames = Collections.synchronizedList(new ArrayList<>());
private List<Throwable> myExceptions = Collections.synchronizedList(new ArrayList<>());
@Override
@BeforeEach
public void before() throws Exception {
super.before();
myExecutor = Executors.newFixedThreadPool(10);
myReceivedNames.clear();
myExceptions.clear();
}
@Override
@AfterEach
public void after() throws Exception {
super.after();
myInterceptorRegistry.unregisterInterceptorsIf(t -> t instanceof SearchBlockingInterceptor);
myExecutor.shutdown();
assertThat(myExceptions, empty());
}
/**
* This test is intended to verify that we are in fact executing searches in parallel
* when two different searches come in.
*
* We execute two identical searches (which should result in only one actual
* execution that will be reused by both) and one other search. We use an
* interceptor to artifically delay the execution of the first search in order
* to verify that the last search completes before the first one can finish.
*/
@Test
public void testSearchesExecuteConcurrently() {
createPatient(withFamily("FAMILY1"));
createPatient(withFamily("FAMILY2"));
createPatient(withFamily("FAMILY3"));
SearchBlockingInterceptor searchBlockingInterceptorFamily1 = new SearchBlockingInterceptor("FAMILY1");
myInterceptorRegistry.registerInterceptor(searchBlockingInterceptorFamily1);
// Submit search 1 (should block because of interceptor semaphore)
{
String uri = ourServerBase + "/Patient?_format=json&family=FAMILY1";
ourLog.info("Submitting GET " + uri);
HttpGet get = new HttpGet(uri);
myExecutor.submit(() -> {
try (CloseableHttpResponse outcome = ourHttpClient.execute(get)) {
assertEquals(200, outcome.getStatusLine().getStatusCode());
String outcomeString = IOUtils.toString(outcome.getEntity().getContent(), StandardCharsets.UTF_8);
Bundle bundle = myFhirCtx.newJsonParser().parseResource(Bundle.class, outcomeString);
assertEquals(1, bundle.getEntry().size());
Patient pt = (Patient) bundle.getEntry().get(0).getResource();
String family = pt.getNameFirstRep().getFamily();
ourLog.info("Received response with family name: {}", family);
myReceivedNames.add(family);
} catch (Exception e) {
ourLog.error("Client failure", e);
myExceptions.add(e);
}
});
}
await().until(() -> searchBlockingInterceptorFamily1.getHits(), equalTo(1));
// Submit search 2 (should also block because it will reuse the first search - same name being searched)
{
String uri = ourServerBase + "/Patient?_format=json&family=FAMILY1";
HttpGet get = new HttpGet(uri);
myExecutor.submit(() -> {
ourLog.info("Submitting GET " + uri);
try (CloseableHttpResponse outcome = ourHttpClient.execute(get)) {
assertEquals(200, outcome.getStatusLine().getStatusCode());
String outcomeString = IOUtils.toString(outcome.getEntity().getContent(), StandardCharsets.UTF_8);
Bundle bundle = myFhirCtx.newJsonParser().parseResource(Bundle.class, outcomeString);
assertEquals(1, bundle.getEntry().size());
Patient pt = (Patient) bundle.getEntry().get(0).getResource();
String family = pt.getNameFirstRep().getFamily();
ourLog.info("Received response with family name: {}", family);
myReceivedNames.add(family);
} catch (Exception e) {
ourLog.error("Client failure", e);
myExceptions.add(e);
}
});
}
// Submit search 3 (should not block - different name being searched, so it should actually finish first)
{
String uri = ourServerBase + "/Patient?_format=json&family=FAMILY3";
HttpGet get = new HttpGet(uri);
myExecutor.submit(() -> {
ourLog.info("Submitting GET " + uri);
try (CloseableHttpResponse outcome = ourHttpClient.execute(get)) {
assertEquals(200, outcome.getStatusLine().getStatusCode());
String outcomeString = IOUtils.toString(outcome.getEntity().getContent(), StandardCharsets.UTF_8);
Bundle bundle = myFhirCtx.newJsonParser().parseResource(Bundle.class, outcomeString);
assertEquals(1, bundle.getEntry().size());
Patient pt = (Patient) bundle.getEntry().get(0).getResource();
String family = pt.getNameFirstRep().getFamily();
ourLog.info("Received response with family name: {}", family);
myReceivedNames.add(family);
} catch (Exception e) {
ourLog.error("Client failure", e);
myExceptions.add(e);
}
});
}
ourLog.info("About to wait for FAMILY3 to complete");
await().until(() -> myReceivedNames, contains("FAMILY3"));
ourLog.info("Got FAMILY3");
searchBlockingInterceptorFamily1.getLatch().countDown();
ourLog.info("About to wait for FAMILY1 to complete");
await().until(() -> myReceivedNames, contains("FAMILY3", "FAMILY1", "FAMILY1"));
ourLog.info("Got FAMILY1");
assertEquals(1, searchBlockingInterceptorFamily1.getHits());
}
@Interceptor
public static class SearchBlockingInterceptor {
private final CountDownLatch myLatch = new CountDownLatch(1);
private final String myFamily;
private AtomicInteger myHits = new AtomicInteger(0);
SearchBlockingInterceptor(String theFamily) {
myFamily = theFamily;
}
@Hook(Pointcut.JPA_PERFTRACE_SEARCH_FIRST_RESULT_LOADED)
public void firstResultLoaded(RequestDetails theRequestDetails) {
String family = theRequestDetails.getParameters().get("family")[0];
ourLog.info("See a hit for: {}", family);
if (family.equals(myFamily)) {
try {
myHits.incrementAndGet();
if (!myLatch.await(1, TimeUnit.MINUTES)) {
throw new InternalErrorException("Timed out waiting for " + Pointcut.JPA_PERFTRACE_SEARCH_FIRST_RESULT_LOADED);
}
} catch (InterruptedException e) {
throw new InternalErrorException(e);
}
}
}
public Integer getHits() {
return myHits.get();
}
public CountDownLatch getLatch() {
return myLatch;
}
}
}

View File

@ -4808,6 +4808,7 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
} }
@Test @Test
@Disabled("Not useful with the search coordinator thread pool removed")
public void testSearchWithCountNotSet() { public void testSearchWithCountNotSet() {
mySearchCoordinatorSvcRaw.setSyncSizeForUnitTests(1); mySearchCoordinatorSvcRaw.setSyncSizeForUnitTests(1);
mySearchCoordinatorSvcRaw.setLoadingThrottleForUnitTests(200); mySearchCoordinatorSvcRaw.setLoadingThrottleForUnitTests(200);
@ -4876,6 +4877,7 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
} }
@Test @Test
@Disabled("Not useful with the search coordinator thread pool removed")
public void testSearchWithCountSearchResultsUpTo5() { public void testSearchWithCountSearchResultsUpTo5() {
mySearchCoordinatorSvcRaw.setSyncSizeForUnitTests(1); mySearchCoordinatorSvcRaw.setSyncSizeForUnitTests(1);
mySearchCoordinatorSvcRaw.setLoadingThrottleForUnitTests(200); mySearchCoordinatorSvcRaw.setLoadingThrottleForUnitTests(200);

View File

@ -6,7 +6,6 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.config.dstu3.BaseDstu3Config;
import ca.uhn.fhir.jpa.dao.IResultIterator; import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder; import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.LegacySearchBuilder; import ca.uhn.fhir.jpa.dao.LegacySearchBuilder;
@ -29,6 +28,7 @@ import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.hamcrest.Matchers;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@ -126,7 +126,7 @@ public class SearchCoordinatorSvcImplTest {
myCurrentSearch = null; myCurrentSearch = null;
mySvc = new SearchCoordinatorSvcImpl(new BaseDstu3Config().searchCoordinatorThreadFactory()); mySvc = new SearchCoordinatorSvcImpl();
mySvc.setEntityManagerForUnitTest(myEntityManager); mySvc.setEntityManagerForUnitTest(myEntityManager);
mySvc.setTransactionManagerForUnitTest(myTxManager); mySvc.setTransactionManagerForUnitTest(myTxManager);
mySvc.setContextForUnitTest(ourCtx); mySvc.setContextForUnitTest(ourCtx);
@ -174,12 +174,8 @@ public class SearchCoordinatorSvcImplTest {
IResultIterator iter = new FailAfterNIterator(new SlowIterator(pids.iterator(), 2), 300); IResultIterator iter = new FailAfterNIterator(new SlowIterator(pids.iterator(), 2), 300);
when(mySearchBuilder.createQuery(same(params), any(), any(), nullable(RequestPartitionId.class))).thenReturn(iter); when(mySearchBuilder.createQuery(same(params), any(), any(), nullable(RequestPartitionId.class))).thenReturn(iter);
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
assertNotNull(result.getUuid());
assertEquals(null, result.size());
try { try {
result.getResources(0, 100000); mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
} catch (InternalErrorException e) { } catch (InternalErrorException e) {
assertThat(e.getMessage(), containsString("FAILED")); assertThat(e.getMessage(), containsString("FAILED"));
assertThat(e.getMessage(), containsString("at ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImplTest")); assertThat(e.getMessage(), containsString("at ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImplTest"));
@ -220,7 +216,7 @@ public class SearchCoordinatorSvcImplTest {
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions()); IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
assertNotNull(result.getUuid()); assertNotNull(result.getUuid());
assertEquals(null, result.size()); assertEquals(790, result.size());
List<IBaseResource> resources = result.getResources(0, 100000); List<IBaseResource> resources = result.getResources(0, 100000);
assertEquals(790, resources.size()); assertEquals(790, resources.size());
@ -297,7 +293,7 @@ public class SearchCoordinatorSvcImplTest {
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions()); IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
assertNotNull(result.getUuid()); assertNotNull(result.getUuid());
assertEquals(null, result.size()); assertEquals(790, result.size());
List<IBaseResource> resources; List<IBaseResource> resources;
@ -337,16 +333,19 @@ public class SearchCoordinatorSvcImplTest {
SlowIterator iter = new SlowIterator(pids.iterator(), 500); SlowIterator iter = new SlowIterator(pids.iterator(), 500);
when(mySearchBuilder.createQuery(same(params), any(), any(), nullable(RequestPartitionId.class))).thenReturn(iter); when(mySearchBuilder.createQuery(same(params), any(), any(), nullable(RequestPartitionId.class))).thenReturn(iter);
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions()); ourLog.info("Registering the first search");
assertNotNull(result.getUuid()); new Thread(() -> mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions())).start();
await().until(()->iter.getCountReturned(), Matchers.greaterThan(0));
String searchId = mySvc.getActiveSearchIds().iterator().next();
CountDownLatch completionLatch = new CountDownLatch(1); CountDownLatch completionLatch = new CountDownLatch(1);
Runnable taskStarter = () -> { Runnable taskStarter = () -> {
try { try {
assertNotNull(searchId);
ourLog.info("About to pull the first resource"); ourLog.info("About to pull the first resource");
List<IBaseResource> resources = result.getResources(0, 1); List<ResourcePersistentId> resources = mySvc.getResources(searchId, 0, 1, null);
ourLog.info("Done pulling the first resource"); ourLog.info("Done pulling the first resource");
assertEquals(1, resources.size()); assertEquals(1, resources.size());
} finally { } finally {
completionLatch.countDown(); completionLatch.countDown();
} }
@ -360,10 +359,9 @@ public class SearchCoordinatorSvcImplTest {
ourLog.info("Done cancelling all searches"); ourLog.info("Done cancelling all searches");
try { try {
result.getResources(10, 20); mySvc.getResources(searchId, 0, 1, null);
} catch (InternalErrorException e) { } catch (ResourceGoneException e) {
assertThat(e.getMessage(), containsString("Abort has been requested")); // good
assertThat(e.getMessage(), containsString("at ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImpl"));
} }
completionLatch.await(10, TimeUnit.SECONDS); completionLatch.await(10, TimeUnit.SECONDS);
@ -391,7 +389,7 @@ public class SearchCoordinatorSvcImplTest {
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions()); IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
assertNotNull(result.getUuid()); assertNotNull(result.getUuid());
assertEquals(null, result.size()); assertEquals(790, result.size());
ArgumentCaptor<Search> searchCaptor = ArgumentCaptor.forClass(Search.class); ArgumentCaptor<Search> searchCaptor = ArgumentCaptor.forClass(Search.class);
verify(mySearchCacheSvc, atLeast(1)).save(searchCaptor.capture()); verify(mySearchCacheSvc, atLeast(1)).save(searchCaptor.capture());
@ -402,23 +400,11 @@ public class SearchCoordinatorSvcImplTest {
PersistedJpaBundleProvider provider; PersistedJpaBundleProvider provider;
resources = result.getResources(0, 10); resources = result.getResources(0, 10);
assertNull(result.size()); assertEquals(790, result.size());
assertEquals(10, resources.size()); assertEquals(10, resources.size());
assertEquals("10", resources.get(0).getIdElement().getValueAsString()); assertEquals("10", resources.get(0).getIdElement().getValueAsString());
assertEquals("19", resources.get(9).getIdElement().getValueAsString()); assertEquals("19", resources.get(9).getIdElement().getValueAsString());
when(mySearchCacheSvc.fetchByUuid(eq(result.getUuid()))).thenReturn(Optional.of(search));
/*
* Now call from a new bundle provider. This simulates a separate HTTP
* client request coming in.
*/
provider = newPersistedJpaBundleProvider(result.getUuid());
resources = provider.getResources(10, 20);
assertEquals(10, resources.size());
assertEquals("20", resources.get(0).getIdElement().getValueAsString());
assertEquals("29", resources.get(9).getIdElement().getValueAsString());
myExpectedNumberOfSearchBuildersCreated = 4; myExpectedNumberOfSearchBuildersCreated = 4;
} }

View File

@ -176,8 +176,9 @@
</dependency> </dependency>
<dependency> <dependency>
<artifactId>commons-lang</artifactId> <artifactId>commons-lang</artifactId>
<groupId>commons-lang</groupId> <groupId>commons-lang</groupId>
<version>2.5</version> <version>2.5</version>
<scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>net.sf.json-lib</groupId> <groupId>net.sf.json-lib</groupId>

View File

@ -261,6 +261,13 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<artifactId>commons-lang</artifactId>
<groupId>commons-lang</groupId>
<version>2.5</version>
<scope>test</scope>
</dependency>
<!-- Dependencies for Schematron --> <!-- Dependencies for Schematron -->
<dependency> <dependency>
<groupId>com.helger</groupId> <groupId>com.helger</groupId>