added retry for PessimisticLockingFailureException on transactions (#5554)

This commit is contained in:
Emre Dincturk 2023-12-18 11:35:36 -05:00 committed by GitHub
parent 7863f03c68
commit 77da1deeda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 305 additions and 23 deletions

View File

@ -0,0 +1,31 @@
package ca.uhn.fhir.util;
/**
* A utility class for thread sleeps.
* Uses non-static methods for easier mocking and unnecessary waits in unit tests
*/
public class SleepUtil {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SleepUtil.class);
public void sleepAtLeast(long theMillis) {
sleepAtLeast(theMillis, true);
}
@SuppressWarnings("BusyWait")
public void sleepAtLeast(long theMillis, boolean theLogProgress) {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() <= start + theMillis) {
try {
long timeSinceStarted = System.currentTimeMillis() - start;
long timeToSleep = Math.max(0, theMillis - timeSinceStarted);
if (theLogProgress) {
ourLog.info("Sleeping for {}ms", timeToSleep);
}
Thread.sleep(timeToSleep);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
ourLog.error("Interrupted", e);
}
}
}
}

View File

@ -31,6 +31,9 @@ import static org.apache.commons.lang3.StringUtils.defaultString;
public class TestUtil {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(TestUtil.class);
private static SleepUtil ourSleepUtil = new SleepUtil();
private static boolean ourShouldRandomizeTimezones = true;
public static void setShouldRandomizeTimezones(boolean theShouldRandomizeTimezones) {
@ -135,25 +138,22 @@ public class TestUtil {
return stripReturns(theString).replace(" ", "");
}
/**
*
* In production code, instead of this static method, it is better to use an instance of SleepUtil.
* Since SleepUtil isn't using static methods, it is easier to mock for unit test and avoid unnecessary waits in
* unit tests
*/
public static void sleepAtLeast(long theMillis) {
sleepAtLeast(theMillis, true);
ourSleepUtil.sleepAtLeast(theMillis);
}
@SuppressWarnings("BusyWait")
/**
* In production code, instead of this static method, it is better to use an instance of SleepUtil.
* Since SleepUtil isn't using static methods, it is easier to mock for unit test and avoid unnecessary waits in
* unit tests
*/
public static void sleepAtLeast(long theMillis, boolean theLogProgress) {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() <= start + theMillis) {
try {
long timeSinceStarted = System.currentTimeMillis() - start;
long timeToSleep = Math.max(0, theMillis - timeSinceStarted);
if (theLogProgress) {
ourLog.info("Sleeping for {}ms", timeToSleep);
}
Thread.sleep(timeToSleep);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
ourLog.error("Interrupted", e);
}
}
ourSleepUtil.sleepAtLeast(theMillis, theLogProgress);
}
}

View File

@ -0,0 +1,29 @@
package ca.uhn.fhir.util;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
class SleepUtilTest {
@Test
public void testSleepAtLeast() {
SleepUtil sleepUtil = new SleepUtil();
long amountToSleepMs = 10;
long start = System.currentTimeMillis();
sleepUtil.sleepAtLeast(amountToSleepMs);
long stop = System.currentTimeMillis();
long actualSleepDurationMs = stop - start;
assertTrue(actualSleepDurationMs >= amountToSleepMs);
}
@Test
public void testZeroMs() {
// 0 is a valid input
SleepUtil sleepUtil = new SleepUtil();
sleepUtil.sleepAtLeast(0);
}
}

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 5553
title: "mdm-clear jobs are prone to failing because of deadlocks when running on SQL Server. Such job failures have
been mitigated to some extent by increasing the retries on deadlocks."

View File

