3156 multiple tag race condition (#3297)
* 3156 fix race condition that results in multiple tags being created * 3156 added changelog * cleanup * added smile jira ticket * 3156 review fixes * review fixes * 3156 added an additional integratoin test * cleanup * review fix and master merge * threadsafe stuff * small change * upping min threads for tests to pass * changing test * test check * fix * test testing * one more update * using threadpoolutil * temporary measure * git push * all builds Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-MacBook-Pro.local>
This commit is contained in:
parent
ad3e282b8b
commit
65398ca14f
|
@ -0,0 +1,7 @@
|
||||||
|
---
|
||||||
|
issue: 3156
|
||||||
|
type: fix
|
||||||
|
jira: smile-3382
|
||||||
|
title: "Fixed race condition in BaseHapiFhirDao that resulted in exceptions
|
||||||
|
being thrown when multiple resources in a single bundle transaction
|
||||||
|
had the same tags."
|
|
@ -79,6 +79,7 @@ import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum;
|
||||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
||||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
||||||
|
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
|
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
|
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
|
||||||
|
@ -118,8 +119,12 @@ import org.springframework.context.ApplicationContext;
|
||||||
import org.springframework.context.ApplicationContextAware;
|
import org.springframework.context.ApplicationContextAware;
|
||||||
import org.springframework.stereotype.Repository;
|
import org.springframework.stereotype.Repository;
|
||||||
import org.springframework.transaction.PlatformTransactionManager;
|
import org.springframework.transaction.PlatformTransactionManager;
|
||||||
|
import org.springframework.transaction.TransactionDefinition;
|
||||||
|
import org.springframework.transaction.TransactionStatus;
|
||||||
|
import org.springframework.transaction.support.TransactionCallback;
|
||||||
import org.springframework.transaction.support.TransactionSynchronization;
|
import org.springframework.transaction.support.TransactionSynchronization;
|
||||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||||
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
|
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
@ -148,6 +153,7 @@ import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.StringTokenizer;
|
import java.util.StringTokenizer;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.apache.commons.lang3.StringUtils.defaultIfBlank;
|
import static org.apache.commons.lang3.StringUtils.defaultIfBlank;
|
||||||
import static org.apache.commons.lang3.StringUtils.defaultString;
|
import static org.apache.commons.lang3.StringUtils.defaultString;
|
||||||
|
@ -180,6 +186,9 @@ import static org.apache.commons.lang3.StringUtils.trim;
|
||||||
@Repository
|
@Repository
|
||||||
public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStorageDao implements IDao, IJpaDao<T>, ApplicationContextAware {
|
public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStorageDao implements IDao, IJpaDao<T>, ApplicationContextAware {
|
||||||
|
|
||||||
|
// total attempts to do a tag transaction
|
||||||
|
private static final int TOTAL_TAG_READ_ATTEMPTS = 10;
|
||||||
|
|
||||||
public static final long INDEX_STATUS_INDEXED = 1L;
|
public static final long INDEX_STATUS_INDEXED = 1L;
|
||||||
public static final long INDEX_STATUS_INDEXING_FAILED = 2L;
|
public static final long INDEX_STATUS_INDEXING_FAILED = 2L;
|
||||||
public static final String NS_JPA_PROFILE = "https://github.com/hapifhir/hapi-fhir/ns/jpa/profile";
|
public static final String NS_JPA_PROFILE = "https://github.com/hapifhir/hapi-fhir/ns/jpa/profile";
|
||||||
|
@ -247,6 +256,9 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
|
||||||
@Autowired(required = false)
|
@Autowired(required = false)
|
||||||
private IFulltextSearchSvc myFulltextSearchSvc;
|
private IFulltextSearchSvc myFulltextSearchSvc;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private PlatformTransactionManager myTransactionManager;
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void setSearchParamPresenceSvc(ISearchParamPresenceSvc theSearchParamPresenceSvc) {
|
public void setSearchParamPresenceSvc(ISearchParamPresenceSvc theSearchParamPresenceSvc) {
|
||||||
mySearchParamPresenceSvc = theSearchParamPresenceSvc;
|
mySearchParamPresenceSvc = theSearchParamPresenceSvc;
|
||||||
|
@ -398,15 +410,33 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
|
||||||
|
|
||||||
MemoryCacheService.TagDefinitionCacheKey key = toTagDefinitionMemoryCacheKey(theTagType, theScheme, theTerm);
|
MemoryCacheService.TagDefinitionCacheKey key = toTagDefinitionMemoryCacheKey(theTagType, theScheme, theTerm);
|
||||||
|
|
||||||
|
|
||||||
TagDefinition retVal = myMemoryCacheService.getIfPresent(MemoryCacheService.CacheEnum.TAG_DEFINITION, key);
|
TagDefinition retVal = myMemoryCacheService.getIfPresent(MemoryCacheService.CacheEnum.TAG_DEFINITION, key);
|
||||||
|
|
||||||
|
|
||||||
if (retVal == null) {
|
if (retVal == null) {
|
||||||
HashMap<MemoryCacheService.TagDefinitionCacheKey, TagDefinition> resolvedTagDefinitions = theTransactionDetails.getOrCreateUserData(HapiTransactionService.XACT_USERDATA_KEY_RESOLVED_TAG_DEFINITIONS, () -> new HashMap<>());
|
HashMap<MemoryCacheService.TagDefinitionCacheKey, TagDefinition> resolvedTagDefinitions = theTransactionDetails.getOrCreateUserData(HapiTransactionService.XACT_USERDATA_KEY_RESOLVED_TAG_DEFINITIONS, () -> new HashMap<>());
|
||||||
retVal = resolvedTagDefinitions.get(key);
|
retVal = resolvedTagDefinitions.get(key);
|
||||||
|
|
||||||
if (retVal == null) {
|
if (retVal == null) {
|
||||||
|
// actual DB hit(s) happen here
|
||||||
|
retVal = getOrCreateTag(theTagType, theScheme, theTerm, theLabel);
|
||||||
|
|
||||||
|
TransactionSynchronization sync = new AddTagDefinitionToCacheAfterCommitSynchronization(key, retVal);
|
||||||
|
TransactionSynchronizationManager.registerSynchronization(sync);
|
||||||
|
|
||||||
|
resolvedTagDefinitions.put(key, retVal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return retVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the tag defined by the fed in values, or saves it if it does not
|
||||||
|
* exist.
|
||||||
|
*
|
||||||
|
* Can also throw an InternalErrorException if something bad happens.
|
||||||
|
*/
|
||||||
|
private TagDefinition getOrCreateTag(TagTypeEnum theTagType, String theScheme, String theTerm, String theLabel) {
|
||||||
CriteriaBuilder builder = myEntityManager.getCriteriaBuilder();
|
CriteriaBuilder builder = myEntityManager.getCriteriaBuilder();
|
||||||
CriteriaQuery<TagDefinition> cq = builder.createQuery(TagDefinition.class);
|
CriteriaQuery<TagDefinition> cq = builder.createQuery(TagDefinition.class);
|
||||||
Root<TagDefinition> from = cq.from(TagDefinition.class);
|
Root<TagDefinition> from = cq.from(TagDefinition.class);
|
||||||
|
@ -426,18 +456,78 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
|
||||||
}
|
}
|
||||||
|
|
||||||
TypedQuery<TagDefinition> q = myEntityManager.createQuery(cq);
|
TypedQuery<TagDefinition> q = myEntityManager.createQuery(cq);
|
||||||
|
|
||||||
|
TransactionTemplate template = new TransactionTemplate(myTransactionManager);
|
||||||
|
template.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
|
||||||
|
|
||||||
|
// this transaction will attempt to get or create the tag,
|
||||||
|
// repeating (on any failure) 10 times.
|
||||||
|
// if it fails more than this, we will throw exceptions
|
||||||
|
TagDefinition retVal;
|
||||||
|
int count = 0;
|
||||||
|
HashSet<Throwable> throwables = new HashSet<>();
|
||||||
|
do {
|
||||||
try {
|
try {
|
||||||
retVal = q.getSingleResult();
|
retVal = template.execute(new TransactionCallback<TagDefinition>() {
|
||||||
|
|
||||||
|
// do the actual DB call(s) to read and/or write the values
|
||||||
|
private TagDefinition readOrCreate() {
|
||||||
|
TagDefinition val;
|
||||||
|
try {
|
||||||
|
val = q.getSingleResult();
|
||||||
} catch (NoResultException e) {
|
} catch (NoResultException e) {
|
||||||
retVal = new TagDefinition(theTagType, theScheme, theTerm, theLabel);
|
val = new TagDefinition(theTagType, theScheme, theTerm, theLabel);
|
||||||
myEntityManager.persist(retVal);
|
myEntityManager.persist(val);
|
||||||
|
}
|
||||||
|
return val;
|
||||||
}
|
}
|
||||||
|
|
||||||
TransactionSynchronization sync = new AddTagDefinitionToCacheAfterCommitSynchronization(key, retVal);
|
@Override
|
||||||
TransactionSynchronizationManager.registerSynchronization(sync);
|
public TagDefinition doInTransaction(TransactionStatus status) {
|
||||||
|
TagDefinition tag = null;
|
||||||
|
|
||||||
resolvedTagDefinitions.put(key, retVal);
|
try {
|
||||||
|
tag = readOrCreate();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
// log any exceptions - just in case
|
||||||
|
// they may be signs of things to come...
|
||||||
|
ourLog.warn(
|
||||||
|
"Tag read/write failed: "
|
||||||
|
+ ex.getMessage() + ". "
|
||||||
|
+ "This is not a failure on its own, "
|
||||||
|
+ "but could be useful information in the result of an actual failure."
|
||||||
|
);
|
||||||
|
throwables.add(ex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return tag;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (Exception ex) {
|
||||||
|
// transaction template can fail if connections to db are exhausted
|
||||||
|
// and/or timeout
|
||||||
|
ourLog.warn("Transaction failed with: "
|
||||||
|
+ ex.getMessage() + ". "
|
||||||
|
+ "Transaction will rollback and be reattempted."
|
||||||
|
);
|
||||||
|
retVal = null;
|
||||||
|
}
|
||||||
|
count++;
|
||||||
|
} while (retVal == null && count < TOTAL_TAG_READ_ATTEMPTS);
|
||||||
|
|
||||||
|
if (retVal == null) {
|
||||||
|
// if tag is still null,
|
||||||
|
// something bad must be happening
|
||||||
|
// - throw
|
||||||
|
String msg = throwables.stream()
|
||||||
|
.map(Throwable::getMessage)
|
||||||
|
.collect(Collectors.joining(", "));
|
||||||
|
throw new InternalErrorException(
|
||||||
|
"Tag get/create failed after "
|
||||||
|
+ TOTAL_TAG_READ_ATTEMPTS
|
||||||
|
+ " attempts with error(s): "
|
||||||
|
+ msg
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
return retVal;
|
return retVal;
|
||||||
|
|
|
@ -323,6 +323,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
|
||||||
}
|
}
|
||||||
|
|
||||||
// Perform actual DB update
|
// Perform actual DB update
|
||||||
|
// this call will also update the metadata
|
||||||
ResourceTable updatedEntity = updateEntity(theRequest, theResource, entity, null, thePerformIndexing, false, theTransactionDetails, false, thePerformIndexing);
|
ResourceTable updatedEntity = updateEntity(theRequest, theResource, entity, null, thePerformIndexing, false, theTransactionDetails, false, thePerformIndexing);
|
||||||
|
|
||||||
// Store the resource forced ID if necessary
|
// Store the resource forced ID if necessary
|
||||||
|
@ -366,6 +367,8 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
|
||||||
|
|
||||||
// Update the version/last updated in the resource so that interceptors get
|
// Update the version/last updated in the resource so that interceptors get
|
||||||
// the correct version
|
// the correct version
|
||||||
|
// TODO - the above updateEntity calls updateResourceMetadata
|
||||||
|
// Maybe we don't need this call here?
|
||||||
updateResourceMetadata(entity, theResource);
|
updateResourceMetadata(entity, theResource);
|
||||||
|
|
||||||
// Populate the PID in the resource so it is available to hooks
|
// Populate the PID in the resource so it is available to hooks
|
||||||
|
|
|
@ -41,8 +41,10 @@ public class TestDstu2Config extends BaseJavaConfigDstu2 {
|
||||||
* We use a randomized number of maximum threads in order to try
|
* We use a randomized number of maximum threads in order to try
|
||||||
* and catch any potential deadlocks caused by database connection
|
* and catch any potential deadlocks caused by database connection
|
||||||
* starvation
|
* starvation
|
||||||
|
*
|
||||||
|
* A minimum of 2 is required for most transactions.
|
||||||
*/
|
*/
|
||||||
ourMaxThreads = (int) (Math.random() * 6.0) + 1;
|
ourMaxThreads = (int) (Math.random() * 6.0) + 2;
|
||||||
|
|
||||||
if ("true".equals(System.getProperty("single_db_connection"))) {
|
if ("true".equals(System.getProperty("single_db_connection"))) {
|
||||||
ourMaxThreads = 1;
|
ourMaxThreads = 1;
|
||||||
|
|
|
@ -102,9 +102,11 @@ public class TestDstu3Config extends BaseJavaConfigDstu3 {
|
||||||
/*
|
/*
|
||||||
* We use a randomized number of maximum threads in order to try
|
* We use a randomized number of maximum threads in order to try
|
||||||
* and catch any potential deadlocks caused by database connection
|
* and catch any potential deadlocks caused by database connection
|
||||||
* starvation
|
* starvation.
|
||||||
|
*
|
||||||
|
* We need a minimum of 2 for most transactions, so 2 is added
|
||||||
*/
|
*/
|
||||||
int maxThreads = (int) (Math.random() * 6.0) + 1;
|
int maxThreads = (int) (Math.random() * 6.0) + 2;
|
||||||
|
|
||||||
if ("true".equals(System.getProperty("single_db_connection"))) {
|
if ("true".equals(System.getProperty("single_db_connection"))) {
|
||||||
maxThreads = 1;
|
maxThreads = 1;
|
||||||
|
|
|
@ -42,9 +42,12 @@ public class TestR5Config extends BaseJavaConfigR5 {
|
||||||
* We use a randomized number of maximum threads in order to try
|
* We use a randomized number of maximum threads in order to try
|
||||||
* and catch any potential deadlocks caused by database connection
|
* and catch any potential deadlocks caused by database connection
|
||||||
* starvation
|
* starvation
|
||||||
|
*
|
||||||
|
* A minimum of 2 is necessary for most transactions,
|
||||||
|
* so 2 will be our limit
|
||||||
*/
|
*/
|
||||||
if (ourMaxThreads == null) {
|
if (ourMaxThreads == null) {
|
||||||
ourMaxThreads = (int) (Math.random() * 6.0) + 1;
|
ourMaxThreads = (int) (Math.random() * 6.0) + 2;
|
||||||
|
|
||||||
if ("true".equals(System.getProperty("single_db_connection"))) {
|
if ("true".equals(System.getProperty("single_db_connection"))) {
|
||||||
ourMaxThreads = 1;
|
ourMaxThreads = 1;
|
||||||
|
|
|
@ -1,11 +1,393 @@
|
||||||
package ca.uhn.fhir.jpa.dao;
|
package ca.uhn.fhir.jpa.dao;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.jpa.model.entity.TagDefinition;
|
||||||
|
import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
|
||||||
|
import ca.uhn.fhir.jpa.util.MemoryCacheService;
|
||||||
|
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
||||||
|
import ca.uhn.fhir.util.AsyncUtil;
|
||||||
|
import ca.uhn.fhir.util.ThreadPoolUtil;
|
||||||
|
import ch.qos.logback.classic.Level;
|
||||||
|
import ch.qos.logback.classic.Logger;
|
||||||
|
import ch.qos.logback.classic.spi.ILoggingEvent;
|
||||||
|
import ch.qos.logback.core.Appender;
|
||||||
|
import org.hl7.fhir.r4.model.Patient;
|
||||||
|
import org.jetbrains.annotations.Nullable;
|
||||||
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
import org.mockito.InjectMocks;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
|
import org.springframework.transaction.PlatformTransactionManager;
|
||||||
|
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||||
|
|
||||||
|
import javax.persistence.EntityExistsException;
|
||||||
|
import javax.persistence.EntityManager;
|
||||||
|
import javax.persistence.NoResultException;
|
||||||
|
import javax.persistence.TypedQuery;
|
||||||
|
import javax.persistence.criteria.CriteriaBuilder;
|
||||||
|
import javax.persistence.criteria.CriteriaQuery;
|
||||||
|
import javax.persistence.criteria.Path;
|
||||||
|
import javax.persistence.criteria.Predicate;
|
||||||
|
import javax.persistence.criteria.Root;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.doThrow;
|
||||||
|
import static org.mockito.Mockito.lenient;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
public class BaseHapiFhirDaoTest {
|
public class BaseHapiFhirDaoTest {
|
||||||
|
|
||||||
|
private static class TestDao extends BaseHapiFhirDao<Patient> {
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
@Override
|
||||||
|
protected String getResourceName() {
|
||||||
|
return "Patient";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TagDefinition getTagOrNull(TransactionDetails theDetails,
|
||||||
|
TagTypeEnum theEnum,
|
||||||
|
String theScheme,
|
||||||
|
String theTerm,
|
||||||
|
String theLabel) {
|
||||||
|
// we need to init synchronization due to what
|
||||||
|
// the underlying class is doing
|
||||||
|
try {
|
||||||
|
TransactionSynchronizationManager.initSynchronization();
|
||||||
|
return super.getTagOrNull(theDetails, theEnum, theScheme, theTerm, theLabel);
|
||||||
|
} finally {
|
||||||
|
TransactionSynchronizationManager.clearSynchronization();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Logger ourLogger;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private Appender<ILoggingEvent> myAppender;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private MemoryCacheService myMemoryCacheService;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private EntityManager myEntityManager;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private PlatformTransactionManager myTransactionManager;
|
||||||
|
|
||||||
|
@InjectMocks
|
||||||
|
private TestDao myTestDao;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void init() {
|
||||||
|
ourLogger = (Logger) LoggerFactory.getLogger(BaseHapiFhirDao.class);
|
||||||
|
ourLogger.addAppender(myAppender);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterEach
|
||||||
|
public void end() {
|
||||||
|
ourLogger.detachAppender(myAppender);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a mocked criteria builder
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private CriteriaBuilder getMockedCriteriaBuilder() {
|
||||||
|
Predicate pred = mock(Predicate.class);
|
||||||
|
|
||||||
|
CriteriaBuilder builder = mock(CriteriaBuilder.class);
|
||||||
|
// lenient due to multiple equal calls with different inputs
|
||||||
|
lenient().when(builder.equal(any(), any()))
|
||||||
|
.thenReturn(pred);
|
||||||
|
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a mocked from
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private Root<TagDefinition> getMockedFrom() {
|
||||||
|
Path path = mock(Path.class);
|
||||||
|
|
||||||
|
Root<TagDefinition> from = mock(Root.class);
|
||||||
|
// lenient due to multiple get calls with different inputs
|
||||||
|
lenient().when(from.get(anyString()))
|
||||||
|
.thenReturn(path);
|
||||||
|
return from;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void getTagOrNull_raceCondition_wontUpsertDuplicates() throws InterruptedException, ExecutionException {
|
||||||
|
/*
|
||||||
|
* We use this boolean to fake a race condition.
|
||||||
|
* Boolean is used for two reasons:
|
||||||
|
* 1) We don't want an unstable test (so a fake
|
||||||
|
* race condition will ensure the test always executes
|
||||||
|
* exactly as expected... but this just ensures the code
|
||||||
|
* is not buggy, not that race conditions are actually handled)
|
||||||
|
* 2) We want the ability (and confidence!) to know
|
||||||
|
* that a _real_ race condition can be handled. Setting
|
||||||
|
* this boolean false (and potentially tweaking thread count)
|
||||||
|
* gives us this confidence.
|
||||||
|
*
|
||||||
|
* Set this false to try with a real race condition
|
||||||
|
*/
|
||||||
|
boolean fakeRaceCondition = true;
|
||||||
|
|
||||||
|
// the more threads, the more likely we
|
||||||
|
// are to see race conditions.
|
||||||
|
// We need a lot to ensure at least 2 threads
|
||||||
|
// are in the create method at the same time
|
||||||
|
int threads = fakeRaceCondition ? 2 : 30;
|
||||||
|
|
||||||
|
// setup
|
||||||
|
TagTypeEnum tagType = TagTypeEnum.TAG;
|
||||||
|
String scheme = "http://localhost";
|
||||||
|
String term = "code123";
|
||||||
|
String label = "hollow world";
|
||||||
|
TransactionDetails transactionDetails = new TransactionDetails();
|
||||||
|
String raceConditionError = "Entity exists; if this is logged, you have race condition issues!";
|
||||||
|
|
||||||
|
TagDefinition tagDefinition = new TagDefinition(tagType,
|
||||||
|
scheme,
|
||||||
|
term,
|
||||||
|
label);
|
||||||
|
|
||||||
|
// mock objects
|
||||||
|
CriteriaBuilder builder = getMockedCriteriaBuilder();
|
||||||
|
TypedQuery<TagDefinition> query = mock(TypedQuery.class);
|
||||||
|
CriteriaQuery<TagDefinition> cq = mock(CriteriaQuery.class);
|
||||||
|
Root<TagDefinition> from = getMockedFrom();
|
||||||
|
|
||||||
|
// when
|
||||||
|
when(myEntityManager.getCriteriaBuilder())
|
||||||
|
.thenReturn(builder);
|
||||||
|
when(builder.createQuery(any(Class.class)))
|
||||||
|
.thenReturn(cq);
|
||||||
|
when(cq.from(any(Class.class)))
|
||||||
|
.thenReturn(from);
|
||||||
|
when(myEntityManager.createQuery(any(CriteriaQuery.class)))
|
||||||
|
.thenReturn(query);
|
||||||
|
AtomicBoolean atomicBoolean = new AtomicBoolean(false);
|
||||||
|
AtomicInteger getSingleResultInt = new AtomicInteger();
|
||||||
|
when(query.getSingleResult())
|
||||||
|
.thenAnswer(new Answer<TagDefinition>() {
|
||||||
|
private final AtomicInteger count = new AtomicInteger();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TagDefinition answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||||
|
getSingleResultInt.incrementAndGet();
|
||||||
|
if (fakeRaceCondition) {
|
||||||
|
// fake
|
||||||
|
// ensure the first 2 accesses throw to
|
||||||
|
// help fake a race condition (2, or possibly the same,
|
||||||
|
// thread failing to access the resource)
|
||||||
|
if (count.get() < 2) {
|
||||||
|
count.incrementAndGet();
|
||||||
|
throw new NoResultException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// real
|
||||||
|
if (!atomicBoolean.get()) {
|
||||||
|
throw new NoResultException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tagDefinition;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
AtomicInteger persistInt = new AtomicInteger();
|
||||||
|
doAnswer(new Answer() {
|
||||||
|
private final AtomicInteger count = new AtomicInteger();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||||
|
persistInt.incrementAndGet();
|
||||||
|
if (fakeRaceCondition) {
|
||||||
|
// fake
|
||||||
|
if (count.get() < 1) {
|
||||||
|
count.incrementAndGet();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
throw new EntityExistsException(raceConditionError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// real
|
||||||
|
if (!atomicBoolean.getAndSet(true)) {
|
||||||
|
// first thread gets null...
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
// all other threads get entity exists exception
|
||||||
|
throw new EntityExistsException(raceConditionError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).when(myEntityManager).persist(any(Object.class));
|
||||||
|
|
||||||
|
ourLogger.setLevel(Level.WARN);
|
||||||
|
|
||||||
|
// test
|
||||||
|
// ExecutorService service = Executors.newFixedThreadPool(threads);
|
||||||
|
ConcurrentHashMap<Integer, TagDefinition> outcomes = new ConcurrentHashMap<>();
|
||||||
|
ConcurrentHashMap<Integer, Throwable> errors = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
ThreadPoolTaskExecutor executor = ThreadPoolUtil.newThreadPool(threads, threads * 2, "test-");
|
||||||
|
|
||||||
|
AtomicInteger counter = new AtomicInteger();
|
||||||
|
|
||||||
|
CountDownLatch latch = new CountDownLatch(threads);
|
||||||
|
Runnable task = () -> {
|
||||||
|
latch.countDown();
|
||||||
|
try {
|
||||||
|
TagDefinition retTag = myTestDao.getTagOrNull(transactionDetails, tagType, scheme, term, label);
|
||||||
|
outcomes.put(retTag.hashCode(), retTag);
|
||||||
|
counter.incrementAndGet();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
errors.put(ex.hashCode(), ex);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
ArrayList<Future> futures = new ArrayList<>();
|
||||||
|
for (int i = 0; i < threads; i++) {
|
||||||
|
futures.add(executor.submit(task));
|
||||||
|
}
|
||||||
|
for (Future f : futures) {
|
||||||
|
f.get();
|
||||||
|
}
|
||||||
|
AsyncUtil.awaitLatchAndIgnoreInterrupt(latch, (long) threads, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
// try {
|
||||||
|
// ArrayList<Future> futures = new ArrayList<>();
|
||||||
|
// for (int i = 0; i < threads; i++) {
|
||||||
|
// futures.add(service.submit(task));
|
||||||
|
// }
|
||||||
|
// for (Future f : futures) {
|
||||||
|
// f.get();
|
||||||
|
// }
|
||||||
|
// } finally {
|
||||||
|
// service.shutdown();
|
||||||
|
// }
|
||||||
|
// // should not take a second per thread.
|
||||||
|
// // but will take some time, due to the delays above.
|
||||||
|
// // a second per thread seems like a good threshold.
|
||||||
|
// Assertions.assertTrue(
|
||||||
|
// service.awaitTermination(threads, TimeUnit.SECONDS)
|
||||||
|
// );
|
||||||
|
|
||||||
|
Assertions.assertEquals(threads + 1, getSingleResultInt.get(), "Not enough gets " + getSingleResultInt.get());
|
||||||
|
Assertions.assertEquals(threads, persistInt.get(), "Not enough persists " + persistInt.get());
|
||||||
|
|
||||||
|
// verify
|
||||||
|
Assertions.assertEquals(1, outcomes.size());
|
||||||
|
Assertions.assertEquals(threads, counter.get());
|
||||||
|
Assertions.assertEquals(0, errors.size(),
|
||||||
|
errors.values().stream().map(Throwable::getMessage)
|
||||||
|
.collect(Collectors.joining(", ")));
|
||||||
|
|
||||||
|
// verify we logged some race conditions
|
||||||
|
ArgumentCaptor<ILoggingEvent> captor = ArgumentCaptor.forClass(ILoggingEvent.class);
|
||||||
|
verify(myAppender, Mockito.atLeastOnce())
|
||||||
|
.doAppend(captor.capture());
|
||||||
|
assertTrue(captor.getAllValues().get(0).getMessage()
|
||||||
|
.contains(raceConditionError));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void getTagOrNull_failingForever_throwsInternalErrorAndLogsWarnings() {
|
||||||
|
// setup
|
||||||
|
TagTypeEnum tagType = TagTypeEnum.TAG;
|
||||||
|
String scheme = "http://localhost";
|
||||||
|
String term = "code123";
|
||||||
|
String label = "hollow world";
|
||||||
|
TransactionDetails transactionDetails = new TransactionDetails();
|
||||||
|
String exMsg = "Hi there";
|
||||||
|
String readError = "No read for you";
|
||||||
|
|
||||||
|
ourLogger.setLevel(Level.WARN);
|
||||||
|
|
||||||
|
// mock objects
|
||||||
|
CriteriaBuilder builder = getMockedCriteriaBuilder();
|
||||||
|
TypedQuery<TagDefinition> query = mock(TypedQuery.class);
|
||||||
|
CriteriaQuery<TagDefinition> cq = mock(CriteriaQuery.class);
|
||||||
|
Root<TagDefinition> from = getMockedFrom();
|
||||||
|
|
||||||
|
// when
|
||||||
|
when(myEntityManager.getCriteriaBuilder())
|
||||||
|
.thenReturn(builder);
|
||||||
|
when(builder.createQuery(any(Class.class)))
|
||||||
|
.thenReturn(cq);
|
||||||
|
when(cq.from(any(Class.class)))
|
||||||
|
.thenReturn(from);
|
||||||
|
when(myEntityManager.createQuery(any(CriteriaQuery.class)))
|
||||||
|
.thenReturn(query);
|
||||||
|
when(query.getSingleResult())
|
||||||
|
.thenThrow(new NoResultException(readError));
|
||||||
|
doThrow(new RuntimeException(exMsg))
|
||||||
|
.when(myEntityManager).persist(any(Object.class));
|
||||||
|
|
||||||
|
// test
|
||||||
|
try {
|
||||||
|
myTestDao.getTagOrNull(transactionDetails, tagType, scheme, term, label);
|
||||||
|
fail();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
// verify
|
||||||
|
assertEquals("Tag get/create failed after 10 attempts with error(s): " + exMsg, ex.getMessage());
|
||||||
|
|
||||||
|
ArgumentCaptor<ILoggingEvent> appenderCaptor = ArgumentCaptor.forClass(ILoggingEvent.class);
|
||||||
|
verify(myAppender, Mockito.times(10))
|
||||||
|
.doAppend(appenderCaptor.capture());
|
||||||
|
List<ILoggingEvent> events = appenderCaptor.getAllValues();
|
||||||
|
assertEquals(10, events.size());
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
String actualMsg = events.get(i).getMessage();
|
||||||
|
assertEquals(
|
||||||
|
"Tag read/write failed: "
|
||||||
|
+ exMsg
|
||||||
|
+ ". "
|
||||||
|
+ "This is not a failure on its own, "
|
||||||
|
+ "but could be useful information in the result of an actual failure.",
|
||||||
|
actualMsg
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
////////// Static access tests
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void cleanProvenanceSourceUri() {
|
public void cleanProvenanceSourceUri() {
|
||||||
assertEquals("", BaseHapiFhirDao.cleanProvenanceSourceUri(null));
|
assertEquals("", BaseHapiFhirDao.cleanProvenanceSourceUri(null));
|
||||||
|
|
|
@ -21,10 +21,12 @@ import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
|
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
|
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
|
||||||
import ca.uhn.fhir.util.BundleBuilder;
|
import ca.uhn.fhir.util.BundleBuilder;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.time.DateUtils;
|
import org.apache.commons.lang3.time.DateUtils;
|
||||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
import org.hl7.fhir.instance.model.api.IIdType;
|
import org.hl7.fhir.instance.model.api.IIdType;
|
||||||
import org.hl7.fhir.r4.model.Bundle;
|
import org.hl7.fhir.r4.model.Bundle;
|
||||||
|
import org.hl7.fhir.r4.model.Coding;
|
||||||
import org.hl7.fhir.r4.model.DateType;
|
import org.hl7.fhir.r4.model.DateType;
|
||||||
import org.hl7.fhir.r4.model.DecimalType;
|
import org.hl7.fhir.r4.model.DecimalType;
|
||||||
import org.hl7.fhir.r4.model.Encounter;
|
import org.hl7.fhir.r4.model.Encounter;
|
||||||
|
@ -46,9 +48,14 @@ import org.springframework.data.domain.PageRequest;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
@ -1002,6 +1009,57 @@ public class FhirResourceDaoR4CreateTest extends BaseJpaR4Test {
|
||||||
assertEquals(1, ids.size());
|
assertEquals(1, ids.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testResourceWithTagCreationNoFailures() throws ExecutionException, InterruptedException {
|
||||||
|
ExecutorService pool = Executors.newFixedThreadPool(5);
|
||||||
|
try {
|
||||||
|
Coding tag = new Coding();
|
||||||
|
tag.setCode("code123");
|
||||||
|
tag.setDisplay("Display Name");
|
||||||
|
tag.setSystem("System123");
|
||||||
|
|
||||||
|
Patient p = new Patient();
|
||||||
|
IIdType id = myPatientDao.create(p).getId();
|
||||||
|
|
||||||
|
List<Future<String>> futures = new ArrayList<>();
|
||||||
|
for (int i = 0; i < 50; i++) {
|
||||||
|
Patient updatePatient = new Patient();
|
||||||
|
updatePatient.setId(id.toUnqualifiedVersionless());
|
||||||
|
updatePatient.addIdentifier().setSystem("" + i);
|
||||||
|
updatePatient.setActive(true);
|
||||||
|
updatePatient.getMeta().addTag(tag);
|
||||||
|
|
||||||
|
int finalI = i;
|
||||||
|
Future<String> future = pool.submit(() -> {
|
||||||
|
ourLog.info("Starting update {}", finalI);
|
||||||
|
try {
|
||||||
|
try {
|
||||||
|
myPatientDao.update(updatePatient);
|
||||||
|
} catch (ResourceVersionConflictException e) {
|
||||||
|
assertEquals("The operation has failed with a version constraint failure. This generally means that two clients/threads were trying to update the same resource at the same time, and this request was chosen as the failing request.", e.getMessage());
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
ourLog.error("Failure", e);
|
||||||
|
return e.toString();
|
||||||
|
}
|
||||||
|
ourLog.info("Finished update {}", finalI);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
futures.add(future);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Future<String> next : futures) {
|
||||||
|
String nextError = next.get();
|
||||||
|
if (StringUtils.isNotBlank(nextError)) {
|
||||||
|
fail(nextError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
pool.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreateWithNormalizedQuantitySearchNotSupported_SmallerThanCanonicalUnit() {
|
public void testCreateWithNormalizedQuantitySearchNotSupported_SmallerThanCanonicalUnit() {
|
||||||
|
|
||||||
|
|
|
@ -79,7 +79,6 @@ import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Method;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -101,7 +100,6 @@ import static org.hamcrest.Matchers.not;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
import static org.junit.jupiter.api.Assertions.fail;
|
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.anyString;
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
import static org.mockito.ArgumentMatchers.nullable;
|
import static org.mockito.ArgumentMatchers.nullable;
|
||||||
|
|
Loading…
Reference in New Issue