From 77da1deedad7ae8144db3268895731e4bd951e97 Mon Sep 17 00:00:00 2001 From: Emre Dincturk <74370953+mrdnctrk@users.noreply.github.com> Date: Mon, 18 Dec 2023 11:35:36 -0500 Subject: [PATCH] added retry for PessimisticLockingFailureException on transactions (#5554) --- .../main/java/ca/uhn/fhir/util/SleepUtil.java | 31 +++ .../main/java/ca/uhn/fhir/util/TestUtil.java | 32 +-- .../java/ca/uhn/fhir/util/SleepUtilTest.java | 29 +++ .../7_0_0/5553-deadlocks-on-mdm-clear.yaml | 5 + .../jpa/dao/tx/HapiTransactionService.java | 37 +++- .../dao/tx/HapiTransactionServiceTest.java | 194 ++++++++++++++++++ 6 files changed, 305 insertions(+), 23 deletions(-) create mode 100644 hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SleepUtil.java create mode 100644 hapi-fhir-base/src/test/java/ca/uhn/fhir/util/SleepUtilTest.java create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_0_0/5553-deadlocks-on-mdm-clear.yaml create mode 100644 hapi-fhir-storage/src/test/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionServiceTest.java diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SleepUtil.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SleepUtil.java new file mode 100644 index 00000000000..635bfae71bc --- /dev/null +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/SleepUtil.java @@ -0,0 +1,31 @@ +package ca.uhn.fhir.util; + +/** + * A utility class for thread sleeps. + * Uses non-static methods for easier mocking and unnecessary waits in unit tests + */ +public class SleepUtil { + private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SleepUtil.class); + + public void sleepAtLeast(long theMillis) { + sleepAtLeast(theMillis, true); + } + + @SuppressWarnings("BusyWait") + public void sleepAtLeast(long theMillis, boolean theLogProgress) { + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() <= start + theMillis) { + try { + long timeSinceStarted = System.currentTimeMillis() - start; + long timeToSleep = Math.max(0, theMillis - timeSinceStarted); + if (theLogProgress) { + ourLog.info("Sleeping for {}ms", timeToSleep); + } + Thread.sleep(timeToSleep); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + ourLog.error("Interrupted", e); + } + } + } +} diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/TestUtil.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/TestUtil.java index 36ff4c3f536..32b725f8f4c 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/TestUtil.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/TestUtil.java @@ -31,6 +31,9 @@ import static org.apache.commons.lang3.StringUtils.defaultString; public class TestUtil { private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(TestUtil.class); + + private static SleepUtil ourSleepUtil = new SleepUtil(); + private static boolean ourShouldRandomizeTimezones = true; public static void setShouldRandomizeTimezones(boolean theShouldRandomizeTimezones) { @@ -135,25 +138,22 @@ public class TestUtil { return stripReturns(theString).replace(" ", ""); } + /** + * + * In production code, instead of this static method, it is better to use an instance of SleepUtil. + * Since SleepUtil isn't using static methods, it is easier to mock for unit test and avoid unnecessary waits in + * unit tests + */ public static void sleepAtLeast(long theMillis) { - sleepAtLeast(theMillis, true); + ourSleepUtil.sleepAtLeast(theMillis); } - @SuppressWarnings("BusyWait") + /** + * In production code, instead of this static method, it is better to use an instance of SleepUtil. + * Since SleepUtil isn't using static methods, it is easier to mock for unit test and avoid unnecessary waits in + * unit tests + */ public static void sleepAtLeast(long theMillis, boolean theLogProgress) { - long start = System.currentTimeMillis(); - while (System.currentTimeMillis() <= start + theMillis) { - try { - long timeSinceStarted = System.currentTimeMillis() - start; - long timeToSleep = Math.max(0, theMillis - timeSinceStarted); - if (theLogProgress) { - ourLog.info("Sleeping for {}ms", timeToSleep); - } - Thread.sleep(timeToSleep); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - ourLog.error("Interrupted", e); - } - } + ourSleepUtil.sleepAtLeast(theMillis, theLogProgress); } } diff --git a/hapi-fhir-base/src/test/java/ca/uhn/fhir/util/SleepUtilTest.java b/hapi-fhir-base/src/test/java/ca/uhn/fhir/util/SleepUtilTest.java new file mode 100644 index 00000000000..fb6cd85788c --- /dev/null +++ b/hapi-fhir-base/src/test/java/ca/uhn/fhir/util/SleepUtilTest.java @@ -0,0 +1,29 @@ +package ca.uhn.fhir.util; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class SleepUtilTest { + + @Test + public void testSleepAtLeast() { + SleepUtil sleepUtil = new SleepUtil(); + long amountToSleepMs = 10; + + long start = System.currentTimeMillis(); + sleepUtil.sleepAtLeast(amountToSleepMs); + long stop = System.currentTimeMillis(); + + long actualSleepDurationMs = stop - start; + assertTrue(actualSleepDurationMs >= amountToSleepMs); + } + + @Test + public void testZeroMs() { + // 0 is a valid input + SleepUtil sleepUtil = new SleepUtil(); + sleepUtil.sleepAtLeast(0); + } + +} diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_0_0/5553-deadlocks-on-mdm-clear.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_0_0/5553-deadlocks-on-mdm-clear.yaml new file mode 100644 index 00000000000..fe9f7a0bfd5 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_0_0/5553-deadlocks-on-mdm-clear.yaml @@ -0,0 +1,5 @@ +--- +type: fix +issue: 5553 +title: "mdm-clear jobs are prone to failing because of deadlocks when running on SQL Server. Such job failures have +been mitigated to some extent by increasing the retries on deadlocks." diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java index 990ef103309..719d8891b34 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java @@ -36,7 +36,7 @@ import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException; import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; import ca.uhn.fhir.util.ICallable; -import ca.uhn.fhir.util.TestUtil; +import ca.uhn.fhir.util.SleepUtil; import com.google.common.annotations.VisibleForTesting; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; @@ -48,6 +48,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DataIntegrityViolationException; +import org.springframework.dao.PessimisticLockingFailureException; import org.springframework.orm.ObjectOptimisticLockingFailureException; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionStatus; @@ -89,11 +90,18 @@ public class HapiTransactionService implements IHapiTransactionService { private Propagation myTransactionPropagationWhenChangingPartitions = Propagation.REQUIRED; + private SleepUtil mySleepUtil = new SleepUtil(); + @VisibleForTesting public void setInterceptorBroadcaster(IInterceptorBroadcaster theInterceptorBroadcaster) { myInterceptorBroadcaster = theInterceptorBroadcaster; } + @VisibleForTesting + public void setSleepUtil(SleepUtil theSleepUtil) { + mySleepUtil = theSleepUtil; + } + @Override public IExecutionBuilder withRequest(@Nullable RequestDetails theRequestDetails) { return new ExecutionBuilder(theRequestDetails); @@ -281,6 +289,25 @@ public class HapiTransactionService implements IHapiTransactionService { return doExecuteInTransaction(theExecutionBuilder, theCallback, requestPartitionId, previousRequestPartitionId); } + private boolean isThrowableOrItsSubclassPresent(Throwable theThrowable, Class theClass) { + return ExceptionUtils.indexOfType(theThrowable, theClass) != -1; + } + + private boolean isThrowablePresent(Throwable theThrowable, Class theClass) { + return ExceptionUtils.indexOfThrowable(theThrowable, theClass) != -1; + } + + private boolean isRetriable(Throwable theThrowable) { + return isThrowablePresent(theThrowable, ResourceVersionConflictException.class) + || isThrowablePresent(theThrowable, DataIntegrityViolationException.class) + || isThrowablePresent(theThrowable, ConstraintViolationException.class) + || isThrowablePresent(theThrowable, ObjectOptimisticLockingFailureException.class) + // calling isThrowableOrItsSubclassPresent instead of isThrowablePresent for + // PessimisticLockingFailureException, because we want to retry on its subclasses as well, especially + // CannotAcquireLockException, which is thrown in some deadlock situations which we want to retry + || isThrowableOrItsSubclassPresent(theThrowable, PessimisticLockingFailureException.class); + } + @Nullable private T doExecuteInTransaction( ExecutionBuilder theExecutionBuilder, @@ -294,11 +321,7 @@ public class HapiTransactionService implements IHapiTransactionService { return doExecuteCallback(theExecutionBuilder, theCallback); } catch (Exception e) { - if (!(ExceptionUtils.indexOfThrowable(e, ResourceVersionConflictException.class) != -1 - || ExceptionUtils.indexOfThrowable(e, DataIntegrityViolationException.class) != -1 - || ExceptionUtils.indexOfThrowable(e, ConstraintViolationException.class) != -1 - || ExceptionUtils.indexOfThrowable(e, ObjectOptimisticLockingFailureException.class) - != -1)) { + if (!isRetriable(e)) { ourLog.debug("Unexpected transaction exception. Will not be retried.", e); throw e; } else { @@ -354,7 +377,7 @@ public class HapiTransactionService implements IHapiTransactionService { } double sleepAmount = (250.0d * i) * Math.random(); long sleepAmountLong = (long) sleepAmount; - TestUtil.sleepAtLeast(sleepAmountLong, false); + mySleepUtil.sleepAtLeast(sleepAmountLong, false); ourLog.info( "About to start a transaction retry due to conflict or constraint error. Sleeping {}ms first.", diff --git a/hapi-fhir-storage/src/test/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionServiceTest.java b/hapi-fhir-storage/src/test/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionServiceTest.java new file mode 100644 index 00000000000..1547cf2114d --- /dev/null +++ b/hapi-fhir-storage/src/test/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionServiceTest.java @@ -0,0 +1,194 @@ +package ca.uhn.fhir.jpa.dao.tx; + +import ca.uhn.fhir.interceptor.api.HookParams; +import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; +import ca.uhn.fhir.interceptor.api.Pointcut; +import ca.uhn.fhir.jpa.api.model.ResourceVersionConflictResolutionStrategy; +import ca.uhn.fhir.jpa.model.config.PartitionSettings; +import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; +import ca.uhn.fhir.rest.api.server.RequestDetails; +import ca.uhn.fhir.rest.api.server.SystemRequestDetails; +import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; +import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException; +import ca.uhn.fhir.util.SleepUtil; +import org.hibernate.exception.ConstraintViolationException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.dao.CannotAcquireLockException; +import org.springframework.dao.DataIntegrityViolationException; +import org.springframework.orm.ObjectOptimisticLockingFailureException; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; + +import java.sql.SQLException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.params.provider.Arguments.arguments; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class HapiTransactionServiceTest { + + @Mock + private IInterceptorBroadcaster myInterceptorBroadcasterMock; + + @Mock + private PlatformTransactionManager myTransactionManagerMock; + + @Mock + private IRequestPartitionHelperSvc myRequestPartitionHelperSvcMock; + @Mock + private PartitionSettings myPartitionSettingsMock; + + @Mock + private SleepUtil mySleepUtilMock; + + private HapiTransactionService myHapiTransactionService; + + + @BeforeEach + public void beforeEach() { + myHapiTransactionService = new HapiTransactionService(); + myHapiTransactionService.setTransactionManager(myTransactionManagerMock); + myHapiTransactionService.setInterceptorBroadcaster(myInterceptorBroadcasterMock); + myHapiTransactionService.setPartitionSettingsForUnitTest(myPartitionSettingsMock); + myHapiTransactionService.setRequestPartitionSvcForUnitTest(myRequestPartitionHelperSvcMock); + myHapiTransactionService.setSleepUtil(mySleepUtilMock); + mockInterceptorBroadcaster(); + } + + private void mockInterceptorBroadcaster() { + lenient().when(myInterceptorBroadcasterMock.callHooksAndReturnObject(eq(Pointcut.STORAGE_VERSION_CONFLICT), + isA(HookParams.class))) + .thenAnswer(invocationOnMock -> { + HookParams hookParams = (HookParams) invocationOnMock.getArguments()[1]; + //answer with whatever retry settings passed in as HookParam + RequestDetails requestDetails = hookParams.get(RequestDetails.class); + ResourceVersionConflictResolutionStrategy answer = new ResourceVersionConflictResolutionStrategy(); + answer.setRetry(requestDetails.isRetry()); + answer.setMaxRetries(requestDetails.getMaxRetries()); + return answer; + }); + } + + + /** + * A helper method to test retry logic on exceptions + * TransactionCallback interface allows only throwing RuntimeExceptions, + * that's why the parameter type is RunTimeException + */ + private Exception testRetriesOnException(RuntimeException theException, + boolean theRetryEnabled, + int theMaxRetries, + int theExpectedNumberOfCallsToTransactionCallback) { + RequestDetails requestDetails = new SystemRequestDetails(); + requestDetails.setRetry(theRetryEnabled); + requestDetails.setMaxRetries(theMaxRetries); + + HapiTransactionService.IExecutionBuilder executionBuilder = myHapiTransactionService + .withRequest(requestDetails) + .withTransactionDetails(new TransactionDetails()); + + AtomicInteger numberOfCalls = new AtomicInteger(); + TransactionCallback transactionCallback = (TransactionStatus theStatus) -> { + numberOfCalls.incrementAndGet(); + throw theException; + }; + + Exception theExceptionThrownByDoExecute = assertThrows(Exception.class, () -> { + myHapiTransactionService.doExecute((HapiTransactionService.ExecutionBuilder) executionBuilder, transactionCallback); + }); + + assertEquals(theExpectedNumberOfCallsToTransactionCallback, numberOfCalls.get()); + verify(mySleepUtilMock, times(theExpectedNumberOfCallsToTransactionCallback - 1)) + .sleepAtLeast(anyLong(), anyBoolean()); + return theExceptionThrownByDoExecute; + } + + private static Stream provideRetriableExceptionParameters() { + String exceptionMessage = "failed!"; + return Stream.of( + arguments(new ResourceVersionConflictException(exceptionMessage)), + arguments(new DataIntegrityViolationException(exceptionMessage)), + arguments(new ConstraintViolationException(exceptionMessage, new SQLException(""), null)), + arguments(new ObjectOptimisticLockingFailureException(exceptionMessage, new Exception())), + //CannotAcquireLockException is a subclass of + //PessimisticLockingFailureException which we treat as a retriable exception + arguments(new CannotAcquireLockException(exceptionMessage)) + ); + } + + @ParameterizedTest(name = "{index}: {0}") + @MethodSource(value = "provideRetriableExceptionParameters") + void testDoExecute_WhenRetryEnabled_RetriesOnRetriableExceptions(RuntimeException theException) { + testRetriesOnException(theException, true, 2, 3); + } + + + @ParameterizedTest(name = "{index}: {0}") + @MethodSource(value = "provideRetriableExceptionParameters") + void testDoExecute_WhenRetryEnabled_RetriesOnRetriableInnerExceptions(RuntimeException theException) { + //in this test we wrap the retriable exception to test that nested exceptions are covered as well + RuntimeException theWrapperException = new RuntimeException("this is the wrapper", theException); + testRetriesOnException(theWrapperException, true, 2, 3); + } + + @ParameterizedTest(name = "{index}: {0}") + @MethodSource(value = "provideRetriableExceptionParameters") + void testDoExecute_WhenRetryIsDisabled_DoesNotRetryExceptions(RuntimeException theException) { + testRetriesOnException(theException, false, 10, 1); + } + + @Test + void testDoExecute_WhenRetryEnabled_DoesNotRetryOnNonRetriableException() { + RuntimeException nonRetriableException = new RuntimeException("should not be retried"); + Exception exceptionThrown = testRetriesOnException(nonRetriableException, true, 10, 1); + assertEquals(nonRetriableException, exceptionThrown); + verifyNoInteractions(myInterceptorBroadcasterMock); + } + + @Test + void testDoExecute_WhenRetyEnabled_StopsRetryingWhenARetryIsSuccessfull() { + RequestDetails requestDetails = new SystemRequestDetails(); + requestDetails.setRetry(true); + requestDetails.setMaxRetries(10); + + HapiTransactionService.IExecutionBuilder executionBuilder = myHapiTransactionService + .withRequest(requestDetails) + .withTransactionDetails(new TransactionDetails()); + + AtomicInteger numberOfCalls = new AtomicInteger(); + TransactionCallback transactionCallback = (TransactionStatus theStatus) -> { + int currentCallNum = numberOfCalls.incrementAndGet(); + //fail for the first two calls then succeed on the third + if (currentCallNum < 3) { + // using ResourceVersionConflictException, since it is a retriable exception + throw new ResourceVersionConflictException("failed"); + } + return null; + }; + + myHapiTransactionService.doExecute((HapiTransactionService.ExecutionBuilder) executionBuilder, transactionCallback); + + assertEquals(3, numberOfCalls.get()); + verify(mySleepUtilMock, times(2)) + .sleepAtLeast(anyLong(), anyBoolean()); + } +}