From b72b3373cf281b712cca1e970a667651bd9ad1a7 Mon Sep 17 00:00:00 2001 From: leif stawnyczy Date: Fri, 11 Aug 2023 14:15:48 -0400 Subject: [PATCH] fixing searches --- .../jpa/search/builder/SearchBuilder.java | 41 +++++- .../jpa/search/builder/tasks/SearchTask.java | 114 ++++++++++---- .../FhirResourceDaoR4SearchOptimizedTest.java | 6 +- .../r4/PatientEverythingPaginationR4Test.java | 139 ++++++++++++++++++ 4 files changed, 261 insertions(+), 39 deletions(-) create mode 100644 hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/PatientEverythingPaginationR4Test.java diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/SearchBuilder.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/SearchBuilder.java index 5e48a02c6ad..0ed0d288345 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/SearchBuilder.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/SearchBuilder.java @@ -178,6 +178,15 @@ public class SearchBuilder implements ISearchBuilder { @PersistenceContext(type = PersistenceContextType.TRANSACTION) protected EntityManager myEntityManager; + /** + * Contains lists of ids of resources that should be fetched, + * along with those that will be fetched from the search. + * + * This includes: + * * Resource ids specified explicitly by _id parameter + * * Ids of patients that are not referenced by any other resource. Specifically + * when invoking an $everything operation + */ private List myAlsoIncludePids; private CriteriaBuilder myCriteriaBuilder; private SearchParameterMap myParams; @@ -1925,10 +1934,25 @@ public class SearchBuilder implements ISearchBuilder { private final Integer myOffset; private boolean myFirst = true; private IncludesIterator myIncludesIterator; + /** + * The next JpaPid value of the next result in this query. + * Will not be null if fetched using getNext() + */ private JpaPid myNext; + /** + * ResultsIterator is the QueryExecutor that runs the sql + * and fetches data from the db. + */ private ISearchQueryExecutor myResultsIterator; private boolean myFetchIncludesForEverythingOperation; + /** + * The count of resources found in the cached search + */ private int mySkipCount = 0; + /** + * The count of resources that are new in this search + * (ie, not cached in previous searches) + */ private int myNonSkipCount = 0; private List myQueryList = new ArrayList<>(); @@ -1967,26 +1991,35 @@ public class SearchBuilder implements ISearchBuilder { } } - // assigns the results iterator + /* + * assigns the results iterator. + * Can also assign and populate myAlsoIncludePids. + * Specifically in type/$everything mode + * (ie, /Patient/$everything) + */ initializeIteratorQuery(myOffset, myMaxResultsToFetch); + // but if it doesn't, we'll set an empty list here if (myAlsoIncludePids == null) { myAlsoIncludePids = new ArrayList<>(); } } if (myNext == null) { - for (Iterator myPreResultsIterator = myAlsoIncludePids.iterator(); - myPreResultsIterator.hasNext(); ) { - JpaPid next = myPreResultsIterator.next(); + // we first consume any alsoIncludePids + for (JpaPid next : myAlsoIncludePids) { if (next != null) if (myPidSet.add(next)) { +// mySkipCount++; myNext = next; break; + } else { +// myNonSkipCount++; } } if (myNext == null) { + // no next means we need a new query (if one is available) while (myResultsIterator.hasNext() || !myQueryList.isEmpty()) { // Update iterator with next chunk if necessary. if (!myResultsIterator.hasNext()) { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/tasks/SearchTask.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/tasks/SearchTask.java index 64b39e376d4..b0992f4bcc0 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/tasks/SearchTask.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/tasks/SearchTask.java @@ -334,8 +334,6 @@ public class SearchTask implements Callable { if (theResultIter.hasNext() == false) { int skippedCount = theResultIter.getSkippedCount(); - int nonSkippedCount = theResultIter.getNonSkippedCount(); - int totalFetched = skippedCount + myCountSavedThisPass + myCountBlockedThisPass; ourLog.trace( "MaxToFetch[{}] SkippedCount[{}] CountSavedThisPass[{}] CountSavedThisTotal[{}] AdditionalPrefetchRemaining[{}]", myMaxResultsToFetch, @@ -344,16 +342,18 @@ public class SearchTask implements Callable { myCountSavedTotal, myAdditionalPrefetchThresholdsRemaining); - if (nonSkippedCount == 0 - || (myMaxResultsToFetch != null && totalFetched < myMaxResultsToFetch)) { + if (isFinished(theResultIter)) { + // finished ourLog.trace("Setting search status to FINISHED"); mySearch.setStatus(SearchStatusEnum.FINISHED); mySearch.setTotalCount(myCountSavedTotal - countBlocked); } else if (myAdditionalPrefetchThresholdsRemaining) { + // pass complete ourLog.trace("Setting search status to PASSCMPLET"); mySearch.setStatus(SearchStatusEnum.PASSCMPLET); mySearch.setSearchParameterMap(myParams); } else { + // also finished ourLog.trace("Setting search status to FINISHED"); mySearch.setStatus(SearchStatusEnum.FINISHED); mySearch.setTotalCount(myCountSavedTotal - countBlocked); @@ -382,6 +382,37 @@ public class SearchTask implements Callable { ourLog.trace("saveUnsynced() - post-commit"); } + private boolean isFinished(final IResultIterator theResultIter) { + int skippedCount = theResultIter.getSkippedCount(); + int nonSkippedCount = theResultIter.getNonSkippedCount(); + int totalFetched = skippedCount + myCountSavedThisPass + myCountBlockedThisPass; + + if (myMaxResultsToFetch != null && totalFetched < myMaxResultsToFetch) { + // total fetched < max results to fetch -> we've exhausted the search + return true; + } else { + if (nonSkippedCount == 0) { + // no skipped resources in this query + if (myParams.getCount() == null) { + // no supplied count, either + // if there's no additional thresholds remaining, we're done + return !myAdditionalPrefetchThresholdsRemaining; + } else { + // count supplied + // if the count is > what we've fetched -> we've exhausted the query + return myParams.getCount() > totalFetched; + } + } + // skipped resources means we have more to fetch + return false; + } + } + + private boolean hasMoreToFetch(IResultIterator theResultIter) { + return myAdditionalPrefetchThresholdsRemaining + && !isFinished(theResultIter); + } + public boolean isNotAborted() { return myAbortRequested == false; } @@ -531,32 +562,7 @@ public class SearchTask implements Callable { : SearchParameterMapCalculator.isWantCount(myStorageSettings.getDefaultTotalMode()); if (myParamWantOnlyCount || myParamOrDefaultWantCount) { - ourLog.trace("Performing count"); - ISearchBuilder sb = newSearchBuilder(); - - /* - * createCountQuery - * NB: (see createQuery below) - * Because FulltextSearchSvcImpl will (internally) - * mutate the myParams (searchmap), - * (specifically removing the _content and _text filters) - * we will have to clone those parameters here so that - * the "correct" params are used in createQuery below - */ - Long count = sb.createCountQuery(myParams.clone(), mySearch.getUuid(), myRequest, myRequestPartitionId); - - ourLog.trace("Got count {}", count); - - myTxService - .withRequest(myRequest) - .withRequestPartitionId(myRequestPartitionId) - .execute(() -> { - mySearch.setTotalCount(count.intValue()); - if (myParamWantOnlyCount) { - mySearch.setStatus(SearchStatusEnum.FINISHED); - } - doSaveSearch(); - }); + doCountOnlyQuery(myParamWantOnlyCount); if (myParamWantOnlyCount) { return; } @@ -573,12 +579,22 @@ public class SearchTask implements Callable { */ int currentlyLoaded = defaultIfNull(mySearch.getNumFound(), 0); int minWanted = 0; + + // if no count is provided, + // we only use the values in SearchPreFetchThresholds + // but if there is a count... if (myParams.getCount() != null) { - minWanted = myParams.getCount() + 1; // Always fetch one past this page, so we know if there is a next page. - minWanted = Math.min(minWanted, myPagingProvider.getMaximumPageSize()); + // we want either the max page size or the requested count size + // (+1 iff count == max page size) + minWanted = Math.min(myParams.getCount(), myPagingProvider.getMaximumPageSize()); + // Always fetch one past this page size, so we know if there is a next page. + if (minWanted == myParams.getCount()) { + minWanted += 1; + } minWanted += currentlyLoaded; } + // iterate through the search thresholds for (Iterator iter = myStorageSettings.getSearchPreFetchThresholds().iterator(); iter.hasNext(); ) { @@ -633,6 +649,7 @@ public class SearchTask implements Callable { */ try (IResultIterator resultIterator = sb.createQuery(myParams, mySearchRuntimeDetails, myRequest, myRequestPartitionId)) { + // resultIterator is SearchBuilder.QueryIterator assert (resultIterator != null); /* @@ -678,4 +695,37 @@ public class SearchTask implements Callable { throw new InternalErrorException(Msg.code(1166) + e); } } + + /** + * Does the query but only for the count. + * @param myParamWantOnlyCount - if count query is wanted only + */ + private void doCountOnlyQuery(boolean myParamWantOnlyCount) { + ourLog.trace("Performing count"); + ISearchBuilder sb = newSearchBuilder(); + + /* + * createCountQuery + * NB: (see createQuery below) + * Because FulltextSearchSvcImpl will (internally) + * mutate the myParams (searchmap), + * (specifically removing the _content and _text filters) + * we will have to clone those parameters here so that + * the "correct" params are used in createQuery below + */ + Long count = sb.createCountQuery(myParams.clone(), mySearch.getUuid(), myRequest, myRequestPartitionId); + + ourLog.trace("Got count {}", count); + + myTxService + .withRequest(myRequest) + .withRequestPartitionId(myRequestPartitionId) + .execute(() -> { + mySearch.setTotalCount(count.intValue()); + if (myParamWantOnlyCount) { + mySearch.setStatus(SearchStatusEnum.FINISHED); + } + doSaveSearch(); + }); + } } diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchOptimizedTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchOptimizedTest.java index 67fb685cfcd..d3fae2b824c 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchOptimizedTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchOptimizedTest.java @@ -554,7 +554,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test { }); runInTransaction(() -> { Search search = mySearchEntityDao.findByUuidAndFetchIncludes(uuid).orElseThrow(() -> new InternalErrorException("")); - assertEquals(50, search.getNumFound()); + assertEquals(51, search.getNumFound()); assertEquals(search.getNumFound(), mySearchResultDao.count()); assertEquals(null, search.getTotalCount()); assertEquals(SearchStatusEnum.PASSCMPLET, search.getStatus()); @@ -1172,12 +1172,12 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test { myCaptureQueriesListener.logSelectQueriesForCurrentThread(); assertEquals(4, myCaptureQueriesListener.countSelectQueries()); // first prefetch is 50+1 - assertEquals(51, myCaptureQueriesListener.logInsertQueries()); + assertEquals(52, myCaptureQueriesListener.logInsertQueries()); assertEquals(1, myCaptureQueriesListener.countUpdateQueries()); assertEquals(0, myCaptureQueriesListener.countDeleteQueries()); assertEquals(4, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); - assertEquals(51, myCaptureQueriesListener.countInsertQueriesForCurrentThread()); + assertEquals(52, myCaptureQueriesListener.countInsertQueriesForCurrentThread()); assertEquals(1, myCaptureQueriesListener.countUpdateQueriesForCurrentThread()); assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread()); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/PatientEverythingPaginationR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/PatientEverythingPaginationR4Test.java new file mode 100644 index 00000000000..c97b67ab4ff --- /dev/null +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/PatientEverythingPaginationR4Test.java @@ -0,0 +1,139 @@ +package ca.uhn.fhir.jpa.provider.r4; + +import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; +import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test; +import ca.uhn.fhir.parser.StrictErrorHandler; +import ca.uhn.fhir.rest.api.Constants; +import ca.uhn.fhir.rest.api.EncodingEnum; +import ca.uhn.fhir.rest.api.server.SystemRequestDetails; +import ca.uhn.fhir.util.BundleUtil; +import com.google.common.base.Charsets; +import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.hl7.fhir.r4.model.Bundle; +import org.hl7.fhir.r4.model.Patient; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.List; + +import static org.hl7.fhir.instance.model.api.IBaseBundle.LINK_NEXT; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +@SuppressWarnings("Duplicates") +public class PatientEverythingPaginationR4Test extends BaseResourceProviderR4Test { + + private int myOriginalServerDefaultPageSize; + + @BeforeEach + public void beforeDisableResultReuse() { + myStorageSettings.setReuseCachedSearchResultsForMillis(null); + } + + @Override + @BeforeEach + public void before() throws Exception { + super.before(); + myFhirContext.setParserErrorHandler(new StrictErrorHandler()); + + myStorageSettings.setAllowMultipleDelete(true); + + myOriginalServerDefaultPageSize = myServer.getDefaultPageSize(); + myServer.setDefaultPageSize(50); + + } + + @Override + @AfterEach + public void after() throws Exception { + super.after(); + + myStorageSettings.setReuseCachedSearchResultsForMillis(new JpaStorageSettings().getReuseCachedSearchResultsForMillis()); + myServer.setDefaultPageSize(myOriginalServerDefaultPageSize); + } + + /** + * Built to reproduce this issue + * Notice that the issue is not gateway related. Is a plain server issue. + */ + @Test + @Disabled // not a valid test + public void testEverythingRespectsServerDefaultPageSize() throws IOException { + // setup + for (int i = 0; i < 25; i++) { + Patient patient = new Patient(); + patient.addName().setFamily("lastn").addGiven("name"); + myPatientDao.create(patient, new SystemRequestDetails()).getId().toUnqualifiedVersionless(); + } + + // must be larger than myStorageSettings.getSearchPreFetchThresholds()[0] for issue to show up + int originalPagingProviderPageSize = myPagingProvider.getDefaultPageSize(); + myPagingProvider.setDefaultPageSize(50); + + // execute + Bundle bundle; + try { + bundle = fetchBundle(myServerBase + "/Patient/$everything?_format=json"); + } finally { + // restore + myPagingProvider.setDefaultPageSize(originalPagingProviderPageSize); + } + + // validate + List bundlePatients = BundleUtil.toListOfResourcesOfType(myFhirContext, bundle, Patient.class); + assertEquals(myServer.getDefaultPageSize(), bundlePatients.size()); + } + + /** + * Built to reproduce this issue + * Notice that the issue is not gateway related. Is a plain server issue. + */ + @Test + public void testEverythingPaginatesThroughAllPatients() throws IOException { + // setup + int totalPatients = 54; + for (int i = 0; i < totalPatients; i++) { + Patient patient = new Patient(); + patient.addName().setFamily("lastn").addGiven("name"); + myPatientDao.create(patient, new SystemRequestDetails()).getId().toUnqualifiedVersionless(); + } + + // test + Bundle bundle = fetchBundle(myServerBase + "/Patient/$everything?_format=json"); + + // verify + List patientsFirstPage = BundleUtil.toListOfResourcesOfType(myFhirContext, bundle, Patient.class); + assertEquals(50, patientsFirstPage.size()); + + String nextUrl = BundleUtil.getLinkUrlOfType(myFhirContext, bundle, LINK_NEXT); + System.out.println(nextUrl); + assertNotNull(nextUrl); + Bundle page2 = fetchBundle(nextUrl); + assertNotNull(page2); + List patientsPage2 = BundleUtil.toListOfResourcesOfType(myFhirContext, page2, Patient.class); + + assertEquals(4, patientsPage2.size()); + } + + + private Bundle fetchBundle(String theUrl) throws IOException { + Bundle bundle; + HttpGet get = new HttpGet(theUrl); + CloseableHttpResponse resp = ourHttpClient.execute(get); + try { + assertEquals(EncodingEnum.JSON.getResourceContentTypeNonLegacy(), resp.getFirstHeader(Constants.HEADER_CONTENT_TYPE).getValue().replaceAll(";.*", "")); + bundle = EncodingEnum.JSON.newParser(myFhirContext).parseResource(Bundle.class, IOUtils.toString(resp.getEntity().getContent(), Charsets.UTF_8)); + } finally { + IOUtils.closeQuietly(resp); + } + + return bundle; + } + + +}