Add warm cache module

This commit is contained in:
James Agnew 2018-10-14 09:32:07 -04:00
parent 51a69f0dc9
commit 8b46257423
20 changed files with 1291 additions and 908 deletions

View File

@ -25,6 +25,8 @@ import ca.uhn.fhir.i18n.HapiLocalizer;
import ca.uhn.fhir.jpa.dao.DaoRegistry;
import ca.uhn.fhir.jpa.provider.SubscriptionRetriggeringProvider;
import ca.uhn.fhir.jpa.search.*;
import ca.uhn.fhir.jpa.search.warm.CacheWarmingSvcImpl;
import ca.uhn.fhir.jpa.search.warm.ICacheWarmingSvc;
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
import ca.uhn.fhir.jpa.sp.SearchParamPresenceSvcImpl;
import ca.uhn.fhir.jpa.subscription.email.SubscriptionEmailInterceptor;
@ -111,6 +113,11 @@ public abstract class BaseConfig implements SchedulingConfigurer {
public abstract FhirContext fhirContext();
@Bean
public ICacheWarmingSvc cacheWarmingSvc() {
return new CacheWarmingSvcImpl();
}
@Bean
public HibernateExceptionTranslator hibernateExceptionTranslator() {
return new HibernateExceptionTranslator();

View File

@ -511,9 +511,13 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
@Override
@Transactional(propagation = Propagation.NEVER)
public ExpungeOutcome expunge(IIdType theId, ExpungeOptions theExpungeOptions) {
BaseHasResource entity = readEntity(theId);
TransactionTemplate txTemplate = new TransactionTemplate(myPlatformTransactionManager);
BaseHasResource entity = txTemplate.execute(t->readEntity(theId));
if (theId.hasVersionIdPart()) {
BaseHasResource currentVersion = readEntity(theId.toVersionless());
BaseHasResource currentVersion;
currentVersion = txTemplate.execute(t->readEntity(theId.toVersionless()));
if (entity.getVersion() == currentVersion.getVersion()) {
throw new PreconditionFailedException("Can not perform version-specific expunge of resource " + theId.toUnqualified().getValue() + " as this is the current version");
}

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.dao;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -24,9 +24,9 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.jpa.search.JpaRuntimeSearchParam;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.util.SearchParameterUtil;
import ca.uhn.fhir.util.StopWatch;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
@ -37,6 +37,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.PostConstruct;
import java.util.*;
@ -58,7 +60,8 @@ public abstract class BaseSearchParamRegistry<SP extends IBaseResource> implemen
private DaoConfig myDaoConfig;
private volatile long myLastRefresh;
private ApplicationContext myApplicationContext;
@Autowired
private PlatformTransactionManager myTxManager;
public BaseSearchParamRegistry() {
super();
}
@ -197,7 +200,7 @@ public abstract class BaseSearchParamRegistry<SP extends IBaseResource> implemen
});
for (String nextBase : next.getBase()) {
if (!activeParamNamesToUniqueSearchParams.containsKey(nextBase)) {
activeParamNamesToUniqueSearchParams.put(nextBase, new HashMap<Set<String>, List<JpaRuntimeSearchParam>>());
activeParamNamesToUniqueSearchParams.put(nextBase, new HashMap<>());
}
if (!activeParamNamesToUniqueSearchParams.get(nextBase).containsKey(paramNames)) {
activeParamNamesToUniqueSearchParams.get(nextBase).put(paramNames, new ArrayList<JpaRuntimeSearchParam>());
@ -242,86 +245,94 @@ public abstract class BaseSearchParamRegistry<SP extends IBaseResource> implemen
long refreshInterval = 60 * DateUtils.MILLIS_PER_MINUTE;
if (System.currentTimeMillis() - refreshInterval > myLastRefresh) {
synchronized (this) {
if (System.currentTimeMillis() - refreshInterval > myLastRefresh) {
StopWatch sw = new StopWatch();
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.execute(t->{
doRefresh(refreshInterval);
return null;
});
}
}
}
Map<String, Map<String, RuntimeSearchParam>> searchParams = new HashMap<>();
for (Map.Entry<String, Map<String, RuntimeSearchParam>> nextBuiltInEntry : getBuiltInSearchParams().entrySet()) {
for (RuntimeSearchParam nextParam : nextBuiltInEntry.getValue().values()) {
String nextResourceName = nextBuiltInEntry.getKey();
getSearchParamMap(searchParams, nextResourceName).put(nextParam.getName(), nextParam);
}
}
private void doRefresh(long theRefreshInterval) {
if (System.currentTimeMillis() - theRefreshInterval > myLastRefresh) {
StopWatch sw = new StopWatch();
SearchParameterMap params = new SearchParameterMap();
params.setLoadSynchronousUpTo(MAX_MANAGED_PARAM_COUNT);
IBundleProvider allSearchParamsBp = getSearchParameterDao().search(params);
int size = allSearchParamsBp.size();
// Just in case..
if (size >= MAX_MANAGED_PARAM_COUNT) {
ourLog.warn("Unable to support >" + MAX_MANAGED_PARAM_COUNT + " search params!");
size = MAX_MANAGED_PARAM_COUNT;
}
List<IBaseResource> allSearchParams = allSearchParamsBp.getResources(0, size);
for (IBaseResource nextResource : allSearchParams) {
SP nextSp = (SP) nextResource;
if (nextSp == null) {
continue;
}
RuntimeSearchParam runtimeSp = toRuntimeSp(nextSp);
if (runtimeSp == null) {
continue;
}
for (String nextBaseName : SearchParameterUtil.getBaseAsStrings(myCtx, nextSp)) {
if (isBlank(nextBaseName)) {
continue;
}
Map<String, RuntimeSearchParam> searchParamMap = getSearchParamMap(searchParams, nextBaseName);
String name = runtimeSp.getName();
if (myDaoConfig.isDefaultSearchParamsCanBeOverridden() || !searchParamMap.containsKey(name)) {
searchParamMap.put(name, runtimeSp);
}
}
}
Map<String, Map<String, RuntimeSearchParam>> activeSearchParams = new HashMap<>();
for (Map.Entry<String, Map<String, RuntimeSearchParam>> nextEntry : searchParams.entrySet()) {
for (RuntimeSearchParam nextSp : nextEntry.getValue().values()) {
String nextName = nextSp.getName();
if (nextSp.getStatus() != RuntimeSearchParam.RuntimeSearchParamStatusEnum.ACTIVE) {
nextSp = null;
}
if (!activeSearchParams.containsKey(nextEntry.getKey())) {
activeSearchParams.put(nextEntry.getKey(), new HashMap<>());
}
if (activeSearchParams.containsKey(nextEntry.getKey())) {
ourLog.debug("Replacing existing/built in search param {}:{} with new one", nextEntry.getKey(), nextName);
}
if (nextSp != null) {
activeSearchParams.get(nextEntry.getKey()).put(nextName, nextSp);
} else {
activeSearchParams.get(nextEntry.getKey()).remove(nextName);
}
}
}
myActiveSearchParams = activeSearchParams;
populateActiveSearchParams(activeSearchParams);
myLastRefresh = System.currentTimeMillis();
ourLog.info("Refreshed search parameter cache in {}ms", sw.getMillis());
Map<String, Map<String, RuntimeSearchParam>> searchParams = new HashMap<>();
for (Map.Entry<String, Map<String, RuntimeSearchParam>> nextBuiltInEntry : getBuiltInSearchParams().entrySet()) {
for (RuntimeSearchParam nextParam : nextBuiltInEntry.getValue().values()) {
String nextResourceName = nextBuiltInEntry.getKey();
getSearchParamMap(searchParams, nextResourceName).put(nextParam.getName(), nextParam);
}
}
SearchParameterMap params = new SearchParameterMap();
params.setLoadSynchronousUpTo(MAX_MANAGED_PARAM_COUNT);
IBundleProvider allSearchParamsBp = getSearchParameterDao().search(params);
int size = allSearchParamsBp.size();
// Just in case..
if (size >= MAX_MANAGED_PARAM_COUNT) {
ourLog.warn("Unable to support >" + MAX_MANAGED_PARAM_COUNT + " search params!");
size = MAX_MANAGED_PARAM_COUNT;
}
List<IBaseResource> allSearchParams = allSearchParamsBp.getResources(0, size);
for (IBaseResource nextResource : allSearchParams) {
SP nextSp = (SP) nextResource;
if (nextSp == null) {
continue;
}
RuntimeSearchParam runtimeSp = toRuntimeSp(nextSp);
if (runtimeSp == null) {
continue;
}
for (String nextBaseName : SearchParameterUtil.getBaseAsStrings(myCtx, nextSp)) {
if (isBlank(nextBaseName)) {
continue;
}
Map<String, RuntimeSearchParam> searchParamMap = getSearchParamMap(searchParams, nextBaseName);
String name = runtimeSp.getName();
if (myDaoConfig.isDefaultSearchParamsCanBeOverridden() || !searchParamMap.containsKey(name)) {
searchParamMap.put(name, runtimeSp);
}
}
}
Map<String, Map<String, RuntimeSearchParam>> activeSearchParams = new HashMap<>();
for (Map.Entry<String, Map<String, RuntimeSearchParam>> nextEntry : searchParams.entrySet()) {
for (RuntimeSearchParam nextSp : nextEntry.getValue().values()) {
String nextName = nextSp.getName();
if (nextSp.getStatus() != RuntimeSearchParam.RuntimeSearchParamStatusEnum.ACTIVE) {
nextSp = null;
}
if (!activeSearchParams.containsKey(nextEntry.getKey())) {
activeSearchParams.put(nextEntry.getKey(), new HashMap<>());
}
if (activeSearchParams.containsKey(nextEntry.getKey())) {
ourLog.debug("Replacing existing/built in search param {}:{} with new one", nextEntry.getKey(), nextName);
}
if (nextSp != null) {
activeSearchParams.get(nextEntry.getKey()).put(nextName, nextSp);
} else {
activeSearchParams.get(nextEntry.getKey()).remove(nextName);
}
}
}
myActiveSearchParams = activeSearchParams;
populateActiveSearchParams(activeSearchParams);
myLastRefresh = System.currentTimeMillis();
ourLog.info("Refreshed search parameter cache in {}ms", sw.getMillis());
}
}

View File

@ -1,6 +1,7 @@
package ca.uhn.fhir.jpa.dao;
import ca.uhn.fhir.jpa.entity.ResourceEncodingEnum;
import ca.uhn.fhir.jpa.search.warm.WarmCacheEntry;
import ca.uhn.fhir.jpa.util.JpaConstants;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor;
import com.google.common.collect.Sets;
@ -22,9 +23,9 @@ import java.util.*;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -153,6 +154,7 @@ public class DaoConfig {
private Set<String> myBundleTypesAllowedForStorage;
private boolean myValidateSearchParameterExpressionsOnSave = true;
private List<Integer> myPreFetchThresholds = Arrays.asList(500, 2000, -1);
private List<WarmCacheEntry> myWarmCacheEntries = new ArrayList<>();
/**
* Constructor
@ -171,6 +173,22 @@ public class DaoConfig {
}
}
/**
* Returns a set of searches that should be kept "warm", meaning that
* searches will periodically be performed in the background to
* keep results ready for this search
*/
public List<WarmCacheEntry> getWarmCacheEntries() {
if (myWarmCacheEntries == null) {
myWarmCacheEntries = new ArrayList<>();
}
return myWarmCacheEntries;
}
public void setWarmCacheEntries(List<WarmCacheEntry> theWarmCacheEntries) {
myWarmCacheEntries = theWarmCacheEntries;
}
/**
* If set to <code>true</code> (default is false), the reindexing of search parameters
* using a query on the HFJ_RESOURCE.SP_INDEX_STATUS column will be disabled completely.

View File

@ -1359,6 +1359,8 @@ public class SearchBuilder implements ISearchBuilder {
}
private TypedQuery<Long> createQuery(SortSpec sort, Integer theMaximumResults, boolean theCount) {
myPredicates = new ArrayList<>();
CriteriaQuery<Long> outerQuery;
/*
* Sort
@ -1369,30 +1371,48 @@ public class SearchBuilder implements ISearchBuilder {
if (sort != null) {
assert !theCount;
// outerQuery = myBuilder.createQuery(Long.class);
// Root<ResourceTable> outerQueryFrom = outerQuery.from(ResourceTable.class);
//
// List<Order> orders = Lists.newArrayList();
// List<Predicate> predicates = Lists.newArrayList();
//
// createSort(myBuilder, outerQueryFrom, sort, orders, predicates);
// if (orders.size() > 0) {
// outerQuery.orderBy(orders);
// }
//
// Subquery<Long> subQ = outerQuery.subquery(Long.class);
// Root<ResourceTable> subQfrom = subQ.from(ResourceTable.class);
//
// myResourceTableQuery = subQ;
// myResourceTableRoot = subQfrom;
//
// Expression<Long> selectExpr = subQfrom.get("myId").as(Long.class);
// subQ.select(selectExpr);
//
// predicates.add(0, myBuilder.in(outerQueryFrom.get("myId").as(Long.class)).value(subQ));
//
// outerQuery.multiselect(outerQueryFrom.get("myId").as(Long.class));
// outerQuery.where(predicates.toArray(new Predicate[0]));
outerQuery = myBuilder.createQuery(Long.class);
Root<ResourceTable> outerQueryFrom = outerQuery.from(ResourceTable.class);
myResourceTableQuery = outerQuery;
myResourceTableRoot = myResourceTableQuery.from(ResourceTable.class);
if (theCount) {
outerQuery.multiselect(myBuilder.countDistinct(myResourceTableRoot));
} else {
outerQuery.multiselect(myResourceTableRoot.get("myId").as(Long.class));
}
List<Order> orders = Lists.newArrayList();
List<Predicate> predicates = Lists.newArrayList();
List<Predicate> predicates = myPredicates; // Lists.newArrayList();
createSort(myBuilder, outerQueryFrom, sort, orders, predicates);
createSort(myBuilder, myResourceTableRoot, sort, orders, predicates);
if (orders.size() > 0) {
outerQuery.orderBy(orders);
}
Subquery<Long> subQ = outerQuery.subquery(Long.class);
Root<ResourceTable> subQfrom = subQ.from(ResourceTable.class);
myResourceTableQuery = subQ;
myResourceTableRoot = subQfrom;
Expression<Long> selectExpr = subQfrom.get("myId").as(Long.class);
subQ.select(selectExpr);
predicates.add(0, myBuilder.in(outerQueryFrom.get("myId").as(Long.class)).value(subQ));
outerQuery.multiselect(outerQueryFrom.get("myId").as(Long.class));
outerQuery.where(predicates.toArray(new Predicate[0]));
} else {
@ -1407,8 +1427,6 @@ public class SearchBuilder implements ISearchBuilder {
}
myPredicates = new ArrayList<>();
if (myParams.getEverythingMode() != null) {
Join<ResourceTable, ResourceLink> join = myResourceTableRoot.join("myResourceLinks", JoinType.LEFT);
@ -1590,7 +1608,8 @@ public class SearchBuilder implements ISearchBuilder {
if (param.getParamType() == RestSearchParameterTypeEnum.REFERENCE) {
thePredicates.add(join.get("mySourcePath").as(String.class).in(param.getPathsSplit()));
} else {
Predicate joinParam1 = theBuilder.equal(join.get("myParamName"), theSort.getParamName());
Long hashIdentity = BaseResourceIndexedSearchParam.calculateHashIdentity(myResourceName, theSort.getParamName());
Predicate joinParam1 = theBuilder.equal(join.get("myHashIdentity"), hashIdentity);
thePredicates.add(joinParam1);
}
} else {
@ -1940,11 +1959,11 @@ public class SearchBuilder implements ISearchBuilder {
return;
}
if (theParamName.equals(BaseResource.SP_RES_ID)) {
if (theParamName.equals(IAnyResource.SP_RES_ID)) {
addPredicateResourceId(theAndOrParams);
} else if (theParamName.equals(BaseResource.SP_RES_LANGUAGE)) {
} else if (theParamName.equals(IAnyResource.SP_RES_LANGUAGE)) {
addPredicateLanguage(theAndOrParams);

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.entity;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -44,6 +44,10 @@ import static org.apache.commons.lang3.StringUtils.left;
* do not reuse these names:
* IDX_SP_STRING
*/
// This one us used only for sorting
@Index(name = "IDX_SP_STRING_HASH_IDENT", columnList = "HASH_IDENTITY"),
@Index(name = "IDX_SP_STRING_HASH_NRM", columnList = "HASH_NORM_PREFIX,SP_VALUE_NORMALIZED"),
@Index(name = "IDX_SP_STRING_HASH_EXCT", columnList = "HASH_EXACT"),
@ -130,6 +134,11 @@ public class ResourceIndexedSearchParamString extends BaseResourceIndexedSearchP
*/
@Column(name = "HASH_NORM_PREFIX", nullable = true)
private Long myHashNormalizedPrefix;
/**
* @since 3.6.0 - At some point this should be made not-null
*/
@Column(name = "HASH_IDENTITY", nullable = true)
private Long myHashIdentity;
/**
* @since 3.4.0 - At some point this should be made not-null
*/
@ -137,12 +146,10 @@ public class ResourceIndexedSearchParamString extends BaseResourceIndexedSearchP
private Long myHashExact;
@Transient
private transient DaoConfig myDaoConfig;
public ResourceIndexedSearchParamString() {
super();
}
public ResourceIndexedSearchParamString(DaoConfig theDaoConfig, String theName, String theValueNormalized, String theValueExact) {
setDaoConfig(theDaoConfig);
setParamName(theName);
@ -150,6 +157,10 @@ public class ResourceIndexedSearchParamString extends BaseResourceIndexedSearchP
setValueExact(theValueExact);
}
public void setHashIdentity(Long theHashIdentity) {
myHashIdentity = theHashIdentity;
}
@PrePersist
public void calculateHashes() {
if (myHashNormalizedPrefix == null && myDaoConfig != null) {
@ -159,6 +170,7 @@ public class ResourceIndexedSearchParamString extends BaseResourceIndexedSearchP
String valueExact = getValueExact();
setHashNormalizedPrefix(calculateHashNormalized(myDaoConfig, resourceType, paramName, valueNormalized));
setHashExact(calculateHashExact(resourceType, paramName, valueExact));
setHashIdentity(calculateHashIdentity(resourceType, paramName));
}
}

View File

@ -5,8 +5,6 @@ import ca.uhn.fhir.model.api.Include;
import ca.uhn.fhir.rest.param.DateRangeParam;
import org.apache.commons.lang3.SerializationUtils;
import org.hibernate.annotations.OptimisticLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.persistence.*;
import javax.validation.constraints.NotNull;
@ -150,13 +148,6 @@ public class Search implements Serializable {
myFailureMessage = left(theFailureMessage, FAILURE_MESSAGE_LENGTH);
}
// FIXME: remove this
private static final Logger ourLog = LoggerFactory.getLogger(Search.class);
@PrePersist
public void prePersist() {
ourLog.info("*** SAVING WITH VERSION {} TOTAL {}", myVersion, myTotalCount);
}
public Long getId() {
return myId;
}

View File

@ -166,8 +166,7 @@ public class PersistedJpaBundleProvider implements IBundleProvider {
return false;
}
// FIXME: remove
ourLog.info("** Retrieved search with version {} and total {}", mySearchEntity.getVersion(), mySearchEntity.getTotalCount());
ourLog.trace("Retrieved search with version {} and total {}", mySearchEntity.getVersion(), mySearchEntity.getTotalCount());
// Load the includes now so that they are available outside of this transaction
mySearchEntity.getIncludes().size();

View File

@ -46,9 +46,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.*;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
@ -363,7 +361,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
PersistedJpaSearchFirstPageBundleProvider retVal = new PersistedJpaSearchFirstPageBundleProvider(search, theCallingDao, task, sb, myManagedTxManager);
populateBundleProvider(retVal);
ourLog.info("Search initial phase completed in {}ms", w.getMillis());
ourLog.debug("Search initial phase completed in {}ms", w.getMillis());
return retVal;
}
@ -675,25 +673,21 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
private void doSaveSearch() {
// FIXME: remove
Integer totalCount = mySearch.getTotalCount();
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
ourLog.info("Have flushed save with total {}", totalCount);
}
});
ourLog.info("** Saving search version {} count {}", mySearch.getVersion(), totalCount);
Search newSearch;
if (mySearch.getId() == null) {
mySearch = mySearchDao.saveAndFlush(mySearch);
newSearch = mySearchDao.save(mySearch);
for (SearchInclude next : mySearch.getIncludes()) {
mySearchIncludeDao.save(next);
}
} else {
mySearch = mySearchDao.saveAndFlush(mySearch);
newSearch = mySearchDao.save(mySearch);
}
// mySearchDao.save is not supposed to return null, but in unit tests
// it can if the mock search dao isn't set up to handle that
if (newSearch != null) {
mySearch = newSearch;
}
}
/**
@ -712,14 +706,13 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
boolean wantCount = myParams.getSummaryMode().contains(SummaryEnum.COUNT);
boolean wantOnlyCount = wantCount && myParams.getSummaryMode().size() == 1;
if (wantCount) {
ourLog.info("** performing count");
ourLog.trace("Performing count");
ISearchBuilder sb = newSearchBuilder();
Iterator<Long> countIterator = sb.createCountQuery(myParams, mySearch.getUuid());
Long count = countIterator.next();
ourLog.info("** got count {}", count);
ourLog.trace("Got count {}", count);
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
// txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
@ -728,6 +721,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
mySearch.setStatus(SearchStatusEnum.FINISHED);
}
doSaveSearch();
mySearchDao.flush();
}
});
if (wantOnlyCount) {
@ -735,7 +729,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
}
}
ourLog.info("** done count");
ourLog.trace("Done count");
ISearchBuilder sb = newSearchBuilder();
/*
@ -957,13 +951,33 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
int pageIndex = theFromIndex / pageSize;
Pageable page = new PageRequest(pageIndex, pageSize) {
Pageable page = new AbstractPageRequest(pageIndex, pageSize) {
private static final long serialVersionUID = 1L;
@Override
public long getOffset() {
return theFromIndex;
}
@Override
public Sort getSort() {
return Sort.unsorted();
}
@Override
public Pageable next() {
return null;
}
@Override
public Pageable previous() {
return null;
}
@Override
public Pageable first() {
return null;
}
};
return page;

View File

@ -0,0 +1,96 @@
package ca.uhn.fhir.jpa.search.warm;
import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.jpa.dao.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
public class CacheWarmingSvcImpl implements ICacheWarmingSvc {
@Autowired
private DaoConfig myDaoConfig;
private Map<WarmCacheEntry, Long> myCacheEntryToNextRefresh = new LinkedHashMap<>();
@Autowired
private FhirContext myCtx;
@Autowired
private DaoRegistry myDaoRegistry;
@Override
@Scheduled(fixedDelay = 1000)
public synchronized void performWarmingPass() {
for (WarmCacheEntry nextCacheEntry : new ArrayList<>(myCacheEntryToNextRefresh.keySet())) {
long nextRefresh = myCacheEntryToNextRefresh.get(nextCacheEntry);
if (nextRefresh < System.currentTimeMillis()) {
// Perform the search
refreshNow(nextCacheEntry);
// Set the next time to warm this search
nextRefresh = nextCacheEntry.getPeriodMillis() + System.currentTimeMillis();
myCacheEntryToNextRefresh.put(nextCacheEntry, nextRefresh);
}
}
}
private void refreshNow(WarmCacheEntry theCacheEntry) {
String nextUrl = theCacheEntry.getUrl();
RuntimeResourceDefinition resourceDef = parseWarmUrlResourceType(nextUrl);
IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(resourceDef.getName());
String queryPart = parseWarmUrlParamPart(nextUrl);
SearchParameterMap responseCriteriaUrl = BaseHapiFhirDao.translateMatchUrl(callingDao, myCtx, queryPart, resourceDef);
callingDao.search(responseCriteriaUrl);
}
private String parseWarmUrlParamPart(String theNextUrl) {
int paramIndex = theNextUrl.indexOf('?');
if (paramIndex == -1) {
throw new ConfigurationException("Invalid warm cache URL (must have ? character)");
}
return theNextUrl.substring(paramIndex);
}
private RuntimeResourceDefinition parseWarmUrlResourceType(String theNextUrl) {
int paramIndex = theNextUrl.indexOf('?');
String resourceName = theNextUrl.substring(0, paramIndex);
if (resourceName.contains("/")) {
resourceName = resourceName.substring(resourceName.lastIndexOf('/') + 1);
}
RuntimeResourceDefinition resourceDef = myCtx.getResourceDefinition(resourceName);
return resourceDef;
}
@PostConstruct
public void start() {
initCacheMap();
}
public synchronized void initCacheMap() {
myCacheEntryToNextRefresh.clear();
List<WarmCacheEntry> warmCacheEntries = myDaoConfig.getWarmCacheEntries();
for (WarmCacheEntry next : warmCacheEntries) {
// Validate
parseWarmUrlParamPart(next.getUrl());
parseWarmUrlResourceType(next.getUrl());
myCacheEntryToNextRefresh.put(next, 0L);
}
}
}

View File

@ -0,0 +1,8 @@
package ca.uhn.fhir.jpa.search.warm;
import org.springframework.scheduling.annotation.Scheduled;
public interface ICacheWarmingSvc {
@Scheduled(fixedDelay = 1000)
void performWarmingPass();
}

View File

@ -0,0 +1,61 @@
package ca.uhn.fhir.jpa.search.warm;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
/**
* Denotes a search that should be performed in the background
* periodically in order to keep a fresh copy in the query cache.
* This improves performance for searches by keeping a copy
* loaded in the background.
*/
public class WarmCacheEntry {
private long myPeriodMillis;
private String myUrl;
@Override
public boolean equals(Object theO) {
if (this == theO) {
return true;
}
if (theO == null || getClass() != theO.getClass()) {
return false;
}
WarmCacheEntry that = (WarmCacheEntry) theO;
return new EqualsBuilder()
.append(myPeriodMillis, that.myPeriodMillis)
.append(myUrl, that.myUrl)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(myPeriodMillis)
.append(myUrl)
.toHashCode();
}
public long getPeriodMillis() {
return myPeriodMillis;
}
public WarmCacheEntry setPeriodMillis(long thePeriodMillis) {
myPeriodMillis = thePeriodMillis;
return this;
}
public String getUrl() {
return myUrl;
}
public WarmCacheEntry setUrl(String theUrl) {
myUrl = theUrl;
return this;
}
}

View File

@ -6,9 +6,6 @@ import ca.uhn.fhir.validation.ResultSeverityEnum;
import net.ttddyy.dsproxy.listener.ThreadQueryCountHolder;
import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder;
import org.apache.commons.dbcp2.BasicDataSource;
import org.hibernate.cfg.AvailableSettings;
import org.hibernate.query.criteria.LiteralHandlingMode;
import org.hibernate.resource.jdbc.spi.PhysicalConnectionHandlingMode;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

View File

@ -11,6 +11,7 @@ import ca.uhn.fhir.jpa.provider.r4.JpaSystemProviderR4;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.search.IStaleSearchDeletingSvc;
import ca.uhn.fhir.jpa.search.warm.ICacheWarmingSvc;
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
import ca.uhn.fhir.jpa.term.BaseHapiTerminologySvcImpl;
import ca.uhn.fhir.jpa.term.IHapiTerminologySvc;
@ -51,7 +52,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import static org.junit.Assert.*;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
@RunWith(SpringJUnit4ClassRunner.class)
@ -260,6 +261,8 @@ public abstract class BaseJpaR4Test extends BaseJpaTest {
@Autowired
protected ITermConceptMapGroupElementTargetDao myTermConceptMapGroupElementTargetDao;
@Autowired
protected ICacheWarmingSvc myCacheWarmingSvc;
@Autowired
private JpaValidationSupportChainR4 myJpaValidationSupportChainR4;
@After()

View File

@ -0,0 +1,112 @@
package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider;
import ca.uhn.fhir.jpa.search.warm.CacheWarmingSvcImpl;
import ca.uhn.fhir.jpa.search.warm.WarmCacheEntry;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.util.TestUtil;
import org.hl7.fhir.r4.model.Patient;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class FhirResourceDaoR4CacheWarmingTest extends BaseJpaR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(FhirResourceDaoR4CacheWarmingTest.class);
@After
public void afterResetDao() {
myDaoConfig.setResourceServerIdStrategy(new DaoConfig().getResourceServerIdStrategy());
myDaoConfig.setWarmCacheEntries(new ArrayList<>());
CacheWarmingSvcImpl cacheWarmingSvc = (CacheWarmingSvcImpl) myCacheWarmingSvc;
cacheWarmingSvc.initCacheMap();
}
@Test
public void testInvalidCacheEntries() {
CacheWarmingSvcImpl cacheWarmingSvc = (CacheWarmingSvcImpl) myCacheWarmingSvc;
myDaoConfig.setWarmCacheEntries(new ArrayList<>());
myDaoConfig.getWarmCacheEntries().add(
new WarmCacheEntry()
.setPeriodMillis(10)
.setUrl("BadResource?name=smith")
);
try {
cacheWarmingSvc.initCacheMap();
fail();
} catch (DataFormatException e) {
assertEquals("Unknown resource name \"BadResource\" (this name is not known in FHIR version \"R4\")", e.getMessage());
}
myDaoConfig.setWarmCacheEntries(new ArrayList<>());
myDaoConfig.getWarmCacheEntries().add(
new WarmCacheEntry()
.setPeriodMillis(10)
.setUrl("foo/Patient")
);
try {
cacheWarmingSvc.initCacheMap();
fail();
} catch (ConfigurationException e) {
assertEquals("Invalid warm cache URL (must have ? character)", e.getMessage());
}
}
@Test
public void testKeepCacheWarm() throws InterruptedException {
myDaoConfig.setWarmCacheEntries(new ArrayList<>());
myDaoConfig.getWarmCacheEntries().add(
new WarmCacheEntry()
.setPeriodMillis(10)
.setUrl("Patient?name=smith")
);
CacheWarmingSvcImpl cacheWarmingSvc = (CacheWarmingSvcImpl) myCacheWarmingSvc;
cacheWarmingSvc.initCacheMap();
Patient p1 = new Patient();
p1.setId("p1");
p1.setActive(true);
myPatientDao.update(p1);
Patient p2 = new Patient();
p2.setId("p2");
p2.setActive(true);
p2.addName().setFamily("Smith");
myPatientDao.update(p2);
Thread.sleep(2000);
SearchParameterMap params = new SearchParameterMap();
params.add("name", new StringParam("smith"));
IBundleProvider result = myPatientDao.search(params);
assertEquals(PersistedJpaBundleProvider.class, result.getClass());
PersistedJpaBundleProvider resultCasted = (PersistedJpaBundleProvider) result;
assertTrue(resultCasted.isCacheHit());
}
@AfterClass
public static void afterClassClearContext() {
TestUtil.clearAllStaticFieldsForUnitTest();
}
}

View File

@ -70,6 +70,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
params.setSummaryMode(Sets.newHashSet(SummaryEnum.COUNT));
IBundleProvider results = myPatientDao.search(params);
String uuid = results.getUuid();
ourLog.info("** Search returned UUID: {}", uuid);
assertEquals(200, results.size().intValue());
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 10, true);
assertThat(ids, empty());
@ -98,6 +99,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
params.setSummaryMode(Sets.newHashSet(SummaryEnum.COUNT));
IBundleProvider results = myPatientDao.search(params);
String uuid = results.getUuid();
ourLog.info("** Search returned UUID: {}", uuid);
assertEquals(201, results.size().intValue());
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 10, true);
assertThat(ids, empty());
@ -109,6 +111,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
params.setSummaryMode(Sets.newHashSet(SummaryEnum.COUNT, SummaryEnum.DATA));
results = myPatientDao.search(params);
uuid = results.getUuid();
ourLog.info("** Search returned UUID: {}", uuid);
assertEquals(201, results.size().intValue());
ids = toUnqualifiedVersionlessIdValues(results, 0, 10, true);
assertThat(ids, hasSize(10));
@ -120,6 +123,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
params.setSummaryMode(Sets.newHashSet(SummaryEnum.COUNT));
results = myPatientDao.search(params);
uuid = results.getUuid();
ourLog.info("** Search returned UUID: {}", uuid);
assertEquals(201, results.size().intValue());
ids = toUnqualifiedVersionlessIdValues(results, 0, 10, true);
assertThat(ids, empty());
@ -144,12 +148,15 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
// assertEquals(200, myDatabaseBackedPagingProvider.retrieveResultList(uuid).size().intValue());
assertEquals(200, results.size().intValue());
ourLog.info("** Asking for results");
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 5, true);
assertEquals("Patient/PT00000", ids.get(0));
assertEquals("Patient/PT00004", ids.get(4));
ourLog.info("** About to make new query for search with UUID: {}", uuid);
assertEquals(200, myDatabaseBackedPagingProvider.retrieveResultList(uuid).size().intValue());
IBundleProvider search2 = myDatabaseBackedPagingProvider.retrieveResultList(uuid);
Integer search2Size = search2.size();
assertEquals(200, search2Size.intValue());
}
@Test
@ -162,6 +169,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
params.setSummaryMode(Sets.newHashSet(SummaryEnum.COUNT, SummaryEnum.DATA));
IBundleProvider results = myPatientDao.search(params);
String uuid = results.getUuid();
ourLog.info("** Search returned UUID: {}", uuid);
assertEquals(200, results.size().intValue());
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 10, true);
assertEquals("Patient/PT00000", ids.get(0));
@ -176,6 +184,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
params.setSummaryMode(Sets.newHashSet(SummaryEnum.COUNT, SummaryEnum.DATA));
results = myPatientDao.search(params);
uuid = results.getUuid();
ourLog.info("** Search returned UUID: {}", uuid);
assertEquals(200, results.size().intValue());
ids = toUnqualifiedVersionlessIdValues(results, 0, 10, true);
assertEquals("Patient/PT00000", ids.get(0));
@ -197,6 +206,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
params.setSort(new SortSpec(Patient.SP_NAME));
IBundleProvider results = myPatientDao.search(params);
String uuid = results.getUuid();
ourLog.info("** Search returned UUID: {}", uuid);
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 200, true);
assertEquals("Patient/PT00000", ids.get(0));
assertEquals("Patient/PT00199", ids.get(199));
@ -250,6 +260,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
params.setSort(new SortSpec(Patient.SP_NAME));
IBundleProvider results = myPatientDao.search(params);
String uuid = results.getUuid();
ourLog.info("** Search returned UUID: {}", uuid);
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 10, true);
assertEquals("Patient/PT00000", ids.get(0));
assertEquals("Patient/PT00009", ids.get(9));
@ -375,6 +386,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
params.setCount(50);
IBundleProvider results = myPatientDao.search(params);
String uuid = results.getUuid();
ourLog.info("** Search returned UUID: {}", uuid);
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 50, true);
assertEquals("Patient/PT00000", ids.get(0));
assertEquals("Patient/PT00049", ids.get(49));
@ -408,6 +420,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
params.setSort(new SortSpec(Patient.SP_NAME));
IBundleProvider results = myPatientDao.search(params);
String uuid = results.getUuid();
ourLog.info("** Search returned UUID: {}", uuid);
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 10, true);
assertEquals("Patient/PT00000", ids.get(0));
assertEquals("Patient/PT00009", ids.get(9));
@ -465,6 +478,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
params.setSort(new SortSpec(Patient.SP_NAME));
final IBundleProvider results = myPatientDao.search(params);
String uuid = results.getUuid();
ourLog.info("** Search returned UUID: {}", uuid);
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 10, true);
assertEquals("Patient/PT00000", ids.get(0));
assertEquals("Patient/PT00009", ids.get(9));
@ -529,6 +543,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
params.add(Patient.SP_RES_ID, new TokenParam("PT00000"));
IBundleProvider results = myPatientDao.search(params);
String uuid = results.getUuid();
ourLog.info("** Search returned UUID: {}", uuid);
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 10, true);
assertEquals("Patient/PT00000", ids.get(0));
assertEquals(1, ids.size());

