fixed intermittently failing test

This commit is contained in:
Ken Stevens 2019-08-28 13:11:44 -04:00
parent 259426b0dd
commit eab589bcac
2 changed files with 184 additions and 151 deletions

View File

@ -79,6 +79,9 @@ import javax.annotation.Nullable;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.persistence.EntityManager; import javax.persistence.EntityManager;
import java.io.IOException; import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
@ -241,7 +244,6 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
@Override @Override
public IBundleProvider registerSearch(final IDao theCallingDao, final SearchParameterMap theParams, String theResourceType, CacheControlDirective theCacheControlDirective, RequestDetails theRequestDetails) { public IBundleProvider registerSearch(final IDao theCallingDao, final SearchParameterMap theParams, String theResourceType, CacheControlDirective theCacheControlDirective, RequestDetails theRequestDetails) {
StopWatch w = new StopWatch();
final String searchUuid = UUID.randomUUID().toString(); final String searchUuid = UUID.randomUUID().toString();
ourLog.debug("Registering new search {}", searchUuid); ourLog.debug("Registering new search {}", searchUuid);
@ -251,6 +253,185 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
sb.setType(resourceTypeClass, theResourceType); sb.setType(resourceTypeClass, theResourceType);
sb.setFetchSize(mySyncSize); sb.setFetchSize(mySyncSize);
final Integer loadSynchronousUpTo = getLoadSynchronousUpToOrNull(theCacheControlDirective);
if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null) {
ourLog.debug("Search {} is loading in synchronous mode", searchUuid);
return executeQuery(theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo);
}
/*
* See if there are any cached searches whose results we can return
* instead
*/
boolean useCache = true;
if (theCacheControlDirective != null && theCacheControlDirective.isNoCache() == true) {
useCache = false;
}
final String queryString = theParams.toNormalizedQueryString(myContext);
if (theParams.getEverythingMode() == null) {
if (myDaoConfig.getReuseCachedSearchResultsForMillis() != null && useCache) {
IBundleProvider foundSearchProvider = findCachedQuery(theCallingDao, theParams, theResourceType, theRequestDetails, queryString);
if (foundSearchProvider != null) {
return foundSearchProvider;
}
}
}
return submitSearch(theCallingDao, theParams, theResourceType, theRequestDetails, searchUuid, sb, queryString);
}
@NotNull
private IBundleProvider submitSearch(IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequestDetails, String theSearchUuid, ISearchBuilder theSb, String theQueryString) {
StopWatch w = new StopWatch();
Search search = new Search();
populateSearchEntity(theParams, theResourceType, theSearchUuid, theQueryString, search);
// Interceptor call: STORAGE_PRESEARCH_REGISTERED
HookParams params = new HookParams()
.add(ICachedSearchDetails.class, search)
.add(RequestDetails.class, theRequestDetails)
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
JpaInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PRESEARCH_REGISTERED, params);
SearchTask task = new SearchTask(search, theCallingDao, theParams, theResourceType, theRequestDetails);
myIdToSearchTask.put(search.getUuid(), task);
myExecutor.submit(task);
PersistedJpaSearchFirstPageBundleProvider retVal = new PersistedJpaSearchFirstPageBundleProvider(search, theCallingDao, task, theSb, myManagedTxManager, theRequestDetails);
populateBundleProvider(retVal);
ourLog.debug("Search initial phase completed in {}ms", w.getMillis());
return retVal;
}
@org.jetbrains.annotations.Nullable
private IBundleProvider findCachedQuery(IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequestDetails, String theQueryString) {
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
PersistedJpaBundleProvider foundSearchProvider = txTemplate.execute(t -> {
// Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH
HookParams params = new HookParams()
.add(SearchParameterMap.class, theParams)
.add(RequestDetails.class, theRequestDetails)
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
Object outcome = JpaInterceptorBroadcaster.doCallHooksAndReturnObject(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH, params);
if (Boolean.FALSE.equals(outcome)) {
return null;
}
// Check for a search matching the given hash
Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType);
if (searchToUse == null) {
return null;
}
ourLog.debug("Reusing search {} from cache", searchToUse.getUuid());
// Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED
params = new HookParams()
.add(SearchParameterMap.class, theParams)
.add(RequestDetails.class, theRequestDetails)
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
JpaInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, params);
mySearchResultCacheSvc.updateSearchLastReturned(searchToUse, new Date());
PersistedJpaBundleProvider retVal = new PersistedJpaBundleProvider(theRequestDetails, searchToUse.getUuid(), theCallingDao);
retVal.setCacheHit(true);
populateBundleProvider(retVal);
return retVal;
});
if (foundSearchProvider != null) {
return foundSearchProvider;
}
return null;
}
@Nullable
private Search findSearchToUseOrNull(String theQueryString, String theResourceType) {
Search searchToUse = null;
// createdCutoff is in recent past
final Instant createdCutoff = Instant.now().minus(myDaoConfig.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS);
Collection<Search> candidates = mySearchResultCacheSvc.findCandidatesForReuse(theResourceType, theQueryString, theQueryString.hashCode(), Date.from(createdCutoff));
for (Search nextCandidateSearch : candidates) {
// We should only reuse our search if it was created within the permitted window
// Date.after() is unreliable. Instant.isAfter() always works.
if (theQueryString.equals(nextCandidateSearch.getSearchQueryString()) && nextCandidateSearch.getCreated().toInstant().isAfter(createdCutoff)) {
searchToUse = nextCandidateSearch;
break;
}
}
return searchToUse;
}
private IBundleProvider executeQuery(SearchParameterMap theParams, RequestDetails theRequestDetails, String theSearchUuid, ISearchBuilder theSb, Integer theLoadSynchronousUpTo) {
SearchRuntimeDetails searchRuntimeDetails = new SearchRuntimeDetails(theRequestDetails, theSearchUuid);
searchRuntimeDetails.setLoadSynchronous(true);
// Execute the query and make sure we return distinct results
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
return txTemplate.execute(t -> {
// Load the results synchronously
final List<Long> pids = new ArrayList<>();
try (IResultIterator resultIter = theSb.createQuery(theParams, searchRuntimeDetails, theRequestDetails)) {
while (resultIter.hasNext()) {
pids.add(resultIter.next());
if (theLoadSynchronousUpTo != null && pids.size() >= theLoadSynchronousUpTo) {
break;
}
if (theParams.getLoadSynchronousUpTo() != null && pids.size() >= theParams.getLoadSynchronousUpTo()) {
break;
}
}
} catch (IOException e) {
ourLog.error("IO failure during database access", e);
throw new InternalErrorException(e);
}
JpaPreResourceAccessDetails accessDetails = new JpaPreResourceAccessDetails(pids, () -> theSb);
HookParams params = new HookParams()
.add(IPreResourceAccessDetails.class, accessDetails)
.add(RequestDetails.class, theRequestDetails)
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
JpaInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PREACCESS_RESOURCES, params);
for (int i = pids.size() - 1; i >= 0; i--) {
if (accessDetails.isDontReturnResourceAtIndex(i)) {
pids.remove(i);
}
}
/*
* For synchronous queries, we load all the includes right away
* since we're returning a static bundle with all the results
* pre-loaded. This is ok because syncronous requests are not
* expected to be paged
*
* On the other hand for async queries we load includes/revincludes
* individually for pages as we return them to clients
*/
final Set<Long> includedPids = new HashSet<>();
includedPids.addAll(theSb.loadIncludes(myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated(), "(synchronous)", theRequestDetails));
includedPids.addAll(theSb.loadIncludes(myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated(), "(synchronous)", theRequestDetails));
List<Long> includedPidsList = new ArrayList<>(includedPids);
List<IBaseResource> resources = new ArrayList<>();
theSb.loadResourcesByPid(pids, includedPidsList, resources, false, theRequestDetails);
return new SimpleBundleProvider(resources);
});
}
@org.jetbrains.annotations.Nullable
private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) {
final Integer loadSynchronousUpTo; final Integer loadSynchronousUpTo;
if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) { if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) {
if (theCacheControlDirective.getMaxResults() != null) { if (theCacheControlDirective.getMaxResults() != null) {
@ -264,156 +445,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
} else { } else {
loadSynchronousUpTo = null; loadSynchronousUpTo = null;
} }
return loadSynchronousUpTo;
if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null) {
ourLog.debug("Search {} is loading in synchronous mode", searchUuid);
SearchRuntimeDetails searchRuntimeDetails = new SearchRuntimeDetails(theRequestDetails, searchUuid);
searchRuntimeDetails.setLoadSynchronous(true);
// Execute the query and make sure we return distinct results
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
return txTemplate.execute(t -> {
// Load the results synchronously
final List<Long> pids = new ArrayList<>();
try (IResultIterator resultIter = sb.createQuery(theParams, searchRuntimeDetails, theRequestDetails)) {
while (resultIter.hasNext()) {
pids.add(resultIter.next());
if (loadSynchronousUpTo != null && pids.size() >= loadSynchronousUpTo) {
break;
}
if (theParams.getLoadSynchronousUpTo() != null && pids.size() >= theParams.getLoadSynchronousUpTo()) {
break;
}
}
} catch (IOException e) {
ourLog.error("IO failure during database access", e);
throw new InternalErrorException(e);
}
JpaPreResourceAccessDetails accessDetails = new JpaPreResourceAccessDetails(pids, () -> sb);
HookParams params = new HookParams()
.add(IPreResourceAccessDetails.class, accessDetails)
.add(RequestDetails.class, theRequestDetails)
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
JpaInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PREACCESS_RESOURCES, params);
for (int i = pids.size() - 1; i >= 0; i--) {
if (accessDetails.isDontReturnResourceAtIndex(i)) {
pids.remove(i);
}
}
/*
* For synchronous queries, we load all the includes right away
* since we're returning a static bundle with all the results
* pre-loaded. This is ok because syncronous requests are not
* expected to be paged
*
* On the other hand for async queries we load includes/revincludes
* individually for pages as we return them to clients
*/
final Set<Long> includedPids = new HashSet<>();
includedPids.addAll(sb.loadIncludes(myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated(), "(synchronous)", theRequestDetails));
includedPids.addAll(sb.loadIncludes(myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated(), "(synchronous)", theRequestDetails));
List<Long> includedPidsList = new ArrayList<>(includedPids);
List<IBaseResource> resources = new ArrayList<>();
sb.loadResourcesByPid(pids, includedPidsList, resources, false, theRequestDetails);
return new SimpleBundleProvider(resources);
});
}
/*
* See if there are any cached searches whose results we can return
* instead
*/
boolean useCache = true;
if (theCacheControlDirective != null && theCacheControlDirective.isNoCache() == true) {
useCache = false;
}
final String queryString = theParams.toNormalizedQueryString(myContext);
if (theParams.getEverythingMode() == null) {
if (myDaoConfig.getReuseCachedSearchResultsForMillis() != null && useCache) {
final Date createdCutoff = new Date(System.currentTimeMillis() - myDaoConfig.getReuseCachedSearchResultsForMillis());
final String resourceType = theResourceType;
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
PersistedJpaBundleProvider foundSearchProvider = txTemplate.execute(t -> {
Search searchToUse = null;
// Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH
HookParams params = new HookParams()
.add(SearchParameterMap.class, theParams)
.add(RequestDetails.class, theRequestDetails)
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
Object outcome = JpaInterceptorBroadcaster.doCallHooksAndReturnObject(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH, params);
if (Boolean.FALSE.equals(outcome)) {
return null;
}
// Check for a search matching the given hash
Collection<Search> candidates = mySearchResultCacheSvc.findCandidatesForReuse(resourceType, queryString, queryString.hashCode(), createdCutoff);
for (Search nextCandidateSearch : candidates) {
if (queryString.equals(nextCandidateSearch.getSearchQueryString()) && nextCandidateSearch.getCreated().after(createdCutoff)) {
searchToUse = nextCandidateSearch;
break;
}
}
PersistedJpaBundleProvider retVal = null;
if (searchToUse != null) {
ourLog.debug("Reusing search {} from cache", searchToUse.getUuid());
// Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED
params = new HookParams()
.add(SearchParameterMap.class, theParams)
.add(RequestDetails.class, theRequestDetails)
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
JpaInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, params);
mySearchResultCacheSvc.updateSearchLastReturned(searchToUse, new Date());
retVal = new PersistedJpaBundleProvider(theRequestDetails, searchToUse.getUuid(), theCallingDao);
retVal.setCacheHit(true);
populateBundleProvider(retVal);
}
return retVal;
});
if (foundSearchProvider != null) {
return foundSearchProvider;
}
}
}
Search search = new Search();
populateSearchEntity(theParams, theResourceType, searchUuid, queryString, search);
// Interceptor call: STORAGE_PRESEARCH_REGISTERED
HookParams params = new HookParams()
.add(ICachedSearchDetails.class, search)
.add(RequestDetails.class, theRequestDetails)
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
JpaInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PRESEARCH_REGISTERED, params);
SearchTask task = new SearchTask(search, theCallingDao, theParams, theResourceType, theRequestDetails);
myIdToSearchTask.put(search.getUuid(), task);
myExecutor.submit(task);
PersistedJpaSearchFirstPageBundleProvider retVal = new PersistedJpaSearchFirstPageBundleProvider(search, theCallingDao, task, sb, myManagedTxManager, theRequestDetails);
populateBundleProvider(retVal);
ourLog.debug("Search initial phase completed in {}ms", w.getMillis());
return retVal;
} }
private void callInterceptorStoragePreAccessResources(IInterceptorBroadcaster theInterceptorBroadcaster, RequestDetails theRequestDetails, ISearchBuilder theSb, List<Long> thePids) { private void callInterceptorStoragePreAccessResources(IInterceptorBroadcaster theInterceptorBroadcaster, RequestDetails theRequestDetails, ISearchBuilder theSb, List<Long> thePids) {

View File

@ -31,6 +31,7 @@ import static ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImpl.toPage;
public class DatabaseSearchResultCacheSvcImpl extends BaseSearchResultCacheSvcImpl { public class DatabaseSearchResultCacheSvcImpl extends BaseSearchResultCacheSvcImpl {
/* /*
* Be careful increasing this number! We use the number of params here in a * Be careful increasing this number! We use the number of params here in a
* // FIXME KHS
* DELETE FROM foo WHERE params IN (aaaa) * DELETE FROM foo WHERE params IN (aaaa)
* type query and this can fail if we have 1000s of params * type query and this can fail if we have 1000s of params
*/ */