diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_2_0/5671-tx-tangle.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_2_0/5671-tx-tangle.yaml new file mode 100644 index 00000000000..0224e092a74 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_2_0/5671-tx-tangle.yaml @@ -0,0 +1,4 @@ +--- +type: fix +issue: 5671 +title: "Avoid lock contention by refreshing SearchParameter cache in a new transaction." diff --git a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/cache/IResourceChangeListenerCacheRefresher.java b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/cache/IResourceChangeListenerCacheRefresher.java index aa9c72739f8..97cf3f7ea95 100644 --- a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/cache/IResourceChangeListenerCacheRefresher.java +++ b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/cache/IResourceChangeListenerCacheRefresher.java @@ -19,6 +19,10 @@ */ package ca.uhn.fhir.jpa.cache; +import org.springframework.context.event.ContextClosedEvent; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; + /** * This is an internal service and is not intended to be used outside this package. Implementers should only directly * call the {@link IResourceChangeListenerRegistry}. @@ -40,4 +44,10 @@ public interface IResourceChangeListenerCacheRefresher { * @return the number of resources that have been created, updated and deleted since the last time the cache was refreshed */ ResourceChangeResult refreshCacheAndNotifyListener(IResourceChangeListenerCache theEntry); + + @EventListener(ContextRefreshedEvent.class) + public void start(); + + @EventListener(ContextClosedEvent.class) + public void shutdown(); } diff --git a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/cache/ResourceChangeListenerCacheRefresherImpl.java b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/cache/ResourceChangeListenerCacheRefresherImpl.java index 5c8bbbe031e..38ceb2b24b8 100644 --- a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/cache/ResourceChangeListenerCacheRefresherImpl.java +++ b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/cache/ResourceChangeListenerCacheRefresherImpl.java @@ -25,6 +25,7 @@ import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import com.google.common.annotations.VisibleForTesting; +import jakarta.transaction.Transactional; import org.apache.commons.lang3.time.DateUtils; import org.hl7.fhir.instance.model.api.IIdType; import org.quartz.JobExecutionContext; @@ -57,7 +58,7 @@ public class ResourceChangeListenerCacheRefresherImpl /** * All cache entries are checked at this interval to see if they need to be refreshed */ - static long LOCAL_REFRESH_INTERVAL_MS = 10 * DateUtils.MILLIS_PER_SECOND; + static final long LOCAL_REFRESH_INTERVAL_MS = 10 * DateUtils.MILLIS_PER_SECOND; @Autowired private IResourceVersionSvc myResourceVersionSvc; @@ -133,8 +134,12 @@ public class ResourceChangeListenerCacheRefresherImpl } @Override + // Suspend any current transaction while we sync with the db. + // This avoids lock conflicts while reading the resource versions. + @Transactional(Transactional.TxType.NOT_SUPPORTED) public ResourceChangeResult refreshCacheAndNotifyListener(IResourceChangeListenerCache theCache) { ResourceChangeResult retVal = new ResourceChangeResult(); + if (isStopping()) { ourLog.info("Context is stopping, aborting cache refresh"); return retVal; @@ -146,6 +151,7 @@ public class ResourceChangeListenerCacheRefresherImpl SearchParameterMap searchParamMap = theCache.getSearchParameterMap(); ResourceVersionMap newResourceVersionMap = myResourceVersionSvc.getVersionMap(theCache.getResourceName(), searchParamMap); + retVal = retVal.plus(notifyListener(theCache, newResourceVersionMap)); return retVal; @@ -154,7 +160,7 @@ public class ResourceChangeListenerCacheRefresherImpl /** * Notify a listener with all matching resources if it hasn't been initialized yet, otherwise only notify it if * any resources have changed - * @param theCache + * @param theCache the target * @param theNewResourceVersionMap the measured new resources * @return the list of created, updated and deleted ids */ diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/cache/ResourceChangeListenerRegistryImplIT.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/cache/ResourceChangeListenerRegistryImplIT.java index 842e00c3a9a..6c4ff5e7730 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/cache/ResourceChangeListenerRegistryImplIT.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/cache/ResourceChangeListenerRegistryImplIT.java @@ -7,6 +7,7 @@ import ca.uhn.fhir.jpa.test.BaseJpaR4Test; import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.rest.param.DateRangeParam; import ca.uhn.fhir.rest.param.TokenParam; +import ca.uhn.fhir.test.utilities.ProxyUtil; import ca.uhn.test.concurrency.IPointcutLatch; import ca.uhn.test.concurrency.PointcutLatch; import org.apache.commons.lang3.time.DateUtils; @@ -36,7 +37,7 @@ public class ResourceChangeListenerRegistryImplIT extends BaseJpaR4Test { @Autowired ResourceChangeListenerRegistryImpl myResourceChangeListenerRegistry; @Autowired - ResourceChangeListenerCacheRefresherImpl myResourceChangeListenerCacheRefresher; + IResourceChangeListenerCacheRefresher myResourceChangeListenerCacheRefresher; private final static String RESOURCE_NAME = "Patient"; private TestCallback myMaleTestCallback = new TestCallback("MALE"); @@ -130,9 +131,10 @@ public class ResourceChangeListenerRegistryImplIT extends BaseJpaR4Test { return patient; } - private IdDt createPatientAndRefreshCache(Patient thePatient, TestCallback theTestCallback, long theExpectedCount) throws InterruptedException { + private IdDt createPatientAndRefreshCache(Patient thePatient, TestCallback theTestCallback, long theExpectedCount) { IIdType retval = myPatientDao.create(thePatient).getId(); - ResourceChangeResult result = myResourceChangeListenerCacheRefresher.forceRefreshAllCachesForUnitTest(); + ResourceChangeResult result = ProxyUtil.getSingletonTarget(myResourceChangeListenerCacheRefresher, ResourceChangeListenerCacheRefresherImpl.class) + .forceRefreshAllCachesForUnitTest(); assertResult(result, theExpectedCount, 0, 0); return new IdDt(retval); } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java index 247eb8d0232..ac592d245e3 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java @@ -247,6 +247,7 @@ public class HapiTransactionService implements IHapiTransactionService { ourRequestPartitionThreadLocal.set(requestPartitionId); } + ourLog.trace("Starting doExecute for RequestPartitionId {}", requestPartitionId); if (!myPartitionSettings.isPartitioningEnabled() || Objects.equals(previousRequestPartitionId, requestPartitionId)) { if (ourExistingTransaction.get() == this && canReuseExistingTransaction(theExecutionBuilder)) { @@ -281,6 +282,7 @@ public class HapiTransactionService implements IHapiTransactionService { TransactionCallback theCallback, RequestPartitionId requestPartitionId, RequestPartitionId previousRequestPartitionId) { + ourLog.trace("executeInNewTransactionForPartitionChange"); theExecutionBuilder.myPropagation = myTransactionPropagationWhenChangingPartitions; return doExecuteInTransaction(theExecutionBuilder, theCallback, requestPartitionId, previousRequestPartitionId); } @@ -310,6 +312,7 @@ public class HapiTransactionService implements IHapiTransactionService { TransactionCallback theCallback, RequestPartitionId requestPartitionId, RequestPartitionId previousRequestPartitionId) { + ourLog.trace("doExecuteInTransaction"); try { for (int i = 0; ; i++) { try { @@ -569,6 +572,7 @@ public class HapiTransactionService implements IHapiTransactionService { @Nullable private static T executeInExistingTransaction(@Nonnull TransactionCallback theCallback) { + ourLog.trace("executeInExistingTransaction"); // TODO we could probably track the TransactionStatus we need as a thread local like we do our partition id. return theCallback.doInTransaction(null); } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/searchparam/submit/interceptor/SearchParamValidatingInterceptor.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/searchparam/submit/interceptor/SearchParamValidatingInterceptor.java index d06edb7d983..5f0a72d5171 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/searchparam/submit/interceptor/SearchParamValidatingInterceptor.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/searchparam/submit/interceptor/SearchParamValidatingInterceptor.java @@ -55,6 +55,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; public class SearchParamValidatingInterceptor { public static final String SEARCH_PARAM = "SearchParameter"; + public static final String SKIP_VALIDATION = SearchParamValidatingInterceptor.class.getName() + ".SKIP_VALIDATION"; private FhirContext myFhirContext; @@ -79,6 +80,14 @@ public class SearchParamValidatingInterceptor { if (isNotSearchParameterResource(theResource)) { return; } + + // avoid a loop when loading our hard-coded core FhirContext SearchParameters + boolean isStartup = theRequestDetails != null + && Boolean.TRUE == theRequestDetails.getUserData().get(SKIP_VALIDATION); + if (isStartup) { + return; + } + RuntimeSearchParam runtimeSearchParam = mySearchParameterCanonicalizer.canonicalizeSearchParameter(theResource); if (runtimeSearchParam == null) { return;