From 65398ca14f2e765057bc2e4322e41b2f0d193f23 Mon Sep 17 00:00:00 2001 From: TipzCM Date: Mon, 31 Jan 2022 11:49:12 -0500 Subject: [PATCH] 3156 multiple tag race condition (#3297) * 3156 fix race condition that results in multiple tags being created * 3156 added changelog * cleanup * added smile jira ticket * 3156 review fixes * review fixes * 3156 added an additional integratoin test * cleanup * review fix and master merge * threadsafe stuff * small change * upping min threads for tests to pass * changing test * test check * fix * test testing * one more update * using threadpoolutil * temporary measure * git push * all builds Co-authored-by: leif stawnyczy --- ...156-fix-race-condition-on-tag-creation.yml | 7 + .../ca/uhn/fhir/jpa/dao/BaseHapiFhirDao.java | 144 +++++-- .../fhir/jpa/dao/BaseHapiFhirResourceDao.java | 3 + .../uhn/fhir/jpa/config/TestDstu2Config.java | 4 +- .../uhn/fhir/jpa/config/TestDstu3Config.java | 6 +- .../ca/uhn/fhir/jpa/config/TestR5Config.java | 5 +- .../uhn/fhir/jpa/dao/BaseHapiFhirDaoTest.java | 382 ++++++++++++++++++ .../dao/r4/FhirResourceDaoR4CreateTest.java | 58 +++ .../FhirInstanceValidatorR4Test.java | 2 - 9 files changed, 578 insertions(+), 33 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_7_0/3156-fix-race-condition-on-tag-creation.yml diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_7_0/3156-fix-race-condition-on-tag-creation.yml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_7_0/3156-fix-race-condition-on-tag-creation.yml new file mode 100644 index 00000000000..03416dd5c4c --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_7_0/3156-fix-race-condition-on-tag-creation.yml @@ -0,0 +1,7 @@ +--- +issue: 3156 +type: fix +jira: smile-3382 +title: "Fixed race condition in BaseHapiFhirDao that resulted in exceptions + being thrown when multiple resources in a single bundle transaction + had the same tags." diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirDao.java index fa8eca6724a..24dd749e3a9 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirDao.java @@ -79,6 +79,7 @@ import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum; import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; +import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; @@ -118,8 +119,12 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Repository; import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; +import org.springframework.transaction.support.TransactionTemplate; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -148,6 +153,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.StringTokenizer; import java.util.UUID; +import java.util.stream.Collectors; import static org.apache.commons.lang3.StringUtils.defaultIfBlank; import static org.apache.commons.lang3.StringUtils.defaultString; @@ -180,6 +186,9 @@ import static org.apache.commons.lang3.StringUtils.trim; @Repository public abstract class BaseHapiFhirDao extends BaseStorageDao implements IDao, IJpaDao, ApplicationContextAware { + // total attempts to do a tag transaction + private static final int TOTAL_TAG_READ_ATTEMPTS = 10; + public static final long INDEX_STATUS_INDEXED = 1L; public static final long INDEX_STATUS_INDEXING_FAILED = 2L; public static final String NS_JPA_PROFILE = "https://github.com/hapifhir/hapi-fhir/ns/jpa/profile"; @@ -247,6 +256,9 @@ public abstract class BaseHapiFhirDao extends BaseStora @Autowired(required = false) private IFulltextSearchSvc myFulltextSearchSvc; + @Autowired + private PlatformTransactionManager myTransactionManager; + @VisibleForTesting public void setSearchParamPresenceSvc(ISearchParamPresenceSvc theSearchParamPresenceSvc) { mySearchParamPresenceSvc = theSearchParamPresenceSvc; @@ -398,40 +410,15 @@ public abstract class BaseHapiFhirDao extends BaseStora MemoryCacheService.TagDefinitionCacheKey key = toTagDefinitionMemoryCacheKey(theTagType, theScheme, theTerm); - TagDefinition retVal = myMemoryCacheService.getIfPresent(MemoryCacheService.CacheEnum.TAG_DEFINITION, key); - if (retVal == null) { HashMap resolvedTagDefinitions = theTransactionDetails.getOrCreateUserData(HapiTransactionService.XACT_USERDATA_KEY_RESOLVED_TAG_DEFINITIONS, () -> new HashMap<>()); retVal = resolvedTagDefinitions.get(key); if (retVal == null) { - CriteriaBuilder builder = myEntityManager.getCriteriaBuilder(); - CriteriaQuery cq = builder.createQuery(TagDefinition.class); - Root from = cq.from(TagDefinition.class); - - if (isNotBlank(theScheme)) { - cq.where( - builder.and( - builder.equal(from.get("myTagType"), theTagType), - builder.equal(from.get("mySystem"), theScheme), - builder.equal(from.get("myCode"), theTerm))); - } else { - cq.where( - builder.and( - builder.equal(from.get("myTagType"), theTagType), - builder.isNull(from.get("mySystem")), - builder.equal(from.get("myCode"), theTerm))); - } - - TypedQuery q = myEntityManager.createQuery(cq); - try { - retVal = q.getSingleResult(); - } catch (NoResultException e) { - retVal = new TagDefinition(theTagType, theScheme, theTerm, theLabel); - myEntityManager.persist(retVal); - } + // actual DB hit(s) happen here + retVal = getOrCreateTag(theTagType, theScheme, theTerm, theLabel); TransactionSynchronization sync = new AddTagDefinitionToCacheAfterCommitSynchronization(key, retVal); TransactionSynchronizationManager.registerSynchronization(sync); @@ -443,6 +430,109 @@ public abstract class BaseHapiFhirDao extends BaseStora return retVal; } + /** + * Gets the tag defined by the fed in values, or saves it if it does not + * exist. + * + * Can also throw an InternalErrorException if something bad happens. + */ + private TagDefinition getOrCreateTag(TagTypeEnum theTagType, String theScheme, String theTerm, String theLabel) { + CriteriaBuilder builder = myEntityManager.getCriteriaBuilder(); + CriteriaQuery cq = builder.createQuery(TagDefinition.class); + Root from = cq.from(TagDefinition.class); + + if (isNotBlank(theScheme)) { + cq.where( + builder.and( + builder.equal(from.get("myTagType"), theTagType), + builder.equal(from.get("mySystem"), theScheme), + builder.equal(from.get("myCode"), theTerm))); + } else { + cq.where( + builder.and( + builder.equal(from.get("myTagType"), theTagType), + builder.isNull(from.get("mySystem")), + builder.equal(from.get("myCode"), theTerm))); + } + + TypedQuery q = myEntityManager.createQuery(cq); + + TransactionTemplate template = new TransactionTemplate(myTransactionManager); + template.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); + + // this transaction will attempt to get or create the tag, + // repeating (on any failure) 10 times. + // if it fails more than this, we will throw exceptions + TagDefinition retVal; + int count = 0; + HashSet throwables = new HashSet<>(); + do { + try { + retVal = template.execute(new TransactionCallback() { + + // do the actual DB call(s) to read and/or write the values + private TagDefinition readOrCreate() { + TagDefinition val; + try { + val = q.getSingleResult(); + } catch (NoResultException e) { + val = new TagDefinition(theTagType, theScheme, theTerm, theLabel); + myEntityManager.persist(val); + } + return val; + } + + @Override + public TagDefinition doInTransaction(TransactionStatus status) { + TagDefinition tag = null; + + try { + tag = readOrCreate(); + } catch (Exception ex) { + // log any exceptions - just in case + // they may be signs of things to come... + ourLog.warn( + "Tag read/write failed: " + + ex.getMessage() + ". " + + "This is not a failure on its own, " + + "but could be useful information in the result of an actual failure." + ); + throwables.add(ex); + } + + return tag; + } + }); + } catch (Exception ex) { + // transaction template can fail if connections to db are exhausted + // and/or timeout + ourLog.warn("Transaction failed with: " + + ex.getMessage() + ". " + + "Transaction will rollback and be reattempted." + ); + retVal = null; + } + count++; + } while (retVal == null && count < TOTAL_TAG_READ_ATTEMPTS); + + if (retVal == null) { + // if tag is still null, + // something bad must be happening + // - throw + String msg = throwables.stream() + .map(Throwable::getMessage) + .collect(Collectors.joining(", ")); + throw new InternalErrorException( + "Tag get/create failed after " + + TOTAL_TAG_READ_ATTEMPTS + + " attempts with error(s): " + + msg + ); + } + + return retVal; + } + protected IBundleProvider history(RequestDetails theRequest, String theResourceType, Long theResourcePid, Date theRangeStartInclusive, Date theRangeEndInclusive, Integer theOffset) { String resourceName = defaultIfBlank(theResourceType, null); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java index 3cc5ae40603..aa1a01de1ed 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java @@ -323,6 +323,7 @@ public abstract class BaseHapiFhirResourceDao extends B } // Perform actual DB update + // this call will also update the metadata ResourceTable updatedEntity = updateEntity(theRequest, theResource, entity, null, thePerformIndexing, false, theTransactionDetails, false, thePerformIndexing); // Store the resource forced ID if necessary @@ -366,6 +367,8 @@ public abstract class BaseHapiFhirResourceDao extends B // Update the version/last updated in the resource so that interceptors get // the correct version + // TODO - the above updateEntity calls updateResourceMetadata + // Maybe we don't need this call here? updateResourceMetadata(entity, theResource); // Populate the PID in the resource so it is available to hooks diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestDstu2Config.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestDstu2Config.java index cf0013d8113..bfd2f975924 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestDstu2Config.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestDstu2Config.java @@ -41,8 +41,10 @@ public class TestDstu2Config extends BaseJavaConfigDstu2 { * We use a randomized number of maximum threads in order to try * and catch any potential deadlocks caused by database connection * starvation + * + * A minimum of 2 is required for most transactions. */ - ourMaxThreads = (int) (Math.random() * 6.0) + 1; + ourMaxThreads = (int) (Math.random() * 6.0) + 2; if ("true".equals(System.getProperty("single_db_connection"))) { ourMaxThreads = 1; diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestDstu3Config.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestDstu3Config.java index d710671edea..cbe940421dc 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestDstu3Config.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestDstu3Config.java @@ -102,9 +102,11 @@ public class TestDstu3Config extends BaseJavaConfigDstu3 { /* * We use a randomized number of maximum threads in order to try * and catch any potential deadlocks caused by database connection - * starvation + * starvation. + * + * We need a minimum of 2 for most transactions, so 2 is added */ - int maxThreads = (int) (Math.random() * 6.0) + 1; + int maxThreads = (int) (Math.random() * 6.0) + 2; if ("true".equals(System.getProperty("single_db_connection"))) { maxThreads = 1; diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestR5Config.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestR5Config.java index 3ad3351ef23..a72d9ce6a3d 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestR5Config.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestR5Config.java @@ -42,9 +42,12 @@ public class TestR5Config extends BaseJavaConfigR5 { * We use a randomized number of maximum threads in order to try * and catch any potential deadlocks caused by database connection * starvation + * + * A minimum of 2 is necessary for most transactions, + * so 2 will be our limit */ if (ourMaxThreads == null) { - ourMaxThreads = (int) (Math.random() * 6.0) + 1; + ourMaxThreads = (int) (Math.random() * 6.0) + 2; if ("true".equals(System.getProperty("single_db_connection"))) { ourMaxThreads = 1; diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirDaoTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirDaoTest.java index 30a96db0f2d..fc0d873e0ef 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirDaoTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirDaoTest.java @@ -1,11 +1,393 @@ package ca.uhn.fhir.jpa.dao; +import ca.uhn.fhir.jpa.model.entity.TagDefinition; +import ca.uhn.fhir.jpa.model.entity.TagTypeEnum; +import ca.uhn.fhir.jpa.util.MemoryCacheService; +import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; +import ca.uhn.fhir.util.AsyncUtil; +import ca.uhn.fhir.util.ThreadPoolUtil; +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.Appender; +import org.hl7.fhir.r4.model.Patient; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.stubbing.Answer; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.support.TransactionSynchronizationManager; + +import javax.persistence.EntityExistsException; +import javax.persistence.EntityManager; +import javax.persistence.NoResultException; +import javax.persistence.TypedQuery; +import javax.persistence.criteria.CriteriaBuilder; +import javax.persistence.criteria.CriteriaQuery; +import javax.persistence.criteria.Path; +import javax.persistence.criteria.Predicate; +import javax.persistence.criteria.Root; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +@ExtendWith(MockitoExtension.class) public class BaseHapiFhirDaoTest { + private static class TestDao extends BaseHapiFhirDao { + + @Nullable + @Override + protected String getResourceName() { + return "Patient"; + } + + @Override + protected TagDefinition getTagOrNull(TransactionDetails theDetails, + TagTypeEnum theEnum, + String theScheme, + String theTerm, + String theLabel) { + // we need to init synchronization due to what + // the underlying class is doing + try { + TransactionSynchronizationManager.initSynchronization(); + return super.getTagOrNull(theDetails, theEnum, theScheme, theTerm, theLabel); + } finally { + TransactionSynchronizationManager.clearSynchronization(); + } + } + } + + private Logger ourLogger; + + @Mock + private Appender myAppender; + + @Mock + private MemoryCacheService myMemoryCacheService; + + @Mock + private EntityManager myEntityManager; + + @Mock + private PlatformTransactionManager myTransactionManager; + + @InjectMocks + private TestDao myTestDao; + + @BeforeEach + public void init() { + ourLogger = (Logger) LoggerFactory.getLogger(BaseHapiFhirDao.class); + ourLogger.addAppender(myAppender); + } + + @AfterEach + public void end() { + ourLogger.detachAppender(myAppender); + } + + /** + * Returns a mocked criteria builder + * @return + */ + private CriteriaBuilder getMockedCriteriaBuilder() { + Predicate pred = mock(Predicate.class); + + CriteriaBuilder builder = mock(CriteriaBuilder.class); + // lenient due to multiple equal calls with different inputs + lenient().when(builder.equal(any(), any())) + .thenReturn(pred); + + return builder; + } + + /** + * Returns a mocked from + * @return + */ + private Root getMockedFrom() { + Path path = mock(Path.class); + + Root from = mock(Root.class); + // lenient due to multiple get calls with different inputs + lenient().when(from.get(anyString())) + .thenReturn(path); + return from; + } + + @Test + public void getTagOrNull_raceCondition_wontUpsertDuplicates() throws InterruptedException, ExecutionException { + /* + * We use this boolean to fake a race condition. + * Boolean is used for two reasons: + * 1) We don't want an unstable test (so a fake + * race condition will ensure the test always executes + * exactly as expected... but this just ensures the code + * is not buggy, not that race conditions are actually handled) + * 2) We want the ability (and confidence!) to know + * that a _real_ race condition can be handled. Setting + * this boolean false (and potentially tweaking thread count) + * gives us this confidence. + * + * Set this false to try with a real race condition + */ + boolean fakeRaceCondition = true; + + // the more threads, the more likely we + // are to see race conditions. + // We need a lot to ensure at least 2 threads + // are in the create method at the same time + int threads = fakeRaceCondition ? 2 : 30; + + // setup + TagTypeEnum tagType = TagTypeEnum.TAG; + String scheme = "http://localhost"; + String term = "code123"; + String label = "hollow world"; + TransactionDetails transactionDetails = new TransactionDetails(); + String raceConditionError = "Entity exists; if this is logged, you have race condition issues!"; + + TagDefinition tagDefinition = new TagDefinition(tagType, + scheme, + term, + label); + + // mock objects + CriteriaBuilder builder = getMockedCriteriaBuilder(); + TypedQuery query = mock(TypedQuery.class); + CriteriaQuery cq = mock(CriteriaQuery.class); + Root from = getMockedFrom(); + + // when + when(myEntityManager.getCriteriaBuilder()) + .thenReturn(builder); + when(builder.createQuery(any(Class.class))) + .thenReturn(cq); + when(cq.from(any(Class.class))) + .thenReturn(from); + when(myEntityManager.createQuery(any(CriteriaQuery.class))) + .thenReturn(query); + AtomicBoolean atomicBoolean = new AtomicBoolean(false); + AtomicInteger getSingleResultInt = new AtomicInteger(); + when(query.getSingleResult()) + .thenAnswer(new Answer() { + private final AtomicInteger count = new AtomicInteger(); + + @Override + public TagDefinition answer(InvocationOnMock invocationOnMock) throws Throwable { + getSingleResultInt.incrementAndGet(); + if (fakeRaceCondition) { + // fake + // ensure the first 2 accesses throw to + // help fake a race condition (2, or possibly the same, + // thread failing to access the resource) + if (count.get() < 2) { + count.incrementAndGet(); + throw new NoResultException(); + } + } + else { + // real + if (!atomicBoolean.get()) { + throw new NoResultException(); + } + } + return tagDefinition; + } + }); + AtomicInteger persistInt = new AtomicInteger(); + doAnswer(new Answer() { + private final AtomicInteger count = new AtomicInteger(); + + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + persistInt.incrementAndGet(); + if (fakeRaceCondition) { + // fake + if (count.get() < 1) { + count.incrementAndGet(); + return null; + } + else { + throw new EntityExistsException(raceConditionError); + } + } + else { + // real + if (!atomicBoolean.getAndSet(true)) { + // first thread gets null... + return null; + } else { + // all other threads get entity exists exception + throw new EntityExistsException(raceConditionError); + } + } + } + }).when(myEntityManager).persist(any(Object.class)); + + ourLogger.setLevel(Level.WARN); + + // test +// ExecutorService service = Executors.newFixedThreadPool(threads); + ConcurrentHashMap outcomes = new ConcurrentHashMap<>(); + ConcurrentHashMap errors = new ConcurrentHashMap<>(); + + ThreadPoolTaskExecutor executor = ThreadPoolUtil.newThreadPool(threads, threads * 2, "test-"); + + AtomicInteger counter = new AtomicInteger(); + + CountDownLatch latch = new CountDownLatch(threads); + Runnable task = () -> { + latch.countDown(); + try { + TagDefinition retTag = myTestDao.getTagOrNull(transactionDetails, tagType, scheme, term, label); + outcomes.put(retTag.hashCode(), retTag); + counter.incrementAndGet(); + } catch (Exception ex) { + errors.put(ex.hashCode(), ex); + } + }; + + ArrayList futures = new ArrayList<>(); + for (int i = 0; i < threads; i++) { + futures.add(executor.submit(task)); + } + for (Future f : futures) { + f.get(); + } + AsyncUtil.awaitLatchAndIgnoreInterrupt(latch, (long) threads, TimeUnit.SECONDS); + +// try { +// ArrayList futures = new ArrayList<>(); +// for (int i = 0; i < threads; i++) { +// futures.add(service.submit(task)); +// } +// for (Future f : futures) { +// f.get(); +// } +// } finally { +// service.shutdown(); +// } +// // should not take a second per thread. +// // but will take some time, due to the delays above. +// // a second per thread seems like a good threshold. +// Assertions.assertTrue( +// service.awaitTermination(threads, TimeUnit.SECONDS) +// ); + + Assertions.assertEquals(threads + 1, getSingleResultInt.get(), "Not enough gets " + getSingleResultInt.get()); + Assertions.assertEquals(threads, persistInt.get(), "Not enough persists " + persistInt.get()); + + // verify + Assertions.assertEquals(1, outcomes.size()); + Assertions.assertEquals(threads, counter.get()); + Assertions.assertEquals(0, errors.size(), + errors.values().stream().map(Throwable::getMessage) + .collect(Collectors.joining(", "))); + + // verify we logged some race conditions + ArgumentCaptor captor = ArgumentCaptor.forClass(ILoggingEvent.class); + verify(myAppender, Mockito.atLeastOnce()) + .doAppend(captor.capture()); + assertTrue(captor.getAllValues().get(0).getMessage() + .contains(raceConditionError)); + } + + @Test + public void getTagOrNull_failingForever_throwsInternalErrorAndLogsWarnings() { + // setup + TagTypeEnum tagType = TagTypeEnum.TAG; + String scheme = "http://localhost"; + String term = "code123"; + String label = "hollow world"; + TransactionDetails transactionDetails = new TransactionDetails(); + String exMsg = "Hi there"; + String readError = "No read for you"; + + ourLogger.setLevel(Level.WARN); + + // mock objects + CriteriaBuilder builder = getMockedCriteriaBuilder(); + TypedQuery query = mock(TypedQuery.class); + CriteriaQuery cq = mock(CriteriaQuery.class); + Root from = getMockedFrom(); + + // when + when(myEntityManager.getCriteriaBuilder()) + .thenReturn(builder); + when(builder.createQuery(any(Class.class))) + .thenReturn(cq); + when(cq.from(any(Class.class))) + .thenReturn(from); + when(myEntityManager.createQuery(any(CriteriaQuery.class))) + .thenReturn(query); + when(query.getSingleResult()) + .thenThrow(new NoResultException(readError)); + doThrow(new RuntimeException(exMsg)) + .when(myEntityManager).persist(any(Object.class)); + + // test + try { + myTestDao.getTagOrNull(transactionDetails, tagType, scheme, term, label); + fail(); + } catch (Exception ex) { + // verify + assertEquals("Tag get/create failed after 10 attempts with error(s): " + exMsg, ex.getMessage()); + + ArgumentCaptor appenderCaptor = ArgumentCaptor.forClass(ILoggingEvent.class); + verify(myAppender, Mockito.times(10)) + .doAppend(appenderCaptor.capture()); + List events = appenderCaptor.getAllValues(); + assertEquals(10, events.size()); + for (int i = 0; i < 10; i++) { + String actualMsg = events.get(i).getMessage(); + assertEquals( + "Tag read/write failed: " + + exMsg + + ". " + + "This is not a failure on its own, " + + "but could be useful information in the result of an actual failure.", + actualMsg + ); + } + } + } + + ////////// Static access tests + @Test public void cleanProvenanceSourceUri() { assertEquals("", BaseHapiFhirDao.cleanProvenanceSourceUri(null)); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4CreateTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4CreateTest.java index 2f710956ded..d4d017b42ca 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4CreateTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4CreateTest.java @@ -21,10 +21,12 @@ import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException; import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; import ca.uhn.fhir.util.BundleBuilder; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateUtils; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.r4.model.Bundle; +import org.hl7.fhir.r4.model.Coding; import org.hl7.fhir.r4.model.DateType; import org.hl7.fhir.r4.model.DecimalType; import org.hl7.fhir.r4.model.Encounter; @@ -46,9 +48,14 @@ import org.springframework.data.domain.PageRequest; import java.io.IOException; import java.math.BigDecimal; +import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.stream.Collectors; import static org.hamcrest.MatcherAssert.assertThat; @@ -1002,6 +1009,57 @@ public class FhirResourceDaoR4CreateTest extends BaseJpaR4Test { assertEquals(1, ids.size()); } + @Test + public void testResourceWithTagCreationNoFailures() throws ExecutionException, InterruptedException { + ExecutorService pool = Executors.newFixedThreadPool(5); + try { + Coding tag = new Coding(); + tag.setCode("code123"); + tag.setDisplay("Display Name"); + tag.setSystem("System123"); + + Patient p = new Patient(); + IIdType id = myPatientDao.create(p).getId(); + + List> futures = new ArrayList<>(); + for (int i = 0; i < 50; i++) { + Patient updatePatient = new Patient(); + updatePatient.setId(id.toUnqualifiedVersionless()); + updatePatient.addIdentifier().setSystem("" + i); + updatePatient.setActive(true); + updatePatient.getMeta().addTag(tag); + + int finalI = i; + Future future = pool.submit(() -> { + ourLog.info("Starting update {}", finalI); + try { + try { + myPatientDao.update(updatePatient); + } catch (ResourceVersionConflictException e) { + assertEquals("The operation has failed with a version constraint failure. This generally means that two clients/threads were trying to update the same resource at the same time, and this request was chosen as the failing request.", e.getMessage()); + } + } catch (Exception e) { + ourLog.error("Failure", e); + return e.toString(); + } + ourLog.info("Finished update {}", finalI); + return null; + }); + futures.add(future); + } + + for (Future next : futures) { + String nextError = next.get(); + if (StringUtils.isNotBlank(nextError)) { + fail(nextError); + } + } + + } finally { + pool.shutdown(); + } + } + @Test public void testCreateWithNormalizedQuantitySearchNotSupported_SmallerThanCanonicalUnit() { diff --git a/hapi-fhir-validation/src/test/java/org/hl7/fhir/r4/validation/FhirInstanceValidatorR4Test.java b/hapi-fhir-validation/src/test/java/org/hl7/fhir/r4/validation/FhirInstanceValidatorR4Test.java index e503acdc20d..32f3b66e1fa 100644 --- a/hapi-fhir-validation/src/test/java/org/hl7/fhir/r4/validation/FhirInstanceValidatorR4Test.java +++ b/hapi-fhir-validation/src/test/java/org/hl7/fhir/r4/validation/FhirInstanceValidatorR4Test.java @@ -79,7 +79,6 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.IOException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -101,7 +100,6 @@ import static org.hamcrest.Matchers.not; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.nullable;