diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java index a22d854bd89..5ea9c657a18 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java @@ -79,6 +79,9 @@ import javax.annotation.Nullable; import javax.annotation.PostConstruct; import javax.persistence.EntityManager; import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalUnit; import java.util.*; import java.util.concurrent.*; @@ -241,7 +244,6 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { @Override public IBundleProvider registerSearch(final IDao theCallingDao, final SearchParameterMap theParams, String theResourceType, CacheControlDirective theCacheControlDirective, RequestDetails theRequestDetails) { - StopWatch w = new StopWatch(); final String searchUuid = UUID.randomUUID().toString(); ourLog.debug("Registering new search {}", searchUuid); @@ -251,6 +253,185 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { sb.setType(resourceTypeClass, theResourceType); sb.setFetchSize(mySyncSize); + final Integer loadSynchronousUpTo = getLoadSynchronousUpToOrNull(theCacheControlDirective); + + if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null) { + ourLog.debug("Search {} is loading in synchronous mode", searchUuid); + return executeQuery(theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo); + } + + /* + * See if there are any cached searches whose results we can return + * instead + */ + boolean useCache = true; + if (theCacheControlDirective != null && theCacheControlDirective.isNoCache() == true) { + useCache = false; + } + + final String queryString = theParams.toNormalizedQueryString(myContext); + if (theParams.getEverythingMode() == null) { + if (myDaoConfig.getReuseCachedSearchResultsForMillis() != null && useCache) { + IBundleProvider foundSearchProvider = findCachedQuery(theCallingDao, theParams, theResourceType, theRequestDetails, queryString); + if (foundSearchProvider != null) { + return foundSearchProvider; + } + } + } + + return submitSearch(theCallingDao, theParams, theResourceType, theRequestDetails, searchUuid, sb, queryString); + + } + + @NotNull + private IBundleProvider submitSearch(IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequestDetails, String theSearchUuid, ISearchBuilder theSb, String theQueryString) { + StopWatch w = new StopWatch(); + Search search = new Search(); + populateSearchEntity(theParams, theResourceType, theSearchUuid, theQueryString, search); + + // Interceptor call: STORAGE_PRESEARCH_REGISTERED + HookParams params = new HookParams() + .add(ICachedSearchDetails.class, search) + .add(RequestDetails.class, theRequestDetails) + .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); + JpaInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PRESEARCH_REGISTERED, params); + + SearchTask task = new SearchTask(search, theCallingDao, theParams, theResourceType, theRequestDetails); + myIdToSearchTask.put(search.getUuid(), task); + myExecutor.submit(task); + + PersistedJpaSearchFirstPageBundleProvider retVal = new PersistedJpaSearchFirstPageBundleProvider(search, theCallingDao, task, theSb, myManagedTxManager, theRequestDetails); + populateBundleProvider(retVal); + + ourLog.debug("Search initial phase completed in {}ms", w.getMillis()); + return retVal; + } + + @org.jetbrains.annotations.Nullable + private IBundleProvider findCachedQuery(IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequestDetails, String theQueryString) { + TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager); + PersistedJpaBundleProvider foundSearchProvider = txTemplate.execute(t -> { + + // Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH + HookParams params = new HookParams() + .add(SearchParameterMap.class, theParams) + .add(RequestDetails.class, theRequestDetails) + .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); + Object outcome = JpaInterceptorBroadcaster.doCallHooksAndReturnObject(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH, params); + if (Boolean.FALSE.equals(outcome)) { + return null; + } + + // Check for a search matching the given hash + Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType); + if (searchToUse == null) { + return null; + } + + ourLog.debug("Reusing search {} from cache", searchToUse.getUuid()); + // Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED + params = new HookParams() + .add(SearchParameterMap.class, theParams) + .add(RequestDetails.class, theRequestDetails) + .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); + JpaInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, params); + + mySearchResultCacheSvc.updateSearchLastReturned(searchToUse, new Date()); + + PersistedJpaBundleProvider retVal = new PersistedJpaBundleProvider(theRequestDetails, searchToUse.getUuid(), theCallingDao); + retVal.setCacheHit(true); + populateBundleProvider(retVal); + + return retVal; + }); + + if (foundSearchProvider != null) { + return foundSearchProvider; + } + return null; + } + + @Nullable + private Search findSearchToUseOrNull(String theQueryString, String theResourceType) { + Search searchToUse = null; + + // createdCutoff is in recent past + final Instant createdCutoff = Instant.now().minus(myDaoConfig.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS); + Collection candidates = mySearchResultCacheSvc.findCandidatesForReuse(theResourceType, theQueryString, theQueryString.hashCode(), Date.from(createdCutoff)); + + for (Search nextCandidateSearch : candidates) { + // We should only reuse our search if it was created within the permitted window + // Date.after() is unreliable. Instant.isAfter() always works. + if (theQueryString.equals(nextCandidateSearch.getSearchQueryString()) && nextCandidateSearch.getCreated().toInstant().isAfter(createdCutoff)) { + searchToUse = nextCandidateSearch; + break; + } + } + return searchToUse; + } + + private IBundleProvider executeQuery(SearchParameterMap theParams, RequestDetails theRequestDetails, String theSearchUuid, ISearchBuilder theSb, Integer theLoadSynchronousUpTo) { + SearchRuntimeDetails searchRuntimeDetails = new SearchRuntimeDetails(theRequestDetails, theSearchUuid); + searchRuntimeDetails.setLoadSynchronous(true); + + // Execute the query and make sure we return distinct results + TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager); + txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); + return txTemplate.execute(t -> { + + // Load the results synchronously + final List pids = new ArrayList<>(); + + try (IResultIterator resultIter = theSb.createQuery(theParams, searchRuntimeDetails, theRequestDetails)) { + while (resultIter.hasNext()) { + pids.add(resultIter.next()); + if (theLoadSynchronousUpTo != null && pids.size() >= theLoadSynchronousUpTo) { + break; + } + if (theParams.getLoadSynchronousUpTo() != null && pids.size() >= theParams.getLoadSynchronousUpTo()) { + break; + } + } + } catch (IOException e) { + ourLog.error("IO failure during database access", e); + throw new InternalErrorException(e); + } + + JpaPreResourceAccessDetails accessDetails = new JpaPreResourceAccessDetails(pids, () -> theSb); + HookParams params = new HookParams() + .add(IPreResourceAccessDetails.class, accessDetails) + .add(RequestDetails.class, theRequestDetails) + .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); + JpaInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PREACCESS_RESOURCES, params); + + for (int i = pids.size() - 1; i >= 0; i--) { + if (accessDetails.isDontReturnResourceAtIndex(i)) { + pids.remove(i); + } + } + + /* + * For synchronous queries, we load all the includes right away + * since we're returning a static bundle with all the results + * pre-loaded. This is ok because syncronous requests are not + * expected to be paged + * + * On the other hand for async queries we load includes/revincludes + * individually for pages as we return them to clients + */ + final Set includedPids = new HashSet<>(); + includedPids.addAll(theSb.loadIncludes(myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated(), "(synchronous)", theRequestDetails)); + includedPids.addAll(theSb.loadIncludes(myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated(), "(synchronous)", theRequestDetails)); + List includedPidsList = new ArrayList<>(includedPids); + + List resources = new ArrayList<>(); + theSb.loadResourcesByPid(pids, includedPidsList, resources, false, theRequestDetails); + return new SimpleBundleProvider(resources); + }); + } + + @org.jetbrains.annotations.Nullable + private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) { final Integer loadSynchronousUpTo; if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) { if (theCacheControlDirective.getMaxResults() != null) { @@ -264,156 +445,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { } else { loadSynchronousUpTo = null; } - - if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null) { - - ourLog.debug("Search {} is loading in synchronous mode", searchUuid); - SearchRuntimeDetails searchRuntimeDetails = new SearchRuntimeDetails(theRequestDetails, searchUuid); - searchRuntimeDetails.setLoadSynchronous(true); - - // Execute the query and make sure we return distinct results - TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager); - txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); - return txTemplate.execute(t -> { - - // Load the results synchronously - final List pids = new ArrayList<>(); - - try (IResultIterator resultIter = sb.createQuery(theParams, searchRuntimeDetails, theRequestDetails)) { - while (resultIter.hasNext()) { - pids.add(resultIter.next()); - if (loadSynchronousUpTo != null && pids.size() >= loadSynchronousUpTo) { - break; - } - if (theParams.getLoadSynchronousUpTo() != null && pids.size() >= theParams.getLoadSynchronousUpTo()) { - break; - } - } - } catch (IOException e) { - ourLog.error("IO failure during database access", e); - throw new InternalErrorException(e); - } - - JpaPreResourceAccessDetails accessDetails = new JpaPreResourceAccessDetails(pids, () -> sb); - HookParams params = new HookParams() - .add(IPreResourceAccessDetails.class, accessDetails) - .add(RequestDetails.class, theRequestDetails) - .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); - JpaInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PREACCESS_RESOURCES, params); - - for (int i = pids.size() - 1; i >= 0; i--) { - if (accessDetails.isDontReturnResourceAtIndex(i)) { - pids.remove(i); - } - } - - /* - * For synchronous queries, we load all the includes right away - * since we're returning a static bundle with all the results - * pre-loaded. This is ok because syncronous requests are not - * expected to be paged - * - * On the other hand for async queries we load includes/revincludes - * individually for pages as we return them to clients - */ - final Set includedPids = new HashSet<>(); - includedPids.addAll(sb.loadIncludes(myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated(), "(synchronous)", theRequestDetails)); - includedPids.addAll(sb.loadIncludes(myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated(), "(synchronous)", theRequestDetails)); - List includedPidsList = new ArrayList<>(includedPids); - - List resources = new ArrayList<>(); - sb.loadResourcesByPid(pids, includedPidsList, resources, false, theRequestDetails); - return new SimpleBundleProvider(resources); - }); - } - - /* - * See if there are any cached searches whose results we can return - * instead - */ - boolean useCache = true; - if (theCacheControlDirective != null && theCacheControlDirective.isNoCache() == true) { - useCache = false; - } - final String queryString = theParams.toNormalizedQueryString(myContext); - if (theParams.getEverythingMode() == null) { - if (myDaoConfig.getReuseCachedSearchResultsForMillis() != null && useCache) { - - final Date createdCutoff = new Date(System.currentTimeMillis() - myDaoConfig.getReuseCachedSearchResultsForMillis()); - final String resourceType = theResourceType; - - TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager); - PersistedJpaBundleProvider foundSearchProvider = txTemplate.execute(t -> { - Search searchToUse = null; - - // Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH - HookParams params = new HookParams() - .add(SearchParameterMap.class, theParams) - .add(RequestDetails.class, theRequestDetails) - .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); - Object outcome = JpaInterceptorBroadcaster.doCallHooksAndReturnObject(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH, params); - if (Boolean.FALSE.equals(outcome)) { - return null; - } - - // Check for a search matching the given hash - Collection candidates = mySearchResultCacheSvc.findCandidatesForReuse(resourceType, queryString, queryString.hashCode(), createdCutoff); - for (Search nextCandidateSearch : candidates) { - if (queryString.equals(nextCandidateSearch.getSearchQueryString()) && nextCandidateSearch.getCreated().after(createdCutoff)) { - searchToUse = nextCandidateSearch; - break; - } - } - - PersistedJpaBundleProvider retVal = null; - if (searchToUse != null) { - ourLog.debug("Reusing search {} from cache", searchToUse.getUuid()); - - // Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED - params = new HookParams() - .add(SearchParameterMap.class, theParams) - .add(RequestDetails.class, theRequestDetails) - .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); - JpaInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, params); - - mySearchResultCacheSvc.updateSearchLastReturned(searchToUse, new Date()); - - retVal = new PersistedJpaBundleProvider(theRequestDetails, searchToUse.getUuid(), theCallingDao); - retVal.setCacheHit(true); - - populateBundleProvider(retVal); - } - - return retVal; - }); - - if (foundSearchProvider != null) { - return foundSearchProvider; - } - - } - } - - Search search = new Search(); - populateSearchEntity(theParams, theResourceType, searchUuid, queryString, search); - - // Interceptor call: STORAGE_PRESEARCH_REGISTERED - HookParams params = new HookParams() - .add(ICachedSearchDetails.class, search) - .add(RequestDetails.class, theRequestDetails) - .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); - JpaInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PRESEARCH_REGISTERED, params); - - SearchTask task = new SearchTask(search, theCallingDao, theParams, theResourceType, theRequestDetails); - myIdToSearchTask.put(search.getUuid(), task); - myExecutor.submit(task); - - PersistedJpaSearchFirstPageBundleProvider retVal = new PersistedJpaSearchFirstPageBundleProvider(search, theCallingDao, task, sb, myManagedTxManager, theRequestDetails); - populateBundleProvider(retVal); - - ourLog.debug("Search initial phase completed in {}ms", w.getMillis()); - return retVal; - + return loadSynchronousUpTo; } private void callInterceptorStoragePreAccessResources(IInterceptorBroadcaster theInterceptorBroadcaster, RequestDetails theRequestDetails, ISearchBuilder theSb, List thePids) { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/cache/DatabaseSearchResultCacheSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/cache/DatabaseSearchResultCacheSvcImpl.java index ec2c672c1fb..e67f1bf1aac 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/cache/DatabaseSearchResultCacheSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/cache/DatabaseSearchResultCacheSvcImpl.java @@ -31,6 +31,7 @@ import static ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImpl.toPage; public class DatabaseSearchResultCacheSvcImpl extends BaseSearchResultCacheSvcImpl { /* * Be careful increasing this number! We use the number of params here in a + * // FIXME KHS * DELETE FROM foo WHERE params IN (aaaa) * type query and this can fail if we have 1000s of params */