@ -36,7 +36,7 @@ import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
import ca.uhn.fhir.util.ICallable;
import ca.uhn.fhir.util.TestUtil;
import ca.uhn.fhir.util.SleepUtil;
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
@ -48,6 +48,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.dao.PessimisticLockingFailureException;
import org.springframework.orm.ObjectOptimisticLockingFailureException;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
@ -89,11 +90,18 @@ public class HapiTransactionService implements IHapiTransactionService {
private Propagation myTransactionPropagationWhenChangingPartitions = Propagation.REQUIRED;
private SleepUtil mySleepUtil = new SleepUtil();
@VisibleForTesting
public void setInterceptorBroadcaster(IInterceptorBroadcaster theInterceptorBroadcaster) {
myInterceptorBroadcaster = theInterceptorBroadcaster;
}
@VisibleForTesting
public void setSleepUtil(SleepUtil theSleepUtil) {
mySleepUtil = theSleepUtil;
}
@Override
public IExecutionBuilder withRequest(@Nullable RequestDetails theRequestDetails) {
return new ExecutionBuilder(theRequestDetails);
@ -281,6 +289,25 @@ public class HapiTransactionService implements IHapiTransactionService {
return doExecuteInTransaction(theExecutionBuilder, theCallback, requestPartitionId, previousRequestPartitionId);
}
private boolean isThrowableOrItsSubclassPresent(Throwable theThrowable, Class<? extends Throwable> theClass) {
return ExceptionUtils.indexOfType(theThrowable, theClass) != -1;
}
private boolean isThrowablePresent(Throwable theThrowable, Class<? extends Throwable> theClass) {
return ExceptionUtils.indexOfThrowable(theThrowable, theClass) != -1;
}
private boolean isRetriable(Throwable theThrowable) {
return isThrowablePresent(theThrowable, ResourceVersionConflictException.class)
|| isThrowablePresent(theThrowable, DataIntegrityViolationException.class)
|| isThrowablePresent(theThrowable, ConstraintViolationException.class)
|| isThrowablePresent(theThrowable, ObjectOptimisticLockingFailureException.class)
// calling isThrowableOrItsSubclassPresent instead of isThrowablePresent for
// PessimisticLockingFailureException, because we want to retry on its subclasses as well, especially
// CannotAcquireLockException, which is thrown in some deadlock situations which we want to retry
|| isThrowableOrItsSubclassPresent(theThrowable, PessimisticLockingFailureException.class);
}
@Nullable
private <T> T doExecuteInTransaction(
ExecutionBuilder theExecutionBuilder,
@ -294,11 +321,7 @@ public class HapiTransactionService implements IHapiTransactionService {
return doExecuteCallback(theExecutionBuilder, theCallback);
} catch (Exception e) {
if (!(ExceptionUtils.indexOfThrowable(e, ResourceVersionConflictException.class) != -1
|| ExceptionUtils.indexOfThrowable(e, DataIntegrityViolationException.class) != -1
|| ExceptionUtils.indexOfThrowable(e, ConstraintViolationException.class) != -1
|| ExceptionUtils.indexOfThrowable(e, ObjectOptimisticLockingFailureException.class)
!= -1)) {
if (!isRetriable(e)) {
ourLog.debug("Unexpected transaction exception. Will not be retried.", e);
throw e;
} else {
@ -354,7 +377,7 @@ public class HapiTransactionService implements IHapiTransactionService {
}
double sleepAmount = (250.0d * i) * Math.random();
long sleepAmountLong = (long) sleepAmount;
TestUtil.sleepAtLeast(sleepAmountLong, false);
mySleepUtil.sleepAtLeast(sleepAmountLong, false);
ourLog.info(
"About to start a transaction retry due to conflict or constraint error. Sleeping {}ms first.",

View File

@ -0,0 +1,194 @@
package ca.uhn.fhir.jpa.dao.tx;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.model.ResourceVersionConflictResolutionStrategy;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import ca.uhn.fhir.util.SleepUtil;
import org.hibernate.exception.ConstraintViolationException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.dao.CannotAcquireLockException;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.orm.ObjectOptimisticLockingFailureException;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class HapiTransactionServiceTest {
@Mock
private IInterceptorBroadcaster myInterceptorBroadcasterMock;
@Mock
private PlatformTransactionManager myTransactionManagerMock;
@Mock
private IRequestPartitionHelperSvc myRequestPartitionHelperSvcMock;
@Mock
private PartitionSettings myPartitionSettingsMock;
@Mock
private SleepUtil mySleepUtilMock;
private HapiTransactionService myHapiTransactionService;
@BeforeEach
public void beforeEach() {
myHapiTransactionService = new HapiTransactionService();
myHapiTransactionService.setTransactionManager(myTransactionManagerMock);
myHapiTransactionService.setInterceptorBroadcaster(myInterceptorBroadcasterMock);
myHapiTransactionService.setPartitionSettingsForUnitTest(myPartitionSettingsMock);
myHapiTransactionService.setRequestPartitionSvcForUnitTest(myRequestPartitionHelperSvcMock);
myHapiTransactionService.setSleepUtil(mySleepUtilMock);
mockInterceptorBroadcaster();
}
private void mockInterceptorBroadcaster() {
lenient().when(myInterceptorBroadcasterMock.callHooksAndReturnObject(eq(Pointcut.STORAGE_VERSION_CONFLICT),
isA(HookParams.class)))
.thenAnswer(invocationOnMock -> {
HookParams hookParams = (HookParams) invocationOnMock.getArguments()[1];
//answer with whatever retry settings passed in as HookParam
RequestDetails requestDetails = hookParams.get(RequestDetails.class);
ResourceVersionConflictResolutionStrategy answer = new ResourceVersionConflictResolutionStrategy();
answer.setRetry(requestDetails.isRetry());
answer.setMaxRetries(requestDetails.getMaxRetries());
return answer;
});
}
/**
* A helper method to test retry logic on exceptions
* TransactionCallback interface allows only throwing RuntimeExceptions,
* that's why the parameter type is RunTimeException
*/
private Exception testRetriesOnException(RuntimeException theException,
boolean theRetryEnabled,
int theMaxRetries,
int theExpectedNumberOfCallsToTransactionCallback) {
RequestDetails requestDetails = new SystemRequestDetails();
requestDetails.setRetry(theRetryEnabled);
requestDetails.setMaxRetries(theMaxRetries);
HapiTransactionService.IExecutionBuilder executionBuilder = myHapiTransactionService
.withRequest(requestDetails)
.withTransactionDetails(new TransactionDetails());
AtomicInteger numberOfCalls = new AtomicInteger();
TransactionCallback<Void> transactionCallback = (TransactionStatus theStatus) -> {
numberOfCalls.incrementAndGet();
throw theException;
};
Exception theExceptionThrownByDoExecute = assertThrows(Exception.class, () -> {
myHapiTransactionService.doExecute((HapiTransactionService.ExecutionBuilder) executionBuilder, transactionCallback);
});
assertEquals(theExpectedNumberOfCallsToTransactionCallback, numberOfCalls.get());
verify(mySleepUtilMock, times(theExpectedNumberOfCallsToTransactionCallback - 1))
.sleepAtLeast(anyLong(), anyBoolean());
return theExceptionThrownByDoExecute;
}
private static Stream<Arguments> provideRetriableExceptionParameters() {
String exceptionMessage = "failed!";
return Stream.of(
arguments(new ResourceVersionConflictException(exceptionMessage)),
arguments(new DataIntegrityViolationException(exceptionMessage)),
arguments(new ConstraintViolationException(exceptionMessage, new SQLException(""), null)),
arguments(new ObjectOptimisticLockingFailureException(exceptionMessage, new Exception())),
//CannotAcquireLockException is a subclass of
//PessimisticLockingFailureException which we treat as a retriable exception
arguments(new CannotAcquireLockException(exceptionMessage))
);
}
@ParameterizedTest(name = "{index}: {0}")
@MethodSource(value = "provideRetriableExceptionParameters")
void testDoExecute_WhenRetryEnabled_RetriesOnRetriableExceptions(RuntimeException theException) {
testRetriesOnException(theException, true, 2, 3);
}
@ParameterizedTest(name = "{index}: {0}")
@MethodSource(value = "provideRetriableExceptionParameters")
void testDoExecute_WhenRetryEnabled_RetriesOnRetriableInnerExceptions(RuntimeException theException) {
//in this test we wrap the retriable exception to test that nested exceptions are covered as well
RuntimeException theWrapperException = new RuntimeException("this is the wrapper", theException);
testRetriesOnException(theWrapperException, true, 2, 3);
}
@ParameterizedTest(name = "{index}: {0}")
@MethodSource(value = "provideRetriableExceptionParameters")
void testDoExecute_WhenRetryIsDisabled_DoesNotRetryExceptions(RuntimeException theException) {
testRetriesOnException(theException, false, 10, 1);
}
@Test
void testDoExecute_WhenRetryEnabled_DoesNotRetryOnNonRetriableException() {
RuntimeException nonRetriableException = new RuntimeException("should not be retried");
Exception exceptionThrown = testRetriesOnException(nonRetriableException, true, 10, 1);
assertEquals(nonRetriableException, exceptionThrown);
verifyNoInteractions(myInterceptorBroadcasterMock);
}
@Test
void testDoExecute_WhenRetyEnabled_StopsRetryingWhenARetryIsSuccessfull() {
RequestDetails requestDetails = new SystemRequestDetails();
requestDetails.setRetry(true);
requestDetails.setMaxRetries(10);
HapiTransactionService.IExecutionBuilder executionBuilder = myHapiTransactionService
.withRequest(requestDetails)
.withTransactionDetails(new TransactionDetails());
AtomicInteger numberOfCalls = new AtomicInteger();
TransactionCallback<Void> transactionCallback = (TransactionStatus theStatus) -> {
int currentCallNum = numberOfCalls.incrementAndGet();
//fail for the first two calls then succeed on the third
if (currentCallNum < 3) {
// using ResourceVersionConflictException, since it is a retriable exception
throw new ResourceVersionConflictException("failed");
}
return null;
};
myHapiTransactionService.doExecute((HapiTransactionService.ExecutionBuilder) executionBuilder, transactionCallback);
assertEquals(3, numberOfCalls.get());
verify(mySleepUtilMock, times(2))
.sleepAtLeast(anyLong(), anyBoolean());
}
}