Split search cache api from search result cache api.

This commit is contained in:
Ken Stevens 2019-08-28 16:24:56 -04:00
parent eab589bcac
commit 3e58962ac1
18 changed files with 412 additions and 354 deletions

View File

@ -14,7 +14,9 @@ import ca.uhn.fhir.jpa.provider.TerminologyUploaderProvider;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.IStaleSearchDeletingSvc;
import ca.uhn.fhir.jpa.search.StaleSearchDeletingSvcImpl;
import ca.uhn.fhir.jpa.search.cache.DatabaseSearchCacheSvcImpl;
import ca.uhn.fhir.jpa.search.cache.DatabaseSearchResultCacheSvcImpl;
import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.search.reindex.ResourceReindexingSvcImpl;
@ -145,6 +147,11 @@ public abstract class BaseConfig implements SchedulingConfigurer {
return new BinaryStorageInterceptor();
}
@Bean
public ISearchCacheSvc searchCacheSvc() {
return new DatabaseSearchCacheSvcImpl();
}
@Bean
public ISearchResultCacheSvc searchResultCacheSvc() {
return new DatabaseSearchResultCacheSvcImpl();

View File

@ -18,6 +18,7 @@ import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
import ca.uhn.fhir.jpa.model.search.StorageProcessingMessage;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider;
import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
import ca.uhn.fhir.jpa.searchparam.ResourceMetaParams;
import ca.uhn.fhir.jpa.searchparam.extractor.LogicalReferenceHelper;
@ -153,6 +154,8 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
@Autowired
private PlatformTransactionManager myPlatformTransactionManager;
@Autowired
private ISearchCacheSvc mySearchCacheSvc;
@Autowired
private ISearchResultCacheSvc mySearchResultCacheSvc;
@Autowired
private ISearchParamPresenceSvc mySearchParamPresenceSvc;
@ -467,7 +470,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
}
}
search = mySearchResultCacheSvc.save(search);
search = mySearchCacheSvc.save(search);
return new PersistedJpaBundleProvider(theRequest, search.getUuid(), this);
}
@ -493,7 +496,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
theProvider.setContext(getContext());
theProvider.setEntityManager(myEntityManager);
theProvider.setPlatformTransactionManager(myPlatformTransactionManager);
theProvider.setSearchResultCacheSvc(mySearchResultCacheSvc);
theProvider.setSearchCacheSvc(mySearchCacheSvc);
theProvider.setSearchCoordinatorSvc(mySearchCoordinatorSvc);
theProvider.setInterceptorBroadcaster(myInterceptorBroadcaster);
}

View File

