Make Hibernate Search Path asynchronous to speed up first bundle (#3535)

* logging prep

* Start async path

* start async scroll

* Limit size to avoid saving too many results

* Allow unbounded results when no limit set.

* Larger default scroll page size

* Cleanup
This commit is contained in:
michaelabuckley 2022-04-18 10:06:46 -04:00 committed by GitHub
parent e0cc325294
commit 51f38eae98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 291 additions and 55 deletions

View File

@ -29,11 +29,13 @@ import ca.uhn.fhir.jpa.dao.search.ExtendedLuceneIndexExtractor;
import ca.uhn.fhir.jpa.dao.search.ExtendedLuceneResourceProjection;
import ca.uhn.fhir.jpa.dao.search.ExtendedLuceneSearchBuilder;
import ca.uhn.fhir.jpa.dao.search.LastNOperation;
import ca.uhn.fhir.jpa.dao.search.SearchScrollQueryExecutorAdaptor;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.model.search.ExtendedLuceneIndexData;
import ca.uhn.fhir.jpa.search.autocomplete.ValueSetAutocompleteOptions;
import ca.uhn.fhir.jpa.search.autocomplete.ValueSetAutocompleteSearch;
import ca.uhn.fhir.jpa.search.builder.ISearchQueryExecutor;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.extractor.ISearchParamExtractor;
import ca.uhn.fhir.jpa.searchparam.extractor.ResourceIndexedSearchParams;
@ -44,6 +46,7 @@ import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
import ca.uhn.fhir.rest.server.util.ResourceSearchParams;
import org.hibernate.search.backend.elasticsearch.ElasticsearchExtension;
import org.hibernate.search.engine.search.query.SearchScroll;
import org.hibernate.search.mapper.orm.Search;
import org.hibernate.search.mapper.orm.session.SearchSession;
import org.hibernate.search.mapper.orm.work.SearchIndexingPlan;
@ -61,7 +64,9 @@ import javax.persistence.PersistenceContextType;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@ -120,14 +125,27 @@ public class FulltextSearchSvcImpl implements IFulltextSearchSvc {
plan.addOrUpdate(theEntity);
}
private List<ResourcePersistentId> doSearch(String theResourceType, SearchParameterMap theParams, ResourcePersistentId theReferencingPid) {
@Override
public ISearchQueryExecutor searchAsync(String theResourceName, SearchParameterMap theParams) {
return doSearch(theResourceName, theParams, null);
}
private ISearchQueryExecutor doSearch(String theResourceType, SearchParameterMap theParams, ResourcePersistentId theReferencingPid) {
// keep this in sync with supportsSomeOf();
SearchSession session = getSearchSession();
List<Long> longPids = session.search(ResourceTable.class)
// Selects are replacements for projection and convert more cleanly than the old implementation.
int scrollSize = 50;
if (theParams.getCount()!=null) {
scrollSize = theParams.getCount();
}
SearchScroll<Long> esResult = session.search(ResourceTable.class)
// The document id is the PK which is pid. We use this instead of _myId to avoid fetching the doc body.
.select(
f -> f.field("myId", Long.class)
// adapt the String docRef.id() to the Long that it really is.
f -> f.composite(
docRef -> Long.valueOf(docRef.id()),
f.documentReference())
)
.where(
f -> f.bool(b -> {
@ -170,9 +188,9 @@ public class FulltextSearchSvcImpl implements IFulltextSearchSvc {
//DROP EARLY HERE IF BOOL IS EMPTY?
})
).fetchAllHits();
).scroll(scrollSize);
return convertLongsToResourcePersistentIds(longPids);
return new SearchScrollQueryExecutorAdaptor(esResult);
}
@Nonnull
@ -190,7 +208,8 @@ public class FulltextSearchSvcImpl implements IFulltextSearchSvc {
public List<ResourcePersistentId> everything(String theResourceName, SearchParameterMap theParams, ResourcePersistentId theReferencingPid) {
List<ResourcePersistentId> retVal = doSearch(null, theParams, theReferencingPid);
// wipmb what about max results here?
List<ResourcePersistentId> retVal = toList(doSearch(null, theParams, theReferencingPid), 10000);
if (theReferencingPid != null) {
retVal.add(theReferencingPid);
}
@ -223,7 +242,17 @@ public class FulltextSearchSvcImpl implements IFulltextSearchSvc {
@Transactional()
@Override
public List<ResourcePersistentId> search(String theResourceName, SearchParameterMap theParams) {
return doSearch(theResourceName, theParams, null);
return toList(doSearch(theResourceName, theParams, null), 500);
}
/**
* Adapt our async interface to the legacy concrete List
*/
private List<ResourcePersistentId> toList(ISearchQueryExecutor theSearchResultStream, long theMaxSize) {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(theSearchResultStream, 0), false)
.map(ResourcePersistentId::new)
.limit(theMaxSize)
.collect(Collectors.toList());
}
@Transactional()
@ -272,7 +301,7 @@ public class FulltextSearchSvcImpl implements IFulltextSearchSvc {
List<ExtendedLuceneResourceProjection> rawResourceDataList = session.search(ResourceTable.class)
.select(
f -> f.composite(
(pid, forcedId, resource)-> new ExtendedLuceneResourceProjection(pid, forcedId, resource),
ExtendedLuceneResourceProjection::new,
f.field("myId", Long.class),
f.field("myForcedId", String.class),
f.field("myRawResource", String.class))

View File

@ -23,6 +23,7 @@ package ca.uhn.fhir.jpa.dao;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.model.search.ExtendedLuceneIndexData;
import ca.uhn.fhir.jpa.search.autocomplete.ValueSetAutocompleteOptions;
import ca.uhn.fhir.jpa.search.builder.ISearchQueryExecutor;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.extractor.ResourceIndexedSearchParams;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
@ -44,6 +45,16 @@ public interface IFulltextSearchSvc {
*/
List<ResourcePersistentId> search(String theResourceName, SearchParameterMap theParams);
/**
* Query the index for a scrollable iterator of results.
* No max size to the result iterator.
*
* @param theResourceName e.g. Patient
* @param theParams The search query
* @return Iterator of result PIDs
*/
ISearchQueryExecutor searchAsync(String theResourceName, SearchParameterMap theParams);
/**
* Autocomplete search for NIH $expand contextDirection=existing
* @param theOptions operation options

View File

@ -0,0 +1,50 @@
package ca.uhn.fhir.jpa.dao.search;
import ca.uhn.fhir.jpa.search.builder.ISearchQueryExecutor;
import org.hibernate.search.engine.backend.common.DocumentReference;
import org.hibernate.search.engine.search.query.SearchScroll;
import org.hibernate.search.engine.search.query.SearchScrollResult;
import java.util.Iterator;
/**
* Adapt Hibernate Search SearchScroll paging result to our ISearchQueryExecutor
*/
public class SearchScrollQueryExecutorAdaptor implements ISearchQueryExecutor {
private final SearchScroll<Long> myScroll;
private Iterator<Long> myCurrentIterator;
public SearchScrollQueryExecutorAdaptor(SearchScroll<Long> theScroll) {
myScroll = theScroll;
advanceNextScrollPage();
}
/**
* Advance one page (i.e. SearchScrollResult).
* Note: the last page will have 0 hits.
*/
private void advanceNextScrollPage() {
SearchScrollResult<Long> scrollResults = myScroll.next();
myCurrentIterator = scrollResults.hits().iterator();
}
@Override
public void close() {
myScroll.close();
}
@Override
public boolean hasNext() {
return myCurrentIterator.hasNext();
}
@Override
public Long next() {
Long result = myCurrentIterator.next();
// was this the last in the current scroll page?
if (!myCurrentIterator.hasNext()) {
advanceNextScrollPage();
}
return result;
}
}

View File

@ -117,9 +117,9 @@ public class PersistedJpaSearchFirstPageBundleProvider extends PersistedJpaBundl
@Override
public Integer size() {
ourLog.trace("Waiting for initial sync");
ourLog.trace("size() - Waiting for initial sync");
Integer size = mySearchTask.awaitInitialSync();
ourLog.trace("Finished waiting for local sync");
ourLog.trace("size() - Finished waiting for local sync");
SearchCoordinatorSvcImpl.verifySearchHasntFailedOrThrowInternalErrorException(mySearch);
if (size != null) {

View File

@ -1068,8 +1068,10 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
doSaveSearch();
ourLog.trace("saveUnsynced() - pre-commit");
}
});
ourLog.trace("saveUnsynced() - post-commit");
}

View File

@ -93,6 +93,7 @@ import ca.uhn.fhir.util.StringUtil;
import ca.uhn.fhir.util.UrlUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Streams;
import com.healthmarketscience.sqlbuilder.Condition;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.math.NumberUtils;
@ -322,17 +323,28 @@ public class SearchBuilder implements ISearchBuilder {
if (checkUseHibernateSearch()) {
// we're going to run at least part of the search against the Fulltext service.
List<ResourcePersistentId> fulltextMatchIds;
// Ugh - we have two different return types for now
ISearchQueryExecutor fulltextExecutor = null;
List<ResourcePersistentId> fulltextMatchIds = null;
int resultCount = 0;
if (myParams.isLastN()) {
fulltextMatchIds = executeLastNAgainstIndex(theMaximumResults);
resultCount = fulltextMatchIds.size();
} else if (myParams.getEverythingMode() != null) {
fulltextMatchIds = queryHibernateSearchForEverythingPids();
resultCount = fulltextMatchIds.size();
} else {
fulltextMatchIds = myFulltextSearchSvc.search(myResourceName, myParams);
fulltextExecutor = myFulltextSearchSvc.searchAsync(myResourceName, myParams);
}
if (fulltextExecutor == null) {
fulltextExecutor = SearchQueryExecutors.from(fulltextMatchIds);
}
if (theSearchRuntimeDetails != null) {
theSearchRuntimeDetails.setFoundIndexMatchesCount(fulltextMatchIds.size());
// wipmb we no longer have full size.
theSearchRuntimeDetails.setFoundIndexMatchesCount(resultCount);
HookParams params = new HookParams()
.add(RequestDetails.class, theRequest)
.addIfMatchesType(ServletRequestDetails.class, theRequest)
@ -340,13 +352,11 @@ public class SearchBuilder implements ISearchBuilder {
CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequest, Pointcut.JPA_PERFTRACE_INDEXSEARCH_QUERY_COMPLETE, params);
}
List<Long> rawPids = ResourcePersistentId.toLongList(fulltextMatchIds);
// wipmb extract
// can we skip the database entirely and return the pid list from here?
boolean canSkipDatabase =
// if we processed an AND clause, and it returned nothing, then nothing can match.
rawPids.isEmpty() ||
!fulltextExecutor.hasNext() ||
// Our hibernate search query doesn't respect partitions yet
(!myPartitionSettings.isPartitioningEnabled() &&
// we don't support _count=0 yet.
@ -363,12 +373,16 @@ public class SearchBuilder implements ISearchBuilder {
);
if (canSkipDatabase) {
queries.add(ResolvedSearchQueryExecutor.from(rawPids));
if (theMaximumResults != null) {
fulltextExecutor = SearchQueryExecutors.limited(fulltextExecutor, theMaximumResults);
}
queries.add(fulltextExecutor);
} else {
// Finish the query in the database for the rest of the search parameters, sorting, partitioning, etc.
// We break the pids into chunks that fit in the 1k limit for jdbc bind params.
// wipmb change chunk to take iterator
new QueryChunker<Long>()
.chunk(rawPids, t -> doCreateChunkedQueries(theParams, t, theOffset, sort, theCount, theRequest, queries));
.chunk(Streams.stream(fulltextExecutor).collect(Collectors.toList()), t -> doCreateChunkedQueries(theParams, t, theOffset, sort, theCount, theRequest, queries));
}
} else {
// do everything in the database.
@ -1384,41 +1398,6 @@ public class SearchBuilder implements ISearchBuilder {
myDaoConfig = theDaoConfig;
}
/**
* Adapt bare Iterator to our internal query interface.
*/
static class ResolvedSearchQueryExecutor implements ISearchQueryExecutor {
private final Iterator<Long> myIterator;
ResolvedSearchQueryExecutor(Iterable<Long> theIterable) {
this(theIterable.iterator());
}
ResolvedSearchQueryExecutor(Iterator<Long> theIterator) {
myIterator = theIterator;
}
@Nonnull
public static ResolvedSearchQueryExecutor from(List<Long> rawPids) {
return new ResolvedSearchQueryExecutor(rawPids);
}
@Override
public boolean hasNext() {
return myIterator.hasNext();
}
@Override
public Long next() {
return myIterator.next();
}
@Override
public void close() {
// empty
}
}
public class IncludesIterator extends BaseIterator<ResourcePersistentId> implements Iterator<ResourcePersistentId> {
private final RequestDetails myRequest;
@ -1643,6 +1622,7 @@ public class SearchBuilder implements ISearchBuilder {
private void initializeIteratorQuery(Integer theOffset, Integer theMaxResultsToFetch) {
if (myQueryList.isEmpty()) {
// wipmb what is this?
// Capture times for Lucene/Elasticsearch queries as well
mySearchRuntimeDetails.setQueryStopwatch(new StopWatch());
myQueryList = createQuery(myParams, mySort, theOffset, theMaxResultsToFetch, false, myRequest, mySearchRuntimeDetails);

View File

@ -0,0 +1,107 @@
package ca.uhn.fhir.jpa.search.builder;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.apache.commons.lang3.Validate;
import javax.annotation.Nonnull;
import java.util.Iterator;
import java.util.List;
public class SearchQueryExecutors {
public static ISearchQueryExecutor limited(ISearchQueryExecutor theExecutor, long theLimit) {
Validate.isTrue(theLimit >= 0, "limit must be non-negative");
return new ISearchQueryExecutor() {
long myCount = 0;
@Override
public void close() {
theExecutor.close();
}
@Override
public boolean hasNext() {
return theExecutor.hasNext() && myCount < theLimit;
}
@Override
public Long next() {
myCount += 1;
return theExecutor.next();
}
};
}
@Nonnull
public static ISearchQueryExecutor from(List<Long> rawPids) {
return new ResolvedSearchQueryExecutor(rawPids);
}
/**
* Adapt bare Iterator to our internal query interface.
*/
static class ResolvedSearchQueryExecutor implements ISearchQueryExecutor {
private final Iterator<Long> myIterator;
ResolvedSearchQueryExecutor(Iterable<Long> theIterable) {
this(theIterable.iterator());
}
ResolvedSearchQueryExecutor(Iterator<Long> theIterator) {
myIterator = theIterator;
}
@Nonnull
public static ResolvedSearchQueryExecutor from(List<Long> rawPids) {
return new ResolvedSearchQueryExecutor(rawPids);
}
@Override
public boolean hasNext() {
return myIterator.hasNext();
}
@Override
public Long next() {
return myIterator.next();
}
@Override
public void close() {
// empty
}
}
static public ISearchQueryExecutor from(Iterator<ResourcePersistentId> theIterator) {
return new ResourcePersistentIdQueryAdaptor(theIterator);
}
static public ISearchQueryExecutor from(Iterable<ResourcePersistentId> theIterable) {
return new ResourcePersistentIdQueryAdaptor(theIterable.iterator());
}
static class ResourcePersistentIdQueryAdaptor implements ISearchQueryExecutor {
final Iterator<ResourcePersistentId> myIterator;
ResourcePersistentIdQueryAdaptor(Iterator<ResourcePersistentId> theIterator) {
myIterator = theIterator;
}
@Override
public void close() {
}
@Override
public boolean hasNext() {
return myIterator.hasNext();
}
@Override
public Long next() {
ResourcePersistentId next = myIterator.next();
return next==null?null:next.getIdAsLong();
}
}
}

View File

@ -72,7 +72,7 @@ public class DatabaseSearchResultCacheSvcImpl implements ISearchResultCacheSvc {
public void storeResults(Search theSearch, List<ResourcePersistentId> thePreviouslyStoredResourcePids, List<ResourcePersistentId> theNewResourcePids) {
List<SearchResult> resultsToSave = Lists.newArrayList();
ourLog.trace("Storing {} results with {} previous for search", theNewResourcePids.size(), thePreviouslyStoredResourcePids.size());
ourLog.debug("Storing {} results with {} previous for search", theNewResourcePids.size(), thePreviouslyStoredResourcePids.size());
int order = thePreviouslyStoredResourcePids.size();
for (ResourcePersistentId nextPid : theNewResourcePids) {

View File

@ -0,0 +1,57 @@
package ca.uhn.fhir.jpa.search.builder;
import com.google.common.collect.Streams;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.junit.jupiter.api.Assertions.*;
class SearchQueryExecutorsTest {
@Test
public void adaptFromLongArrayYieldsAllValues() {
List<Long> listWithValues = Arrays.asList(1L,2L,3L,4L,5L);
ISearchQueryExecutor queryExecutor = SearchQueryExecutors.from(listWithValues);
assertThat(drain(queryExecutor), contains(1L,2L,3L,4L,5L));
}
@Test
public void limitedCountDropsTrailingTest() {
// given
List<Long> vals = Arrays.asList(1L,2L,3L,4L,5L);
ISearchQueryExecutor target = SearchQueryExecutors.from(vals);
ISearchQueryExecutor queryExecutor = SearchQueryExecutors.limited(target, 3);
assertThat(drain(queryExecutor), contains(1L,2L,3L));
}
@Test
public void limitedCountExhaustsBeforeLimitOkTest() {
// given
List<Long> vals = Arrays.asList(1L,2L,3L);
ISearchQueryExecutor target = SearchQueryExecutors.from(vals);
ISearchQueryExecutor queryExecutor = SearchQueryExecutors.limited(target, 5);
assertThat(drain(queryExecutor), contains(1L,2L,3L));
}
private List<Long> drain(ISearchQueryExecutor theQueryExecutor) {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(theQueryExecutor, 0), false)
.collect(Collectors.toList());
}
}