transaction bundle multi-threading issues (#4739)
* a solution for trying to avoid multi-thread issues for transaction bundle processing * cleanup * using retries * mnor tweaks * checking in * changes * updating changelog * some minor tweaks * cleanup * flip if --------- Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-MacBook-Pro.local> Co-authored-by: Ken Stevens <ken@smilecdr.com>
This commit is contained in:
parent
58850fed82
commit
1c0addafeb
|
@ -0,0 +1,8 @@
|
|||
---
|
||||
type: fix
|
||||
issue: 4738
|
||||
title: "Processing multiple transaction bundles with similar entries
|
||||
on different threads would lead to Constraint Violations if
|
||||
the entries contained similar unique values.
|
||||
This is an error that can be fixed with Retry Handlers now.
|
||||
"
|
|
@ -22,7 +22,6 @@ package ca.uhn.fhir.jpa.dao.index;
|
|||
import ca.uhn.fhir.context.RuntimeSearchParam;
|
||||
import ca.uhn.fhir.i18n.Msg;
|
||||
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
||||
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
|
||||
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
|
||||
import ca.uhn.fhir.jpa.dao.data.IResourceIndexedComboStringUniqueDao;
|
||||
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
|
||||
|
@ -37,13 +36,9 @@ import ca.uhn.fhir.jpa.searchparam.extractor.ISearchParamWithInlineReferencesExt
|
|||
import ca.uhn.fhir.jpa.searchparam.extractor.ResourceIndexedSearchParams;
|
||||
import ca.uhn.fhir.jpa.searchparam.extractor.SearchParamExtractorService;
|
||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
||||
import ca.uhn.fhir.rest.api.server.storage.NotFoundPid;
|
||||
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
||||
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
|
||||
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
|
||||
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
|
||||
import ca.uhn.fhir.rest.server.util.ResourceSearchParams;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
|
|||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
|
||||
import javax.persistence.EntityManager;
|
||||
import javax.persistence.FlushModeType;
|
||||
import javax.persistence.PersistenceContext;
|
||||
import javax.persistence.PersistenceContextType;
|
||||
import javax.persistence.Query;
|
||||
|
@ -118,10 +119,16 @@ public class SearchQueryExecutor implements ISearchQueryExecutor {
|
|||
|
||||
ourLog.trace("About to execute SQL: {}", sql);
|
||||
|
||||
// This tells hibernate not to flush when we call scroll(), but rather to wait until the transaction commits and
|
||||
// only flush then. We need to do this so that any exceptions that happen in the transaction happen when
|
||||
// we try to commit the transaction, and not here.
|
||||
// See the test called testTransaction_multiThreaded (in FhirResourceDaoR4ConcurrentWriteTest) which triggers
|
||||
// the following exception if we don't set this flush mode:
|
||||
// java.util.concurrent.ExecutionException: org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
|
||||
hibernateQuery.setFlushMode(FlushModeType.COMMIT);
|
||||
ScrollableResults scrollableResults = hibernateQuery.scroll(ScrollMode.FORWARD_ONLY);
|
||||
myResultSet = new ScrollableResultsIterator<>(scrollableResults);
|
||||
myQueryInitialized = true;
|
||||
|
||||
}
|
||||
|
||||
if (myResultSet == null || !myResultSet.hasNext()) {
|
||||
|
@ -134,7 +141,7 @@ public class SearchQueryExecutor implements ISearchQueryExecutor {
|
|||
} catch (Exception e) {
|
||||
ourLog.error("Failed to create or execute SQL query", e);
|
||||
close();
|
||||
throw new InternalErrorException(Msg.code(1262) + e);
|
||||
throw new InternalErrorException(Msg.code(1262) + e, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -116,7 +116,6 @@ public class SearchParamExtractorService {
|
|||
* {@literal theParams}.
|
||||
*/
|
||||
public void extractFromResource(RequestPartitionId theRequestPartitionId, RequestDetails theRequestDetails, ResourceIndexedSearchParams theNewParams, ResourceIndexedSearchParams theExistingParams, ResourceTable theEntity, IBaseResource theResource, TransactionDetails theTransactionDetails, boolean theFailOnInvalidReference, @Nonnull ISearchParamExtractor.ISearchParamFilter theSearchParamFilter) {
|
||||
|
||||
// All search parameter types except Reference
|
||||
ResourceIndexedSearchParams normalParams = new ResourceIndexedSearchParams();
|
||||
extractSearchIndexParameters(theRequestDetails, normalParams, theResource, theSearchParamFilter);
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package ca.uhn.fhir.jpa.dao.r4;
|
||||
|
||||
import ca.uhn.fhir.interceptor.executor.InterceptorService;
|
||||
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
|
||||
import ca.uhn.fhir.jpa.interceptor.TransactionConcurrencySemaphoreInterceptor;
|
||||
import ca.uhn.fhir.jpa.interceptor.UserRequestRetryVersionConflictsInterceptor;
|
||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||
|
@ -14,7 +15,10 @@ import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
|
|||
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
|
||||
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
|
||||
import ca.uhn.fhir.util.BundleBuilder;
|
||||
import ca.uhn.fhir.util.ClasspathUtil;
|
||||
import ca.uhn.fhir.util.HapiExtensions;
|
||||
import ca.uhn.fhir.util.StopWatch;
|
||||
import ca.uhn.test.concurrency.PointcutLatch;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
import org.hl7.fhir.r4.model.BooleanType;
|
||||
import org.hl7.fhir.r4.model.Bundle;
|
||||
|
@ -33,15 +37,18 @@ import org.hl7.fhir.r4.model.StringType;
|
|||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
@ -53,6 +60,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
|||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
@ -60,24 +68,27 @@ import static org.mockito.Mockito.when;
|
|||
@SuppressWarnings({"deprecation", "Duplicates"})
|
||||
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
|
||||
public class FhirResourceDaoR4ConcurrentWriteTest extends BaseJpaR4Test {
|
||||
|
||||
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(FhirResourceDaoR4ConcurrentWriteTest.class);
|
||||
|
||||
private static final int THREAD_COUNT = 10;
|
||||
|
||||
private ExecutorService myExecutor;
|
||||
private UserRequestRetryVersionConflictsInterceptor myRetryInterceptor;
|
||||
private TransactionConcurrencySemaphoreInterceptor myConcurrencySemaphoreInterceptor;
|
||||
|
||||
@Autowired
|
||||
private JpaStorageSettings myStorageSettings;
|
||||
|
||||
@Override
|
||||
@BeforeEach
|
||||
public void before() throws Exception {
|
||||
super.before();
|
||||
myExecutor = Executors.newFixedThreadPool(10);
|
||||
myExecutor = Executors.newFixedThreadPool(THREAD_COUNT);
|
||||
myRetryInterceptor = new UserRequestRetryVersionConflictsInterceptor();
|
||||
myConcurrencySemaphoreInterceptor = new TransactionConcurrencySemaphoreInterceptor(myMemoryCacheService);
|
||||
|
||||
RestfulServer server = new RestfulServer(myFhirContext);
|
||||
when(mySrd.getServer()).thenReturn(server);
|
||||
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
|
@ -87,6 +98,63 @@ public class FhirResourceDaoR4ConcurrentWriteTest extends BaseJpaR4Test {
|
|||
myInterceptorRegistry.unregisterInterceptor(myConcurrencySemaphoreInterceptor);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransaction_multiThreaded()
|
||||
throws InterruptedException, ExecutionException {
|
||||
// setup
|
||||
Bundle bundle1 = ClasspathUtil.loadResource(myFhirContext, Bundle.class, "/r4/test-bundle.json");
|
||||
Bundle bundle2 = ClasspathUtil.loadResource(myFhirContext,
|
||||
Bundle.class, "/r4/test-bundle2.json");
|
||||
Bundle[] bundles = {
|
||||
bundle1,
|
||||
bundle2
|
||||
};
|
||||
int calls = bundles.length;
|
||||
AtomicInteger counter = new AtomicInteger();
|
||||
PointcutLatch latch = new PointcutLatch("transactionLatch");
|
||||
Collection<Callable<Bundle>> callables = new ArrayList<>();
|
||||
|
||||
myInterceptorRegistry.registerInterceptor(myRetryInterceptor);
|
||||
|
||||
latch.setDefaultTimeoutSeconds(5);
|
||||
latch.setExpectedCount(calls);
|
||||
for (int i = 0; i < calls; i++) {
|
||||
int mc = i;
|
||||
Bundle bundle = bundles[i];
|
||||
Callable<Bundle> task = () -> {
|
||||
String name = "task_" + mc;
|
||||
StopWatch watch = new StopWatch();
|
||||
ourLog.info("Starting thread " + name);
|
||||
watch.startTask(name);
|
||||
SystemRequestDetails details = new SystemRequestDetails();
|
||||
details.setRetry(true);
|
||||
details.setMaxRetries(3);
|
||||
Bundle b = mySystemDao.transaction(details,
|
||||
bundle);
|
||||
int c = counter.incrementAndGet();
|
||||
latch.call(1);
|
||||
watch.endCurrentTask();
|
||||
long timeMS = watch.getMillis();
|
||||
ourLog.info("Ending thread " + name + " after " + timeMS + "ms");
|
||||
return b;
|
||||
};
|
||||
callables.add(task);
|
||||
}
|
||||
|
||||
// test
|
||||
List<Future<Bundle>> futures = myExecutor.invokeAll(callables);
|
||||
|
||||
// validate
|
||||
assertEquals(futures.size(), calls);
|
||||
for (Future<Bundle> future : futures) {
|
||||
// make sure no exceptions
|
||||
Bundle b = future.get();
|
||||
assertNotNull(b);
|
||||
}
|
||||
latch.awaitExpected();
|
||||
assertEquals(counter.get(), calls);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransactionCreates_NoGuard() {
|
||||
myStorageSettings.setMatchUrlCache(true);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -59,7 +59,6 @@ import ca.uhn.fhir.rest.api.PreferReturnEnum;
|
|||
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
|
||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||
import ca.uhn.fhir.rest.api.server.storage.DeferredInterceptorBroadcasts;
|
||||
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
||||
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
||||
import ca.uhn.fhir.rest.param.ParameterUtil;
|
||||
import ca.uhn.fhir.rest.server.RestfulServerUtils;
|
||||
|
@ -109,7 +108,20 @@ import org.springframework.transaction.support.TransactionCallback;
|
|||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -133,7 +145,9 @@ public abstract class BaseTransactionProcessor {
|
|||
private PlatformTransactionManager myTxManager;
|
||||
@Autowired
|
||||
private FhirContext myContext;
|
||||
|
||||
@Autowired
|
||||
@SuppressWarnings("rawtypes")
|
||||
private ITransactionProcessorVersionAdapter myVersionAdapter;
|
||||
@Autowired
|
||||
private DaoRegistry myDaoRegistry;
|
||||
|
@ -225,10 +239,12 @@ public abstract class BaseTransactionProcessor {
|
|||
return resp;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void populateEntryWithOperationOutcome(BaseServerResponseException caughtEx, IBase nextEntry) {
|
||||
myVersionAdapter.populateEntryWithOperationOutcome(caughtEx, nextEntry);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void handleTransactionCreateOrUpdateOutcome(IdSubstitutionMap idSubstitutions, Map<IIdType, DaoMethodOutcome> idToPersistedOutcome,
|
||||
IIdType nextResourceId, DaoMethodOutcome outcome,
|
||||
IBase newEntry, String theResourceType,
|
||||
|
@ -502,6 +518,7 @@ public abstract class BaseTransactionProcessor {
|
|||
return response;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void doTransactionReadOperations(final RequestDetails theRequestDetails, IBaseBundle theResponse,
|
||||
List<IBase> theGetEntries, IdentityHashMap<IBase, Integer> theOriginalRequestOrder,
|
||||
StopWatch theTransactionStopWatch, boolean theNestedMode) {
|
||||
|
@ -579,6 +596,7 @@ public abstract class BaseTransactionProcessor {
|
|||
* heavy load with lots of concurrent transactions using all available
|
||||
* database connections.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private void prepareThenExecuteTransactionWriteOperations(RequestDetails theRequestDetails, String theActionName,
|
||||
TransactionDetails theTransactionDetails, StopWatch theTransactionStopWatch,
|
||||
IBaseBundle theResponse, IdentityHashMap<IBase, Integer> theOriginalRequestOrder,
|
||||
|
@ -639,6 +657,7 @@ public abstract class BaseTransactionProcessor {
|
|||
CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, thePointcut, params);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private TransactionWriteOperationsDetails buildWriteOperationsDetails(List<IBase> theEntries) {
|
||||
TransactionWriteOperationsDetails writeOperationsDetails;
|
||||
List<String> updateRequestUrls = new ArrayList<>();
|
||||
|
@ -699,6 +718,7 @@ public abstract class BaseTransactionProcessor {
|
|||
/**
|
||||
* Searches for duplicate conditional creates and consolidates them.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private void consolidateDuplicateConditionals(RequestDetails theRequestDetails, String theActionName, List<IBase> theEntries) {
|
||||
final Set<String> keysWithNoFullUrl = new HashSet<>();
|
||||
final HashMap<String, String> keyToUuid = new HashMap<>();
|
||||
|
@ -786,6 +806,7 @@ public abstract class BaseTransactionProcessor {
|
|||
*/
|
||||
private void replaceReferencesInEntriesWithConsolidatedUUID(List<IBase> theEntries, String theEntryFullUrl, String existingUuid) {
|
||||
for (IBase nextEntry : theEntries) {
|
||||
@SuppressWarnings("unchecked")
|
||||
IBaseResource nextResource = myVersionAdapter.getResource(nextEntry);
|
||||
if (nextResource != null) {
|
||||
for (IBaseReference nextReference : myContext.newTerser().getAllPopulatedChildElementsOfType(nextResource, IBaseReference.class)) {
|
||||
|
@ -819,6 +840,7 @@ public abstract class BaseTransactionProcessor {
|
|||
if (theBaseResource != null) {
|
||||
nextResourceId = theBaseResource.getIdElement();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
String fullUrl = myVersionAdapter.getFullUrl(theNextReqEntry);
|
||||
if (isNotBlank(fullUrl)) {
|
||||
IIdType fullUrlIdType = newIdType(fullUrl);
|
||||
|
@ -865,6 +887,7 @@ public abstract class BaseTransactionProcessor {
|
|||
/**
|
||||
* After pre-hooks have been called
|
||||
*/
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
protected EntriesToProcessMap doTransactionWriteOperations(final RequestDetails theRequest, String theActionName,
|
||||
TransactionDetails theTransactionDetails, Set<IIdType> theAllIds,
|
||||
IdSubstitutionMap theIdSubstitutions, Map<IIdType, DaoMethodOutcome> theIdToPersistedOutcome,
|
||||
|
@ -903,6 +926,7 @@ public abstract class BaseTransactionProcessor {
|
|||
|
||||
IBase nextReqEntry = theEntries.get(i);
|
||||
IBaseResource res = myVersionAdapter.getResource(nextReqEntry);
|
||||
|
||||
IIdType nextResourceId = getNextResourceIdFromBaseResource(res, nextReqEntry, theAllIds);
|
||||
|
||||
String verb = myVersionAdapter.getEntryRequestVerb(myContext, nextReqEntry);
|
||||
|
@ -931,15 +955,18 @@ public abstract class BaseTransactionProcessor {
|
|||
extractAndVerifyTransactionUrlForEntry(nextReqEntry, verb);
|
||||
}
|
||||
validateResourcePresent(res, order, verb);
|
||||
@SuppressWarnings("rawtypes")
|
||||
|
||||
IFhirResourceDao resourceDao = getDaoOrThrowException(res.getClass());
|
||||
res.setId((String) null);
|
||||
|
||||
DaoMethodOutcome outcome;
|
||||
String matchUrl = myVersionAdapter.getEntryRequestIfNoneExist(nextReqEntry);
|
||||
matchUrl = performIdSubstitutionsInMatchUrl(theIdSubstitutions, matchUrl);
|
||||
// create individual resource
|
||||
outcome = resourceDao.create(res, matchUrl, false, theTransactionDetails, theRequest);
|
||||
setConditionalUrlToBeValidatedLater(conditionalUrlToIdMap, matchUrl, outcome.getId());
|
||||
res.setId(outcome.getId());
|
||||
|
||||
if (nextResourceId != null) {
|
||||
handleTransactionCreateOrUpdateOutcome(theIdSubstitutions, theIdToPersistedOutcome, nextResourceId, outcome, nextRespEntry, resourceType, res, theRequest);
|
||||
}
|
||||
|
@ -1140,6 +1167,7 @@ public abstract class BaseTransactionProcessor {
|
|||
// flush writes to db
|
||||
theTransactionStopWatch.startTask("Flush writes to database");
|
||||
|
||||
// flush the changes
|
||||
flushSession(theIdToPersistedOutcome);
|
||||
|
||||
theTransactionStopWatch.endCurrentTask();
|
||||
|
@ -1253,7 +1281,6 @@ public abstract class BaseTransactionProcessor {
|
|||
if (!match.matched()) {
|
||||
throw new PreconditionFailedException(Msg.code(539) + "Invalid conditional URL \"" + matchUrl + "\". The given resource is not matched by this URL.");
|
||||
}
|
||||
;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -1636,7 +1663,7 @@ public abstract class BaseTransactionProcessor {
|
|||
|
||||
/**
|
||||
* Extracts the transaction url from the entry and verifies it's:
|
||||
* * not null or bloack
|
||||
* * not null or blank
|
||||
* * is a relative url matching the resourceType it is about
|
||||
* <p>
|
||||
* Returns the transaction url (or throws an InvalidRequestException if url is not valid)
|
||||
|
|
|
@ -39,6 +39,8 @@ import ca.uhn.fhir.util.ICallable;
|
|||
import ca.uhn.fhir.util.TestUtil;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.hibernate.exception.ConstraintViolationException;
|
||||
import org.hl7.fhir.instance.model.api.IBaseOperationOutcome;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -227,64 +229,78 @@ public class HapiTransactionService implements IHapiTransactionService {
|
|||
|
||||
return doExecuteCallback(theExecutionBuilder, theCallback);
|
||||
|
||||
} catch (ResourceVersionConflictException | DataIntegrityViolationException | ObjectOptimisticLockingFailureException e) {
|
||||
ourLog.debug("Version conflict detected", e);
|
||||
} catch (Exception e) {
|
||||
if (!(ExceptionUtils.indexOfThrowable(e, ResourceVersionConflictException.class) != -1 ||
|
||||
ExceptionUtils.indexOfThrowable(e, DataIntegrityViolationException.class) != -1 ||
|
||||
ExceptionUtils.indexOfThrowable(e, ConstraintViolationException.class) != -1 ||
|
||||
ExceptionUtils.indexOfThrowable(e, ObjectOptimisticLockingFailureException.class) != -1)) {
|
||||
ourLog.error("Unexpected transaction exception. Will not be retried.", e);
|
||||
throw e;
|
||||
} else {
|
||||
|
||||
if (theExecutionBuilder.myOnRollback != null) {
|
||||
theExecutionBuilder.myOnRollback.run();
|
||||
}
|
||||
ourLog.debug("Version conflict detected", e);
|
||||
|
||||
int maxRetries = 0;
|
||||
|
||||
/*
|
||||
* If two client threads both concurrently try to add the same tag that isn't
|
||||
* known to the system already, they'll both try to create a row in HFJ_TAG_DEF,
|
||||
* which is the tag definition table. In that case, a constraint error will be
|
||||
* thrown by one of the client threads, so we auto-retry in order to avoid
|
||||
* annoying spurious failures for the client.
|
||||
*/
|
||||
if (DaoFailureUtil.isTagStorageFailure(e)) {
|
||||
maxRetries = 3;
|
||||
}
|
||||
|
||||
if (maxRetries == 0) {
|
||||
HookParams params = new HookParams()
|
||||
.add(RequestDetails.class, theExecutionBuilder.myRequestDetails)
|
||||
.addIfMatchesType(ServletRequestDetails.class, theExecutionBuilder.myRequestDetails);
|
||||
ResourceVersionConflictResolutionStrategy conflictResolutionStrategy = (ResourceVersionConflictResolutionStrategy) CompositeInterceptorBroadcaster.doCallHooksAndReturnObject(myInterceptorBroadcaster, theExecutionBuilder.myRequestDetails, Pointcut.STORAGE_VERSION_CONFLICT, params);
|
||||
if (conflictResolutionStrategy != null && conflictResolutionStrategy.isRetry()) {
|
||||
maxRetries = conflictResolutionStrategy.getMaxRetries();
|
||||
if (theExecutionBuilder.myOnRollback != null) {
|
||||
theExecutionBuilder.myOnRollback.run();
|
||||
}
|
||||
}
|
||||
|
||||
if (i < maxRetries) {
|
||||
if (theExecutionBuilder.myTransactionDetails != null) {
|
||||
theExecutionBuilder.myTransactionDetails.getRollbackUndoActions().forEach(Runnable::run);
|
||||
theExecutionBuilder.myTransactionDetails.clearRollbackUndoActions();
|
||||
theExecutionBuilder.myTransactionDetails.clearResolvedItems();
|
||||
theExecutionBuilder.myTransactionDetails.clearUserData(XACT_USERDATA_KEY_RESOLVED_TAG_DEFINITIONS);
|
||||
theExecutionBuilder.myTransactionDetails.clearUserData(XACT_USERDATA_KEY_EXISTING_SEARCH_PARAMS);
|
||||
int maxRetries = 0;
|
||||
|
||||
/*
|
||||
* If two client threads both concurrently try to add the same tag that isn't
|
||||
* known to the system already, they'll both try to create a row in HFJ_TAG_DEF,
|
||||
* which is the tag definition table. In that case, a constraint error will be
|
||||
* thrown by one of the client threads, so we auto-retry in order to avoid
|
||||
* annoying spurious failures for the client.
|
||||
*/
|
||||
if (DaoFailureUtil.isTagStorageFailure(e)) {
|
||||
maxRetries = 3;
|
||||
}
|
||||
double sleepAmount = (250.0d * i) * Math.random();
|
||||
long sleepAmountLong = (long) sleepAmount;
|
||||
TestUtil.sleepAtLeast(sleepAmountLong, false);
|
||||
|
||||
ourLog.info("About to start a transaction retry due to conflict or constraint error. Sleeping {}ms first.", sleepAmountLong);
|
||||
continue;
|
||||
if (maxRetries == 0) {
|
||||
HookParams params = new HookParams()
|
||||
.add(RequestDetails.class, theExecutionBuilder.myRequestDetails)
|
||||
.addIfMatchesType(ServletRequestDetails.class, theExecutionBuilder.myRequestDetails);
|
||||
ResourceVersionConflictResolutionStrategy conflictResolutionStrategy = (ResourceVersionConflictResolutionStrategy) CompositeInterceptorBroadcaster.doCallHooksAndReturnObject(
|
||||
myInterceptorBroadcaster,
|
||||
theExecutionBuilder.myRequestDetails,
|
||||
Pointcut.STORAGE_VERSION_CONFLICT,
|
||||
params
|
||||
);
|
||||
if (conflictResolutionStrategy != null && conflictResolutionStrategy.isRetry()) {
|
||||
maxRetries = conflictResolutionStrategy.getMaxRetries();
|
||||
}
|
||||
}
|
||||
|
||||
if (i < maxRetries) {
|
||||
if (theExecutionBuilder.myTransactionDetails != null) {
|
||||
theExecutionBuilder.myTransactionDetails.getRollbackUndoActions().forEach(Runnable::run);
|
||||
theExecutionBuilder.myTransactionDetails.clearRollbackUndoActions();
|
||||
theExecutionBuilder.myTransactionDetails.clearResolvedItems();
|
||||
theExecutionBuilder.myTransactionDetails.clearUserData(XACT_USERDATA_KEY_RESOLVED_TAG_DEFINITIONS);
|
||||
theExecutionBuilder.myTransactionDetails.clearUserData(XACT_USERDATA_KEY_EXISTING_SEARCH_PARAMS);
|
||||
}
|
||||
double sleepAmount = (250.0d * i) * Math.random();
|
||||
long sleepAmountLong = (long) sleepAmount;
|
||||
TestUtil.sleepAtLeast(sleepAmountLong, false);
|
||||
|
||||
ourLog.info("About to start a transaction retry due to conflict or constraint error. Sleeping {}ms first.", sleepAmountLong);
|
||||
continue;
|
||||
}
|
||||
|
||||
IBaseOperationOutcome oo = null;
|
||||
if (e instanceof ResourceVersionConflictException) {
|
||||
oo = ((ResourceVersionConflictException) e).getOperationOutcome();
|
||||
}
|
||||
|
||||
if (maxRetries > 0) {
|
||||
String msg = "Max retries (" + maxRetries + ") exceeded for version conflict: " + e.getMessage();
|
||||
ourLog.info(msg, maxRetries);
|
||||
throw new ResourceVersionConflictException(Msg.code(549) + msg);
|
||||
}
|
||||
|
||||
throw new ResourceVersionConflictException(Msg.code(550) + e.getMessage(), e, oo);
|
||||
}
|
||||
|
||||
IBaseOperationOutcome oo = null;
|
||||
if (e instanceof ResourceVersionConflictException) {
|
||||
oo = ((ResourceVersionConflictException) e).getOperationOutcome();
|
||||
}
|
||||
|
||||
if (maxRetries > 0) {
|
||||
String msg = "Max retries (" + maxRetries + ") exceeded for version conflict: " + e.getMessage();
|
||||
ourLog.info(msg, maxRetries);
|
||||
throw new ResourceVersionConflictException(Msg.code(549) + msg);
|
||||
}
|
||||
|
||||
throw new ResourceVersionConflictException(Msg.code(550) + e.getMessage(), e, oo);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
|
|
@ -23,8 +23,8 @@ import ca.uhn.fhir.interceptor.api.Hook;
|
|||
import ca.uhn.fhir.interceptor.api.Interceptor;
|
||||
import ca.uhn.fhir.interceptor.api.Pointcut;
|
||||
import ca.uhn.fhir.jpa.api.model.ResourceVersionConflictResolutionStrategy;
|
||||
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
|
||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue