fixing searches

This commit is contained in:
leif stawnyczy 2023-08-11 14:15:48 -04:00
parent de9a747666
commit b72b3373cf
4 changed files with 261 additions and 39 deletions

View File

@ -178,6 +178,15 @@ public class SearchBuilder implements ISearchBuilder<JpaPid> {
@PersistenceContext(type = PersistenceContextType.TRANSACTION)
protected EntityManager myEntityManager;
/**
* Contains lists of ids of resources that should be fetched,
* along with those that will be fetched from the search.
*
* This includes:
* * Resource ids specified explicitly by _id parameter
* * Ids of patients that are not referenced by any other resource. Specifically
* when invoking an $everything operation
*/
private List<JpaPid> myAlsoIncludePids;
private CriteriaBuilder myCriteriaBuilder;
private SearchParameterMap myParams;
@ -1925,10 +1934,25 @@ public class SearchBuilder implements ISearchBuilder<JpaPid> {
private final Integer myOffset;
private boolean myFirst = true;
private IncludesIterator myIncludesIterator;
/**
* The next JpaPid value of the next result in this query.
* Will not be null if fetched using getNext()
*/
private JpaPid myNext;
/**
* ResultsIterator is the QueryExecutor that runs the sql
* and fetches data from the db.
*/
private ISearchQueryExecutor myResultsIterator;
private boolean myFetchIncludesForEverythingOperation;
/**
* The count of resources found in the cached search
*/
private int mySkipCount = 0;
/**
* The count of resources that are new in this search
* (ie, not cached in previous searches)
*/
private int myNonSkipCount = 0;
private List<ISearchQueryExecutor> myQueryList = new ArrayList<>();
@ -1967,26 +1991,35 @@ public class SearchBuilder implements ISearchBuilder<JpaPid> {
}
}
// assigns the results iterator
/*
* assigns the results iterator.
* Can also assign and populate myAlsoIncludePids.
* Specifically in type/$everything mode
* (ie, /Patient/$everything)
*/
initializeIteratorQuery(myOffset, myMaxResultsToFetch);
// but if it doesn't, we'll set an empty list here
if (myAlsoIncludePids == null) {
myAlsoIncludePids = new ArrayList<>();
}
}
if (myNext == null) {
for (Iterator<JpaPid> myPreResultsIterator = myAlsoIncludePids.iterator();
myPreResultsIterator.hasNext(); ) {
JpaPid next = myPreResultsIterator.next();
// we first consume any alsoIncludePids
for (JpaPid next : myAlsoIncludePids) {
if (next != null)
if (myPidSet.add(next)) {
// mySkipCount++;
myNext = next;
break;
} else {
// myNonSkipCount++;
}
}
if (myNext == null) {
// no next means we need a new query (if one is available)
while (myResultsIterator.hasNext() || !myQueryList.isEmpty()) {
// Update iterator with next chunk if necessary.
if (!myResultsIterator.hasNext()) {

View File

@ -334,8 +334,6 @@ public class SearchTask implements Callable<Void> {
if (theResultIter.hasNext() == false) {
int skippedCount = theResultIter.getSkippedCount();
int nonSkippedCount = theResultIter.getNonSkippedCount();
int totalFetched = skippedCount + myCountSavedThisPass + myCountBlockedThisPass;
ourLog.trace(
"MaxToFetch[{}] SkippedCount[{}] CountSavedThisPass[{}] CountSavedThisTotal[{}] AdditionalPrefetchRemaining[{}]",
myMaxResultsToFetch,
@ -344,16 +342,18 @@ public class SearchTask implements Callable<Void> {
myCountSavedTotal,
myAdditionalPrefetchThresholdsRemaining);
if (nonSkippedCount == 0
|| (myMaxResultsToFetch != null && totalFetched < myMaxResultsToFetch)) {
if (isFinished(theResultIter)) {
// finished
ourLog.trace("Setting search status to FINISHED");
mySearch.setStatus(SearchStatusEnum.FINISHED);
mySearch.setTotalCount(myCountSavedTotal - countBlocked);
} else if (myAdditionalPrefetchThresholdsRemaining) {
// pass complete
ourLog.trace("Setting search status to PASSCMPLET");
mySearch.setStatus(SearchStatusEnum.PASSCMPLET);
mySearch.setSearchParameterMap(myParams);
} else {
// also finished
ourLog.trace("Setting search status to FINISHED");
mySearch.setStatus(SearchStatusEnum.FINISHED);
mySearch.setTotalCount(myCountSavedTotal - countBlocked);
@ -382,6 +382,37 @@ public class SearchTask implements Callable<Void> {
ourLog.trace("saveUnsynced() - post-commit");
}
private boolean isFinished(final IResultIterator theResultIter) {
int skippedCount = theResultIter.getSkippedCount();
int nonSkippedCount = theResultIter.getNonSkippedCount();
int totalFetched = skippedCount + myCountSavedThisPass + myCountBlockedThisPass;
if (myMaxResultsToFetch != null && totalFetched < myMaxResultsToFetch) {
// total fetched < max results to fetch -> we've exhausted the search
return true;
} else {
if (nonSkippedCount == 0) {
// no skipped resources in this query
if (myParams.getCount() == null) {
// no supplied count, either
// if there's no additional thresholds remaining, we're done
return !myAdditionalPrefetchThresholdsRemaining;
} else {
// count supplied
// if the count is > what we've fetched -> we've exhausted the query
return myParams.getCount() > totalFetched;
}
}
// skipped resources means we have more to fetch
return false;
}
}
private boolean hasMoreToFetch(IResultIterator theResultIter) {
return myAdditionalPrefetchThresholdsRemaining
&& !isFinished(theResultIter);
}
public boolean isNotAborted() {
return myAbortRequested == false;
}
@ -531,32 +562,7 @@ public class SearchTask implements Callable<Void> {
: SearchParameterMapCalculator.isWantCount(myStorageSettings.getDefaultTotalMode());
if (myParamWantOnlyCount || myParamOrDefaultWantCount) {
ourLog.trace("Performing count");
ISearchBuilder sb = newSearchBuilder();
/*
* createCountQuery
* NB: (see createQuery below)
* Because FulltextSearchSvcImpl will (internally)
* mutate the myParams (searchmap),
* (specifically removing the _content and _text filters)
* we will have to clone those parameters here so that
* the "correct" params are used in createQuery below
*/
Long count = sb.createCountQuery(myParams.clone(), mySearch.getUuid(), myRequest, myRequestPartitionId);
ourLog.trace("Got count {}", count);
myTxService
.withRequest(myRequest)
.withRequestPartitionId(myRequestPartitionId)
.execute(() -> {
mySearch.setTotalCount(count.intValue());
if (myParamWantOnlyCount) {
mySearch.setStatus(SearchStatusEnum.FINISHED);
}
doSaveSearch();
});
doCountOnlyQuery(myParamWantOnlyCount);
if (myParamWantOnlyCount) {
return;
}
@ -573,12 +579,22 @@ public class SearchTask implements Callable<Void> {
*/
int currentlyLoaded = defaultIfNull(mySearch.getNumFound(), 0);
int minWanted = 0;
// if no count is provided,
// we only use the values in SearchPreFetchThresholds
// but if there is a count...
if (myParams.getCount() != null) {
minWanted = myParams.getCount() + 1; // Always fetch one past this page, so we know if there is a next page.
minWanted = Math.min(minWanted, myPagingProvider.getMaximumPageSize());
// we want either the max page size or the requested count size
// (+1 iff count == max page size)
minWanted = Math.min(myParams.getCount(), myPagingProvider.getMaximumPageSize());
// Always fetch one past this page size, so we know if there is a next page.
if (minWanted == myParams.getCount()) {
minWanted += 1;
}
minWanted += currentlyLoaded;
}
// iterate through the search thresholds
for (Iterator<Integer> iter =
myStorageSettings.getSearchPreFetchThresholds().iterator();
iter.hasNext(); ) {
@ -633,6 +649,7 @@ public class SearchTask implements Callable<Void> {
*/
try (IResultIterator<JpaPid> resultIterator =
sb.createQuery(myParams, mySearchRuntimeDetails, myRequest, myRequestPartitionId)) {
// resultIterator is SearchBuilder.QueryIterator
assert (resultIterator != null);
/*
@ -678,4 +695,37 @@ public class SearchTask implements Callable<Void> {
throw new InternalErrorException(Msg.code(1166) + e);
}
}
/**
* Does the query but only for the count.
* @param myParamWantOnlyCount - if count query is wanted only
*/
private void doCountOnlyQuery(boolean myParamWantOnlyCount) {
ourLog.trace("Performing count");
ISearchBuilder sb = newSearchBuilder();
/*
* createCountQuery
* NB: (see createQuery below)
* Because FulltextSearchSvcImpl will (internally)
* mutate the myParams (searchmap),
* (specifically removing the _content and _text filters)
* we will have to clone those parameters here so that
* the "correct" params are used in createQuery below
*/
Long count = sb.createCountQuery(myParams.clone(), mySearch.getUuid(), myRequest, myRequestPartitionId);
ourLog.trace("Got count {}", count);
myTxService
.withRequest(myRequest)
.withRequestPartitionId(myRequestPartitionId)
.execute(() -> {
mySearch.setTotalCount(count.intValue());
if (myParamWantOnlyCount) {
mySearch.setStatus(SearchStatusEnum.FINISHED);
}
doSaveSearch();
});
}
}

View File

@ -554,7 +554,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
});
runInTransaction(() -> {
Search search = mySearchEntityDao.findByUuidAndFetchIncludes(uuid).orElseThrow(() -> new InternalErrorException(""));
assertEquals(50, search.getNumFound());
assertEquals(51, search.getNumFound());
assertEquals(search.getNumFound(), mySearchResultDao.count());
assertEquals(null, search.getTotalCount());
assertEquals(SearchStatusEnum.PASSCMPLET, search.getStatus());
@ -1172,12 +1172,12 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
myCaptureQueriesListener.logSelectQueriesForCurrentThread();
assertEquals(4, myCaptureQueriesListener.countSelectQueries());
// first prefetch is 50+1
assertEquals(51, myCaptureQueriesListener.logInsertQueries());
assertEquals(52, myCaptureQueriesListener.logInsertQueries());
assertEquals(1, myCaptureQueriesListener.countUpdateQueries());
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
assertEquals(4, myCaptureQueriesListener.countSelectQueriesForCurrentThread());
assertEquals(51, myCaptureQueriesListener.countInsertQueriesForCurrentThread());
assertEquals(52, myCaptureQueriesListener.countInsertQueriesForCurrentThread());
assertEquals(1, myCaptureQueriesListener.countUpdateQueriesForCurrentThread());
assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread());

View File

@ -0,0 +1,139 @@
package ca.uhn.fhir.jpa.provider.r4;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test;
import ca.uhn.fhir.parser.StrictErrorHandler;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.util.BundleUtil;
import com.google.common.base.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.List;
import static org.hl7.fhir.instance.model.api.IBaseBundle.LINK_NEXT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@SuppressWarnings("Duplicates")
public class PatientEverythingPaginationR4Test extends BaseResourceProviderR4Test {
private int myOriginalServerDefaultPageSize;
@BeforeEach
public void beforeDisableResultReuse() {
myStorageSettings.setReuseCachedSearchResultsForMillis(null);
}
@Override
@BeforeEach
public void before() throws Exception {
super.before();
myFhirContext.setParserErrorHandler(new StrictErrorHandler());
myStorageSettings.setAllowMultipleDelete(true);
myOriginalServerDefaultPageSize = myServer.getDefaultPageSize();
myServer.setDefaultPageSize(50);
}
@Override
@AfterEach
public void after() throws Exception {
super.after();
myStorageSettings.setReuseCachedSearchResultsForMillis(new JpaStorageSettings().getReuseCachedSearchResultsForMillis());
myServer.setDefaultPageSize(myOriginalServerDefaultPageSize);
}
/**
* Built to reproduce <a href="https://gitlab.com/simpatico.ai/cdr/-/issues/4940">this issue</a>
* Notice that the issue is not gateway related. Is a plain server issue.
*/
@Test
@Disabled // not a valid test
public void testEverythingRespectsServerDefaultPageSize() throws IOException {
// setup
for (int i = 0; i < 25; i++) {
Patient patient = new Patient();
patient.addName().setFamily("lastn").addGiven("name");
myPatientDao.create(patient, new SystemRequestDetails()).getId().toUnqualifiedVersionless();
}
// must be larger than myStorageSettings.getSearchPreFetchThresholds()[0] for issue to show up
int originalPagingProviderPageSize = myPagingProvider.getDefaultPageSize();
myPagingProvider.setDefaultPageSize(50);
// execute
Bundle bundle;
try {
bundle = fetchBundle(myServerBase + "/Patient/$everything?_format=json");
} finally {
// restore
myPagingProvider.setDefaultPageSize(originalPagingProviderPageSize);
}
// validate
List<Patient> bundlePatients = BundleUtil.toListOfResourcesOfType(myFhirContext, bundle, Patient.class);
assertEquals(myServer.getDefaultPageSize(), bundlePatients.size());
}
/**
* Built to reproduce <a href="https://gitlab.com/simpatico.ai/cdr/-/issues/4940">this issue</a>
* Notice that the issue is not gateway related. Is a plain server issue.
*/
@Test
public void testEverythingPaginatesThroughAllPatients() throws IOException {
// setup
int totalPatients = 54;
for (int i = 0; i < totalPatients; i++) {
Patient patient = new Patient();
patient.addName().setFamily("lastn").addGiven("name");
myPatientDao.create(patient, new SystemRequestDetails()).getId().toUnqualifiedVersionless();
}
// test
Bundle bundle = fetchBundle(myServerBase + "/Patient/$everything?_format=json");
// verify
List<Patient> patientsFirstPage = BundleUtil.toListOfResourcesOfType(myFhirContext, bundle, Patient.class);
assertEquals(50, patientsFirstPage.size());
String nextUrl = BundleUtil.getLinkUrlOfType(myFhirContext, bundle, LINK_NEXT);
System.out.println(nextUrl);
assertNotNull(nextUrl);
Bundle page2 = fetchBundle(nextUrl);
assertNotNull(page2);
List<Patient> patientsPage2 = BundleUtil.toListOfResourcesOfType(myFhirContext, page2, Patient.class);
assertEquals(4, patientsPage2.size());
}
private Bundle fetchBundle(String theUrl) throws IOException {
Bundle bundle;
HttpGet get = new HttpGet(theUrl);
CloseableHttpResponse resp = ourHttpClient.execute(get);
try {
assertEquals(EncodingEnum.JSON.getResourceContentTypeNonLegacy(), resp.getFirstHeader(Constants.HEADER_CONTENT_TYPE).getValue().replaceAll(";.*", ""));
bundle = EncodingEnum.JSON.newParser(myFhirContext).parseResource(Bundle.class, IOUtils.toString(resp.getEntity().getContent(), Charsets.UTF_8));
} finally {
IOUtils.closeQuietly(resp);
}
return bundle;
}
}