@ -37,10 +37,10 @@ import java.util.Set;
public interface ISearchResultDao extends JpaRepository<SearchResult, Long> {
@Query(value="SELECT r.myResourcePid FROM SearchResult r WHERE r.mySearchPid = :search ORDER BY r.myOrder ASC")
Slice<Long> findWithSearchUuid(@Param("search") Long theSearch, Pageable thePage);
Slice<Long> findWithSearchPid(@Param("search") Long theSearchPid, Pageable thePage);
@Query(value="SELECT r.myResourcePid FROM SearchResult r WHERE r.mySearchPid = :search")
List<Long> findWithSearchUuidOrderIndependent(@Param("search") Long theSearch);
List<Long> findWithSearchPidOrderIndependent(@Param("search") Long theSearchPid);
@Query(value="SELECT r.myId FROM SearchResult r WHERE r.mySearchPid = :search")
Slice<Long> findForSearch(Pageable thePage, @Param("search") Long theSearchPid);

View File

@ -30,6 +30,7 @@ import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.entity.SearchTypeEnum;
import ca.uhn.fhir.jpa.model.entity.BaseHasResource;
import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
import ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster;
import ca.uhn.fhir.model.primitive.InstantDt;
@ -63,7 +64,7 @@ public class PersistedJpaBundleProvider implements IBundleProvider {
private EntityManager myEntityManager;
private PlatformTransactionManager myPlatformTransactionManager;
private ISearchCoordinatorSvc mySearchCoordinatorSvc;
private ISearchResultCacheSvc mySearchResultCacheSvc;
private ISearchCacheSvc mySearchCacheSvc;
private Search mySearchEntity;
private String myUuid;
private boolean myCacheHit;
@ -191,7 +192,7 @@ public class PersistedJpaBundleProvider implements IBundleProvider {
if (mySearchEntity == null) {
ensureDependenciesInjected();
Optional<Search> search = mySearchResultCacheSvc.fetchByUuid(myUuid);
Optional<Search> search = mySearchCacheSvc.fetchByUuid(myUuid);
if (!search.isPresent()) {
return false;
}
@ -338,7 +339,7 @@ public class PersistedJpaBundleProvider implements IBundleProvider {
myInterceptorBroadcaster = theInterceptorBroadcaster;
}
public void setSearchResultCacheSvc(ISearchResultCacheSvc theSearchResultCacheSvc) {
mySearchResultCacheSvc = theSearchResultCacheSvc;
public void setSearchCacheSvc(ISearchCacheSvc theSearchCacheSvc) {
mySearchCacheSvc = theSearchCacheSvc;
}
}

View File

@ -31,6 +31,7 @@ import ca.uhn.fhir.jpa.entity.SearchTypeEnum;
import ca.uhn.fhir.jpa.interceptor.JpaPreResourceAccessDetails;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster;
@ -81,7 +82,6 @@ import javax.persistence.EntityManager;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.*;
import java.util.concurrent.*;
@ -108,6 +108,8 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
@Autowired
private PlatformTransactionManager myManagedTxManager;
@Autowired
private ISearchCacheSvc mySearchCacheSvc;
@Autowired
private ISearchResultCacheSvc mySearchResultCacheSvc;
@Autowired
private DaoRegistry myDaoRegistry;
@ -129,7 +131,8 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
}
@VisibleForTesting
public void setSearchResultCacheSvcForUnitTest(ISearchResultCacheSvc theSearchResultCacheSvc) {
public void setSearchCacheServicesForUnitTest(ISearchCacheSvc theSearchCacheSvc, ISearchResultCacheSvc theSearchResultCacheSvc) {
mySearchCacheSvc = theSearchCacheSvc;
mySearchResultCacheSvc = theSearchResultCacheSvc;
}
@ -183,7 +186,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
}
}
search = mySearchResultCacheSvc
search = mySearchCacheSvc
.fetchByUuid(theUuid)
.orElseThrow(() -> {
ourLog.debug("Client requested unknown paging ID[{}]", theUuid);
@ -209,7 +212,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
// If the search was saved in "pass complete mode" it's probably time to
// start a new pass
if (search.getStatus() == SearchStatusEnum.PASSCMPLET) {
Optional<Search> newSearch = mySearchResultCacheSvc.tryToMarkSearchAsInProgress(search);
Optional<Search> newSearch = mySearchCacheSvc.tryToMarkSearchAsInProgress(search);
if (newSearch.isPresent()) {
search = newSearch.get();
String resourceType = search.getResourceType();
@ -237,7 +240,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
theRetVal.setContext(myContext);
theRetVal.setEntityManager(myEntityManager);
theRetVal.setPlatformTransactionManager(myManagedTxManager);
theRetVal.setSearchResultCacheSvc(mySearchResultCacheSvc);
theRetVal.setSearchCacheSvc(mySearchCacheSvc);
theRetVal.setSearchCoordinatorSvc(this);
theRetVal.setInterceptorBroadcaster(myInterceptorBroadcaster);
}
@ -336,7 +339,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
JpaInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, params);
mySearchResultCacheSvc.updateSearchLastReturned(searchToUse, new Date());
mySearchCacheSvc.updateSearchLastReturned(searchToUse, new Date());
PersistedJpaBundleProvider retVal = new PersistedJpaBundleProvider(theRequestDetails, searchToUse.getUuid(), theCallingDao);
retVal.setCacheHit(true);
@ -357,7 +360,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
// createdCutoff is in recent past
final Instant createdCutoff = Instant.now().minus(myDaoConfig.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS);
Collection<Search> candidates = mySearchResultCacheSvc.findCandidatesForReuse(theResourceType, theQueryString, theQueryString.hashCode(), Date.from(createdCutoff));
Collection<Search> candidates = mySearchCacheSvc.findCandidatesForReuse(theResourceType, theQueryString, theQueryString.hashCode(), Date.from(createdCutoff));
for (Search nextCandidateSearch : candidates) {
// We should only reuse our search if it was created within the permitted window
@ -841,7 +844,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
private void doSaveSearch() {
Search newSearch = mySearchResultCacheSvc.save(mySearch);
Search newSearch = mySearchCacheSvc.save(mySearch);
// mySearchDao.save is not supposed to return null, but in unit tests
// it can if the mock search dao isn't set up to handle that

View File

@ -21,13 +21,14 @@ package ca.uhn.fhir.jpa.search;
*/
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import static ca.uhn.fhir.jpa.search.cache.DatabaseSearchResultCacheSvcImpl.DEFAULT_CUTOFF_SLACK;
import static ca.uhn.fhir.jpa.search.cache.DatabaseSearchCacheSvcImpl.DEFAULT_CUTOFF_SLACK;
/**
* Deletes old searches
@ -42,12 +43,12 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc {
@Autowired
private DaoConfig myDaoConfig;
@Autowired
private ISearchResultCacheSvc mySearchResultCacheSvc;
private ISearchCacheSvc mySearchCacheSvc;
@Override
@Transactional(propagation = Propagation.NEVER)
public void pollForStaleSearchesAndDeleteThem() {
mySearchResultCacheSvc.pollForStaleSearchesAndDeleteThem();
mySearchCacheSvc.pollForStaleSearchesAndDeleteThem();
}
@Scheduled(fixedDelay = DEFAULT_CUTOFF_SLACK)

View File

@ -12,7 +12,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public abstract class BaseSearchResultCacheSvcImpl implements ISearchResultCacheSvc {
public abstract class BaseSearchCacheSvcImpl implements ISearchCacheSvc {
@Autowired
private PlatformTransactionManager myTxManager;
@ -24,7 +24,6 @@ public abstract class BaseSearchResultCacheSvcImpl implements ISearchResultCache
myUnsyncedLastUpdated.put(theSearch.getId(), theDate);
}
@Override
@Scheduled(fixedDelay = 10 * DateUtils.MILLIS_PER_SECOND)
public void flushLastUpdated() {

View File

@ -0,0 +1,242 @@
package ca.uhn.fhir.jpa.search.cache;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.data.ISearchDao;
import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao;
import ca.uhn.fhir.jpa.dao.data.ISearchResultDao;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.entity.SearchInclude;
import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.dstu3.model.InstantType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Slice;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionTemplate;
import javax.transaction.Transactional;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Optional;
public class DatabaseSearchCacheSvcImpl extends BaseSearchCacheSvcImpl {
private static final Logger ourLog = LoggerFactory.getLogger(DatabaseSearchCacheSvcImpl.class);
/*
* Be careful increasing this number! We use the number of params here in a
* // FIXME KHS
* DELETE FROM foo WHERE params IN (aaaa)
* type query and this can fail if we have 1000s of params
*/
public static final int DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT = 500;
public static final int DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS = 20000;
public static final long DEFAULT_CUTOFF_SLACK = 10 * DateUtils.MILLIS_PER_SECOND;
private static int ourMaximumResultsToDeleteInOneStatement = DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT;
private static int ourMaximumResultsToDeleteInOnePass = DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS;
private static Long ourNowForUnitTests;
/*
* We give a bit of extra leeway just to avoid race conditions where a query result
* is being reused (because a new client request came in with the same params) right before
* the result is to be deleted
*/
private long myCutoffSlack = DEFAULT_CUTOFF_SLACK;
@Autowired
private ISearchDao mySearchDao;
@Autowired
private ISearchResultDao mySearchResultDao;
@Autowired
private ISearchIncludeDao mySearchIncludeDao;
@Autowired
private PlatformTransactionManager myTxManager;
@Autowired
private DaoConfig myDaoConfig;
@VisibleForTesting
public void setCutoffSlackForUnitTest(long theCutoffSlack) {
myCutoffSlack = theCutoffSlack;
}
@Transactional(Transactional.TxType.REQUIRED)
@Override
public Search save(Search theSearch) {
Search newSearch;
if (theSearch.getId() == null) {
newSearch = mySearchDao.save(theSearch);
for (SearchInclude next : theSearch.getIncludes()) {
mySearchIncludeDao.save(next);
}
} else {
newSearch = mySearchDao.save(theSearch);
}
return newSearch;
}
@Override
@Transactional(Transactional.TxType.REQUIRED)
public Optional<Search> fetchByUuid(String theUuid) {
Validate.notBlank(theUuid);
return mySearchDao.findByUuidAndFetchIncludes(theUuid);
}
@Override
@Transactional(Transactional.TxType.NEVER)
public Optional<Search> tryToMarkSearchAsInProgress(Search theSearch) {
ourLog.trace("Going to try to change search status from {} to {}", theSearch.getStatus(), SearchStatusEnum.LOADING);
try {
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
txTemplate.afterPropertiesSet();
return txTemplate.execute(t -> {
Search search = mySearchDao.findById(theSearch.getId()).orElse(theSearch);
if (search.getStatus() != SearchStatusEnum.PASSCMPLET) {
throw new IllegalStateException("Can't change to LOADING because state is " + theSearch.getStatus());
}
search.setStatus(SearchStatusEnum.LOADING);
Search newSearch = mySearchDao.save(search);
return Optional.of(newSearch);
});
} catch (Exception e) {
ourLog.warn("Failed to activate search: {}", e.toString());
ourLog.trace("Failed to activate search", e);
return Optional.empty();
}
}
@Override
public Collection<Search> findCandidatesForReuse(String theResourceType, String theQueryString, int theQueryStringHash, Date theCreatedAfter) {
int hashCode = theQueryString.hashCode();
return mySearchDao.find(theResourceType, hashCode, theCreatedAfter);
}
@Override
protected void flushLastUpdated(Long theSearchId, Date theLastUpdated) {
mySearchDao.updateSearchLastReturned(theSearchId, theLastUpdated);
}
@Transactional(Transactional.TxType.NEVER)
@Override
public void pollForStaleSearchesAndDeleteThem() {
if (!myDaoConfig.isExpireSearchResults()) {
return;
}
long cutoffMillis = myDaoConfig.getExpireSearchResultsAfterMillis();
if (myDaoConfig.getReuseCachedSearchResultsForMillis() != null) {
cutoffMillis = Math.max(cutoffMillis, myDaoConfig.getReuseCachedSearchResultsForMillis());
}
final Date cutoff = new Date((now() - cutoffMillis) - myCutoffSlack);
if (ourNowForUnitTests != null) {
ourLog.info("Searching for searches which are before {} - now is {}", new InstantType(cutoff), new InstantType(new Date(now())));
}
ourLog.debug("Searching for searches which are before {}", cutoff);
TransactionTemplate tt = new TransactionTemplate(myTxManager);
final Slice<Long> toDelete = tt.execute(theStatus ->
mySearchDao.findWhereLastReturnedBefore(cutoff, PageRequest.of(0, 2000))
);
for (final Long nextSearchToDelete : toDelete) {
ourLog.debug("Deleting search with PID {}", nextSearchToDelete);
tt.execute(t -> {
mySearchDao.updateDeleted(nextSearchToDelete, true);
return null;
});
tt.execute(t -> {
deleteSearch(nextSearchToDelete);
return null;
});
}
int count = toDelete.getContent().size();
if (count > 0) {
if (ourLog.isDebugEnabled()) {
long total = tt.execute(t -> mySearchDao.count());
ourLog.debug("Deleted {} searches, {} remaining", count, total);
}
}
}
@VisibleForTesting
void setSearchDaoForUnitTest(ISearchDao theSearchDao) {
mySearchDao = theSearchDao;
}
@VisibleForTesting
void setSearchDaoIncludeForUnitTest(ISearchIncludeDao theSearchIncludeDao) {
mySearchIncludeDao = theSearchIncludeDao;
}
private void deleteSearch(final Long theSearchPid) {
mySearchDao.findById(theSearchPid).ifPresent(searchToDelete -> {
mySearchIncludeDao.deleteForSearch(searchToDelete.getId());
/*
* Note, we're only deleting up to 500 results in an individual search here. This
* is to prevent really long running transactions in cases where there are
* huge searches with tons of results in them. By the time we've gotten here
* we have marked the parent Search entity as deleted, so it's not such a
* huge deal to be only partially deleting search results. They'll get deleted
* eventually
*/
int max = ourMaximumResultsToDeleteInOnePass;
Slice<Long> resultPids = mySearchResultDao.findForSearch(PageRequest.of(0, max), searchToDelete.getId());
if (resultPids.hasContent()) {
List<List<Long>> partitions = Lists.partition(resultPids.getContent(), ourMaximumResultsToDeleteInOneStatement);
for (List<Long> nextPartition : partitions) {
mySearchResultDao.deleteByIds(nextPartition);
}
}
// Only delete if we don't have results left in this search
if (resultPids.getNumberOfElements() < max) {
ourLog.debug("Deleting search {}/{} - Created[{}] -- Last returned[{}]", searchToDelete.getId(), searchToDelete.getUuid(), new InstantType(searchToDelete.getCreated()), new InstantType(searchToDelete.getSearchLastReturned()));
mySearchDao.deleteByPid(searchToDelete.getId());
} else {
ourLog.debug("Purged {} search results for deleted search {}/{}", resultPids.getSize(), searchToDelete.getId(), searchToDelete.getUuid());
}
});
}
@VisibleForTesting
public static void setMaximumResultsToDeleteInOnePassForUnitTest(int theMaximumResultsToDeleteInOnePass) {
ourMaximumResultsToDeleteInOnePass = theMaximumResultsToDeleteInOnePass;
}
@VisibleForTesting
public static void setMaximumResultsToDeleteForUnitTest(int theMaximumResultsToDelete) {
ourMaximumResultsToDeleteInOneStatement = theMaximumResultsToDelete;
}
/**
* This is for unit tests only, do not call otherwise
*/
@VisibleForTesting
public static void setNowForUnitTests(Long theNowForUnitTests) {
ourNowForUnitTests = theNowForUnitTests;
}
private static long now() {
if (ourNowForUnitTests != null) {
return ourNowForUnitTests;
}
return System.currentTimeMillis();
}
}

View File

@ -1,91 +1,25 @@
package ca.uhn.fhir.jpa.search.cache;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.data.ISearchDao;
import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao;
import ca.uhn.fhir.jpa.dao.data.ISearchResultDao;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.entity.SearchInclude;
import ca.uhn.fhir.jpa.entity.SearchResult;
import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.dstu3.model.InstantType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionTemplate;
import javax.transaction.Transactional;
import java.util.*;
import static ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImpl.toPage;
public class DatabaseSearchResultCacheSvcImpl extends BaseSearchResultCacheSvcImpl {
/*
* Be careful increasing this number! We use the number of params here in a
* // FIXME KHS
* DELETE FROM foo WHERE params IN (aaaa)
* type query and this can fail if we have 1000s of params
*/
public static final int DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT = 500;
public static final int DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS = 20000;
public static final long DEFAULT_CUTOFF_SLACK = 10 * DateUtils.MILLIS_PER_SECOND;
public class DatabaseSearchResultCacheSvcImpl implements ISearchResultCacheSvc {
private static final Logger ourLog = LoggerFactory.getLogger(DatabaseSearchResultCacheSvcImpl.class);
private static int ourMaximumResultsToDeleteInOneStatement = DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT;
private static int ourMaximumResultsToDeleteInOnePass = DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS;
private static Long ourNowForUnitTests;
/*
* We give a bit of extra leeway just to avoid race conditions where a query result
* is being reused (because a new client request came in with the same params) right before
* the result is to be deleted
*/
private long myCutoffSlack = DEFAULT_CUTOFF_SLACK;
@Autowired
private ISearchDao mySearchDao;
@Autowired
private ISearchIncludeDao mySearchIncludeDao;
@Autowired
private ISearchResultDao mySearchResultDao;
@Autowired
private PlatformTransactionManager myTxManager;
@Autowired
private DaoConfig myDaoConfig;
@VisibleForTesting
public void setCutoffSlackForUnitTest(long theCutoffSlack) {
myCutoffSlack = theCutoffSlack;
}
@Transactional(Transactional.TxType.REQUIRED)
@Override
public Search save(Search theSearch) {
Search newSearch;
if (theSearch.getId() == null) {
newSearch = mySearchDao.save(theSearch);
for (SearchInclude next : theSearch.getIncludes()) {
mySearchIncludeDao.save(next);
}
} else {
newSearch = mySearchDao.save(theSearch);
}
return newSearch;
}
@Override
@Transactional(Transactional.TxType.REQUIRED)
public Optional<Search> fetchByUuid(String theUuid) {
Validate.notBlank(theUuid);
return mySearchDao.findByUuidAndFetchIncludes(theUuid);
}
@Override
@Transactional(Transactional.TxType.REQUIRED)
@ -96,7 +30,7 @@ public class DatabaseSearchResultCacheSvcImpl extends BaseSearchResultCacheSvcIm
}
List<Long> retVal = mySearchResultDao
.findWithSearchUuid(theSearch.getId(), page)
.findWithSearchPid(theSearch.getId(), page)
.getContent();
ourLog.trace("fetchResultPids for range {}-{} returned {} pids", theFrom, theTo, retVal.size());
@ -107,88 +41,11 @@ public class DatabaseSearchResultCacheSvcImpl extends BaseSearchResultCacheSvcIm
@Override
@Transactional(Transactional.TxType.REQUIRED)
public List<Long> fetchAllResultPids(Search theSearch) {
List<Long> retVal = mySearchResultDao.findWithSearchUuidOrderIndependent(theSearch.getId());
List<Long> retVal = mySearchResultDao.findWithSearchPidOrderIndependent(theSearch.getId());
ourLog.trace("fetchAllResultPids returned {} pids", retVal.size());
return retVal;
}
@Override
@Transactional(Transactional.TxType.NEVER)
public Optional<Search> tryToMarkSearchAsInProgress(Search theSearch) {
ourLog.trace("Going to try to change search status from {} to {}", theSearch.getStatus(), SearchStatusEnum.LOADING);
try {
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
txTemplate.afterPropertiesSet();
return txTemplate.execute(t -> {
Search search = mySearchDao.findById(theSearch.getId()).orElse(theSearch);
if (search.getStatus() != SearchStatusEnum.PASSCMPLET) {
throw new IllegalStateException("Can't change to LOADING because state is " + theSearch.getStatus());
}
search.setStatus(SearchStatusEnum.LOADING);
Search newSearch = mySearchDao.save(search);
return Optional.of(newSearch);
});
} catch (Exception e) {
ourLog.warn("Failed to activate search: {}", e.toString());
ourLog.trace("Failed to activate search", e);
return Optional.empty();
}
}
@Override
public Collection<Search> findCandidatesForReuse(String theResourceType, String theQueryString, int theQueryStringHash, Date theCreatedAfter) {
int hashCode = theQueryString.hashCode();
return mySearchDao.find(theResourceType, hashCode, theCreatedAfter);
}
@Transactional(Transactional.TxType.NEVER)
@Override
public void pollForStaleSearchesAndDeleteThem() {
if (!myDaoConfig.isExpireSearchResults()) {
return;
}
long cutoffMillis = myDaoConfig.getExpireSearchResultsAfterMillis();
if (myDaoConfig.getReuseCachedSearchResultsForMillis() != null) {
cutoffMillis = Math.max(cutoffMillis, myDaoConfig.getReuseCachedSearchResultsForMillis());
}
final Date cutoff = new Date((now() - cutoffMillis) - myCutoffSlack);
if (ourNowForUnitTests != null) {
ourLog.info("Searching for searches which are before {} - now is {}", new InstantType(cutoff), new InstantType(new Date(now())));
}
ourLog.debug("Searching for searches which are before {}", cutoff);
TransactionTemplate tt = new TransactionTemplate(myTxManager);
final Slice<Long> toDelete = tt.execute(theStatus ->
mySearchDao.findWhereLastReturnedBefore(cutoff, PageRequest.of(0, 2000))
);
for (final Long nextSearchToDelete : toDelete) {
ourLog.debug("Deleting search with PID {}", nextSearchToDelete);
tt.execute(t -> {
mySearchDao.updateDeleted(nextSearchToDelete, true);
return null;
});
tt.execute(t -> {
deleteSearch(nextSearchToDelete);
return null;
});
}
int count = toDelete.getContent().size();
if (count > 0) {
if (ourLog.isDebugEnabled()) {
long total = tt.execute(t -> mySearchDao.count());
ourLog.debug("Deleted {} searches, {} remaining", count, total);
}
}
}
@Override
@Transactional(Transactional.TxType.REQUIRED)
public void storeResults(Search theSearch, List<Long> thePreviouslyStoredResourcePids, List<Long> theNewResourcePids) {
@ -210,80 +67,10 @@ public class DatabaseSearchResultCacheSvcImpl extends BaseSearchResultCacheSvcIm
mySearchResultDao.saveAll(resultsToSave);
}
@Override
protected void flushLastUpdated(Long theSearchId, Date theLastUpdated) {
mySearchDao.updateSearchLastReturned(theSearchId, theLastUpdated);
}
@VisibleForTesting
void setSearchDaoForUnitTest(ISearchDao theSearchDao) {
mySearchDao = theSearchDao;
}
@VisibleForTesting
void setSearchDaoIncludeForUnitTest(ISearchIncludeDao theSearchIncludeDao) {
mySearchIncludeDao = theSearchIncludeDao;
}
@VisibleForTesting
void setSearchDaoResultForUnitTest(ISearchResultDao theSearchResultDao) {
mySearchResultDao = theSearchResultDao;
}
private void deleteSearch(final Long theSearchPid) {
mySearchDao.findById(theSearchPid).ifPresent(searchToDelete -> {
mySearchIncludeDao.deleteForSearch(searchToDelete.getId());
/*
* Note, we're only deleting up to 500 results in an individual search here. This
* is to prevent really long running transactions in cases where there are
* huge searches with tons of results in them. By the time we've gotten here
* we have marked the parent Search entity as deleted, so it's not such a
* huge deal to be only partially deleting search results. They'll get deleted
* eventually
*/
int max = ourMaximumResultsToDeleteInOnePass;
Slice<Long> resultPids = mySearchResultDao.findForSearch(PageRequest.of(0, max), searchToDelete.getId());
if (resultPids.hasContent()) {
List<List<Long>> partitions = Lists.partition(resultPids.getContent(), ourMaximumResultsToDeleteInOneStatement);
for (List<Long> nextPartition : partitions) {
mySearchResultDao.deleteByIds(nextPartition);
}
}
// Only delete if we don't have results left in this search
if (resultPids.getNumberOfElements() < max) {
ourLog.debug("Deleting search {}/{} - Created[{}] -- Last returned[{}]", searchToDelete.getId(), searchToDelete.getUuid(), new InstantType(searchToDelete.getCreated()), new InstantType(searchToDelete.getSearchLastReturned()));
mySearchDao.deleteByPid(searchToDelete.getId());
} else {
ourLog.debug("Purged {} search results for deleted search {}/{}", resultPids.getSize(), searchToDelete.getId(), searchToDelete.getUuid());
}
});
}
@VisibleForTesting
public static void setMaximumResultsToDeleteInOnePassForUnitTest(int theMaximumResultsToDeleteInOnePass) {
ourMaximumResultsToDeleteInOnePass = theMaximumResultsToDeleteInOnePass;
}
@VisibleForTesting
public static void setMaximumResultsToDeleteForUnitTest(int theMaximumResultsToDelete) {
ourMaximumResultsToDeleteInOneStatement = theMaximumResultsToDelete;
}
/**
* This is for unit tests only, do not call otherwise
*/
@VisibleForTesting
public static void setNowForUnitTests(Long theNowForUnitTests) {
ourNowForUnitTests = theNowForUnitTests;
}
private static long now() {
if (ourNowForUnitTests != null) {
return ourNowForUnitTests;
}
return System.currentTimeMillis();
}
}

View File

@ -0,0 +1,81 @@
package ca.uhn.fhir.jpa.search.cache;
import ca.uhn.fhir.jpa.entity.Search;
import java.util.Collection;
import java.util.Date;
import java.util.Optional;
public interface ISearchCacheSvc {
/**
* Places a new search of some sort in the cache, or updates an existing search. The search passed in is guaranteed to have
* a {@link Search#getUuid() UUID} so that is a good candidate for consistent identification.
*
* @param theSearch The search to store
* @return Returns a copy of the search as it was saved. Callers should use the returned Search object for any further processing.
*/
Search save(Search theSearch);
/**
* Fetch a search using its UUID. The search should be fully loaded when it is returned (i.e. includes are fetched, so that access to its
* fields will not cause database errors if the current tranaction scope ends.
*
* @param theUuid The search UUID
* @return The search if it exists
*/
Optional<Search> fetchByUuid(String theUuid);
/**
* TODO: this is perhaps an inappropriate responsibility for this service
*
* <p>
* This method marks a search as in progress, but should only allow exactly one call to do so across the cluster. This
* is done so that if two client threads request the next page at the exact same time (which is unlikely but not
* impossible) only one will actually proceed to load the next results and the other will just wait for them
* to arrive.
*
* @param theSearch The search to mark
* @return This method should return an empty optional if the search was not marked (meaning that another thread
* succeeded in marking it). If the search doesn't exist or some other error occurred, an exception will be thrown
* instead of {@link Optional#empty()}
*/
Optional<Search> tryToMarkSearchAsInProgress(Search theSearch);
/**
* Look for any existing searches matching the given resource type and query string.
* <p>
* This method is allowed to perofrm a "best effort" return, so it can return searches that don't match the query string exactly, or
* which have a created timestamp before <code>theCreatedAfter</code> date. The caller is responsible for removing
* any inappropriate Searches and picking the most relevant one.
* </p>
*
* @param theResourceType The resource type of the search. Results MUST match this type
* @param theQueryString The query string. Results SHOULD match this type
* @param theQueryStringHash The query string hash. Results SHOULD match this type
* @param theCreatedAfter Results SHOULD not include any searches created before this cutoff timestamp
* @return A collection of candidate searches
*/
Collection<Search> findCandidatesForReuse(String theResourceType, String theQueryString, int theQueryStringHash, Date theCreatedAfter);
/**
* Mark a search as having been "last used" at the given time. This method may (and probably should) be implemented
* to work asynchronously in order to avoid hammering the database if the search gets reused many times.
*
* @param theSearch The search
* @param theDate The "last returned" timestamp
*/
void updateSearchLastReturned(Search theSearch, Date theDate);
/**
* This is mostly public for unit tests
*/
void flushLastUpdated();
/**
* This method will be called periodically to delete stale searches. Implementations are not required to do anything
* if they have some other mechanism for expiring stale results other than manually looking for them
* and deleting them.
*/
void pollForStaleSearchesAndDeleteThem();
}

View File

@ -8,78 +8,6 @@ import java.util.List;
import java.util.Optional;
public interface ISearchResultCacheSvc {
/**
* Places a new search of some sort in the cache, or updates an existing search. The search passed in is guaranteed to have
* a {@link Search#getUuid() UUID} so that is a good candidate for consistent identification.
*
* @param theSearch The search to store
* @return Returns a copy of the search as it was saved. Callers should use the returned Search object for any further processing.
*/
Search save(Search theSearch);
/**
* Fetch a search using its UUID. The search should be fully loaded when it is returned (i.e. includes are fetched, so that access to its
* fields will not cause database errors if the current tranaction scope ends.
*
* @param theUuid The search UUID
* @return The search if it exists
*/
Optional<Search> fetchByUuid(String theUuid);
/**
* TODO: this is perhaps an inappropriate responsibility for this service
*
* <p>
* This method marks a search as in progress, but should only allow exactly one call to do so across the cluster. This
* is done so that if two client threads request the next page at the exact same time (which is unlikely but not
* impossible) only one will actually proceed to load the next results and the other will just wait for them
* to arrive.
*
* @param theSearch The search to mark
* @return This method should return an empty optional if the search was not marked (meaning that another thread
* succeeded in marking it). If the search doesn't exist or some other error occurred, an exception will be thrown
* instead of {@link Optional#empty()}
*/
Optional<Search> tryToMarkSearchAsInProgress(Search theSearch);
/**
* Look for any existing searches matching the given resource type and query string.
* <p>
* This method is allowed to perofrm a "best effort" return, so it can return searches that don't match the query string exactly, or
* which have a created timestamp before <code>theCreatedAfter</code> date. The caller is responsible for removing
* any inappropriate Searches and picking the most relevant one.
* </p>
*
* @param theResourceType The resource type of the search. Results MUST match this type
* @param theQueryString The query string. Results SHOULD match this type
* @param theQueryStringHash The query string hash. Results SHOULD match this type
* @param theCreatedAfter Results SHOULD not include any searches created before this cutoff timestamp
* @return A collection of candidate searches
*/
Collection<Search> findCandidatesForReuse(String theResourceType, String theQueryString, int theQueryStringHash, Date theCreatedAfter);
/**
* Mark a search as having been "last used" at the given time. This method may (and probably should) be implemented
* to work asynchronously in order to avoid hammering the database if the search gets reused many times.
*
* @param theSearch The search
* @param theDate The "last returned" timestamp
*/
void updateSearchLastReturned(Search theSearch, Date theDate);
/**
* This is mostly public for unit tests
*/
void flushLastUpdated();
/**
* This method will be called periodically to delete stale searches. Implementations are not required to do anything
* if they have some other mechanism for expiring stale results other than manually looking for them
* and deleting them.
*/
void pollForStaleSearchesAndDeleteThem();
/**
* @param theSearch The search - This method is not required to persist any chances to the Search object, it is only provided here for identification
* @param thePreviouslyStoredResourcePids A list of resource PIDs that have previously been saved to this search

View File

@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.provider.SystemProviderDstu2Test;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider;
import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
@ -94,6 +95,8 @@ public abstract class BaseJpaTest {
protected CircularQueueCaptureQueriesListener myCaptureQueriesListener;
@Autowired
protected ISearchResultCacheSvc mySearchResultCacheSvc;
@Autowired
protected ISearchCacheSvc mySearchCacheSvc;
@After
public void afterPerformCleanup() {

View File

@ -4,6 +4,7 @@ import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.data.ISearchDao;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
import ca.uhn.fhir.jpa.search.cache.DatabaseSearchCacheSvcImpl;
import ca.uhn.fhir.jpa.search.cache.DatabaseSearchResultCacheSvcImpl;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.TestUtil;
@ -32,7 +33,7 @@ import javax.annotation.Nullable;
import java.util.Date;
import java.util.concurrent.atomic.AtomicLong;
import static ca.uhn.fhir.jpa.search.cache.DatabaseSearchResultCacheSvcImpl.DEFAULT_CUTOFF_SLACK;
import static ca.uhn.fhir.jpa.search.cache.DatabaseSearchCacheSvcImpl.DEFAULT_CUTOFF_SLACK;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.*;
@ -45,14 +46,14 @@ public class FhirResourceDaoR4SearchPageExpiryTest extends BaseJpaR4Test {
@After()
public void after() {
DatabaseSearchResultCacheSvcImpl staleSearchDeletingSvc = AopTestUtils.getTargetObject(mySearchResultCacheSvc);
DatabaseSearchCacheSvcImpl staleSearchDeletingSvc = AopTestUtils.getTargetObject(mySearchCacheSvc);
staleSearchDeletingSvc.setCutoffSlackForUnitTest(DEFAULT_CUTOFF_SLACK);
DatabaseSearchResultCacheSvcImpl.setNowForUnitTests(null);
DatabaseSearchCacheSvcImpl.setNowForUnitTests(null);
}
@Before
public void before() {
DatabaseSearchResultCacheSvcImpl staleSearchDeletingSvc = AopTestUtils.getTargetObject(mySearchResultCacheSvc);
DatabaseSearchCacheSvcImpl staleSearchDeletingSvc = AopTestUtils.getTargetObject(mySearchCacheSvc);
staleSearchDeletingSvc.setCutoffSlackForUnitTest(0);
myDaoConfig.setCountSearchResultsUpTo(new DaoConfig().getCountSearchResultsUpTo());
}
@ -83,7 +84,7 @@ public class FhirResourceDaoR4SearchPageExpiryTest extends BaseJpaR4Test {
myDaoConfig.setExpireSearchResultsAfterMillis(1000L);
myDaoConfig.setReuseCachedSearchResultsForMillis(500L);
long start = System.currentTimeMillis();
DatabaseSearchResultCacheSvcImpl.setNowForUnitTests(start);
DatabaseSearchCacheSvcImpl.setNowForUnitTests(start);
final String searchUuid1;
{
@ -123,7 +124,7 @@ public class FhirResourceDaoR4SearchPageExpiryTest extends BaseJpaR4Test {
// Search just got used so it shouldn't be deleted
DatabaseSearchResultCacheSvcImpl.setNowForUnitTests(start + 500);
DatabaseSearchCacheSvcImpl.setNowForUnitTests(start + 500);
final AtomicLong search3timestamp = new AtomicLong();
newTxTemplate().execute(new TransactionCallbackWithoutResult() {
@Override
@ -136,7 +137,7 @@ public class FhirResourceDaoR4SearchPageExpiryTest extends BaseJpaR4Test {
}
});
DatabaseSearchResultCacheSvcImpl.setNowForUnitTests(search3timestamp.get() + 800);
DatabaseSearchCacheSvcImpl.setNowForUnitTests(search3timestamp.get() + 800);
myStaleSearchDeletingSvc.pollForStaleSearchesAndDeleteThem();
newTxTemplate().execute(new TransactionCallbackWithoutResult() {
@Override
@ -151,7 +152,7 @@ public class FhirResourceDaoR4SearchPageExpiryTest extends BaseJpaR4Test {
}
});
DatabaseSearchResultCacheSvcImpl.setNowForUnitTests(search3timestamp.get() + 1100);
DatabaseSearchCacheSvcImpl.setNowForUnitTests(search3timestamp.get() + 1100);
myStaleSearchDeletingSvc.pollForStaleSearchesAndDeleteThem();
newTxTemplate().execute(new TransactionCallbackWithoutResult() {
@ -162,7 +163,7 @@ public class FhirResourceDaoR4SearchPageExpiryTest extends BaseJpaR4Test {
}
});
DatabaseSearchResultCacheSvcImpl.setNowForUnitTests(search3timestamp.get() + 2100);
DatabaseSearchCacheSvcImpl.setNowForUnitTests(search3timestamp.get() + 2100);
myStaleSearchDeletingSvc.pollForStaleSearchesAndDeleteThem();
newTxTemplate().execute(new TransactionCallbackWithoutResult() {
@ -217,7 +218,7 @@ public class FhirResourceDaoR4SearchPageExpiryTest extends BaseJpaR4Test {
myDaoConfig.setExpireSearchResultsAfterMillis(500);
myDaoConfig.setReuseCachedSearchResultsForMillis(500L);
DatabaseSearchResultCacheSvcImpl.setNowForUnitTests(start.get() + 499);
DatabaseSearchCacheSvcImpl.setNowForUnitTests(start.get() + 499);
myStaleSearchDeletingSvc.pollForStaleSearchesAndDeleteThem();
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
@ -226,7 +227,7 @@ public class FhirResourceDaoR4SearchPageExpiryTest extends BaseJpaR4Test {
}
});
DatabaseSearchResultCacheSvcImpl.setNowForUnitTests(start.get() + 600);
DatabaseSearchCacheSvcImpl.setNowForUnitTests(start.get() + 600);
myStaleSearchDeletingSvc.pollForStaleSearchesAndDeleteThem();
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
@ -308,7 +309,7 @@ public class FhirResourceDaoR4SearchPageExpiryTest extends BaseJpaR4Test {
}
});
DatabaseSearchResultCacheSvcImpl.setNowForUnitTests(search3timestamp.get() + 800);
DatabaseSearchCacheSvcImpl.setNowForUnitTests(search3timestamp.get() + 800);
myStaleSearchDeletingSvc.pollForStaleSearchesAndDeleteThem();
newTxTemplate().execute(new TransactionCallbackWithoutResult() {
@ -324,7 +325,7 @@ public class FhirResourceDaoR4SearchPageExpiryTest extends BaseJpaR4Test {
}
});
DatabaseSearchResultCacheSvcImpl.setNowForUnitTests(search3timestamp.get() + 1100);
DatabaseSearchCacheSvcImpl.setNowForUnitTests(search3timestamp.get() + 1100);
myStaleSearchDeletingSvc.pollForStaleSearchesAndDeleteThem();
newTxTemplate().execute(new TransactionCallbackWithoutResult() {
@ -373,7 +374,7 @@ public class FhirResourceDaoR4SearchPageExpiryTest extends BaseJpaR4Test {
myDaoConfig.setExpireSearchResults(false);
DatabaseSearchResultCacheSvcImpl.setNowForUnitTests(System.currentTimeMillis() + DateUtils.MILLIS_PER_DAY);
DatabaseSearchCacheSvcImpl.setNowForUnitTests(System.currentTimeMillis() + DateUtils.MILLIS_PER_DAY);
myStaleSearchDeletingSvc.pollForStaleSearchesAndDeleteThem();
newTxTemplate().execute(new TransactionCallbackWithoutResult() {

View File

@ -3862,7 +3862,7 @@ public class FhirResourceDaoR4Test extends BaseJpaR4Test {
search.setStatus(SearchStatusEnum.FAILED);
search.setFailureCode(500);
search.setFailureMessage("FOO");
mySearchResultCacheSvc.save(search);
mySearchCacheSvc.save(search);
});
IBundleProvider results = myEncounterDao.search(map);

View File

@ -2973,7 +2973,7 @@ public class ResourceProviderDstu3Test extends BaseResourceProviderDstu3Test {
.count(5)
.returnBundle(Bundle.class)
.execute();
mySearchResultCacheSvc.flushLastUpdated();
mySearchCacheSvc.flushLastUpdated();
final String uuid1 = toSearchUuidFromLinkNext(result1);
Search search1 = newTxTemplate().execute(new TransactionCallback<Search>() {
@ -2991,7 +2991,7 @@ public class ResourceProviderDstu3Test extends BaseResourceProviderDstu3Test {
.count(5)
.returnBundle(Bundle.class)
.execute();
mySearchResultCacheSvc.flushLastUpdated();
mySearchCacheSvc.flushLastUpdated();
final String uuid2 = toSearchUuidFromLinkNext(result2);
Search search2 = newTxTemplate().execute(new TransactionCallback<Search>() {
@ -3037,7 +3037,7 @@ public class ResourceProviderDstu3Test extends BaseResourceProviderDstu3Test {
.forResource("Organization")
.returnBundle(Bundle.class)
.execute();
mySearchResultCacheSvc.flushLastUpdated();
mySearchCacheSvc.flushLastUpdated();
final String uuid1 = toSearchUuidFromLinkNext(result1);
Search search1 = newTxTemplate().execute(theStatus -> mySearchEntityDao.findByUuidAndFetchIncludes(uuid1).orElseThrow(() -> new InternalErrorException("")));
@ -3048,7 +3048,7 @@ public class ResourceProviderDstu3Test extends BaseResourceProviderDstu3Test {
.forResource("Organization")
.returnBundle(Bundle.class)
.execute();
mySearchResultCacheSvc.flushLastUpdated();
mySearchCacheSvc.flushLastUpdated();
final String uuid2 = toSearchUuidFromLinkNext(result2);
Search search2 = newTxTemplate().execute(theStatus -> mySearchEntityDao.findByUuidAndFetchIncludes(uuid2).orElseThrow(() -> new InternalErrorException("")));

View File

@ -3938,7 +3938,7 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
.count(5)
.returnBundle(Bundle.class)
.execute();
mySearchResultCacheSvc.flushLastUpdated();
mySearchCacheSvc.flushLastUpdated();
final String uuid1 = toSearchUuidFromLinkNext(result1);
Search search1 = newTxTemplate().execute(theStatus -> mySearchEntityDao.findByUuidAndFetchIncludes(uuid1).orElseThrow(()->new InternalErrorException("")));
@ -3951,7 +3951,7 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
.count(5)
.returnBundle(Bundle.class)
.execute();
mySearchResultCacheSvc.flushLastUpdated();
mySearchCacheSvc.flushLastUpdated();
final String uuid2 = toSearchUuidFromLinkNext(result2);
Search search2 = newTxTemplate().execute(theStatus -> mySearchEntityDao.findByUuidAndFetchIncludes(uuid2).orElseThrow(()->new InternalErrorException("")));
@ -3968,7 +3968,7 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
.count(5)
.returnBundle(Bundle.class)
.execute();
mySearchResultCacheSvc.flushLastUpdated();
mySearchCacheSvc.flushLastUpdated();
String uuid3 = toSearchUuidFromLinkNext(result3);
@ -3993,7 +3993,7 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
.forResource("Organization")
.returnBundle(Bundle.class)
.execute();
mySearchResultCacheSvc.flushLastUpdated();
mySearchCacheSvc.flushLastUpdated();
final String uuid1 = toSearchUuidFromLinkNext(result1);
Search search1 = newTxTemplate().execute(theStatus -> mySearchEntityDao.findByUuidAndFetchIncludes(uuid1).orElseThrow(()->new InternalErrorException("")));
@ -4006,7 +4006,7 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
.forResource("Organization")
.returnBundle(Bundle.class)
.execute();
mySearchResultCacheSvc.flushLastUpdated();
mySearchCacheSvc.flushLastUpdated();
final String uuid2 = toSearchUuidFromLinkNext(result2);
Search search2 = newTxTemplate().execute(theStatus -> mySearchEntityDao.findByUuidAndFetchIncludes(uuid2).orElseThrow(()->new InternalErrorException("")));

View File

@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.entity.SearchResult;
import ca.uhn.fhir.jpa.entity.SearchTypeEnum;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
import ca.uhn.fhir.jpa.search.cache.DatabaseSearchCacheSvcImpl;
import ca.uhn.fhir.jpa.search.cache.DatabaseSearchResultCacheSvcImpl;
import ca.uhn.fhir.rest.gclient.IClientExecutable;
import ca.uhn.fhir.rest.gclient.IQuery;
@ -45,17 +46,17 @@ public class StaleSearchDeletingSvcR4Test extends BaseResourceProviderR4Test {
@After()
public void after() throws Exception {
super.after();
DatabaseSearchResultCacheSvcImpl staleSearchDeletingSvc = AopTestUtils.getTargetObject(mySearchResultCacheSvc);
staleSearchDeletingSvc.setCutoffSlackForUnitTest(DatabaseSearchResultCacheSvcImpl.DEFAULT_CUTOFF_SLACK);
DatabaseSearchResultCacheSvcImpl.setMaximumResultsToDeleteForUnitTest(DatabaseSearchResultCacheSvcImpl.DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT);
DatabaseSearchResultCacheSvcImpl.setMaximumResultsToDeleteInOnePassForUnitTest(DatabaseSearchResultCacheSvcImpl.DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS);
DatabaseSearchCacheSvcImpl staleSearchDeletingSvc = AopTestUtils.getTargetObject(mySearchCacheSvc);
staleSearchDeletingSvc.setCutoffSlackForUnitTest(DatabaseSearchCacheSvcImpl.DEFAULT_CUTOFF_SLACK);
DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteForUnitTest(DatabaseSearchCacheSvcImpl.DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT);
DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteInOnePassForUnitTest(DatabaseSearchCacheSvcImpl.DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS);
}
@Override
@Before
public void before() throws Exception {
super.before();
DatabaseSearchResultCacheSvcImpl staleSearchDeletingSvc = AopTestUtils.getTargetObject(mySearchResultCacheSvc);
DatabaseSearchCacheSvcImpl staleSearchDeletingSvc = AopTestUtils.getTargetObject(mySearchCacheSvc);
staleSearchDeletingSvc.setCutoffSlackForUnitTest(0);
}
@ -109,8 +110,8 @@ public class StaleSearchDeletingSvcR4Test extends BaseResourceProviderR4Test {
@Test
public void testDeleteVeryLargeSearch() {
DatabaseSearchResultCacheSvcImpl.setMaximumResultsToDeleteForUnitTest(10);
DatabaseSearchResultCacheSvcImpl.setMaximumResultsToDeleteInOnePassForUnitTest(10);
DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteForUnitTest(10);
DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteInOnePassForUnitTest(10);
runInTransaction(() -> {
Search search = new Search();
@ -152,7 +153,7 @@ public class StaleSearchDeletingSvcR4Test extends BaseResourceProviderR4Test {
@Test
public void testDeleteVerySmallSearch() {
DatabaseSearchResultCacheSvcImpl.setMaximumResultsToDeleteForUnitTest(10);
DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteForUnitTest(10);
runInTransaction(() -> {
Search search = new Search();
@ -162,8 +163,7 @@ public class StaleSearchDeletingSvcR4Test extends BaseResourceProviderR4Test {
search.setSearchType(SearchTypeEnum.SEARCH);
search.setResourceType("Patient");
search.setSearchLastReturned(DateUtils.addDays(new Date(), -10000));
search = mySearchEntityDao.save(search);
mySearchEntityDao.save(search);
});
// It should take one pass to delete the search fully

View File

@ -6,6 +6,7 @@ import ca.uhn.fhir.jpa.dao.*;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.entity.SearchTypeEnum;
import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.BaseIterator;
@ -16,7 +17,6 @@ import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.TestUtil;
import com.google.common.collect.Lists;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.junit.After;
import org.junit.AfterClass;
@ -58,6 +58,8 @@ public class SearchCoordinatorSvcImplTest {
@Mock
private ISearchBuilder mySearchBuilder;
@Mock
private ISearchCacheSvc mySearchCacheSvc;
@Mock
private ISearchResultCacheSvc mySearchResultCacheSvc;
private SearchCoordinatorSvcImpl mySvc;
@Mock
@ -82,7 +84,7 @@ public class SearchCoordinatorSvcImplTest {
mySvc.setEntityManagerForUnitTest(myEntityManager);
mySvc.setTransactionManagerForUnitTest(myTxManager);
mySvc.setContextForUnitTest(ourCtx);
mySvc.setSearchResultCacheSvcForUnitTest(mySearchResultCacheSvc);
mySvc.setSearchCacheServicesForUnitTest(mySearchCacheSvc, mySearchResultCacheSvc);
mySvc.setDaoRegistryForUnitTest(myDaoRegistry);
mySvc.setInterceptorBroadcasterForUnitTest(myInterceptorBroadcaster);
@ -97,7 +99,7 @@ public class SearchCoordinatorSvcImplTest {
PersistedJpaBundleProvider provider = (PersistedJpaBundleProvider) theInvocation.getArguments()[0];
provider.setSearchCoordinatorSvc(mySvc);
provider.setPlatformTransactionManager(myTxManager);
provider.setSearchResultCacheSvc(mySearchResultCacheSvc);
provider.setSearchCacheSvc(mySearchCacheSvc);
provider.setEntityManager(myEntityManager);
provider.setContext(ourCtx);
provider.setInterceptorBroadcaster(myInterceptorBroadcaster);
@ -180,7 +182,7 @@ public class SearchCoordinatorSvcImplTest {
when(mySearchResultCacheSvc.fetchAllResultPids(any())).thenReturn(allResults);
when(mySearchResultCacheSvc.tryToMarkSearchAsInProgress(any())).thenAnswer(t->{
when(mySearchCacheSvc.tryToMarkSearchAsInProgress(any())).thenAnswer(t->{
Search search = t.getArgument(0, Search.class);
assertEquals(SearchStatusEnum.PASSCMPLET, search.getStatus());
search.setStatus(SearchStatusEnum.LOADING);
@ -193,12 +195,12 @@ public class SearchCoordinatorSvcImplTest {
List<IBaseResource> resources;
when(mySearchResultCacheSvc.save(any())).thenAnswer(t -> {
when(mySearchCacheSvc.save(any())).thenAnswer(t -> {
Search search = (Search) t.getArguments()[0];
myCurrentSearch = search;
return search;
});
when(mySearchResultCacheSvc.fetchByUuid(any())).thenAnswer(t -> {
when(mySearchCacheSvc.fetchByUuid(any())).thenAnswer(t -> {
return Optional.ofNullable(myCurrentSearch);
});
IFhirResourceDao dao = myCallingDao;
@ -210,7 +212,7 @@ public class SearchCoordinatorSvcImplTest {
assertEquals("799", resources.get(789).getIdElement().getValueAsString());
ArgumentCaptor<Search> searchCaptor = ArgumentCaptor.forClass(Search.class);
verify(mySearchResultCacheSvc, atLeastOnce()).save(searchCaptor.capture());
verify(mySearchCacheSvc, atLeastOnce()).save(searchCaptor.capture());
assertEquals(790, allResults.size());
assertEquals(10, allResults.get(0).longValue());
@ -255,7 +257,7 @@ public class SearchCoordinatorSvcImplTest {
List<Long> pids = createPidSequence(10, 800);
IResultIterator iter = new SlowIterator(pids.iterator(), 2);
when(mySearchBuilder.createQuery(same(params), any(), any())).thenReturn(iter);
when(mySearchResultCacheSvc.save(any())).thenAnswer(t -> t.getArguments()[0]);
when(mySearchCacheSvc.save(any())).thenAnswer(t -> t.getArguments()[0]);
doAnswer(loadPids()).when(mySearchBuilder).loadResourcesByPid(any(Collection.class), any(Collection.class), any(List.class), anyBoolean(), any());
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null);
@ -263,7 +265,7 @@ public class SearchCoordinatorSvcImplTest {
assertEquals(null, result.size());
ArgumentCaptor<Search> searchCaptor = ArgumentCaptor.forClass(Search.class);
verify(mySearchResultCacheSvc, atLeast(1)).save(searchCaptor.capture());
verify(mySearchCacheSvc, atLeast(1)).save(searchCaptor.capture());
Search search = searchCaptor.getValue();
assertEquals(SearchTypeEnum.SEARCH, search.getSearchType());
@ -276,7 +278,7 @@ public class SearchCoordinatorSvcImplTest {
assertEquals("10", resources.get(0).getIdElement().getValueAsString());
assertEquals("19", resources.get(9).getIdElement().getValueAsString());
when(mySearchResultCacheSvc.fetchByUuid(eq(result.getUuid()))).thenReturn(Optional.of(search));
when(mySearchCacheSvc.fetchByUuid(eq(result.getUuid()))).thenReturn(Optional.of(search));
/*
* Now call from a new bundle provider. This simulates a separate HTTP
@ -328,7 +330,7 @@ public class SearchCoordinatorSvcImplTest {
search.setSearchType(SearchTypeEnum.SEARCH);
search.setResourceType("Patient");
when(mySearchResultCacheSvc.fetchByUuid(eq(uuid))).thenReturn(Optional.of(search));
when(mySearchCacheSvc.fetchByUuid(eq(uuid))).thenReturn(Optional.of(search));
doAnswer(loadPids()).when(mySearchBuilder).loadResourcesByPid(any(Collection.class), any(Collection.class), any(List.class), anyBoolean(), any());
PersistedJpaBundleProvider provider;