cherry-picked fix for 5671 searchparameter locks

This commit is contained in:
Michael Buckley 2024-02-08 14:01:39 -05:00 committed by Long Ma
parent 538acc6fc5
commit 8c71fca161
6 changed files with 40 additions and 5 deletions

View File

@ -0,0 +1,4 @@
---
type: fix
issue: 5671
title: "Avoid lock contention by refreshing SearchParameter cache in a new transaction."

View File

@ -19,6 +19,10 @@
*/
package ca.uhn.fhir.jpa.cache;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
/**
* This is an internal service and is not intended to be used outside this package. Implementers should only directly
* call the {@link IResourceChangeListenerRegistry}.
@ -40,4 +44,10 @@ public interface IResourceChangeListenerCacheRefresher {
* @return the number of resources that have been created, updated and deleted since the last time the cache was refreshed
*/
ResourceChangeResult refreshCacheAndNotifyListener(IResourceChangeListenerCache theEntry);
@EventListener(ContextRefreshedEvent.class)
public void start();
@EventListener(ContextClosedEvent.class)
public void shutdown();
}

View File

@ -25,6 +25,7 @@ import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import com.google.common.annotations.VisibleForTesting;
import jakarta.transaction.Transactional;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IIdType;
import org.quartz.JobExecutionContext;
@ -57,7 +58,7 @@ public class ResourceChangeListenerCacheRefresherImpl
/**
* All cache entries are checked at this interval to see if they need to be refreshed
*/
static long LOCAL_REFRESH_INTERVAL_MS = 10 * DateUtils.MILLIS_PER_SECOND;
static final long LOCAL_REFRESH_INTERVAL_MS = 10 * DateUtils.MILLIS_PER_SECOND;
@Autowired
private IResourceVersionSvc myResourceVersionSvc;
@ -133,8 +134,12 @@ public class ResourceChangeListenerCacheRefresherImpl
}
@Override
// Suspend any current transaction while we sync with the db.
// This avoids lock conflicts while reading the resource versions.
@Transactional(Transactional.TxType.NOT_SUPPORTED)
public ResourceChangeResult refreshCacheAndNotifyListener(IResourceChangeListenerCache theCache) {
ResourceChangeResult retVal = new ResourceChangeResult();
if (isStopping()) {
ourLog.info("Context is stopping, aborting cache refresh");
return retVal;
@ -146,6 +151,7 @@ public class ResourceChangeListenerCacheRefresherImpl
SearchParameterMap searchParamMap = theCache.getSearchParameterMap();
ResourceVersionMap newResourceVersionMap =
myResourceVersionSvc.getVersionMap(theCache.getResourceName(), searchParamMap);
retVal = retVal.plus(notifyListener(theCache, newResourceVersionMap));
return retVal;
@ -154,7 +160,7 @@ public class ResourceChangeListenerCacheRefresherImpl
/**
* Notify a listener with all matching resources if it hasn't been initialized yet, otherwise only notify it if
* any resources have changed
* @param theCache
* @param theCache the target
* @param theNewResourceVersionMap the measured new resources
* @return the list of created, updated and deleted ids
*/

View File

@ -7,6 +7,7 @@ import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.test.utilities.ProxyUtil;
import ca.uhn.test.concurrency.IPointcutLatch;
import ca.uhn.test.concurrency.PointcutLatch;
import org.apache.commons.lang3.time.DateUtils;
@ -36,7 +37,7 @@ public class ResourceChangeListenerRegistryImplIT extends BaseJpaR4Test {
@Autowired
ResourceChangeListenerRegistryImpl myResourceChangeListenerRegistry;
@Autowired
ResourceChangeListenerCacheRefresherImpl myResourceChangeListenerCacheRefresher;
IResourceChangeListenerCacheRefresher myResourceChangeListenerCacheRefresher;
private final static String RESOURCE_NAME = "Patient";
private TestCallback myMaleTestCallback = new TestCallback("MALE");
@ -130,9 +131,10 @@ public class ResourceChangeListenerRegistryImplIT extends BaseJpaR4Test {
return patient;
}
private IdDt createPatientAndRefreshCache(Patient thePatient, TestCallback theTestCallback, long theExpectedCount) throws InterruptedException {
private IdDt createPatientAndRefreshCache(Patient thePatient, TestCallback theTestCallback, long theExpectedCount) {
IIdType retval = myPatientDao.create(thePatient).getId();
ResourceChangeResult result = myResourceChangeListenerCacheRefresher.forceRefreshAllCachesForUnitTest();
ResourceChangeResult result = ProxyUtil.getSingletonTarget(myResourceChangeListenerCacheRefresher, ResourceChangeListenerCacheRefresherImpl.class)
.forceRefreshAllCachesForUnitTest();
assertResult(result, theExpectedCount, 0, 0);
return new IdDt(retval);
}

View File

@ -247,6 +247,7 @@ public class HapiTransactionService implements IHapiTransactionService {
ourRequestPartitionThreadLocal.set(requestPartitionId);
}
ourLog.trace("Starting doExecute for RequestPartitionId {}", requestPartitionId);
if (!myPartitionSettings.isPartitioningEnabled()
|| Objects.equals(previousRequestPartitionId, requestPartitionId)) {
if (ourExistingTransaction.get() == this && canReuseExistingTransaction(theExecutionBuilder)) {
@ -281,6 +282,7 @@ public class HapiTransactionService implements IHapiTransactionService {
TransactionCallback<T> theCallback,
RequestPartitionId requestPartitionId,
RequestPartitionId previousRequestPartitionId) {
ourLog.trace("executeInNewTransactionForPartitionChange");
theExecutionBuilder.myPropagation = myTransactionPropagationWhenChangingPartitions;
return doExecuteInTransaction(theExecutionBuilder, theCallback, requestPartitionId, previousRequestPartitionId);
}
@ -310,6 +312,7 @@ public class HapiTransactionService implements IHapiTransactionService {
TransactionCallback<T> theCallback,
RequestPartitionId requestPartitionId,
RequestPartitionId previousRequestPartitionId) {
ourLog.trace("doExecuteInTransaction");
try {
for (int i = 0; ; i++) {
try {
@ -569,6 +572,7 @@ public class HapiTransactionService implements IHapiTransactionService {
@Nullable
private static <T> T executeInExistingTransaction(@Nonnull TransactionCallback<T> theCallback) {
ourLog.trace("executeInExistingTransaction");
// TODO we could probably track the TransactionStatus we need as a thread local like we do our partition id.
return theCallback.doInTransaction(null);
}

View File

@ -55,6 +55,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class SearchParamValidatingInterceptor {
public static final String SEARCH_PARAM = "SearchParameter";
public static final String SKIP_VALIDATION = SearchParamValidatingInterceptor.class.getName() + ".SKIP_VALIDATION";
private FhirContext myFhirContext;
@ -79,6 +80,14 @@ public class SearchParamValidatingInterceptor {
if (isNotSearchParameterResource(theResource)) {
return;
}
// avoid a loop when loading our hard-coded core FhirContext SearchParameters
boolean isStartup = theRequestDetails != null
&& Boolean.TRUE == theRequestDetails.getUserData().get(SKIP_VALIDATION);
if (isStartup) {
return;
}
RuntimeSearchParam runtimeSearchParam = mySearchParameterCanonicalizer.canonicalizeSearchParameter(theResource);
if (runtimeSearchParam == null) {
return;