Add transaction write semaphore interceptor (#2743)

* Add transaction write semaphore interceptor

* Add changelog

* Test fix
This commit is contained in:
James Agnew 2021-06-20 12:55:49 -04:00 committed by GitHub
parent 0b05c730fb
commit 6ea1438f1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 600 additions and 153 deletions

View File

@ -1612,6 +1612,56 @@ public enum Pointcut implements IPointcut {
),
/**
* <b>Storage Hook:</b>
* Invoked during a FHIR transaction, immediately before processing all write operations (i.e. immediately
* before a database transaction will be opened)
* <p>
* Hooks may accept the following parameters:
* </p>
* <ul>
* <li>
* ca.uhn.fhir.interceptor.model.TransactionWriteOperationsDetails - Contains details about the transaction that is about to start
* </li>
* <li>
* ca.uhn.fhir.rest.api.server.storage.TransactionDetails - The outer transaction details object (since 5.0.0)
* </li>
* </ul>
* <p>
* Hooks should return <code>void</code>.
* </p>
*/
STORAGE_TRANSACTION_WRITE_OPERATIONS_PRE(void.class,
"ca.uhn.fhir.interceptor.model.TransactionWriteOperationsDetails",
"ca.uhn.fhir.rest.api.server.storage.TransactionDetails"
),
/**
* <b>Storage Hook:</b>
* Invoked during a FHIR transaction, immediately after processing all write operations (i.e. immediately
* after the transaction has been committed or rolled back). This hook will always be called if
* {@link #STORAGE_TRANSACTION_WRITE_OPERATIONS_PRE} has been called, regardless of whether the operation
* succeeded or failed.
* <p>
* Hooks may accept the following parameters:
* </p>
* <ul>
* <li>
* ca.uhn.fhir.interceptor.model.TransactionWriteOperationsDetails - Contains details about the transaction that is about to start
* </li>
* <li>
* ca.uhn.fhir.rest.api.server.storage.TransactionDetails - The outer transaction details object (since 5.0.0)
* </li>
* </ul>
* <p>
* Hooks should return <code>void</code>.
* </p>
*/
STORAGE_TRANSACTION_WRITE_OPERATIONS_POST(void.class,
"ca.uhn.fhir.interceptor.model.TransactionWriteOperationsDetails",
"ca.uhn.fhir.rest.api.server.storage.TransactionDetails"
),
/**
* <b>Storage Hook:</b>
* Invoked when a resource delete operation is about to fail due to referential integrity checks. Intended for use with {@literal ca.uhn.fhir.jpa.interceptor.CascadingDeleteInterceptor}.

View File

@ -0,0 +1,29 @@
package ca.uhn.fhir.interceptor.model;
import java.util.List;
/**
* This is an experimental API, use with caution
*/
public class TransactionWriteOperationsDetails {
private List<String> myConditionalCreateRequestUrls;
private List<String> myUpdateRequestUrls;
public List<String> getConditionalCreateRequestUrls() {
return myConditionalCreateRequestUrls;
}
public void setConditionalCreateRequestUrls(List<String> theConditionalCreateRequestUrls) {
myConditionalCreateRequestUrls = theConditionalCreateRequestUrls;
}
public List<String> getUpdateRequestUrls() {
return myUpdateRequestUrls;
}
public void setUpdateRequestUrls(List<String> theUpdateRequestUrls) {
myUpdateRequestUrls = theUpdateRequestUrls;
}
}

View File

@ -0,0 +1,6 @@
---
type: add
issue: 2743
title: "A new interceptor has been added for JPA servers that uses semaphores to avoid multiple concurrent
FHIR transactions from trying to create/update the same resource at the same time. This can improve
overall performance when writing many concurrent transactions since it avoids the need for retries."

View File

@ -117,7 +117,8 @@ public class HapiFhirHibernateJpaDialect extends HibernateJpaDialect {
}
}
return super.convertHibernateAccessException(theException);
DataAccessException retVal = super.convertHibernateAccessException(theException);
return retVal;
}
}

View File

