diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/api/Pointcut.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/api/Pointcut.java index cbf4321461c..9b615da7d0d 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/api/Pointcut.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/api/Pointcut.java @@ -1612,6 +1612,56 @@ public enum Pointcut implements IPointcut { ), + /** + * Storage Hook: + * Invoked during a FHIR transaction, immediately before processing all write operations (i.e. immediately + * before a database transaction will be opened) + *

+ * Hooks may accept the following parameters: + *

+ * + *

+ * Hooks should return void. + *

+ */ + STORAGE_TRANSACTION_WRITE_OPERATIONS_PRE(void.class, + "ca.uhn.fhir.interceptor.model.TransactionWriteOperationsDetails", + "ca.uhn.fhir.rest.api.server.storage.TransactionDetails" + ), + + /** + * Storage Hook: + * Invoked during a FHIR transaction, immediately after processing all write operations (i.e. immediately + * after the transaction has been committed or rolled back). This hook will always be called if + * {@link #STORAGE_TRANSACTION_WRITE_OPERATIONS_PRE} has been called, regardless of whether the operation + * succeeded or failed. + *

+ * Hooks may accept the following parameters: + *

+ * + *

+ * Hooks should return void. + *

+ */ + STORAGE_TRANSACTION_WRITE_OPERATIONS_POST(void.class, + "ca.uhn.fhir.interceptor.model.TransactionWriteOperationsDetails", + "ca.uhn.fhir.rest.api.server.storage.TransactionDetails" + ), + /** * Storage Hook: * Invoked when a resource delete operation is about to fail due to referential integrity checks. Intended for use with {@literal ca.uhn.fhir.jpa.interceptor.CascadingDeleteInterceptor}. diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/model/TransactionWriteOperationsDetails.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/model/TransactionWriteOperationsDetails.java new file mode 100644 index 00000000000..e13608bcaea --- /dev/null +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/model/TransactionWriteOperationsDetails.java @@ -0,0 +1,29 @@ +package ca.uhn.fhir.interceptor.model; + +import java.util.List; + +/** + * This is an experimental API, use with caution + */ +public class TransactionWriteOperationsDetails { + + private List myConditionalCreateRequestUrls; + private List myUpdateRequestUrls; + + public List getConditionalCreateRequestUrls() { + return myConditionalCreateRequestUrls; + } + + public void setConditionalCreateRequestUrls(List theConditionalCreateRequestUrls) { + myConditionalCreateRequestUrls = theConditionalCreateRequestUrls; + } + + public List getUpdateRequestUrls() { + return myUpdateRequestUrls; + } + + public void setUpdateRequestUrls(List theUpdateRequestUrls) { + myUpdateRequestUrls = theUpdateRequestUrls; + } + +} diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_5_0/2743-add-transaxction-semaphore-interceptor.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_5_0/2743-add-transaxction-semaphore-interceptor.yaml new file mode 100644 index 00000000000..53e9f2ff3c2 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_5_0/2743-add-transaxction-semaphore-interceptor.yaml @@ -0,0 +1,6 @@ +--- +type: add +issue: 2743 +title: "A new interceptor has been added for JPA servers that uses semaphores to avoid multiple concurrent + FHIR transactions from trying to create/update the same resource at the same time. This can improve + overall performance when writing many concurrent transactions since it avoids the need for retries." diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/HapiFhirHibernateJpaDialect.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/HapiFhirHibernateJpaDialect.java index f1407ffb7d8..ada43d0f291 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/HapiFhirHibernateJpaDialect.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/HapiFhirHibernateJpaDialect.java @@ -117,7 +117,8 @@ public class HapiFhirHibernateJpaDialect extends HibernateJpaDialect { } } - return super.convertHibernateAccessException(theException); + DataAccessException retVal = super.convertHibernateAccessException(theException); + return retVal; } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirDao.java index 4ff003396da..2e53b186e0c 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirDao.java @@ -183,10 +183,10 @@ public abstract class BaseHapiFhirDao extends BaseStora public static final String OO_SEVERITY_INFO = "information"; public static final String OO_SEVERITY_WARN = "warning"; public static final String XACT_USERDATA_KEY_RESOLVED_TAG_DEFINITIONS = BaseHapiFhirDao.class.getName() + "_RESOLVED_TAG_DEFINITIONS"; + public static final String XACT_USERDATA_KEY_EXISTING_SEARCH_PARAMS = BaseHapiFhirDao.class.getName() + "_EXISTING_SEARCH_PARAMS"; private static final Logger ourLog = LoggerFactory.getLogger(BaseHapiFhirDao.class); private static final Map ourRetrievalContexts = new HashMap<>(); private static final String PROCESSING_SUB_REQUEST = "BaseHapiFhirDao.processingSubRequest"; - private static final String TRANSACTION_DETAILS_CACHE_KEY_EXISTING_SEARCH_PARAMS = BaseHapiFhirDao.class.getName() + "_EXISTING_SEARCH_PARAMS"; private static boolean ourValidationDisabledForUnitTest; private static boolean ourDisableIncrementOnUpdateForUnitTest = false; @@ -1160,7 +1160,7 @@ public abstract class BaseHapiFhirDao extends BaseStora // CREATE or UPDATE - IdentityHashMap existingSearchParams = theTransactionDetails.getOrCreateUserData(TRANSACTION_DETAILS_CACHE_KEY_EXISTING_SEARCH_PARAMS, () -> new IdentityHashMap<>()); + IdentityHashMap existingSearchParams = theTransactionDetails.getOrCreateUserData(XACT_USERDATA_KEY_EXISTING_SEARCH_PARAMS, () -> new IdentityHashMap<>()); existingParams = existingSearchParams.get(entity); if (existingParams == null) { existingParams = new ResourceIndexedSearchParams(entity); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java index af0c0c77b15..6d74ef0cf43 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java @@ -211,7 +211,7 @@ public abstract class BaseHapiFhirResourceDao extends B @Override public DaoMethodOutcome create(T theResource, String theIfNoneExist, boolean thePerformIndexing, @Nonnull TransactionDetails theTransactionDetails, RequestDetails theRequestDetails) { - return myTransactionService.execute(theRequestDetails, tx -> doCreateForPost(theResource, theIfNoneExist, thePerformIndexing, theTransactionDetails, theRequestDetails)); + return myTransactionService.execute(theRequestDetails, theTransactionDetails, tx -> doCreateForPost(theResource, theIfNoneExist, thePerformIndexing, theTransactionDetails, theRequestDetails)); } @VisibleForTesting @@ -420,10 +420,12 @@ public abstract class BaseHapiFhirResourceDao extends B @Override public DaoMethodOutcome delete(IIdType theId, RequestDetails theRequestDetails) { + TransactionDetails transactionDetails = new TransactionDetails(); + validateIdPresentForDelete(theId); validateDeleteEnabled(); - return myTransactionService.execute(theRequestDetails, tx -> { + return myTransactionService.execute(theRequestDetails, transactionDetails, tx -> { DeleteConflictList deleteConflicts = new DeleteConflictList(); if (isNotBlank(theId.getValue())) { deleteConflicts.setResourceIdMarkedForDeletion(theId); @@ -431,7 +433,7 @@ public abstract class BaseHapiFhirResourceDao extends B StopWatch w = new StopWatch(); - DaoMethodOutcome retVal = delete(theId, deleteConflicts, theRequestDetails, new TransactionDetails()); + DaoMethodOutcome retVal = delete(theId, deleteConflicts, theRequestDetails, transactionDetails); DeleteConflictService.validateDeleteConflictsEmptyOrThrowException(getContext(), deleteConflicts); @@ -521,13 +523,15 @@ public abstract class BaseHapiFhirResourceDao extends B @Override public DeleteMethodOutcome deleteByUrl(String theUrl, RequestDetails theRequest) { validateDeleteEnabled(); + + TransactionDetails transactionDetails = new TransactionDetails(); ResourceSearch resourceSearch = myMatchUrlService.getResourceSearch(theUrl); if (resourceSearch.isDeleteExpunge()) { return deleteExpunge(theUrl, theRequest); } - return myTransactionService.execute(theRequest, tx -> { + return myTransactionService.execute(theRequest, transactionDetails, tx -> { DeleteConflictList deleteConflicts = new DeleteConflictList(); DeleteMethodOutcome outcome = deleteByUrl(theUrl, deleteConflicts, theRequest); DeleteConflictService.validateDeleteConflictsEmptyOrThrowException(getContext(), deleteConflicts); @@ -542,8 +546,9 @@ public abstract class BaseHapiFhirResourceDao extends B @Override public DeleteMethodOutcome deleteByUrl(String theUrl, DeleteConflictList deleteConflicts, RequestDetails theRequestDetails) { validateDeleteEnabled(); + TransactionDetails transactionDetails = new TransactionDetails(); - return myTransactionService.execute(theRequestDetails, tx -> doDeleteByUrl(theUrl, deleteConflicts, theRequestDetails)); + return myTransactionService.execute(theRequestDetails, transactionDetails, tx -> doDeleteByUrl(theUrl, deleteConflicts, theRequestDetails)); } @Nonnull @@ -1016,7 +1021,8 @@ public abstract class BaseHapiFhirResourceDao extends B @Override public DaoMethodOutcome patch(IIdType theId, String theConditionalUrl, PatchTypeEnum thePatchType, String thePatchBody, IBaseParameters theFhirPatchBody, RequestDetails theRequest) { - return myTransactionService.execute(theRequest, tx -> doPatch(theId, theConditionalUrl, thePatchType, thePatchBody, theFhirPatchBody, theRequest, new TransactionDetails())); + TransactionDetails transactionDetails = new TransactionDetails(); + return myTransactionService.execute(theRequest, transactionDetails, tx -> doPatch(theId, theConditionalUrl, thePatchType, thePatchBody, theFhirPatchBody, theRequest, transactionDetails)); } private DaoMethodOutcome doPatch(IIdType theId, String theConditionalUrl, PatchTypeEnum thePatchType, String thePatchBody, IBaseParameters theFhirPatchBody, RequestDetails theRequest, TransactionDetails theTransactionDetails) { @@ -1132,8 +1138,9 @@ public abstract class BaseHapiFhirResourceDao extends B @Override public T read(IIdType theId, RequestDetails theRequest, boolean theDeletedOk) { validateResourceTypeAndThrowInvalidRequestException(theId); + TransactionDetails transactionDetails = new TransactionDetails(); - return myTransactionService.execute(theRequest, tx -> doRead(theId, theRequest, theDeletedOk)); + return myTransactionService.execute(theRequest, transactionDetails, tx -> doRead(theId, theRequest, theDeletedOk)); } public T doRead(IIdType theId, RequestDetails theRequest, boolean theDeletedOk) { @@ -1459,7 +1466,9 @@ public abstract class BaseHapiFhirResourceDao extends B @Override public Set searchForIds(SearchParameterMap theParams, RequestDetails theRequest) { - return myTransactionService.execute(theRequest, tx -> { + TransactionDetails transactionDetails = new TransactionDetails(); + + return myTransactionService.execute(theRequest, transactionDetails, tx -> { if (theParams.getLoadSynchronousUpTo() != null) { theParams.setLoadSynchronousUpTo(Math.min(getConfig().getInternalSynchronousSearchSize(), theParams.getLoadSynchronousUpTo())); @@ -1566,8 +1575,8 @@ public abstract class BaseHapiFhirResourceDao extends B String id = theResource.getIdElement().getValue(); Runnable onRollback = () -> theResource.getIdElement().setValue(id); - // Execute the update in a retriable transaction - return myTransactionService.execute(theRequest, tx -> doUpdate(theResource, theMatchUrl, thePerformIndexing, theForceUpdateVersion, theRequest, theTransactionDetails), onRollback); + // Execute the update in a retryable transaction + return myTransactionService.execute(theRequest, theTransactionDetails, tx -> doUpdate(theResource, theMatchUrl, thePerformIndexing, theForceUpdateVersion, theRequest, theTransactionDetails), onRollback); } private DaoMethodOutcome doUpdate(T theResource, String theMatchUrl, boolean thePerformIndexing, boolean theForceUpdateVersion, RequestDetails theRequest, TransactionDetails theTransactionDetails) { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseTransactionProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseTransactionProcessor.java index 2e3c0c7c6f2..29c993d2109 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseTransactionProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseTransactionProcessor.java @@ -25,6 +25,7 @@ import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.interceptor.api.Pointcut; +import ca.uhn.fhir.interceptor.model.TransactionWriteOperationsDetails; import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; @@ -446,33 +447,7 @@ public abstract class BaseTransactionProcessor { } entries.sort(new TransactionSorter(placeholderIds)); - /* - * All of the write operations in the transaction (PUT, POST, etc.. basically anything - * except GET) are performed in their own database transaction before we do the reads. - * We do this because the reads (specifically the searches) often spawn their own - * secondary database transaction and if we allow that within the primary - * database transaction we can end up with deadlocks if the server is under - * heavy load with lots of concurrent transactions using all available - * database connections. - */ - TransactionCallback> txCallback = status -> { - final Set allIds = new LinkedHashSet<>(); - final Map idSubstitutions = new HashMap<>(); - final Map idToPersistedOutcome = new HashMap<>(); - Map retVal = doTransactionWriteOperations(theRequestDetails, theActionName, transactionDetails, allIds, idSubstitutions, idToPersistedOutcome, response, originalRequestOrder, entries, transactionStopWatch); - - transactionStopWatch.startTask("Commit writes to database"); - return retVal; - }; - Map entriesToProcess = myHapiTransactionService.execute(theRequestDetails, txCallback); - transactionStopWatch.endCurrentTask(); - - for (Map.Entry nextEntry : entriesToProcess.entrySet()) { - String responseLocation = nextEntry.getValue().toUnqualified().getValue(); - String responseEtag = nextEntry.getValue().getVersionIdPart(); - myVersionAdapter.setResponseLocation(nextEntry.getKey(), responseLocation); - myVersionAdapter.setResponseETag(nextEntry.getKey(), responseEtag); - } + doTransactionWriteOperations(theRequestDetails, theActionName, transactionDetails, transactionStopWatch, response, originalRequestOrder, entries); /* * Loop through the request and process any entries of type GET @@ -555,6 +530,79 @@ public abstract class BaseTransactionProcessor { return response; } + /** + * All of the write operations in the transaction (PUT, POST, etc.. basically anything + * except GET) are performed in their own database transaction before we do the reads. + * We do this because the reads (specifically the searches) often spawn their own + * secondary database transaction and if we allow that within the primary + * database transaction we can end up with deadlocks if the server is under + * heavy load with lots of concurrent transactions using all available + * database connections. + */ + private void doTransactionWriteOperations(RequestDetails theRequestDetails, String theActionName, TransactionDetails theTransactionDetails, StopWatch theTransactionStopWatch, IBaseBundle theResponse, IdentityHashMap theOriginalRequestOrder, List theEntries) { + TransactionWriteOperationsDetails writeOperationsDetails = null; + if (CompositeInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_PRE, myInterceptorBroadcaster, theRequestDetails) || + CompositeInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_POST, myInterceptorBroadcaster, theRequestDetails)) { + + List updateRequestUrls = new ArrayList<>(); + List conditionalCreateRequestUrls = new ArrayList<>(); + for (IBase nextEntry : theEntries) { + String method = myVersionAdapter.getEntryRequestVerb(myContext, nextEntry); + if ("PUT".equals(method)) { + String requestUrl = myVersionAdapter.getEntryRequestUrl(nextEntry); + if (isNotBlank(requestUrl)) { + updateRequestUrls.add(requestUrl); + } + } else if ("POST".equals(method)) { + String requestUrl = myVersionAdapter.getEntryRequestIfNoneExist(nextEntry); + if (isNotBlank(requestUrl) && requestUrl.contains("?")) { + conditionalCreateRequestUrls.add(requestUrl); + } + } + } + + writeOperationsDetails = new TransactionWriteOperationsDetails(); + writeOperationsDetails.setUpdateRequestUrls(updateRequestUrls); + writeOperationsDetails.setConditionalCreateRequestUrls(conditionalCreateRequestUrls); + HookParams params = new HookParams() + .add(TransactionDetails.class, theTransactionDetails) + .add(TransactionWriteOperationsDetails.class, writeOperationsDetails); + CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_PRE, params); + + } + + TransactionCallback> txCallback = status -> { + final Set allIds = new LinkedHashSet<>(); + final Map idSubstitutions = new HashMap<>(); + final Map idToPersistedOutcome = new HashMap<>(); + Map retVal = doTransactionWriteOperations(theRequestDetails, theActionName, theTransactionDetails, allIds, idSubstitutions, idToPersistedOutcome, theResponse, theOriginalRequestOrder, theEntries, theTransactionStopWatch); + + theTransactionStopWatch.startTask("Commit writes to database"); + return retVal; + }; + Map entriesToProcess; + + try { + entriesToProcess = myHapiTransactionService.execute(theRequestDetails, theTransactionDetails, txCallback); + } finally { + if (writeOperationsDetails != null) { + HookParams params = new HookParams() + .add(TransactionDetails.class, theTransactionDetails) + .add(TransactionWriteOperationsDetails.class, writeOperationsDetails); + CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_POST, params); + } + } + + theTransactionStopWatch.endCurrentTask(); + + for (Map.Entry nextEntry : entriesToProcess.entrySet()) { + String responseLocation = nextEntry.getValue().toUnqualified().getValue(); + String responseEtag = nextEntry.getValue().getVersionIdPart(); + myVersionAdapter.setResponseLocation(nextEntry.getKey(), responseLocation); + myVersionAdapter.setResponseETag(nextEntry.getKey(), responseEtag); + } + } + private boolean isValidVerb(String theVerb) { try { return org.hl7.fhir.r4.model.Bundle.HTTPVerb.fromCode(theVerb) != null; @@ -891,7 +939,6 @@ public abstract class BaseTransactionProcessor { theTransactionStopWatch.endCurrentTask(); } - /* * Make sure that there are no conflicts from deletions. E.g. we can't delete something * if something else has a reference to it.. Unless the thing that has a reference to it @@ -1094,6 +1141,8 @@ public abstract class BaseTransactionProcessor { } if (newId != null) { ourLog.debug(" * Replacing resource ref {} with {}", nextId, newId); + + addRollbackReferenceRestore(theTransactionDetails, resourceReference); if (theReferencesToAutoVersion.contains(resourceReference)) { resourceReference.setReference(newId.getValue()); resourceReference.setResource(null); @@ -1108,6 +1157,7 @@ public abstract class BaseTransactionProcessor { if (theReferencesToAutoVersion.contains(resourceReference)) { DaoMethodOutcome outcome = theIdToPersistedOutcome.get(nextId); if (!outcome.isNop() && !Boolean.TRUE.equals(outcome.getCreated())) { + addRollbackReferenceRestore(theTransactionDetails, resourceReference); resourceReference.setReference(nextId.getValue()); resourceReference.setResource(null); } @@ -1126,6 +1176,10 @@ public abstract class BaseTransactionProcessor { if (theIdSubstitutions.containsKey(nextUriString)) { IIdType newId = theIdSubstitutions.get(nextUriString); ourLog.debug(" * Replacing resource ref {} with {}", nextUriString, newId); + + String existingValue = nextRef.getValueAsString(); + theTransactionDetails.addRollbackUndoAction(() -> nextRef.setValueAsString(existingValue)); + nextRef.setValueAsString(newId.toVersionless().getValue()); } else { ourLog.debug(" * Reference [{}] does not exist in bundle", nextUriString); @@ -1180,6 +1234,11 @@ public abstract class BaseTransactionProcessor { } } + private void addRollbackReferenceRestore(TransactionDetails theTransactionDetails, IBaseReference resourceReference) { + String existingValue = resourceReference.getReferenceElement().getValue(); + theTransactionDetails.addRollbackUndoAction(() -> resourceReference.setReference(existingValue)); + } + private void validateNoDuplicates(RequestDetails theRequest, String theActionName, Map> conditionalRequestUrls, Collection thePersistedOutcomes) { IdentityHashMap resourceToIndexedParams = new IdentityHashMap<>(thePersistedOutcomes.size()); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/index/DaoSearchParamSynchronizer.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/index/DaoSearchParamSynchronizer.java index edff1568d94..158d576ffbf 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/index/DaoSearchParamSynchronizer.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/index/DaoSearchParamSynchronizer.java @@ -28,6 +28,8 @@ import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.searchparam.extractor.ResourceIndexedSearchParams; import ca.uhn.fhir.jpa.util.AddRemoveCount; import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -41,6 +43,7 @@ import java.util.List; @Service public class DaoSearchParamSynchronizer { + private static final Logger ourLog = LoggerFactory.getLogger(DaoSearchParamSynchronizer.class); @PersistenceContext(type = PersistenceContextType.TRANSACTION) protected EntityManager myEntityManager; @Autowired diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java index 35428f31bd3..cb1e5b5e817 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java @@ -24,6 +24,8 @@ import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.api.model.ResourceVersionConflictResolutionStrategy; +import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao; +import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; @@ -65,11 +67,11 @@ public class HapiTransactionService { myTxTemplate = new TransactionTemplate(myTransactionManager); } - public T execute(RequestDetails theRequestDetails, TransactionCallback theCallback) { - return execute(theRequestDetails, theCallback, null); + public T execute(RequestDetails theRequestDetails, TransactionDetails theTransactionDetails, TransactionCallback theCallback) { + return execute(theRequestDetails, theTransactionDetails, theCallback, null); } - public T execute(RequestDetails theRequestDetails, TransactionCallback theCallback, Runnable theOnRollback) { + public T execute(RequestDetails theRequestDetails, TransactionDetails theTransactionDetails, TransactionCallback theCallback, Runnable theOnRollback) { for (int i = 0; ; i++) { try { @@ -108,7 +110,14 @@ public class HapiTransactionService { } if (i < maxRetries) { + theTransactionDetails.getRollbackUndoActions().forEach(t->t.run()); + theTransactionDetails.clearRollbackUndoActions(); + theTransactionDetails.clearResolvedItems(); + theTransactionDetails.clearUserData(BaseHapiFhirDao.XACT_USERDATA_KEY_RESOLVED_TAG_DEFINITIONS); + theTransactionDetails.clearUserData(BaseHapiFhirDao.XACT_USERDATA_KEY_EXISTING_SEARCH_PARAMS); sleepAtLeast(250, false); + + ourLog.info("About to start a transaction retry due to conflict or constraint error"); continue; } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/interceptor/TransactionConcurrencySemaphoreInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/interceptor/TransactionConcurrencySemaphoreInterceptor.java new file mode 100644 index 00000000000..11e50a31778 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/interceptor/TransactionConcurrencySemaphoreInterceptor.java @@ -0,0 +1,104 @@ +package ca.uhn.fhir.jpa.interceptor; + + +import ca.uhn.fhir.interceptor.api.Hook; +import ca.uhn.fhir.interceptor.api.Interceptor; +import ca.uhn.fhir.interceptor.api.Pointcut; +import ca.uhn.fhir.interceptor.model.TransactionWriteOperationsDetails; +import ca.uhn.fhir.jpa.util.MemoryCacheService; +import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +/** + * This interceptor uses semaphores to avoid multiple concurrent FHIR transaction + * bundles from processing the same records at the same time, avoiding concurrency + * issues. + */ +@Interceptor +public class TransactionConcurrencySemaphoreInterceptor { + + private static final Logger ourLog = LoggerFactory.getLogger(TransactionConcurrencySemaphoreInterceptor.class); + private static final String HELD_SEMAPHORES = TransactionConcurrencySemaphoreInterceptor.class.getName() + "_HELD_SEMAPHORES"; + private final Cache mySemaphoreCache; + private final MemoryCacheService myMemoryCacheService; + + /** + * Constructor + */ + public TransactionConcurrencySemaphoreInterceptor(MemoryCacheService theMemoryCacheService) { + myMemoryCacheService = theMemoryCacheService; + mySemaphoreCache = Caffeine + .newBuilder() + .expireAfterAccess(1, TimeUnit.MINUTES) + .build(); + } + + @Hook(Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_PRE) + public void pre(TransactionDetails theTransactionDetails, TransactionWriteOperationsDetails theWriteOperationsDetails) { + List heldSemaphores = new ArrayList<>(); + + acquireSemaphoresForUrlList(heldSemaphores, theWriteOperationsDetails.getUpdateRequestUrls(), false); + acquireSemaphoresForUrlList(heldSemaphores, theWriteOperationsDetails.getConditionalCreateRequestUrls(), true); + + theTransactionDetails.putUserData(HELD_SEMAPHORES, heldSemaphores); + } + + private void acquireSemaphoresForUrlList(List heldSemaphores, List urls, boolean isConditionalCreates) { + for (String next : urls) { + + if (isConditionalCreates) { + if (myMemoryCacheService.getIfPresent(MemoryCacheService.CacheEnum.MATCH_URL, next) != null) { + continue; + } + } + + Semaphore semaphore = mySemaphoreCache.get(next, t -> new Semaphore(1)); + if (heldSemaphores.contains(semaphore)) { + continue; + } + + assert semaphore != null; + try { + if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) { + ourLog.warn("Timed out waiting for semaphore on request URL: {}", next); + } else { + heldSemaphores.add(semaphore); + } + } catch (InterruptedException e) { + ourLog.warn("Interrupted during semaphore acquisition"); + } + } + } + + @Hook(Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_POST) + public void post(TransactionDetails theTransactionDetails) { + List heldSemaphores = theTransactionDetails.getUserData(HELD_SEMAPHORES); + for (Semaphore next : heldSemaphores) { + next.release(); + } + } + + /** + * Clear all semaphors from the list. This is really mostly intended for testing scenarios. + */ + public void clearSemaphores() { + mySemaphoreCache.invalidateAll(); + } + + /** + * Returns a count of all semaphores currently in the cache (incuding held and unheld semaphores) + */ + public long countSemaphores() { + return mySemaphoreCache.estimatedSize(); + } + + +} diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/TransactionProcessorTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/TransactionProcessorTest.java index 6655fb5b345..0bace079a8f 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/TransactionProcessorTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/TransactionProcessorTest.java @@ -29,6 +29,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.TransactionCallback; import javax.persistence.EntityManager; @@ -38,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @ExtendWith(SpringExtension.class) @@ -74,9 +76,9 @@ public class TransactionProcessorTest { @BeforeEach public void before() { - when(myHapiTransactionService.execute(any(), any())).thenAnswer(t -> { - TransactionCallback callback = t.getArgument(1, TransactionCallback.class); - return callback.doInTransaction(null); + when(myHapiTransactionService.execute(any(), any(), any())).thenAnswer(t -> { + TransactionCallback callback = t.getArgument(2, TransactionCallback.class); + return callback.doInTransaction(mock(TransactionStatus.class)); }); myTransactionProcessor.setEntityManagerForUnitTest(myEntityManager); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4ConcurrentWriteTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4ConcurrentWriteTest.java index f547044b7ba..73c6ac46275 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4ConcurrentWriteTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4ConcurrentWriteTest.java @@ -1,6 +1,7 @@ package ca.uhn.fhir.jpa.dao.r4; import ca.uhn.fhir.interceptor.executor.InterceptorService; +import ca.uhn.fhir.jpa.interceptor.TransactionConcurrencySemaphoreInterceptor; import ca.uhn.fhir.jpa.interceptor.UserRequestRetryVersionConflictsInterceptor; import ca.uhn.fhir.jpa.partition.SystemRequestDetails; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; @@ -33,6 +34,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.test.annotation.DirtiesContext; +import javax.annotation.Nonnull; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -61,12 +63,14 @@ public class FhirResourceDaoR4ConcurrentWriteTest extends BaseJpaR4Test { private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(FhirResourceDaoR4ConcurrentWriteTest.class); private ExecutorService myExecutor; private UserRequestRetryVersionConflictsInterceptor myRetryInterceptor; + private TransactionConcurrencySemaphoreInterceptor myConcurrencySemaphoreInterceptor; @BeforeEach public void before() { myExecutor = Executors.newFixedThreadPool(10); myRetryInterceptor = new UserRequestRetryVersionConflictsInterceptor(); + myConcurrencySemaphoreInterceptor = new TransactionConcurrencySemaphoreInterceptor(myMemoryCacheService); RestfulServer server = new RestfulServer(myFhirCtx); when(mySrd.getServer()).thenReturn(server); @@ -77,57 +81,19 @@ public class FhirResourceDaoR4ConcurrentWriteTest extends BaseJpaR4Test { public void after() { myExecutor.shutdown(); myInterceptorRegistry.unregisterInterceptor(myRetryInterceptor); + myInterceptorRegistry.unregisterInterceptor(myConcurrencySemaphoreInterceptor); } @Test - public void testConcurrentTransactionCreates() { + public void testTransactionCreates_NoGuard() { myDaoConfig.setMatchUrlCache(true); - AtomicInteger counter = new AtomicInteger(0); - Runnable creator = () -> { - BundleBuilder bb = new BundleBuilder(myFhirCtx); - String patientId = "Patient/PT" + counter.get(); - IdType practitionerId = IdType.newRandomUuid(); - IdType practitionerId2 = IdType.newRandomUuid(); - - ExplanationOfBenefit eob = new ExplanationOfBenefit(); - eob.addIdentifier().setSystem("foo").setValue("" + counter.get()); - eob.getPatient().setReference(patientId); - eob.addCareTeam().getProvider().setReference(practitionerId.getValue()); - eob.addCareTeam().getProvider().setReference(practitionerId2.getValue()); - bb.addTransactionUpdateEntry(eob).conditional("ExplanationOfBenefit?identifier=foo|" + counter.get()); - - Patient pt = new Patient(); - pt.setId(patientId); - pt.setActive(true); - bb.addTransactionUpdateEntry(pt); - - Coverage coverage = new Coverage(); - coverage.addIdentifier().setSystem("foo").setValue("" + counter.get()); - coverage.getBeneficiary().setReference(patientId); - bb.addTransactionUpdateEntry(coverage).conditional("Coverage?identifier=foo|" + counter.get()); - - Practitioner practitioner = new Practitioner(); - practitioner.setId(practitionerId); - practitioner.addIdentifier().setSystem("foo").setValue("" + counter.get()); - bb.addTransactionCreateEntry(practitioner).conditional("Practitioner?identifier=foo|" + counter.get()); - - Practitioner practitioner2 = new Practitioner(); - practitioner2.setId(practitionerId2); - practitioner2.addIdentifier().setSystem("foo2").setValue("" + counter.get()); - bb.addTransactionCreateEntry(practitioner2).conditional("Practitioner?identifier=foo2|" + counter.get()); - - Observation obs = new Observation(); - obs.setId("Observation/OBS" + counter); - obs.getSubject().setReference(pt.getId()); - bb.addTransactionUpdateEntry(obs); - - Bundle input = (Bundle) bb.getBundle(); - mySystemDao.transaction(new SystemRequestDetails(), input); - }; + AtomicInteger passCounter = new AtomicInteger(0); + AtomicInteger fuzzCounter = new AtomicInteger(0); + Runnable creator = newTransactionTaskWithUpdatesAndConditionalUpdates(passCounter, fuzzCounter); for (int i = 0; i < 10; i++) { - counter.set(i); + passCounter.set(i); ourLog.info("*********************************************************************************"); ourLog.info("Starting pass {}", i); ourLog.info("*********************************************************************************"); @@ -149,17 +115,7 @@ public class FhirResourceDaoR4ConcurrentWriteTest extends BaseJpaR4Test { } runInTransaction(() -> { - Map counts = new TreeMap<>(); - myResourceTableDao - .findAll() - .stream() - .forEach(t -> { - counts.putIfAbsent(t.getResourceType(), 0); - int value = counts.get(t.getResourceType()); - value++; - counts.put(t.getResourceType(), value); - }); - ourLog.info("Counts: {}", counts); + Map counts = getResourceCountMap(); assertEquals(10, counts.get("Patient"), counts.toString()); }); @@ -173,75 +129,203 @@ public class FhirResourceDaoR4ConcurrentWriteTest extends BaseJpaR4Test { * retry succeeds and that the data ultimately gets written. */ @Test - public void testConcurrentTransactionConditionalUpdates() throws ExecutionException, InterruptedException { + public void testTransactionCreates_WithRetry() throws ExecutionException, InterruptedException { myInterceptorRegistry.registerInterceptor(myRetryInterceptor); - Patient pt = new Patient(); - pt.setId("Patient/A"); - pt.addIdentifier().setSystem("http://foo").setValue("pt1"); - myPatientDao.update(pt); + AtomicInteger setCounter = new AtomicInteger(0); + AtomicInteger fuzzCounter = new AtomicInteger(0); + Runnable creator = newTransactionTaskWithUpdatesAndConditionalUpdates(setCounter, fuzzCounter); - Observation obs = new Observation(); - obs.setId("Observation/O"); - obs.addIdentifier().setSystem("http://foo").setValue("obs1"); - myObservationDao.update(obs); + for (int set = 0; set < 3; set++) { - AtomicInteger counter = new AtomicInteger(0); - Runnable creator = () -> { - BundleBuilder bb = new BundleBuilder(myFhirCtx); + ourLog.info("*********************************************************************************"); + ourLog.info("Starting pass {}", set); + ourLog.info("*********************************************************************************"); + fuzzCounter.set(set); - Patient patient = new Patient(); - patient.setId(IdType.newRandomUuid()); - patient.addIdentifier().setSystem("http://foo").setValue("pt1"); - patient.addName().setFamily("fam-" + counter.incrementAndGet()); - bb.addTransactionUpdateEntry(patient).conditional("Patient?identifier=http://foo|pt1"); + List> futures = new ArrayList<>(); + for (int j = 0; j < 10; j++) { + futures.add(myExecutor.submit(creator)); + } - Observation observation = new Observation(); - observation.setId(IdType.newRandomUuid()); - observation.addIdentifier().setSystem("http://foo").setValue("obs1"); - observation.getCode().setText("obs-" + counter.incrementAndGet()); - observation.getSubject().setReference(patient.getId()); - bb.addTransactionUpdateEntry(observation).conditional("Observation?identifier=http://foo|obs1"); + for (Future next : futures) { + next.get(); + } - Bundle input = (Bundle) bb.getBundle(); - input.getEntry().get(0).getResource().setId("Patient/A"); - input.getEntry().get(1).getResource().setId("Observation/O"); - SystemRequestDetails requestDetails = new SystemRequestDetails(); - UserRequestRetryVersionConflictsInterceptor.addRetryHeader(requestDetails, 20); - mySystemDao.transaction(requestDetails, input); - }; - List> futures = new ArrayList<>(); - for (int j = 0; j < 10; j++) { - futures.add(myExecutor.submit(creator)); - } - - for (Future next : futures) { - next.get(); } runInTransaction(() -> { - Map counts = new TreeMap<>(); - myResourceTableDao - .findAll() - .stream() - .forEach(t -> { - counts.putIfAbsent(t.getResourceType(), 0); - int value = counts.get(t.getResourceType()); - value++; - counts.put(t.getResourceType(), value); - }); - ourLog.info("Counts: {}", counts); + Map counts = getResourceCountMap(); assertEquals(1, counts.get("Patient"), counts.toString()); assertEquals(1, counts.get("Observation"), counts.toString()); logAllResourceLinks(); - assertEquals(2, myResourceLinkDao.count()); - assertEquals(22, myResourceHistoryTableDao.count()); + assertEquals(6, myResourceLinkDao.count()); + assertEquals(6, myResourceTableDao.count()); + assertEquals(14, myResourceHistoryTableDao.count()); }); } + @Test + public void testTransactionCreates_WithConcurrencySemaphore() throws ExecutionException, InterruptedException { + myInterceptorRegistry.registerInterceptor(myConcurrencySemaphoreInterceptor); + + AtomicInteger setCounter = new AtomicInteger(0); + AtomicInteger fuzzCounter = new AtomicInteger(0); + Runnable creator = newTransactionTaskWithUpdatesAndConditionalUpdates(setCounter, fuzzCounter); + + for (int set = 0; set < 3; set++) { + + ourLog.info("*********************************************************************************"); + ourLog.info("Starting pass {}", set); + ourLog.info("*********************************************************************************"); + fuzzCounter.set(set); + + List> futures = new ArrayList<>(); + for (int j = 0; j < 10; j++) { + futures.add(myExecutor.submit(creator)); + } + + for (Future next : futures) { + next.get(); + } + + } + + runInTransaction(() -> { + Map counts = getResourceCountMap(); + + assertEquals(1, counts.get("Patient"), counts.toString()); + assertEquals(1, counts.get("Observation"), counts.toString()); + logAllResourceLinks(); + assertEquals(6, myResourceLinkDao.count()); + assertEquals(6, myResourceTableDao.count()); + assertEquals(14, myResourceHistoryTableDao.count()); + }); + + assertEquals(6, myConcurrencySemaphoreInterceptor.countSemaphores()); + } + + @Test + public void testTransactionCreates_WithConcurrencySemaphore_DontLockOnCachedMatchUrlsForConditionalCreate() throws ExecutionException, InterruptedException { + myDaoConfig.setMatchUrlCacheEnabled(true); + myInterceptorRegistry.registerInterceptor(myConcurrencySemaphoreInterceptor); + + Runnable creator = ()->{ + BundleBuilder bb = new BundleBuilder(myFhirCtx); + + Patient patient1 = new Patient(); + patient1.addIdentifier().setSystem("http://foo").setValue("1"); + bb.addTransactionCreateEntry(patient1).conditional("Patient?identifier=http://foo|1"); + + Patient patient2 = new Patient(); + patient2.addIdentifier().setSystem("http://foo").setValue("2"); + bb.addTransactionCreateEntry(patient2).conditional("Patient?identifier=http://foo|2"); + + Bundle input = (Bundle) bb.getBundle(); + SystemRequestDetails requestDetails = new SystemRequestDetails(); + mySystemDao.transaction(requestDetails, input); + }; + + for (int set = 0; set < 3; set++) { + myConcurrencySemaphoreInterceptor.clearSemaphores(); + + List> futures = new ArrayList<>(); + for (int j = 0; j < 10; j++) { + futures.add(myExecutor.submit(creator)); + } + + for (Future next : futures) { + next.get(); + } + + if (set == 0) { + assertEquals(2, myConcurrencySemaphoreInterceptor.countSemaphores()); + } else { + assertEquals(0, myConcurrencySemaphoreInterceptor.countSemaphores()); + } + } + + runInTransaction(() -> { + Map counts = getResourceCountMap(); + assertEquals(2, counts.get("Patient"), counts.toString()); + }); + + } + + @Nonnull + private Map getResourceCountMap() { + Map counts = new TreeMap<>(); + myResourceTableDao + .findAll() + .stream() + .forEach(t -> { + counts.putIfAbsent(t.getResourceType(), 0); + int value = counts.get(t.getResourceType()); + value++; + counts.put(t.getResourceType(), value); + }); + ourLog.info("Counts: {}", counts); + return counts; + } + + + @Nonnull + private Runnable newTransactionTaskWithUpdatesAndConditionalUpdates(AtomicInteger theSetCounter, AtomicInteger theFuzzCounter) { + Runnable creator = () -> { + BundleBuilder bb = new BundleBuilder(myFhirCtx); + String patientId = "Patient/PT" + theSetCounter.get(); + IdType practitionerId = IdType.newRandomUuid(); + IdType practitionerId2 = IdType.newRandomUuid(); + + ExplanationOfBenefit eob = new ExplanationOfBenefit(); + eob.addIdentifier().setSystem("foo").setValue("" + theSetCounter.get()); + eob.getPatient().setReference(patientId); + eob.addCareTeam().getProvider().setReference(practitionerId.getValue()); + eob.addCareTeam().getProvider().setReference(practitionerId2.getValue()); + eob.getFormCode().setText("EOB " + theFuzzCounter.get()); + bb.addTransactionUpdateEntry(eob).conditional("ExplanationOfBenefit?identifier=foo|" + theSetCounter.get()); + + Patient pt = new Patient(); + pt.setId(patientId); + pt.setActive(true); + pt.addName().setFamily("FAMILY " + theFuzzCounter.get()); + bb.addTransactionUpdateEntry(pt); + + Coverage coverage = new Coverage(); + coverage.addIdentifier().setSystem("foo").setValue("" + theSetCounter.get()); + coverage.getBeneficiary().setReference(patientId); + coverage.setDependent("DEP " + theFuzzCounter.get()); + bb.addTransactionUpdateEntry(coverage).conditional("Coverage?identifier=foo|" + theSetCounter.get()); + + Practitioner practitioner = new Practitioner(); + practitioner.setId(practitionerId); + practitioner.addIdentifier().setSystem("foo").setValue("" + theSetCounter.get()); + practitioner.addName().setFamily("SET " + theFuzzCounter.get()); + bb.addTransactionCreateEntry(practitioner).conditional("Practitioner?identifier=foo|" + theSetCounter.get()); + + Practitioner practitioner2 = new Practitioner(); + practitioner2.setId(practitionerId2); + practitioner2.addIdentifier().setSystem("foo2").setValue("" + theSetCounter.get()); + practitioner2.addName().setFamily("SET " + theFuzzCounter.get()); + bb.addTransactionCreateEntry(practitioner2).conditional("Practitioner?identifier=foo2|" + theSetCounter.get()); + + Observation obs = new Observation(); + obs.setId("Observation/OBS" + theSetCounter); + obs.getSubject().setReference(pt.getId()); + obs.getCode().setText("SET " + theFuzzCounter.get()); + bb.addTransactionUpdateEntry(obs); + + Bundle input = (Bundle) bb.getBundle(); + SystemRequestDetails requestDetails = new SystemRequestDetails(); + UserRequestRetryVersionConflictsInterceptor.addRetryHeader(requestDetails, 20); + mySystemDao.transaction(requestDetails, input); + }; + return creator; + } + @Test public void testCreateWithClientAssignedId() { diff --git a/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/svc/candidate/CandidateSearcher.java b/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/svc/candidate/CandidateSearcher.java index c43e93e16c1..1b286bf2139 100644 --- a/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/svc/candidate/CandidateSearcher.java +++ b/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/svc/candidate/CandidateSearcher.java @@ -1,5 +1,25 @@ package ca.uhn.fhir.jpa.mdm.svc.candidate; +/*- + * #%L + * HAPI FHIR JPA Server - Master Data Management + * %% + * Copyright (C) 2014 - 2021 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.mdm.svc.MdmSearchParamSvc; diff --git a/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/svc/candidate/TooManyCandidatesException.java b/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/svc/candidate/TooManyCandidatesException.java index a9fa01d2ff5..aef979547ef 100644 --- a/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/svc/candidate/TooManyCandidatesException.java +++ b/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/svc/candidate/TooManyCandidatesException.java @@ -1,5 +1,25 @@ package ca.uhn.fhir.jpa.mdm.svc.candidate; +/*- + * #%L + * HAPI FHIR JPA Server - Master Data Management + * %% + * Copyright (C) 2014 - 2021 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + public class TooManyCandidatesException extends RuntimeException { public TooManyCandidatesException(String theMessage) { super(theMessage); diff --git a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/api/server/storage/TransactionDetails.java b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/api/server/storage/TransactionDetails.java index 2c3c6230cbe..d957d17acfd 100644 --- a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/api/server/storage/TransactionDetails.java +++ b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/api/server/storage/TransactionDetails.java @@ -30,6 +30,7 @@ import org.hl7.fhir.instance.model.api.IIdType; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.EnumSet; @@ -54,6 +55,7 @@ public class TransactionDetails { public static final ResourcePersistentId NOT_FOUND = new ResourcePersistentId(-1L); private final Date myTransactionDate; + private List myRollbackUndoActions = Collections.emptyList(); private Map myResolvedResourceIds = Collections.emptyMap(); private Map myResolvedMatchUrls = Collections.emptyMap(); private Map myUserData; @@ -75,6 +77,39 @@ public class TransactionDetails { myTransactionDate = theTransactionDate; } + /** + * Get the actions that should be executed if the transaction is rolled back + * + * @since 5.5.0 + */ + public List getRollbackUndoActions() { + return Collections.unmodifiableList(myRollbackUndoActions); + } + + /** + * Add an action that should be executed if the transaction is rolled back + * + * @since 5.5.0 + */ + public void addRollbackUndoAction(@Nonnull Runnable theRunnable) { + assert theRunnable != null; + if (myRollbackUndoActions.isEmpty()) { + myRollbackUndoActions = new ArrayList<>(); + } + myRollbackUndoActions.add(theRunnable); + } + + /** + * Clears any previously added rollback actions + * + * @since 5.5.0 + */ + public void clearRollbackUndoActions() { + if (!myRollbackUndoActions.isEmpty()) { + myRollbackUndoActions.clear(); + } + } + /** * A Resolved Resource ID is a mapping between a resource ID (e.g. "Patient/ABC" or * "Observation/123") and a storage ID for that resource. Resources should only be placed within @@ -140,7 +175,18 @@ public class TransactionDetails { } /** - * Sets an arbitraty object that will last the lifetime of the current transaction + * Remove an item previously stored in user data + * + * @see #getUserData(String) + */ + public void clearUserData(String theKey) { + if (myUserData != null) { + myUserData.remove(theKey); + } + } + + /** + * Sets an arbitrary object that will last the lifetime of the current transaction * * @see #getUserData(String) */ @@ -242,5 +288,10 @@ public class TransactionDetails { public void deferredBroadcastProcessingFinished() { myIsPointcutDeferred = false; } + + public void clearResolvedItems() { + myResolvedResourceIds.clear(); + myResolvedMatchUrls.clear(); + } }