diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/FulltextSearchSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/FulltextSearchSvcImpl.java index 2c66ca424a7..a7372ef1ac5 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/FulltextSearchSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/FulltextSearchSvcImpl.java @@ -29,11 +29,13 @@ import ca.uhn.fhir.jpa.dao.search.ExtendedLuceneIndexExtractor; import ca.uhn.fhir.jpa.dao.search.ExtendedLuceneResourceProjection; import ca.uhn.fhir.jpa.dao.search.ExtendedLuceneSearchBuilder; import ca.uhn.fhir.jpa.dao.search.LastNOperation; +import ca.uhn.fhir.jpa.dao.search.SearchScrollQueryExecutorAdaptor; import ca.uhn.fhir.jpa.model.entity.ModelConfig; import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.model.search.ExtendedLuceneIndexData; import ca.uhn.fhir.jpa.search.autocomplete.ValueSetAutocompleteOptions; import ca.uhn.fhir.jpa.search.autocomplete.ValueSetAutocompleteSearch; +import ca.uhn.fhir.jpa.search.builder.ISearchQueryExecutor; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.extractor.ISearchParamExtractor; import ca.uhn.fhir.jpa.searchparam.extractor.ResourceIndexedSearchParams; @@ -44,6 +46,7 @@ import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; import ca.uhn.fhir.rest.server.util.ResourceSearchParams; import org.hibernate.search.backend.elasticsearch.ElasticsearchExtension; +import org.hibernate.search.engine.search.query.SearchScroll; import org.hibernate.search.mapper.orm.Search; import org.hibernate.search.mapper.orm.session.SearchSession; import org.hibernate.search.mapper.orm.work.SearchIndexingPlan; @@ -61,7 +64,9 @@ import javax.persistence.PersistenceContextType; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Spliterators; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static org.apache.commons.lang3.StringUtils.isNotBlank; @@ -120,14 +125,27 @@ public class FulltextSearchSvcImpl implements IFulltextSearchSvc { plan.addOrUpdate(theEntity); } - private List doSearch(String theResourceType, SearchParameterMap theParams, ResourcePersistentId theReferencingPid) { + @Override + public ISearchQueryExecutor searchAsync(String theResourceName, SearchParameterMap theParams) { + return doSearch(theResourceName, theParams, null); + } + + private ISearchQueryExecutor doSearch(String theResourceType, SearchParameterMap theParams, ResourcePersistentId theReferencingPid) { // keep this in sync with supportsSomeOf(); SearchSession session = getSearchSession(); - List longPids = session.search(ResourceTable.class) - // Selects are replacements for projection and convert more cleanly than the old implementation. + int scrollSize = 50; + if (theParams.getCount()!=null) { + scrollSize = theParams.getCount(); + } + + SearchScroll esResult = session.search(ResourceTable.class) + // The document id is the PK which is pid. We use this instead of _myId to avoid fetching the doc body. .select( - f -> f.field("myId", Long.class) + // adapt the String docRef.id() to the Long that it really is. + f -> f.composite( + docRef -> Long.valueOf(docRef.id()), + f.documentReference()) ) .where( f -> f.bool(b -> { @@ -170,9 +188,9 @@ public class FulltextSearchSvcImpl implements IFulltextSearchSvc { //DROP EARLY HERE IF BOOL IS EMPTY? }) - ).fetchAllHits(); + ).scroll(scrollSize); - return convertLongsToResourcePersistentIds(longPids); + return new SearchScrollQueryExecutorAdaptor(esResult); } @Nonnull @@ -190,7 +208,8 @@ public class FulltextSearchSvcImpl implements IFulltextSearchSvc { public List everything(String theResourceName, SearchParameterMap theParams, ResourcePersistentId theReferencingPid) { - List retVal = doSearch(null, theParams, theReferencingPid); + // wipmb what about max results here? + List retVal = toList(doSearch(null, theParams, theReferencingPid), 10000); if (theReferencingPid != null) { retVal.add(theReferencingPid); } @@ -223,7 +242,17 @@ public class FulltextSearchSvcImpl implements IFulltextSearchSvc { @Transactional() @Override public List search(String theResourceName, SearchParameterMap theParams) { - return doSearch(theResourceName, theParams, null); + return toList(doSearch(theResourceName, theParams, null), 500); + } + + /** + * Adapt our async interface to the legacy concrete List + */ + private List toList(ISearchQueryExecutor theSearchResultStream, long theMaxSize) { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(theSearchResultStream, 0), false) + .map(ResourcePersistentId::new) + .limit(theMaxSize) + .collect(Collectors.toList()); } @Transactional() @@ -272,7 +301,7 @@ public class FulltextSearchSvcImpl implements IFulltextSearchSvc { List rawResourceDataList = session.search(ResourceTable.class) .select( f -> f.composite( - (pid, forcedId, resource)-> new ExtendedLuceneResourceProjection(pid, forcedId, resource), + ExtendedLuceneResourceProjection::new, f.field("myId", Long.class), f.field("myForcedId", String.class), f.field("myRawResource", String.class)) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/IFulltextSearchSvc.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/IFulltextSearchSvc.java index 02a4cadbdbf..4540fed2dd0 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/IFulltextSearchSvc.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/IFulltextSearchSvc.java @@ -23,6 +23,7 @@ package ca.uhn.fhir.jpa.dao; import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.model.search.ExtendedLuceneIndexData; import ca.uhn.fhir.jpa.search.autocomplete.ValueSetAutocompleteOptions; +import ca.uhn.fhir.jpa.search.builder.ISearchQueryExecutor; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.extractor.ResourceIndexedSearchParams; import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; @@ -44,6 +45,16 @@ public interface IFulltextSearchSvc { */ List search(String theResourceName, SearchParameterMap theParams); + /** + * Query the index for a scrollable iterator of results. + * No max size to the result iterator. + * + * @param theResourceName e.g. Patient + * @param theParams The search query + * @return Iterator of result PIDs + */ + ISearchQueryExecutor searchAsync(String theResourceName, SearchParameterMap theParams); + /** * Autocomplete search for NIH $expand contextDirection=existing * @param theOptions operation options diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/search/SearchScrollQueryExecutorAdaptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/search/SearchScrollQueryExecutorAdaptor.java new file mode 100644 index 00000000000..47329eebd68 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/search/SearchScrollQueryExecutorAdaptor.java @@ -0,0 +1,50 @@ +package ca.uhn.fhir.jpa.dao.search; + +import ca.uhn.fhir.jpa.search.builder.ISearchQueryExecutor; +import org.hibernate.search.engine.backend.common.DocumentReference; +import org.hibernate.search.engine.search.query.SearchScroll; +import org.hibernate.search.engine.search.query.SearchScrollResult; + +import java.util.Iterator; + +/** + * Adapt Hibernate Search SearchScroll paging result to our ISearchQueryExecutor + */ +public class SearchScrollQueryExecutorAdaptor implements ISearchQueryExecutor { + private final SearchScroll myScroll; + private Iterator myCurrentIterator; + + public SearchScrollQueryExecutorAdaptor(SearchScroll theScroll) { + myScroll = theScroll; + advanceNextScrollPage(); + } + + /** + * Advance one page (i.e. SearchScrollResult). + * Note: the last page will have 0 hits. + */ + private void advanceNextScrollPage() { + SearchScrollResult scrollResults = myScroll.next(); + myCurrentIterator = scrollResults.hits().iterator(); + } + + @Override + public void close() { + myScroll.close(); + } + + @Override + public boolean hasNext() { + return myCurrentIterator.hasNext(); + } + + @Override + public Long next() { + Long result = myCurrentIterator.next(); + // was this the last in the current scroll page? + if (!myCurrentIterator.hasNext()) { + advanceNextScrollPage(); + } + return result; + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/PersistedJpaSearchFirstPageBundleProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/PersistedJpaSearchFirstPageBundleProvider.java index a55dd708eed..c6b25771310 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/PersistedJpaSearchFirstPageBundleProvider.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/PersistedJpaSearchFirstPageBundleProvider.java @@ -117,9 +117,9 @@ public class PersistedJpaSearchFirstPageBundleProvider extends PersistedJpaBundl @Override public Integer size() { - ourLog.trace("Waiting for initial sync"); + ourLog.trace("size() - Waiting for initial sync"); Integer size = mySearchTask.awaitInitialSync(); - ourLog.trace("Finished waiting for local sync"); + ourLog.trace("size() - Finished waiting for local sync"); SearchCoordinatorSvcImpl.verifySearchHasntFailedOrThrowInternalErrorException(mySearch); if (size != null) { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java index bbacd49a064..5125640c4f7 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java @@ -1068,8 +1068,10 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { doSaveSearch(); + ourLog.trace("saveUnsynced() - pre-commit"); } }); + ourLog.trace("saveUnsynced() - post-commit"); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/SearchBuilder.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/SearchBuilder.java index 5343b8f0b30..c9a803245e6 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/SearchBuilder.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/SearchBuilder.java @@ -93,6 +93,7 @@ import ca.uhn.fhir.util.StringUtil; import ca.uhn.fhir.util.UrlUtil; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import com.google.common.collect.Streams; import com.healthmarketscience.sqlbuilder.Condition; import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.math.NumberUtils; @@ -322,17 +323,28 @@ public class SearchBuilder implements ISearchBuilder { if (checkUseHibernateSearch()) { // we're going to run at least part of the search against the Fulltext service. - List fulltextMatchIds; + + // Ugh - we have two different return types for now + ISearchQueryExecutor fulltextExecutor = null; + List fulltextMatchIds = null; + int resultCount = 0; if (myParams.isLastN()) { fulltextMatchIds = executeLastNAgainstIndex(theMaximumResults); + resultCount = fulltextMatchIds.size(); } else if (myParams.getEverythingMode() != null) { fulltextMatchIds = queryHibernateSearchForEverythingPids(); + resultCount = fulltextMatchIds.size(); } else { - fulltextMatchIds = myFulltextSearchSvc.search(myResourceName, myParams); + fulltextExecutor = myFulltextSearchSvc.searchAsync(myResourceName, myParams); + } + + if (fulltextExecutor == null) { + fulltextExecutor = SearchQueryExecutors.from(fulltextMatchIds); } if (theSearchRuntimeDetails != null) { - theSearchRuntimeDetails.setFoundIndexMatchesCount(fulltextMatchIds.size()); + // wipmb we no longer have full size. + theSearchRuntimeDetails.setFoundIndexMatchesCount(resultCount); HookParams params = new HookParams() .add(RequestDetails.class, theRequest) .addIfMatchesType(ServletRequestDetails.class, theRequest) @@ -340,13 +352,11 @@ public class SearchBuilder implements ISearchBuilder { CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequest, Pointcut.JPA_PERFTRACE_INDEXSEARCH_QUERY_COMPLETE, params); } - List rawPids = ResourcePersistentId.toLongList(fulltextMatchIds); - // wipmb extract // can we skip the database entirely and return the pid list from here? boolean canSkipDatabase = // if we processed an AND clause, and it returned nothing, then nothing can match. - rawPids.isEmpty() || + !fulltextExecutor.hasNext() || // Our hibernate search query doesn't respect partitions yet (!myPartitionSettings.isPartitioningEnabled() && // we don't support _count=0 yet. @@ -363,12 +373,16 @@ public class SearchBuilder implements ISearchBuilder { ); if (canSkipDatabase) { - queries.add(ResolvedSearchQueryExecutor.from(rawPids)); + if (theMaximumResults != null) { + fulltextExecutor = SearchQueryExecutors.limited(fulltextExecutor, theMaximumResults); + } + queries.add(fulltextExecutor); } else { // Finish the query in the database for the rest of the search parameters, sorting, partitioning, etc. // We break the pids into chunks that fit in the 1k limit for jdbc bind params. + // wipmb change chunk to take iterator new QueryChunker() - .chunk(rawPids, t -> doCreateChunkedQueries(theParams, t, theOffset, sort, theCount, theRequest, queries)); + .chunk(Streams.stream(fulltextExecutor).collect(Collectors.toList()), t -> doCreateChunkedQueries(theParams, t, theOffset, sort, theCount, theRequest, queries)); } } else { // do everything in the database. @@ -1384,41 +1398,6 @@ public class SearchBuilder implements ISearchBuilder { myDaoConfig = theDaoConfig; } - /** - * Adapt bare Iterator to our internal query interface. - */ - static class ResolvedSearchQueryExecutor implements ISearchQueryExecutor { - private final Iterator myIterator; - - ResolvedSearchQueryExecutor(Iterable theIterable) { - this(theIterable.iterator()); - } - - ResolvedSearchQueryExecutor(Iterator theIterator) { - myIterator = theIterator; - } - - @Nonnull - public static ResolvedSearchQueryExecutor from(List rawPids) { - return new ResolvedSearchQueryExecutor(rawPids); - } - - @Override - public boolean hasNext() { - return myIterator.hasNext(); - } - - @Override - public Long next() { - return myIterator.next(); - } - - @Override - public void close() { - // empty - } - } - public class IncludesIterator extends BaseIterator implements Iterator { private final RequestDetails myRequest; @@ -1643,6 +1622,7 @@ public class SearchBuilder implements ISearchBuilder { private void initializeIteratorQuery(Integer theOffset, Integer theMaxResultsToFetch) { if (myQueryList.isEmpty()) { + // wipmb what is this? // Capture times for Lucene/Elasticsearch queries as well mySearchRuntimeDetails.setQueryStopwatch(new StopWatch()); myQueryList = createQuery(myParams, mySort, theOffset, theMaxResultsToFetch, false, myRequest, mySearchRuntimeDetails); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/SearchQueryExecutors.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/SearchQueryExecutors.java new file mode 100644 index 00000000000..dbc2f643be4 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/SearchQueryExecutors.java @@ -0,0 +1,107 @@ +package ca.uhn.fhir.jpa.search.builder; + +import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; +import org.apache.commons.lang3.Validate; + +import javax.annotation.Nonnull; +import java.util.Iterator; +import java.util.List; + +public class SearchQueryExecutors { + + public static ISearchQueryExecutor limited(ISearchQueryExecutor theExecutor, long theLimit) { + Validate.isTrue(theLimit >= 0, "limit must be non-negative"); + + return new ISearchQueryExecutor() { + long myCount = 0; + + @Override + public void close() { + theExecutor.close(); + } + + @Override + public boolean hasNext() { + return theExecutor.hasNext() && myCount < theLimit; + } + + @Override + public Long next() { + myCount += 1; + return theExecutor.next(); + } + }; + } + + @Nonnull + public static ISearchQueryExecutor from(List rawPids) { + return new ResolvedSearchQueryExecutor(rawPids); + } + + /** + * Adapt bare Iterator to our internal query interface. + */ + static class ResolvedSearchQueryExecutor implements ISearchQueryExecutor { + private final Iterator myIterator; + + ResolvedSearchQueryExecutor(Iterable theIterable) { + this(theIterable.iterator()); + } + + ResolvedSearchQueryExecutor(Iterator theIterator) { + myIterator = theIterator; + } + + @Nonnull + public static ResolvedSearchQueryExecutor from(List rawPids) { + return new ResolvedSearchQueryExecutor(rawPids); + } + + @Override + public boolean hasNext() { + return myIterator.hasNext(); + } + + @Override + public Long next() { + return myIterator.next(); + } + + @Override + public void close() { + // empty + } + } + + static public ISearchQueryExecutor from(Iterator theIterator) { + return new ResourcePersistentIdQueryAdaptor(theIterator); + } + + static public ISearchQueryExecutor from(Iterable theIterable) { + return new ResourcePersistentIdQueryAdaptor(theIterable.iterator()); + } + + static class ResourcePersistentIdQueryAdaptor implements ISearchQueryExecutor { + final Iterator myIterator; + + ResourcePersistentIdQueryAdaptor(Iterator theIterator) { + myIterator = theIterator; + } + + @Override + public void close() { + } + + @Override + public boolean hasNext() { + return myIterator.hasNext(); + } + + @Override + public Long next() { + ResourcePersistentId next = myIterator.next(); + return next==null?null:next.getIdAsLong(); + } + + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/cache/DatabaseSearchResultCacheSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/cache/DatabaseSearchResultCacheSvcImpl.java index b19a85728a3..68df3c67401 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/cache/DatabaseSearchResultCacheSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/cache/DatabaseSearchResultCacheSvcImpl.java @@ -72,7 +72,7 @@ public class DatabaseSearchResultCacheSvcImpl implements ISearchResultCacheSvc { public void storeResults(Search theSearch, List thePreviouslyStoredResourcePids, List theNewResourcePids) { List resultsToSave = Lists.newArrayList(); - ourLog.trace("Storing {} results with {} previous for search", theNewResourcePids.size(), thePreviouslyStoredResourcePids.size()); + ourLog.debug("Storing {} results with {} previous for search", theNewResourcePids.size(), thePreviouslyStoredResourcePids.size()); int order = thePreviouslyStoredResourcePids.size(); for (ResourcePersistentId nextPid : theNewResourcePids) { diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/builder/SearchQueryExecutorsTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/builder/SearchQueryExecutorsTest.java new file mode 100644 index 00000000000..a99a8cdc202 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/builder/SearchQueryExecutorsTest.java @@ -0,0 +1,57 @@ +package ca.uhn.fhir.jpa.search.builder; + +import com.google.common.collect.Streams; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Spliterators; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.junit.jupiter.api.Assertions.*; + +class SearchQueryExecutorsTest { + + @Test + public void adaptFromLongArrayYieldsAllValues() { + List listWithValues = Arrays.asList(1L,2L,3L,4L,5L); + + ISearchQueryExecutor queryExecutor = SearchQueryExecutors.from(listWithValues); + + assertThat(drain(queryExecutor), contains(1L,2L,3L,4L,5L)); + } + + @Test + public void limitedCountDropsTrailingTest() { + // given + List vals = Arrays.asList(1L,2L,3L,4L,5L); + ISearchQueryExecutor target = SearchQueryExecutors.from(vals); + + ISearchQueryExecutor queryExecutor = SearchQueryExecutors.limited(target, 3); + + assertThat(drain(queryExecutor), contains(1L,2L,3L)); + } + + @Test + public void limitedCountExhaustsBeforeLimitOkTest() { + // given + List vals = Arrays.asList(1L,2L,3L); + ISearchQueryExecutor target = SearchQueryExecutors.from(vals); + + ISearchQueryExecutor queryExecutor = SearchQueryExecutors.limited(target, 5); + + assertThat(drain(queryExecutor), contains(1L,2L,3L)); + } + + + private List drain(ISearchQueryExecutor theQueryExecutor) { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(theQueryExecutor, 0), false) + .collect(Collectors.toList()); + } + + +}