@ -183,10 +183,10 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
public static final String OO_SEVERITY_INFO = "information";
public static final String OO_SEVERITY_WARN = "warning";
public static final String XACT_USERDATA_KEY_RESOLVED_TAG_DEFINITIONS = BaseHapiFhirDao.class.getName() + "_RESOLVED_TAG_DEFINITIONS";
public static final String XACT_USERDATA_KEY_EXISTING_SEARCH_PARAMS = BaseHapiFhirDao.class.getName() + "_EXISTING_SEARCH_PARAMS";
private static final Logger ourLog = LoggerFactory.getLogger(BaseHapiFhirDao.class);
private static final Map<FhirVersionEnum, FhirContext> ourRetrievalContexts = new HashMap<>();
private static final String PROCESSING_SUB_REQUEST = "BaseHapiFhirDao.processingSubRequest";
private static final String TRANSACTION_DETAILS_CACHE_KEY_EXISTING_SEARCH_PARAMS = BaseHapiFhirDao.class.getName() + "_EXISTING_SEARCH_PARAMS";
private static boolean ourValidationDisabledForUnitTest;
private static boolean ourDisableIncrementOnUpdateForUnitTest = false;
@ -1160,7 +1160,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
// CREATE or UPDATE
IdentityHashMap<ResourceTable, ResourceIndexedSearchParams> existingSearchParams = theTransactionDetails.getOrCreateUserData(TRANSACTION_DETAILS_CACHE_KEY_EXISTING_SEARCH_PARAMS, () -> new IdentityHashMap<>());
IdentityHashMap<ResourceTable, ResourceIndexedSearchParams> existingSearchParams = theTransactionDetails.getOrCreateUserData(XACT_USERDATA_KEY_EXISTING_SEARCH_PARAMS, () -> new IdentityHashMap<>());
existingParams = existingSearchParams.get(entity);
if (existingParams == null) {
existingParams = new ResourceIndexedSearchParams(entity);

View File

@ -211,7 +211,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
@Override
public DaoMethodOutcome create(T theResource, String theIfNoneExist, boolean thePerformIndexing, @Nonnull TransactionDetails theTransactionDetails, RequestDetails theRequestDetails) {
return myTransactionService.execute(theRequestDetails, tx -> doCreateForPost(theResource, theIfNoneExist, thePerformIndexing, theTransactionDetails, theRequestDetails));
return myTransactionService.execute(theRequestDetails, theTransactionDetails, tx -> doCreateForPost(theResource, theIfNoneExist, thePerformIndexing, theTransactionDetails, theRequestDetails));
}
@VisibleForTesting
@ -420,10 +420,12 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
@Override
public DaoMethodOutcome delete(IIdType theId, RequestDetails theRequestDetails) {
TransactionDetails transactionDetails = new TransactionDetails();
validateIdPresentForDelete(theId);
validateDeleteEnabled();
return myTransactionService.execute(theRequestDetails, tx -> {
return myTransactionService.execute(theRequestDetails, transactionDetails, tx -> {
DeleteConflictList deleteConflicts = new DeleteConflictList();
if (isNotBlank(theId.getValue())) {
deleteConflicts.setResourceIdMarkedForDeletion(theId);
@ -431,7 +433,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
StopWatch w = new StopWatch();
DaoMethodOutcome retVal = delete(theId, deleteConflicts, theRequestDetails, new TransactionDetails());
DaoMethodOutcome retVal = delete(theId, deleteConflicts, theRequestDetails, transactionDetails);
DeleteConflictService.validateDeleteConflictsEmptyOrThrowException(getContext(), deleteConflicts);
@ -521,13 +523,15 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
@Override
public DeleteMethodOutcome deleteByUrl(String theUrl, RequestDetails theRequest) {
validateDeleteEnabled();
TransactionDetails transactionDetails = new TransactionDetails();
ResourceSearch resourceSearch = myMatchUrlService.getResourceSearch(theUrl);
if (resourceSearch.isDeleteExpunge()) {
return deleteExpunge(theUrl, theRequest);
}
return myTransactionService.execute(theRequest, tx -> {
return myTransactionService.execute(theRequest, transactionDetails, tx -> {
DeleteConflictList deleteConflicts = new DeleteConflictList();
DeleteMethodOutcome outcome = deleteByUrl(theUrl, deleteConflicts, theRequest);
DeleteConflictService.validateDeleteConflictsEmptyOrThrowException(getContext(), deleteConflicts);
@ -542,8 +546,9 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
@Override
public DeleteMethodOutcome deleteByUrl(String theUrl, DeleteConflictList deleteConflicts, RequestDetails theRequestDetails) {
validateDeleteEnabled();
TransactionDetails transactionDetails = new TransactionDetails();
return myTransactionService.execute(theRequestDetails, tx -> doDeleteByUrl(theUrl, deleteConflicts, theRequestDetails));
return myTransactionService.execute(theRequestDetails, transactionDetails, tx -> doDeleteByUrl(theUrl, deleteConflicts, theRequestDetails));
}
@Nonnull
@ -1016,7 +1021,8 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
@Override
public DaoMethodOutcome patch(IIdType theId, String theConditionalUrl, PatchTypeEnum thePatchType, String thePatchBody, IBaseParameters theFhirPatchBody, RequestDetails theRequest) {
return myTransactionService.execute(theRequest, tx -> doPatch(theId, theConditionalUrl, thePatchType, thePatchBody, theFhirPatchBody, theRequest, new TransactionDetails()));
TransactionDetails transactionDetails = new TransactionDetails();
return myTransactionService.execute(theRequest, transactionDetails, tx -> doPatch(theId, theConditionalUrl, thePatchType, thePatchBody, theFhirPatchBody, theRequest, transactionDetails));
}
private DaoMethodOutcome doPatch(IIdType theId, String theConditionalUrl, PatchTypeEnum thePatchType, String thePatchBody, IBaseParameters theFhirPatchBody, RequestDetails theRequest, TransactionDetails theTransactionDetails) {
@ -1132,8 +1138,9 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
@Override
public T read(IIdType theId, RequestDetails theRequest, boolean theDeletedOk) {
validateResourceTypeAndThrowInvalidRequestException(theId);
TransactionDetails transactionDetails = new TransactionDetails();
return myTransactionService.execute(theRequest, tx -> doRead(theId, theRequest, theDeletedOk));
return myTransactionService.execute(theRequest, transactionDetails, tx -> doRead(theId, theRequest, theDeletedOk));
}
public T doRead(IIdType theId, RequestDetails theRequest, boolean theDeletedOk) {
@ -1459,7 +1466,9 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
@Override
public Set<ResourcePersistentId> searchForIds(SearchParameterMap theParams, RequestDetails theRequest) {
return myTransactionService.execute(theRequest, tx -> {
TransactionDetails transactionDetails = new TransactionDetails();
return myTransactionService.execute(theRequest, transactionDetails, tx -> {
if (theParams.getLoadSynchronousUpTo() != null) {
theParams.setLoadSynchronousUpTo(Math.min(getConfig().getInternalSynchronousSearchSize(), theParams.getLoadSynchronousUpTo()));
@ -1566,8 +1575,8 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
String id = theResource.getIdElement().getValue();
Runnable onRollback = () -> theResource.getIdElement().setValue(id);
// Execute the update in a retriable transaction
return myTransactionService.execute(theRequest, tx -> doUpdate(theResource, theMatchUrl, thePerformIndexing, theForceUpdateVersion, theRequest, theTransactionDetails), onRollback);
// Execute the update in a retryable transaction
return myTransactionService.execute(theRequest, theTransactionDetails, tx -> doUpdate(theResource, theMatchUrl, thePerformIndexing, theForceUpdateVersion, theRequest, theTransactionDetails), onRollback);
}
private DaoMethodOutcome doUpdate(T theResource, String theMatchUrl, boolean thePerformIndexing, boolean theForceUpdateVersion, RequestDetails theRequest, TransactionDetails theTransactionDetails) {

View File

@ -25,6 +25,7 @@ import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.TransactionWriteOperationsDetails;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
@ -446,33 +447,7 @@ public abstract class BaseTransactionProcessor {
}
entries.sort(new TransactionSorter(placeholderIds));
/*
* All of the write operations in the transaction (PUT, POST, etc.. basically anything
* except GET) are performed in their own database transaction before we do the reads.
* We do this because the reads (specifically the searches) often spawn their own
* secondary database transaction and if we allow that within the primary
* database transaction we can end up with deadlocks if the server is under
* heavy load with lots of concurrent transactions using all available
* database connections.
*/
TransactionCallback<Map<IBase, IIdType>> txCallback = status -> {
final Set<IIdType> allIds = new LinkedHashSet<>();
final Map<IIdType, IIdType> idSubstitutions = new HashMap<>();
final Map<IIdType, DaoMethodOutcome> idToPersistedOutcome = new HashMap<>();
Map<IBase, IIdType> retVal = doTransactionWriteOperations(theRequestDetails, theActionName, transactionDetails, allIds, idSubstitutions, idToPersistedOutcome, response, originalRequestOrder, entries, transactionStopWatch);
transactionStopWatch.startTask("Commit writes to database");
return retVal;
};
Map<IBase, IIdType> entriesToProcess = myHapiTransactionService.execute(theRequestDetails, txCallback);
transactionStopWatch.endCurrentTask();
for (Map.Entry<IBase, IIdType> nextEntry : entriesToProcess.entrySet()) {
String responseLocation = nextEntry.getValue().toUnqualified().getValue();
String responseEtag = nextEntry.getValue().getVersionIdPart();
myVersionAdapter.setResponseLocation(nextEntry.getKey(), responseLocation);
myVersionAdapter.setResponseETag(nextEntry.getKey(), responseEtag);
}
doTransactionWriteOperations(theRequestDetails, theActionName, transactionDetails, transactionStopWatch, response, originalRequestOrder, entries);
/*
* Loop through the request and process any entries of type GET
@ -555,6 +530,79 @@ public abstract class BaseTransactionProcessor {
return response;
}
/**
* All of the write operations in the transaction (PUT, POST, etc.. basically anything
* except GET) are performed in their own database transaction before we do the reads.
* We do this because the reads (specifically the searches) often spawn their own
* secondary database transaction and if we allow that within the primary
* database transaction we can end up with deadlocks if the server is under
* heavy load with lots of concurrent transactions using all available
* database connections.
*/
private void doTransactionWriteOperations(RequestDetails theRequestDetails, String theActionName, TransactionDetails theTransactionDetails, StopWatch theTransactionStopWatch, IBaseBundle theResponse, IdentityHashMap<IBase, Integer> theOriginalRequestOrder, List<IBase> theEntries) {
TransactionWriteOperationsDetails writeOperationsDetails = null;
if (CompositeInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_PRE, myInterceptorBroadcaster, theRequestDetails) ||
CompositeInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_POST, myInterceptorBroadcaster, theRequestDetails)) {
List<String> updateRequestUrls = new ArrayList<>();
List<String> conditionalCreateRequestUrls = new ArrayList<>();
for (IBase nextEntry : theEntries) {
String method = myVersionAdapter.getEntryRequestVerb(myContext, nextEntry);
if ("PUT".equals(method)) {
String requestUrl = myVersionAdapter.getEntryRequestUrl(nextEntry);
if (isNotBlank(requestUrl)) {
updateRequestUrls.add(requestUrl);
}
} else if ("POST".equals(method)) {
String requestUrl = myVersionAdapter.getEntryRequestIfNoneExist(nextEntry);
if (isNotBlank(requestUrl) && requestUrl.contains("?")) {
conditionalCreateRequestUrls.add(requestUrl);
}
}
}
writeOperationsDetails = new TransactionWriteOperationsDetails();
writeOperationsDetails.setUpdateRequestUrls(updateRequestUrls);
writeOperationsDetails.setConditionalCreateRequestUrls(conditionalCreateRequestUrls);
HookParams params = new HookParams()
.add(TransactionDetails.class, theTransactionDetails)
.add(TransactionWriteOperationsDetails.class, writeOperationsDetails);
CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_PRE, params);
}
TransactionCallback<Map<IBase, IIdType>> txCallback = status -> {
final Set<IIdType> allIds = new LinkedHashSet<>();
final Map<IIdType, IIdType> idSubstitutions = new HashMap<>();
final Map<IIdType, DaoMethodOutcome> idToPersistedOutcome = new HashMap<>();
Map<IBase, IIdType> retVal = doTransactionWriteOperations(theRequestDetails, theActionName, theTransactionDetails, allIds, idSubstitutions, idToPersistedOutcome, theResponse, theOriginalRequestOrder, theEntries, theTransactionStopWatch);
theTransactionStopWatch.startTask("Commit writes to database");
return retVal;
};
Map<IBase, IIdType> entriesToProcess;
try {
entriesToProcess = myHapiTransactionService.execute(theRequestDetails, theTransactionDetails, txCallback);
} finally {
if (writeOperationsDetails != null) {
HookParams params = new HookParams()
.add(TransactionDetails.class, theTransactionDetails)
.add(TransactionWriteOperationsDetails.class, writeOperationsDetails);
CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_POST, params);
}
}
theTransactionStopWatch.endCurrentTask();
for (Map.Entry<IBase, IIdType> nextEntry : entriesToProcess.entrySet()) {
String responseLocation = nextEntry.getValue().toUnqualified().getValue();
String responseEtag = nextEntry.getValue().getVersionIdPart();
myVersionAdapter.setResponseLocation(nextEntry.getKey(), responseLocation);
myVersionAdapter.setResponseETag(nextEntry.getKey(), responseEtag);
}
}
private boolean isValidVerb(String theVerb) {
try {
return org.hl7.fhir.r4.model.Bundle.HTTPVerb.fromCode(theVerb) != null;
@ -891,7 +939,6 @@ public abstract class BaseTransactionProcessor {
theTransactionStopWatch.endCurrentTask();
}
/*
* Make sure that there are no conflicts from deletions. E.g. we can't delete something
* if something else has a reference to it.. Unless the thing that has a reference to it
@ -1094,6 +1141,8 @@ public abstract class BaseTransactionProcessor {
}
if (newId != null) {
ourLog.debug(" * Replacing resource ref {} with {}", nextId, newId);
addRollbackReferenceRestore(theTransactionDetails, resourceReference);
if (theReferencesToAutoVersion.contains(resourceReference)) {
resourceReference.setReference(newId.getValue());
resourceReference.setResource(null);
@ -1108,6 +1157,7 @@ public abstract class BaseTransactionProcessor {
if (theReferencesToAutoVersion.contains(resourceReference)) {
DaoMethodOutcome outcome = theIdToPersistedOutcome.get(nextId);
if (!outcome.isNop() && !Boolean.TRUE.equals(outcome.getCreated())) {
addRollbackReferenceRestore(theTransactionDetails, resourceReference);
resourceReference.setReference(nextId.getValue());
resourceReference.setResource(null);
}
@ -1126,6 +1176,10 @@ public abstract class BaseTransactionProcessor {
if (theIdSubstitutions.containsKey(nextUriString)) {
IIdType newId = theIdSubstitutions.get(nextUriString);
ourLog.debug(" * Replacing resource ref {} with {}", nextUriString, newId);
String existingValue = nextRef.getValueAsString();
theTransactionDetails.addRollbackUndoAction(() -> nextRef.setValueAsString(existingValue));
nextRef.setValueAsString(newId.toVersionless().getValue());
} else {
ourLog.debug(" * Reference [{}] does not exist in bundle", nextUriString);
@ -1180,6 +1234,11 @@ public abstract class BaseTransactionProcessor {
}
}
private void addRollbackReferenceRestore(TransactionDetails theTransactionDetails, IBaseReference resourceReference) {
String existingValue = resourceReference.getReferenceElement().getValue();
theTransactionDetails.addRollbackUndoAction(() -> resourceReference.setReference(existingValue));
}
private void validateNoDuplicates(RequestDetails theRequest, String theActionName, Map<String, Class<? extends IBaseResource>> conditionalRequestUrls, Collection<DaoMethodOutcome> thePersistedOutcomes) {
IdentityHashMap<IBaseResource, ResourceIndexedSearchParams> resourceToIndexedParams = new IdentityHashMap<>(thePersistedOutcomes.size());

View File

@ -28,6 +28,8 @@ import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.searchparam.extractor.ResourceIndexedSearchParams;
import ca.uhn.fhir.jpa.util.AddRemoveCount;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -41,6 +43,7 @@ import java.util.List;
@Service
public class DaoSearchParamSynchronizer {
private static final Logger ourLog = LoggerFactory.getLogger(DaoSearchParamSynchronizer.class);
@PersistenceContext(type = PersistenceContextType.TRANSACTION)
protected EntityManager myEntityManager;
@Autowired

View File

@ -24,6 +24,8 @@ import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.model.ResourceVersionConflictResolutionStrategy;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
@ -65,11 +67,11 @@ public class HapiTransactionService {
myTxTemplate = new TransactionTemplate(myTransactionManager);
}
public <T> T execute(RequestDetails theRequestDetails, TransactionCallback<T> theCallback) {
return execute(theRequestDetails, theCallback, null);
public <T> T execute(RequestDetails theRequestDetails, TransactionDetails theTransactionDetails, TransactionCallback<T> theCallback) {
return execute(theRequestDetails, theTransactionDetails, theCallback, null);
}
public <T> T execute(RequestDetails theRequestDetails, TransactionCallback<T> theCallback, Runnable theOnRollback) {
public <T> T execute(RequestDetails theRequestDetails, TransactionDetails theTransactionDetails, TransactionCallback<T> theCallback, Runnable theOnRollback) {
for (int i = 0; ; i++) {
try {
@ -108,7 +110,14 @@ public class HapiTransactionService {
}
if (i < maxRetries) {
theTransactionDetails.getRollbackUndoActions().forEach(t->t.run());
theTransactionDetails.clearRollbackUndoActions();
theTransactionDetails.clearResolvedItems();
theTransactionDetails.clearUserData(BaseHapiFhirDao.XACT_USERDATA_KEY_RESOLVED_TAG_DEFINITIONS);
theTransactionDetails.clearUserData(BaseHapiFhirDao.XACT_USERDATA_KEY_EXISTING_SEARCH_PARAMS);
sleepAtLeast(250, false);
ourLog.info("About to start a transaction retry due to conflict or constraint error");
continue;
}

View File

@ -0,0 +1,104 @@
package ca.uhn.fhir.jpa.interceptor;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.TransactionWriteOperationsDetails;
import ca.uhn.fhir.jpa.util.MemoryCacheService;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* This interceptor uses semaphores to avoid multiple concurrent FHIR transaction
* bundles from processing the same records at the same time, avoiding concurrency
* issues.
*/
@Interceptor
public class TransactionConcurrencySemaphoreInterceptor {
private static final Logger ourLog = LoggerFactory.getLogger(TransactionConcurrencySemaphoreInterceptor.class);
private static final String HELD_SEMAPHORES = TransactionConcurrencySemaphoreInterceptor.class.getName() + "_HELD_SEMAPHORES";
private final Cache<String, Semaphore> mySemaphoreCache;
private final MemoryCacheService myMemoryCacheService;
/**
* Constructor
*/
public TransactionConcurrencySemaphoreInterceptor(MemoryCacheService theMemoryCacheService) {
myMemoryCacheService = theMemoryCacheService;
mySemaphoreCache = Caffeine
.newBuilder()
.expireAfterAccess(1, TimeUnit.MINUTES)
.build();
}
@Hook(Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_PRE)
public void pre(TransactionDetails theTransactionDetails, TransactionWriteOperationsDetails theWriteOperationsDetails) {
List<Semaphore> heldSemaphores = new ArrayList<>();
acquireSemaphoresForUrlList(heldSemaphores, theWriteOperationsDetails.getUpdateRequestUrls(), false);
acquireSemaphoresForUrlList(heldSemaphores, theWriteOperationsDetails.getConditionalCreateRequestUrls(), true);
theTransactionDetails.putUserData(HELD_SEMAPHORES, heldSemaphores);
}
private void acquireSemaphoresForUrlList(List<Semaphore> heldSemaphores, List<String> urls, boolean isConditionalCreates) {
for (String next : urls) {
if (isConditionalCreates) {
if (myMemoryCacheService.getIfPresent(MemoryCacheService.CacheEnum.MATCH_URL, next) != null) {
continue;
}
}
Semaphore semaphore = mySemaphoreCache.get(next, t -> new Semaphore(1));
if (heldSemaphores.contains(semaphore)) {
continue;
}
assert semaphore != null;
try {
if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
ourLog.warn("Timed out waiting for semaphore on request URL: {}", next);
} else {
heldSemaphores.add(semaphore);
}
} catch (InterruptedException e) {
ourLog.warn("Interrupted during semaphore acquisition");
}
}
}
@Hook(Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_POST)
public void post(TransactionDetails theTransactionDetails) {
List<Semaphore> heldSemaphores = theTransactionDetails.getUserData(HELD_SEMAPHORES);
for (Semaphore next : heldSemaphores) {
next.release();
}
}
/**
* Clear all semaphors from the list. This is really mostly intended for testing scenarios.
*/
public void clearSemaphores() {
mySemaphoreCache.invalidateAll();
}
/**
* Returns a count of all semaphores currently in the cache (incuding held and unheld semaphores)
*/
public long countSemaphores() {
return mySemaphoreCache.estimatedSize();
}
}

View File

@ -29,6 +29,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import javax.persistence.EntityManager;
@ -38,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ExtendWith(SpringExtension.class)
@ -74,9 +76,9 @@ public class TransactionProcessorTest {
@BeforeEach
public void before() {
when(myHapiTransactionService.execute(any(), any())).thenAnswer(t -> {
TransactionCallback callback = t.getArgument(1, TransactionCallback.class);
return callback.doInTransaction(null);
when(myHapiTransactionService.execute(any(), any(), any())).thenAnswer(t -> {
TransactionCallback<?> callback = t.getArgument(2, TransactionCallback.class);
return callback.doInTransaction(mock(TransactionStatus.class));
});
myTransactionProcessor.setEntityManagerForUnitTest(myEntityManager);

View File

@ -1,6 +1,7 @@
package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.interceptor.executor.InterceptorService;
import ca.uhn.fhir.jpa.interceptor.TransactionConcurrencySemaphoreInterceptor;
import ca.uhn.fhir.jpa.interceptor.UserRequestRetryVersionConflictsInterceptor;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
@ -33,6 +34,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.test.annotation.DirtiesContext;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -61,12 +63,14 @@ public class FhirResourceDaoR4ConcurrentWriteTest extends BaseJpaR4Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(FhirResourceDaoR4ConcurrentWriteTest.class);
private ExecutorService myExecutor;
private UserRequestRetryVersionConflictsInterceptor myRetryInterceptor;
private TransactionConcurrencySemaphoreInterceptor myConcurrencySemaphoreInterceptor;
@BeforeEach
public void before() {
myExecutor = Executors.newFixedThreadPool(10);
myRetryInterceptor = new UserRequestRetryVersionConflictsInterceptor();
myConcurrencySemaphoreInterceptor = new TransactionConcurrencySemaphoreInterceptor(myMemoryCacheService);
RestfulServer server = new RestfulServer(myFhirCtx);
when(mySrd.getServer()).thenReturn(server);
@ -77,57 +81,19 @@ public class FhirResourceDaoR4ConcurrentWriteTest extends BaseJpaR4Test {
public void after() {
myExecutor.shutdown();
myInterceptorRegistry.unregisterInterceptor(myRetryInterceptor);
myInterceptorRegistry.unregisterInterceptor(myConcurrencySemaphoreInterceptor);
}
@Test
public void testConcurrentTransactionCreates() {
public void testTransactionCreates_NoGuard() {
myDaoConfig.setMatchUrlCache(true);
AtomicInteger counter = new AtomicInteger(0);
Runnable creator = () -> {
BundleBuilder bb = new BundleBuilder(myFhirCtx);
String patientId = "Patient/PT" + counter.get();
IdType practitionerId = IdType.newRandomUuid();
IdType practitionerId2 = IdType.newRandomUuid();
ExplanationOfBenefit eob = new ExplanationOfBenefit();
eob.addIdentifier().setSystem("foo").setValue("" + counter.get());
eob.getPatient().setReference(patientId);
eob.addCareTeam().getProvider().setReference(practitionerId.getValue());
eob.addCareTeam().getProvider().setReference(practitionerId2.getValue());
bb.addTransactionUpdateEntry(eob).conditional("ExplanationOfBenefit?identifier=foo|" + counter.get());
Patient pt = new Patient();
pt.setId(patientId);
pt.setActive(true);
bb.addTransactionUpdateEntry(pt);
Coverage coverage = new Coverage();
coverage.addIdentifier().setSystem("foo").setValue("" + counter.get());
coverage.getBeneficiary().setReference(patientId);
bb.addTransactionUpdateEntry(coverage).conditional("Coverage?identifier=foo|" + counter.get());
Practitioner practitioner = new Practitioner();
practitioner.setId(practitionerId);
practitioner.addIdentifier().setSystem("foo").setValue("" + counter.get());
bb.addTransactionCreateEntry(practitioner).conditional("Practitioner?identifier=foo|" + counter.get());
Practitioner practitioner2 = new Practitioner();
practitioner2.setId(practitionerId2);
practitioner2.addIdentifier().setSystem("foo2").setValue("" + counter.get());
bb.addTransactionCreateEntry(practitioner2).conditional("Practitioner?identifier=foo2|" + counter.get());
Observation obs = new Observation();
obs.setId("Observation/OBS" + counter);
obs.getSubject().setReference(pt.getId());
bb.addTransactionUpdateEntry(obs);
Bundle input = (Bundle) bb.getBundle();
mySystemDao.transaction(new SystemRequestDetails(), input);
};
AtomicInteger passCounter = new AtomicInteger(0);
AtomicInteger fuzzCounter = new AtomicInteger(0);
Runnable creator = newTransactionTaskWithUpdatesAndConditionalUpdates(passCounter, fuzzCounter);
for (int i = 0; i < 10; i++) {
counter.set(i);
passCounter.set(i);
ourLog.info("*********************************************************************************");
ourLog.info("Starting pass {}", i);
ourLog.info("*********************************************************************************");
@ -149,17 +115,7 @@ public class FhirResourceDaoR4ConcurrentWriteTest extends BaseJpaR4Test {
}
runInTransaction(() -> {
Map<String, Integer> counts = new TreeMap<>();
myResourceTableDao
.findAll()
.stream()
.forEach(t -> {
counts.putIfAbsent(t.getResourceType(), 0);
int value = counts.get(t.getResourceType());
value++;
counts.put(t.getResourceType(), value);
});
ourLog.info("Counts: {}", counts);
Map<String, Integer> counts = getResourceCountMap();
assertEquals(10, counts.get("Patient"), counts.toString());
});
@ -173,43 +129,19 @@ public class FhirResourceDaoR4ConcurrentWriteTest extends BaseJpaR4Test {
* retry succeeds and that the data ultimately gets written.
*/
@Test
public void testConcurrentTransactionConditionalUpdates() throws ExecutionException, InterruptedException {
public void testTransactionCreates_WithRetry() throws ExecutionException, InterruptedException {
myInterceptorRegistry.registerInterceptor(myRetryInterceptor);
Patient pt = new Patient();
pt.setId("Patient/A");
pt.addIdentifier().setSystem("http://foo").setValue("pt1");
myPatientDao.update(pt);
AtomicInteger setCounter = new AtomicInteger(0);
AtomicInteger fuzzCounter = new AtomicInteger(0);
Runnable creator = newTransactionTaskWithUpdatesAndConditionalUpdates(setCounter, fuzzCounter);
Observation obs = new Observation();
obs.setId("Observation/O");
obs.addIdentifier().setSystem("http://foo").setValue("obs1");
myObservationDao.update(obs);
for (int set = 0; set < 3; set++) {
AtomicInteger counter = new AtomicInteger(0);
Runnable creator = () -> {
BundleBuilder bb = new BundleBuilder(myFhirCtx);
Patient patient = new Patient();
patient.setId(IdType.newRandomUuid());
patient.addIdentifier().setSystem("http://foo").setValue("pt1");
patient.addName().setFamily("fam-" + counter.incrementAndGet());
bb.addTransactionUpdateEntry(patient).conditional("Patient?identifier=http://foo|pt1");
Observation observation = new Observation();
observation.setId(IdType.newRandomUuid());
observation.addIdentifier().setSystem("http://foo").setValue("obs1");
observation.getCode().setText("obs-" + counter.incrementAndGet());
observation.getSubject().setReference(patient.getId());
bb.addTransactionUpdateEntry(observation).conditional("Observation?identifier=http://foo|obs1");
Bundle input = (Bundle) bb.getBundle();
input.getEntry().get(0).getResource().setId("Patient/A");
input.getEntry().get(1).getResource().setId("Observation/O");
SystemRequestDetails requestDetails = new SystemRequestDetails();
UserRequestRetryVersionConflictsInterceptor.addRetryHeader(requestDetails, 20);
mySystemDao.transaction(requestDetails, input);
};
ourLog.info("*********************************************************************************");
ourLog.info("Starting pass {}", set);
ourLog.info("*********************************************************************************");
fuzzCounter.set(set);
List<Future<?>> futures = new ArrayList<>();
for (int j = 0; j < 10; j++) {
@ -220,7 +152,111 @@ public class FhirResourceDaoR4ConcurrentWriteTest extends BaseJpaR4Test {
next.get();
}
}
runInTransaction(() -> {
Map<String, Integer> counts = getResourceCountMap();
assertEquals(1, counts.get("Patient"), counts.toString());
assertEquals(1, counts.get("Observation"), counts.toString());
logAllResourceLinks();
assertEquals(6, myResourceLinkDao.count());
assertEquals(6, myResourceTableDao.count());
assertEquals(14, myResourceHistoryTableDao.count());
});
}
@Test
public void testTransactionCreates_WithConcurrencySemaphore() throws ExecutionException, InterruptedException {
myInterceptorRegistry.registerInterceptor(myConcurrencySemaphoreInterceptor);
AtomicInteger setCounter = new AtomicInteger(0);
AtomicInteger fuzzCounter = new AtomicInteger(0);
Runnable creator = newTransactionTaskWithUpdatesAndConditionalUpdates(setCounter, fuzzCounter);
for (int set = 0; set < 3; set++) {
ourLog.info("*********************************************************************************");
ourLog.info("Starting pass {}", set);
ourLog.info("*********************************************************************************");
fuzzCounter.set(set);
List<Future<?>> futures = new ArrayList<>();
for (int j = 0; j < 10; j++) {
futures.add(myExecutor.submit(creator));
}
for (Future<?> next : futures) {
next.get();
}
}
runInTransaction(() -> {
Map<String, Integer> counts = getResourceCountMap();
assertEquals(1, counts.get("Patient"), counts.toString());
assertEquals(1, counts.get("Observation"), counts.toString());
logAllResourceLinks();
assertEquals(6, myResourceLinkDao.count());
assertEquals(6, myResourceTableDao.count());
assertEquals(14, myResourceHistoryTableDao.count());
});
assertEquals(6, myConcurrencySemaphoreInterceptor.countSemaphores());
}
@Test
public void testTransactionCreates_WithConcurrencySemaphore_DontLockOnCachedMatchUrlsForConditionalCreate() throws ExecutionException, InterruptedException {
myDaoConfig.setMatchUrlCacheEnabled(true);
myInterceptorRegistry.registerInterceptor(myConcurrencySemaphoreInterceptor);
Runnable creator = ()->{
BundleBuilder bb = new BundleBuilder(myFhirCtx);
Patient patient1 = new Patient();
patient1.addIdentifier().setSystem("http://foo").setValue("1");
bb.addTransactionCreateEntry(patient1).conditional("Patient?identifier=http://foo|1");
Patient patient2 = new Patient();
patient2.addIdentifier().setSystem("http://foo").setValue("2");
bb.addTransactionCreateEntry(patient2).conditional("Patient?identifier=http://foo|2");
Bundle input = (Bundle) bb.getBundle();
SystemRequestDetails requestDetails = new SystemRequestDetails();
mySystemDao.transaction(requestDetails, input);
};
for (int set = 0; set < 3; set++) {
myConcurrencySemaphoreInterceptor.clearSemaphores();
List<Future<?>> futures = new ArrayList<>();
for (int j = 0; j < 10; j++) {
futures.add(myExecutor.submit(creator));
}
for (Future<?> next : futures) {
next.get();
}
if (set == 0) {
assertEquals(2, myConcurrencySemaphoreInterceptor.countSemaphores());
} else {
assertEquals(0, myConcurrencySemaphoreInterceptor.countSemaphores());
}
}
runInTransaction(() -> {
Map<String, Integer> counts = getResourceCountMap();
assertEquals(2, counts.get("Patient"), counts.toString());
});
}
@Nonnull
private Map<String, Integer> getResourceCountMap() {
Map<String, Integer> counts = new TreeMap<>();
myResourceTableDao
.findAll()
@ -232,14 +268,62 @@ public class FhirResourceDaoR4ConcurrentWriteTest extends BaseJpaR4Test {
counts.put(t.getResourceType(), value);
});
ourLog.info("Counts: {}", counts);
return counts;
}
assertEquals(1, counts.get("Patient"), counts.toString());
assertEquals(1, counts.get("Observation"), counts.toString());
logAllResourceLinks();
assertEquals(2, myResourceLinkDao.count());
assertEquals(22, myResourceHistoryTableDao.count());
});
@Nonnull
private Runnable newTransactionTaskWithUpdatesAndConditionalUpdates(AtomicInteger theSetCounter, AtomicInteger theFuzzCounter) {
Runnable creator = () -> {
BundleBuilder bb = new BundleBuilder(myFhirCtx);
String patientId = "Patient/PT" + theSetCounter.get();
IdType practitionerId = IdType.newRandomUuid();
IdType practitionerId2 = IdType.newRandomUuid();
ExplanationOfBenefit eob = new ExplanationOfBenefit();
eob.addIdentifier().setSystem("foo").setValue("" + theSetCounter.get());
eob.getPatient().setReference(patientId);
eob.addCareTeam().getProvider().setReference(practitionerId.getValue());
eob.addCareTeam().getProvider().setReference(practitionerId2.getValue());
eob.getFormCode().setText("EOB " + theFuzzCounter.get());
bb.addTransactionUpdateEntry(eob).conditional("ExplanationOfBenefit?identifier=foo|" + theSetCounter.get());
Patient pt = new Patient();
pt.setId(patientId);
pt.setActive(true);
pt.addName().setFamily("FAMILY " + theFuzzCounter.get());
bb.addTransactionUpdateEntry(pt);
Coverage coverage = new Coverage();
coverage.addIdentifier().setSystem("foo").setValue("" + theSetCounter.get());
coverage.getBeneficiary().setReference(patientId);
coverage.setDependent("DEP " + theFuzzCounter.get());
bb.addTransactionUpdateEntry(coverage).conditional("Coverage?identifier=foo|" + theSetCounter.get());
Practitioner practitioner = new Practitioner();
practitioner.setId(practitionerId);
practitioner.addIdentifier().setSystem("foo").setValue("" + theSetCounter.get());
practitioner.addName().setFamily("SET " + theFuzzCounter.get());
bb.addTransactionCreateEntry(practitioner).conditional("Practitioner?identifier=foo|" + theSetCounter.get());
Practitioner practitioner2 = new Practitioner();
practitioner2.setId(practitionerId2);
practitioner2.addIdentifier().setSystem("foo2").setValue("" + theSetCounter.get());
practitioner2.addName().setFamily("SET " + theFuzzCounter.get());
bb.addTransactionCreateEntry(practitioner2).conditional("Practitioner?identifier=foo2|" + theSetCounter.get());
Observation obs = new Observation();
obs.setId("Observation/OBS" + theSetCounter);
obs.getSubject().setReference(pt.getId());
obs.getCode().setText("SET " + theFuzzCounter.get());
bb.addTransactionUpdateEntry(obs);
Bundle input = (Bundle) bb.getBundle();
SystemRequestDetails requestDetails = new SystemRequestDetails();
UserRequestRetryVersionConflictsInterceptor.addRetryHeader(requestDetails, 20);
mySystemDao.transaction(requestDetails, input);
};
return creator;
}

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.mdm.svc.candidate;
/*-
* #%L
* HAPI FHIR JPA Server - Master Data Management
* %%
* Copyright (C) 2014 - 2021 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.mdm.svc.MdmSearchParamSvc;

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.mdm.svc.candidate;
/*-
* #%L
* HAPI FHIR JPA Server - Master Data Management
* %%
* Copyright (C) 2014 - 2021 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
public class TooManyCandidatesException extends RuntimeException {
public TooManyCandidatesException(String theMessage) {
super(theMessage);

View File

@ -30,6 +30,7 @@ import org.hl7.fhir.instance.model.api.IIdType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
@ -54,6 +55,7 @@ public class TransactionDetails {
public static final ResourcePersistentId NOT_FOUND = new ResourcePersistentId(-1L);
private final Date myTransactionDate;
private List<Runnable> myRollbackUndoActions = Collections.emptyList();
private Map<String, ResourcePersistentId> myResolvedResourceIds = Collections.emptyMap();
private Map<String, ResourcePersistentId> myResolvedMatchUrls = Collections.emptyMap();
private Map<String, Object> myUserData;
@ -75,6 +77,39 @@ public class TransactionDetails {
myTransactionDate = theTransactionDate;
}
/**
* Get the actions that should be executed if the transaction is rolled back
*
* @since 5.5.0
*/
public List<Runnable> getRollbackUndoActions() {
return Collections.unmodifiableList(myRollbackUndoActions);
}
/**
* Add an action that should be executed if the transaction is rolled back
*
* @since 5.5.0
*/
public void addRollbackUndoAction(@Nonnull Runnable theRunnable) {
assert theRunnable != null;
if (myRollbackUndoActions.isEmpty()) {
myRollbackUndoActions = new ArrayList<>();
}
myRollbackUndoActions.add(theRunnable);
}
/**
* Clears any previously added rollback actions
*
* @since 5.5.0
*/
public void clearRollbackUndoActions() {
if (!myRollbackUndoActions.isEmpty()) {
myRollbackUndoActions.clear();
}
}
/**
* A <b>Resolved Resource ID</b> is a mapping between a resource ID (e.g. "<code>Patient/ABC</code>" or
* "<code>Observation/123</code>") and a storage ID for that resource. Resources should only be placed within
@ -140,7 +175,18 @@ public class TransactionDetails {
}
/**
* Sets an arbitraty object that will last the lifetime of the current transaction
* Remove an item previously stored in user data
*
* @see #getUserData(String)
*/
public void clearUserData(String theKey) {
if (myUserData != null) {
myUserData.remove(theKey);
}
}
/**
* Sets an arbitrary object that will last the lifetime of the current transaction
*
* @see #getUserData(String)
*/
@ -242,5 +288,10 @@ public class TransactionDetails {
public void deferredBroadcastProcessingFinished() {
myIsPointcutDeferred = false;
}
public void clearResolvedItems() {
myResolvedResourceIds.clear();
myResolvedMatchUrls.clear();
}
}