View File

@ -1,5 +1,6 @@
package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.SortOrderEnum;
@ -206,31 +207,38 @@ public class FhirResourceDaoR4SortTest extends BaseJpaR4Test {
@SuppressWarnings("unused")
@Test
public void testSortOnSparselyPopulatedFields() {
// myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.DISABLED);
IIdType pid1, pid2, pid3, pid4, pid5, pid6;
{
Patient p = new Patient();
p.setId("pid1");
p.setActive(true);
pid1 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless();
pid1 = myPatientDao.update(p, mySrd).getId().toUnqualifiedVersionless();
}
{
Patient p = new Patient();
p.setId("pid2");
p.addName().setFamily("A");
pid2 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless();
pid2 = myPatientDao.update(p, mySrd).getId().toUnqualifiedVersionless();
}
{
Patient p = new Patient();
p.setId("pid3");
p.addName().setFamily("B");
pid3 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless();
pid3 = myPatientDao.update(p, mySrd).getId().toUnqualifiedVersionless();
}
{
Patient p = new Patient();
p.setId("pid4");
p.addName().setFamily("B").addGiven("A");
pid4 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless();
pid4 = myPatientDao.update(p, mySrd).getId().toUnqualifiedVersionless();
}
{
Patient p = new Patient();
p.setId("pid5");
p.addName().setFamily("B").addGiven("B");
pid5 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless();
pid5 = myPatientDao.update(p, mySrd).getId().toUnqualifiedVersionless();
}
SearchParameterMap map;
@ -239,6 +247,7 @@ public class FhirResourceDaoR4SortTest extends BaseJpaR4Test {
map = new SearchParameterMap();
map.setSort(new SortSpec(Patient.SP_FAMILY, SortOrderEnum.ASC).setChain(new SortSpec(Patient.SP_GIVEN, SortOrderEnum.ASC)));
ids = toUnqualifiedVersionlessIds(myPatientDao.search(map));
ourLog.info("** Got IDs: {}", ids);
assertThat(ids, contains(pid2, pid4, pid5, pid3, pid1));
assertEquals(5, ids.size());

View File

@ -298,7 +298,7 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
"where HFJ_RES_PARAM_PRESENT.HASH_PRESENCE is null";
consolidateSearchParamPresenceIndexesTask.addQuery(sql, ArbitrarySqlTask.QueryModeEnum.BATCH_UNTIL_NO_MORE, t -> {
Long pid = (Long) t.get("PID");
Boolean present = (Boolean) t.get("HASH_PRESENCE");
Boolean present = (Boolean) t.get("SP_PRESENT");
String resType = (String) t.get("RES_TYPE");
String paramName = (String) t.get("PARAM_NAME");
Long hash = SearchParamPresent.calculateHashPresence(resType, paramName, present);

View File

@ -71,6 +71,11 @@
timezones where the date that could apply. This makes the search slightly more
inclusive, which errs on the side of caution.
</action>
<action type="fix">
A bug was fixed in the JPA server $expunge operation where a database connection
could sometimes be opened and not returned to the pool immediately, leading to
pool starvation if the operation was called many times in a row.
</action>
</release>
<release version="3.5.0" date="2018-09-17">