Refactor tag creation logic and handle concurrent access:
- Simplified tag creation by removing unnecessary transaction complexity, since we allow duplicate tags in hfj_tag_def from #4813 - Removed redundant retry logic based on updated DB constraints
This commit is contained in:
parent
06c8fd0e31
commit
b18c22b703
|
@ -43,6 +43,7 @@ import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
|
|||
import ca.uhn.fhir.jpa.bulk.imprt.svc.BulkDataImportSvcImpl;
|
||||
import ca.uhn.fhir.jpa.cache.IResourceVersionSvc;
|
||||
import ca.uhn.fhir.jpa.cache.ResourceVersionSvcDaoImpl;
|
||||
import ca.uhn.fhir.jpa.dao.CacheTagDefinitionDao;
|
||||
import ca.uhn.fhir.jpa.dao.DaoSearchParamProvider;
|
||||
import ca.uhn.fhir.jpa.dao.HistoryBuilder;
|
||||
import ca.uhn.fhir.jpa.dao.HistoryBuilderFactory;
|
||||
|
@ -56,6 +57,7 @@ import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
|
|||
import ca.uhn.fhir.jpa.dao.TransactionProcessor;
|
||||
import ca.uhn.fhir.jpa.dao.data.IResourceModifiedDao;
|
||||
import ca.uhn.fhir.jpa.dao.data.IResourceSearchUrlDao;
|
||||
import ca.uhn.fhir.jpa.dao.data.ITagDefinitionDao;
|
||||
import ca.uhn.fhir.jpa.dao.expunge.ExpungeEverythingService;
|
||||
import ca.uhn.fhir.jpa.dao.expunge.ExpungeOperation;
|
||||
import ca.uhn.fhir.jpa.dao.expunge.ExpungeService;
|
||||
|
@ -893,4 +895,10 @@ public class JpaConfig {
|
|||
FhirContext theFhirContext, HibernatePropertiesProvider theHibernatePropertiesProvider) {
|
||||
return new ResourceHistoryCalculator(theFhirContext, theHibernatePropertiesProvider.isOracleDialect());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public CacheTagDefinitionDao tagDefinitionDao(
|
||||
ITagDefinitionDao tagDefinitionDao, MemoryCacheService memoryCacheService) {
|
||||
return new CacheTagDefinitionDao(tagDefinitionDao, memoryCacheService);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -75,7 +75,6 @@ import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryResourceMatcher;
|
|||
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
|
||||
import ca.uhn.fhir.jpa.term.api.ITermReadSvc;
|
||||
import ca.uhn.fhir.jpa.util.AddRemoveCount;
|
||||
import ca.uhn.fhir.jpa.util.MemoryCacheService;
|
||||
import ca.uhn.fhir.jpa.util.QueryChunker;
|
||||
import ca.uhn.fhir.model.api.IResource;
|
||||
import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum;
|
||||
|
@ -106,7 +105,6 @@ import jakarta.annotation.Nonnull;
|
|||
import jakarta.annotation.Nullable;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.persistence.EntityManager;
|
||||
import jakarta.persistence.EntityManagerFactory;
|
||||
import jakarta.persistence.PersistenceContext;
|
||||
import jakarta.persistence.PersistenceContextType;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
|
@ -130,15 +128,11 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.support.TransactionSynchronization;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.List;
|
||||
|
@ -235,9 +229,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
|
|||
@Autowired
|
||||
private IPartitionLookupSvc myPartitionLookupSvc;
|
||||
|
||||
@Autowired
|
||||
private MemoryCacheService myMemoryCacheService;
|
||||
|
||||
@Autowired(required = false)
|
||||
private IFulltextSearchSvc myFulltextSearchSvc;
|
||||
|
||||
|
@ -245,12 +236,8 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
|
|||
protected ResourceHistoryCalculator myResourceHistoryCalculator;
|
||||
|
||||
@Autowired
|
||||
private PlatformTransactionManager myTransactionManager;
|
||||
protected CacheTagDefinitionDao cacheTagDefinitionDao;
|
||||
|
||||
@Autowired
|
||||
private EntityManagerFactory myEntityManagerFactory;
|
||||
|
||||
protected TagDefinitionDao tagDefinitionDao;
|
||||
protected final CodingSpy myCodingSpy = new CodingSpy();
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -298,7 +285,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
|
|||
TagList tagList = ResourceMetadataKeyEnum.TAG_LIST.get(theResource);
|
||||
if (tagList != null) {
|
||||
for (Tag next : tagList) {
|
||||
TagDefinition def = getTagOrNull(
|
||||
TagDefinition def = cacheTagDefinitionDao.getTagOrNull(
|
||||
theTransactionDetails,
|
||||
TagTypeEnum.TAG,
|
||||
next.getScheme(),
|
||||
|
@ -317,7 +304,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
|
|||
List<BaseCodingDt> securityLabels = ResourceMetadataKeyEnum.SECURITY_LABELS.get(theResource);
|
||||
if (securityLabels != null) {
|
||||
for (BaseCodingDt next : securityLabels) {
|
||||
TagDefinition def = getTagOrNull(
|
||||
TagDefinition def = cacheTagDefinitionDao.getTagOrNull(
|
||||
theTransactionDetails,
|
||||
TagTypeEnum.SECURITY_LABEL,
|
||||
next.getSystemElement().getValue(),
|
||||
|
@ -336,7 +323,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
|
|||
List<IdDt> profiles = ResourceMetadataKeyEnum.PROFILES.get(theResource);
|
||||
if (profiles != null) {
|
||||
for (IIdType next : profiles) {
|
||||
TagDefinition def = getTagOrNull(
|
||||
TagDefinition def = cacheTagDefinitionDao.getTagOrNull(
|
||||
theTransactionDetails, TagTypeEnum.PROFILE, NS_JPA_PROFILE, next.getValue(), null, null, null);
|
||||
if (def != null) {
|
||||
ResourceTag tag = theEntity.addTag(def);
|
||||
|
@ -355,7 +342,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
|
|||
List<? extends IBaseCoding> tagList = theResource.getMeta().getTag();
|
||||
if (tagList != null) {
|
||||
for (IBaseCoding next : tagList) {
|
||||
TagDefinition def = getTagOrNull(
|
||||
TagDefinition def = cacheTagDefinitionDao.getTagOrNull(
|
||||
theTransactionDetails,
|
||||
TagTypeEnum.TAG,
|
||||
next.getSystem(),
|
||||
|
@ -374,7 +361,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
|
|||
List<? extends IBaseCoding> securityLabels = theResource.getMeta().getSecurity();
|
||||
if (securityLabels != null) {
|
||||
for (IBaseCoding next : securityLabels) {
|
||||
TagDefinition def = getTagOrNull(
|
||||
TagDefinition def = cacheTagDefinitionDao.getTagOrNull(
|
||||
theTransactionDetails,
|
||||
TagTypeEnum.SECURITY_LABEL,
|
||||
next.getSystem(),
|
||||
|
@ -393,7 +380,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
|
|||
List<? extends IPrimitiveType<String>> profiles = theResource.getMeta().getProfile();
|
||||
if (profiles != null) {
|
||||
for (IPrimitiveType<String> next : profiles) {
|
||||
TagDefinition def = getTagOrNull(
|
||||
TagDefinition def = cacheTagDefinitionDao.getTagOrNull(
|
||||
theTransactionDetails, TagTypeEnum.PROFILE, NS_JPA_PROFILE, next.getValue(), null, null, null);
|
||||
if (def != null) {
|
||||
ResourceTag tag = theEntity.addTag(def);
|
||||
|
@ -413,7 +400,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
|
|||
if (!def.isStandardType()) {
|
||||
String profile = def.getResourceProfile("");
|
||||
if (isNotBlank(profile)) {
|
||||
TagDefinition profileDef = getTagOrNull(
|
||||
TagDefinition profileDef = cacheTagDefinitionDao.getTagOrNull(
|
||||
theTransactionDetails, TagTypeEnum.PROFILE, NS_JPA_PROFILE, profile, null, null, null);
|
||||
|
||||
ResourceTag tag = theEntity.addTag(profileDef);
|
||||
|
@ -447,47 +434,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
|
|||
myContext = theContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* <code>null</code> will only be returned if the scheme and tag are both blank
|
||||
*/
|
||||
protected TagDefinition getTagOrNull(
|
||||
TransactionDetails theTransactionDetails,
|
||||
TagTypeEnum theTagType,
|
||||
String theScheme,
|
||||
String theTerm,
|
||||
String theLabel,
|
||||
String theVersion,
|
||||
Boolean theUserSelected) {
|
||||
if (isBlank(theScheme) && isBlank(theTerm) && isBlank(theLabel)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
MemoryCacheService.TagDefinitionCacheKey key =
|
||||
toTagDefinitionMemoryCacheKey(theTagType, theScheme, theTerm, theVersion, theUserSelected);
|
||||
|
||||
TagDefinition retVal = myMemoryCacheService.getIfPresent(MemoryCacheService.CacheEnum.TAG_DEFINITION, key);
|
||||
if (retVal == null) {
|
||||
HashMap<MemoryCacheService.TagDefinitionCacheKey, TagDefinition> resolvedTagDefinitions =
|
||||
theTransactionDetails.getOrCreateUserData(
|
||||
HapiTransactionService.XACT_USERDATA_KEY_RESOLVED_TAG_DEFINITIONS, HashMap::new);
|
||||
|
||||
retVal = resolvedTagDefinitions.get(key);
|
||||
|
||||
if (retVal == null) {
|
||||
// actual DB hit(s) happen here
|
||||
retVal = tagDefinitionDao.getOrCreateTag(
|
||||
theTagType, theScheme, theTerm, theLabel, theVersion, theUserSelected);
|
||||
|
||||
TransactionSynchronization sync = new AddTagDefinitionToCacheAfterCommitSynchronization(key, retVal);
|
||||
TransactionSynchronizationManager.registerSynchronization(sync);
|
||||
|
||||
resolvedTagDefinitions.put(key, retVal);
|
||||
}
|
||||
}
|
||||
|
||||
return retVal;
|
||||
}
|
||||
|
||||
void incrementId(T theResource, ResourceTable theSavedEntity, IIdType theResourceId) {
|
||||
if (theResourceId == null || theResourceId.getVersionIdPart() == null) {
|
||||
theSavedEntity.initializeVersion();
|
||||
|
@ -1713,9 +1659,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
|
|||
}
|
||||
|
||||
@PostConstruct
|
||||
public void start() {
|
||||
this.tagDefinitionDao = new TagDefinitionDao(myEntityManagerFactory, myTransactionManager);
|
||||
}
|
||||
public void start() {}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setStorageSettingsForUnitTest(JpaStorageSettings theStorageSettings) {
|
||||
|
@ -1754,30 +1698,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
|
|||
myJpaStorageResourceParser = theJpaStorageResourceParser;
|
||||
}
|
||||
|
||||
private class AddTagDefinitionToCacheAfterCommitSynchronization implements TransactionSynchronization {
|
||||
|
||||
private final TagDefinition myTagDefinition;
|
||||
private final MemoryCacheService.TagDefinitionCacheKey myKey;
|
||||
|
||||
public AddTagDefinitionToCacheAfterCommitSynchronization(
|
||||
MemoryCacheService.TagDefinitionCacheKey theKey, TagDefinition theTagDefinition) {
|
||||
myTagDefinition = theTagDefinition;
|
||||
myKey = theKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterCommit() {
|
||||
myMemoryCacheService.put(MemoryCacheService.CacheEnum.TAG_DEFINITION, myKey, myTagDefinition);
|
||||
}
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
public static MemoryCacheService.TagDefinitionCacheKey toTagDefinitionMemoryCacheKey(
|
||||
TagTypeEnum theTagType, String theScheme, String theTerm, String theVersion, Boolean theUserSelected) {
|
||||
return new MemoryCacheService.TagDefinitionCacheKey(
|
||||
theTagType, theScheme, theTerm, theVersion, theUserSelected);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static String parseContentTextIntoWords(FhirContext theContext, IBaseResource theResource) {
|
||||
|
||||
|
|
|
@ -1043,7 +1043,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
|
|||
if (!entityHasTag) {
|
||||
theEntity.setHasTags(true);
|
||||
|
||||
TagDefinition def = getTagOrNull(
|
||||
TagDefinition def = cacheTagDefinitionDao.getTagOrNull(
|
||||
theTransactionDetails,
|
||||
nextDef.getTagType(),
|
||||
nextDef.getSystem(),
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
package ca.uhn.fhir.jpa.dao;
|
||||
|
||||
import ca.uhn.fhir.jpa.dao.data.ITagDefinitionDao;
|
||||
import ca.uhn.fhir.jpa.model.entity.TagDefinition;
|
||||
import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
|
||||
import ca.uhn.fhir.jpa.util.MemoryCacheService;
|
||||
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
||||
import jakarta.annotation.Nonnull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.data.domain.Pageable;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.transaction.support.TransactionSynchronization;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.isBlank;
|
||||
|
||||
@Repository
|
||||
public class CacheTagDefinitionDao {
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(CacheTagDefinitionDao.class);
|
||||
|
||||
private final ITagDefinitionDao tagDefinitionDao;
|
||||
private final MemoryCacheService memoryCacheService;
|
||||
|
||||
public CacheTagDefinitionDao(ITagDefinitionDao tagDefinitionDao, MemoryCacheService memoryCacheService) {
|
||||
this.tagDefinitionDao = tagDefinitionDao;
|
||||
this.memoryCacheService = memoryCacheService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a TagDefinition or null if the scheme, term, and label are all blank.
|
||||
*/
|
||||
protected TagDefinition getTagOrNull(
|
||||
TransactionDetails transactionDetails,
|
||||
TagTypeEnum tagType,
|
||||
String scheme,
|
||||
String term,
|
||||
String label,
|
||||
String version,
|
||||
Boolean userSelected) {
|
||||
|
||||
if (isBlank(scheme) && isBlank(term) && isBlank(label)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
MemoryCacheService.TagDefinitionCacheKey key =
|
||||
toTagDefinitionMemoryCacheKey(tagType, scheme, term, version, userSelected);
|
||||
TagDefinition tagDefinition = memoryCacheService.getIfPresent(MemoryCacheService.CacheEnum.TAG_DEFINITION, key);
|
||||
|
||||
if (tagDefinition == null) {
|
||||
HashMap<MemoryCacheService.TagDefinitionCacheKey, TagDefinition> resolvedTagDefinitions =
|
||||
transactionDetails.getOrCreateUserData("resolvedTagDefinitions", HashMap::new);
|
||||
|
||||
tagDefinition = resolvedTagDefinitions.get(key);
|
||||
|
||||
if (tagDefinition == null) {
|
||||
tagDefinition = getOrCreateTag(tagType, scheme, term, label, version, userSelected);
|
||||
|
||||
TransactionSynchronization sync =
|
||||
new AddTagDefinitionToCacheAfterCommitSynchronization(key, tagDefinition);
|
||||
TransactionSynchronizationManager.registerSynchronization(sync);
|
||||
|
||||
resolvedTagDefinitions.put(key, tagDefinition);
|
||||
}
|
||||
}
|
||||
|
||||
return tagDefinition;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets or creates a TagDefinition entity.
|
||||
*/
|
||||
private TagDefinition getOrCreateTag(
|
||||
TagTypeEnum tagType, String scheme, String term, String label, String version, Boolean userSelected) {
|
||||
List<TagDefinition> result = tagDefinitionDao.findByTagTypeAndSchemeAndTermAndVersionAndUserSelected(
|
||||
tagType, scheme, term, version, userSelected, Pageable.ofSize(1));
|
||||
|
||||
if (!result.isEmpty()) {
|
||||
return result.get(0);
|
||||
} else {
|
||||
// Create a new TagDefinition if no result is found
|
||||
TagDefinition newTag = new TagDefinition(tagType, scheme, term, label);
|
||||
newTag.setVersion(version);
|
||||
newTag.setUserSelected(userSelected);
|
||||
return tagDefinitionDao.save(newTag);
|
||||
}
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private static MemoryCacheService.TagDefinitionCacheKey toTagDefinitionMemoryCacheKey(
|
||||
TagTypeEnum tagType, String scheme, String term, String version, Boolean userSelected) {
|
||||
return new MemoryCacheService.TagDefinitionCacheKey(tagType, scheme, term, version, userSelected);
|
||||
}
|
||||
|
||||
private class AddTagDefinitionToCacheAfterCommitSynchronization implements TransactionSynchronization {
|
||||
private final TagDefinition tagDefinition;
|
||||
private final MemoryCacheService.TagDefinitionCacheKey key;
|
||||
|
||||
public AddTagDefinitionToCacheAfterCommitSynchronization(
|
||||
MemoryCacheService.TagDefinitionCacheKey key, TagDefinition tagDefinition) {
|
||||
this.tagDefinition = tagDefinition;
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterCommit() {
|
||||
memoryCacheService.put(MemoryCacheService.CacheEnum.TAG_DEFINITION, key, tagDefinition);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,249 +0,0 @@
|
|||
package ca.uhn.fhir.jpa.dao;
|
||||
|
||||
import ca.uhn.fhir.i18n.Msg;
|
||||
import ca.uhn.fhir.jpa.model.entity.TagDefinition;
|
||||
import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
|
||||
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
||||
import ca.uhn.fhir.util.ThreadPoolUtil;
|
||||
import jakarta.persistence.EntityManager;
|
||||
import jakarta.persistence.EntityManagerFactory;
|
||||
import jakarta.persistence.NoResultException;
|
||||
import jakarta.persistence.TypedQuery;
|
||||
import jakarta.persistence.criteria.CriteriaBuilder;
|
||||
import jakarta.persistence.criteria.CriteriaQuery;
|
||||
import jakarta.persistence.criteria.Predicate;
|
||||
import jakarta.persistence.criteria.Root;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.TransactionDefinition;
|
||||
import org.springframework.transaction.TransactionStatus;
|
||||
import org.springframework.transaction.support.TransactionCallback;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Objects.isNull;
|
||||
import static org.apache.commons.lang3.StringUtils.isBlank;
|
||||
|
||||
/**
|
||||
* This class is responsible for getting or creating tags.
|
||||
* The class is designed to do the tag retrieval in a separate thread and transaction to avoid any form of
|
||||
* suspension of any current transaction, an operation that is not widely supported in an XA environment.
|
||||
*/
|
||||
public class TagDefinitionDao {
|
||||
// total attempts to do a tag transaction
|
||||
private static final int TOTAL_TAG_READ_ATTEMPTS = 10;
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(TagDefinitionDao.class);
|
||||
private final ThreadPoolTaskExecutor taskExecutor = ThreadPoolUtil.newThreadPool(1, 5, "getOrCreateTag-");
|
||||
|
||||
private final EntityManagerFactory myEntityManagerFactory;
|
||||
private final PlatformTransactionManager myTransactionManager;
|
||||
|
||||
public TagDefinitionDao(
|
||||
EntityManagerFactory theEntityManagerFactory, PlatformTransactionManager theTransactionManager) {
|
||||
this.myEntityManagerFactory = theEntityManagerFactory;
|
||||
this.myTransactionManager = theTransactionManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the tag defined by the fed in values, or saves it if it does not
|
||||
* exist.
|
||||
* <p>
|
||||
* Can also throw an InternalErrorException if something bad happens.
|
||||
*/
|
||||
TagDefinition getOrCreateTag(
|
||||
TagTypeEnum theTagType,
|
||||
String theScheme,
|
||||
String theTerm,
|
||||
String theLabel,
|
||||
String theVersion,
|
||||
Boolean theUserSelected) {
|
||||
try {
|
||||
// Execute tag retrieval in a separate thread to avoid transaction suspension issues,
|
||||
// which can cause problems in an XA environment that does not support suspend/resuming transactions,
|
||||
// such as when using the PostgreSQL JDBC driver.
|
||||
Future<TagDefinition> future = taskExecutor.submit(new RetryableGetOrCreateTagUnitOfWork(
|
||||
myEntityManagerFactory,
|
||||
myTransactionManager,
|
||||
theTagType,
|
||||
theScheme,
|
||||
theTerm,
|
||||
theLabel,
|
||||
theVersion,
|
||||
theUserSelected));
|
||||
return future.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new InternalErrorException(
|
||||
Msg.code(2547) + "Tag get/create thread failed to execute: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
static class RetryableGetOrCreateTagUnitOfWork implements Callable<TagDefinition> {
|
||||
private final EntityManagerFactory myEntityManagerFactory;
|
||||
private final PlatformTransactionManager myTransactionManager;
|
||||
private final TagTypeEnum theTagType;
|
||||
private final String theScheme;
|
||||
private final String theTerm;
|
||||
private final String theLabel;
|
||||
private final String theVersion;
|
||||
private final Boolean theUserSelected;
|
||||
|
||||
public RetryableGetOrCreateTagUnitOfWork(
|
||||
EntityManagerFactory theEntityManagerFactory,
|
||||
PlatformTransactionManager theTransactionManager,
|
||||
TagTypeEnum theTagType,
|
||||
String theScheme,
|
||||
String theTerm,
|
||||
String theLabel,
|
||||
String theVersion,
|
||||
Boolean theUserSelected) {
|
||||
this.myEntityManagerFactory = theEntityManagerFactory;
|
||||
this.myTransactionManager = theTransactionManager;
|
||||
this.theTagType = theTagType;
|
||||
this.theScheme = theScheme;
|
||||
this.theTerm = theTerm;
|
||||
this.theLabel = theLabel;
|
||||
this.theVersion = theVersion;
|
||||
this.theUserSelected = theUserSelected;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TagDefinition call() throws Exception {
|
||||
// Create a new entity manager for the current thread.
|
||||
try (EntityManager myEntityManager = myEntityManagerFactory.createEntityManager()) {
|
||||
TransactionTemplate template = new TransactionTemplate(myTransactionManager);
|
||||
template.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
|
||||
|
||||
// this transaction will attempt to get or create the tag,
|
||||
// repeating (on any failure) 10 times.
|
||||
// if it fails more than this, we will throw exceptions
|
||||
TagDefinition retVal;
|
||||
int count = 0;
|
||||
HashSet<Throwable> throwables = new HashSet<>();
|
||||
do {
|
||||
try {
|
||||
retVal = template.execute(new TransactionCallback<>() {
|
||||
// do the actual DB call(s) to read and/or write the values
|
||||
private TagDefinition readOrCreate() {
|
||||
TagDefinition val;
|
||||
try {
|
||||
// Join transaction if needed.
|
||||
if (!myEntityManager.isJoinedToTransaction()) {
|
||||
myEntityManager.joinTransaction();
|
||||
}
|
||||
|
||||
TypedQuery<TagDefinition> q =
|
||||
buildTagQuery(myEntityManager, theTagType, theScheme, theTerm, theVersion, theUserSelected);
|
||||
q.setMaxResults(1);
|
||||
|
||||
val = q.getSingleResult();
|
||||
} catch (NoResultException e) {
|
||||
val = new TagDefinition(theTagType, theScheme, theTerm, theLabel);
|
||||
val.setVersion(theVersion);
|
||||
val.setUserSelected(theUserSelected);
|
||||
myEntityManager.persist(val);
|
||||
myEntityManager.flush();
|
||||
}
|
||||
return val;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TagDefinition doInTransaction(TransactionStatus status) {
|
||||
TagDefinition tag = null;
|
||||
|
||||
try {
|
||||
tag = readOrCreate();
|
||||
} catch (Exception ex) {
|
||||
// log any exceptions - just in case
|
||||
// they may be signs of things to come...
|
||||
ourLog.warn(
|
||||
"Tag read/write failed: "
|
||||
+ ex.getMessage() + ". "
|
||||
+ "This is not a failure on its own, "
|
||||
+ "but could be useful information in the result of an actual failure.",
|
||||
ex);
|
||||
throwables.add(ex);
|
||||
}
|
||||
|
||||
return tag;
|
||||
}
|
||||
});
|
||||
} catch (Exception ex) {
|
||||
// transaction template can fail if connections to db are exhausted and/or timeout
|
||||
ourLog.warn(
|
||||
"Transaction failed with: {}. Transaction will rollback and be reattempted.",
|
||||
ex.getMessage());
|
||||
retVal = null;
|
||||
}
|
||||
|
||||
// Clear the persistence context to avoid stale data on retry
|
||||
if (retVal == null) {
|
||||
myEntityManager.clear();
|
||||
}
|
||||
|
||||
count++;
|
||||
} while (retVal == null && count < TOTAL_TAG_READ_ATTEMPTS);
|
||||
|
||||
if (retVal == null) {
|
||||
// if tag is still null,
|
||||
// something bad must be happening
|
||||
// - throw
|
||||
String msg = throwables.stream().map(Throwable::getMessage).collect(Collectors.joining(", "));
|
||||
throw new InternalErrorException(Msg.code(2023)
|
||||
+ "Tag get/create failed after "
|
||||
+ TOTAL_TAG_READ_ATTEMPTS
|
||||
+ " attempts with error(s): "
|
||||
+ msg);
|
||||
}
|
||||
|
||||
return retVal;
|
||||
}
|
||||
}
|
||||
|
||||
private TypedQuery<TagDefinition> buildTagQuery(
|
||||
EntityManager theEntityManager,
|
||||
TagTypeEnum theTagType,
|
||||
String theScheme,
|
||||
String theTerm,
|
||||
String theVersion,
|
||||
Boolean theUserSelected) {
|
||||
CriteriaBuilder builder = theEntityManager.getCriteriaBuilder();
|
||||
CriteriaQuery<TagDefinition> cq = builder.createQuery(TagDefinition.class);
|
||||
Root<TagDefinition> from = cq.from(TagDefinition.class);
|
||||
|
||||
List<Predicate> predicates = new ArrayList<>();
|
||||
predicates.add(builder.and(
|
||||
builder.equal(from.get("myTagType"), theTagType), builder.equal(from.get("myCode"), theTerm)));
|
||||
|
||||
predicates.add(
|
||||
isBlank(theScheme)
|
||||
? builder.isNull(from.get("mySystem"))
|
||||
: builder.equal(from.get("mySystem"), theScheme));
|
||||
|
||||
predicates.add(
|
||||
isBlank(theVersion)
|
||||
? builder.isNull(from.get("myVersion"))
|
||||
: builder.equal(from.get("myVersion"), theVersion));
|
||||
|
||||
predicates.add(
|
||||
isNull(theUserSelected)
|
||||
? builder.isNull(from.get("myUserSelected"))
|
||||
: builder.equal(from.get("myUserSelected"), theUserSelected));
|
||||
|
||||
cq.where(predicates.toArray(new Predicate[0]));
|
||||
|
||||
TypedQuery<TagDefinition> query = theEntityManager.createQuery(cq);
|
||||
query.setMaxResults(1);
|
||||
|
||||
return query;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -20,8 +20,25 @@
|
|||
package ca.uhn.fhir.jpa.dao.data;
|
||||
|
||||
import ca.uhn.fhir.jpa.model.entity.TagDefinition;
|
||||
import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
|
||||
import org.springframework.data.domain.Pageable;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface ITagDefinitionDao extends JpaRepository<TagDefinition, Long>, IHapiFhirJpaRepository {
|
||||
// nothing
|
||||
@Query("SELECT t FROM TagDefinition t WHERE " + "t.myTagType = :tagType AND "
|
||||
+ "( :scheme IS NULL OR :scheme = '' OR t.mySystem = :scheme ) AND "
|
||||
+ "t.myCode = :term AND "
|
||||
+ "( :version IS NULL OR :version = '' OR t.myVersion = :version ) AND "
|
||||
+ "( :userSelected IS NULL OR t.myUserSelected = :userSelected )")
|
||||
List<TagDefinition> findByTagTypeAndSchemeAndTermAndVersionAndUserSelected(
|
||||
@Param("tagType") TagTypeEnum tagType,
|
||||
@Param("scheme") String scheme,
|
||||
@Param("term") String term,
|
||||
@Param("version") String version,
|
||||
@Param("userSelected") Boolean userSelected,
|
||||
Pageable pageable);
|
||||
}
|
||||
|
|
|
@ -1,396 +0,0 @@
|
|||
package ca.uhn.fhir.jpa.dao;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import ca.uhn.fhir.jpa.model.entity.TagDefinition;
|
||||
import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
|
||||
import ca.uhn.fhir.jpa.util.MemoryCacheService;
|
||||
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
||||
import ca.uhn.fhir.util.AsyncUtil;
|
||||
import ca.uhn.fhir.util.MetaUtil;
|
||||
import ca.uhn.fhir.util.ThreadPoolUtil;
|
||||
import ch.qos.logback.classic.Level;
|
||||
import ch.qos.logback.classic.Logger;
|
||||
import ch.qos.logback.classic.spi.ILoggingEvent;
|
||||
import ch.qos.logback.core.Appender;
|
||||
import org.hl7.fhir.r4.model.Patient;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.persistence.EntityExistsException;
|
||||
import jakarta.persistence.EntityManager;
|
||||
import jakarta.persistence.NoResultException;
|
||||
import jakarta.persistence.TypedQuery;
|
||||
import jakarta.persistence.criteria.CriteriaBuilder;
|
||||
import jakarta.persistence.criteria.CriteriaQuery;
|
||||
import jakarta.persistence.criteria.Path;
|
||||
import jakarta.persistence.criteria.Predicate;
|
||||
import jakarta.persistence.criteria.Root;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.lenient;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class BaseHapiFhirDaoTest {
|
||||
|
||||
private static class TestDao extends BaseHapiFhirResourceDao<Patient> {
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String getResourceName() {
|
||||
return "Patient";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TagDefinition getTagOrNull(TransactionDetails theDetails,
|
||||
TagTypeEnum theEnum,
|
||||
String theScheme,
|
||||
String theTerm,
|
||||
String theLabel,
|
||||
String theVersion,
|
||||
Boolean theUserSelected ) {
|
||||
// we need to init synchronization due to what
|
||||
// the underlying class is doing
|
||||
try {
|
||||
TransactionSynchronizationManager.initSynchronization();
|
||||
return super.getTagOrNull(theDetails, theEnum, theScheme, theTerm, theLabel, theVersion, theUserSelected);
|
||||
} finally {
|
||||
TransactionSynchronizationManager.clearSynchronization();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Logger ourLogger;
|
||||
|
||||
@Mock
|
||||
private Appender<ILoggingEvent> myAppender;
|
||||
|
||||
@Mock
|
||||
private MemoryCacheService myMemoryCacheService;
|
||||
|
||||
@Mock
|
||||
private EntityManager myEntityManager;
|
||||
|
||||
@Mock
|
||||
private PlatformTransactionManager myTransactionManager;
|
||||
|
||||
@InjectMocks
|
||||
private TestDao myTestDao;
|
||||
|
||||
@BeforeEach
|
||||
public void init() {
|
||||
ourLogger = (Logger) LoggerFactory.getLogger(BaseHapiFhirDao.class);
|
||||
ourLogger.addAppender(myAppender);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void end() {
|
||||
ourLogger.detachAppender(myAppender);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a mocked criteria builder
|
||||
* @return
|
||||
*/
|
||||
private CriteriaBuilder getMockedCriteriaBuilder() {
|
||||
Predicate pred = mock(Predicate.class);
|
||||
|
||||
CriteriaBuilder builder = mock(CriteriaBuilder.class);
|
||||
// lenient due to multiple equal calls with different inputs
|
||||
lenient().when(builder.equal(any(), any()))
|
||||
.thenReturn(pred);
|
||||
|
||||
return builder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a mocked from
|
||||
* @return
|
||||
*/
|
||||
private Root<TagDefinition> getMockedFrom() {
|
||||
Path path = mock(Path.class);
|
||||
|
||||
Root<TagDefinition> from = mock(Root.class);
|
||||
// lenient due to multiple get calls with different inputs
|
||||
lenient().when(from.get(anyString()))
|
||||
.thenReturn(path);
|
||||
return from;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getTagOrNull_raceCondition_wontUpsertDuplicates() throws InterruptedException, ExecutionException {
|
||||
/*
|
||||
* We use this boolean to fake a race condition.
|
||||
* Boolean is used for two reasons:
|
||||
* 1) We don't want an unstable test (so a fake
|
||||
* race condition will ensure the test always executes
|
||||
* exactly as expected... but this just ensures the code
|
||||
* is not buggy, not that race conditions are actually handled)
|
||||
* 2) We want the ability (and confidence!) to know
|
||||
* that a _real_ race condition can be handled. Setting
|
||||
* this boolean false (and potentially tweaking thread count)
|
||||
* gives us this confidence.
|
||||
*
|
||||
* Set this false to try with a real race condition
|
||||
*/
|
||||
boolean fakeRaceCondition = true;
|
||||
|
||||
// the more threads, the more likely we
|
||||
// are to see race conditions.
|
||||
// We need a lot to ensure at least 2 threads
|
||||
// are in the create method at the same time
|
||||
int threads = fakeRaceCondition ? 2 : 30;
|
||||
|
||||
// setup
|
||||
TagTypeEnum tagType = TagTypeEnum.TAG;
|
||||
String scheme = "http://localhost";
|
||||
String term = "code123";
|
||||
String label = "hollow world";
|
||||
String version = "v1.0";
|
||||
Boolean userSelected = true;
|
||||
String raceConditionError = "Entity exists; if this is logged, you have race condition issues!";
|
||||
|
||||
TagDefinition tagDefinition = new TagDefinition(tagType, scheme, term, label);
|
||||
tagDefinition.setVersion(version);
|
||||
tagDefinition.setUserSelected(userSelected);
|
||||
|
||||
// mock objects
|
||||
CriteriaBuilder builder = getMockedCriteriaBuilder();
|
||||
TypedQuery<TagDefinition> query = mock(TypedQuery.class);
|
||||
CriteriaQuery<TagDefinition> cq = mock(CriteriaQuery.class);
|
||||
Root<TagDefinition> from = getMockedFrom();
|
||||
|
||||
// when
|
||||
when(myEntityManager.getCriteriaBuilder())
|
||||
.thenReturn(builder);
|
||||
when(builder.createQuery(any(Class.class)))
|
||||
.thenReturn(cq);
|
||||
when(cq.from(any(Class.class)))
|
||||
.thenReturn(from);
|
||||
when(myEntityManager.createQuery(any(CriteriaQuery.class)))
|
||||
.thenReturn(query);
|
||||
AtomicBoolean atomicBoolean = new AtomicBoolean(false);
|
||||
AtomicInteger getSingleResultInt = new AtomicInteger();
|
||||
when(query.getSingleResult())
|
||||
.thenAnswer(new Answer<TagDefinition>() {
|
||||
private final AtomicInteger count = new AtomicInteger();
|
||||
|
||||
@Override
|
||||
public TagDefinition answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||
getSingleResultInt.incrementAndGet();
|
||||
if (fakeRaceCondition) {
|
||||
// fake
|
||||
// ensure the first 2 accesses throw to
|
||||
// help fake a race condition (2, or possibly the same,
|
||||
// thread failing to access the resource)
|
||||
if (count.get() < 2) {
|
||||
count.incrementAndGet();
|
||||
throw new NoResultException();
|
||||
}
|
||||
}
|
||||
else {
|
||||
// real
|
||||
if (!atomicBoolean.get()) {
|
||||
throw new NoResultException();
|
||||
}
|
||||
}
|
||||
return tagDefinition;
|
||||
}
|
||||
});
|
||||
AtomicInteger persistInt = new AtomicInteger();
|
||||
doAnswer(new Answer() {
|
||||
private final AtomicInteger count = new AtomicInteger();
|
||||
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||
persistInt.incrementAndGet();
|
||||
if (fakeRaceCondition) {
|
||||
// fake
|
||||
if (count.get() < 1) {
|
||||
count.incrementAndGet();
|
||||
return null;
|
||||
}
|
||||
else {
|
||||
throw new EntityExistsException(raceConditionError);
|
||||
}
|
||||
}
|
||||
else {
|
||||
// real
|
||||
if (!atomicBoolean.getAndSet(true)) {
|
||||
// first thread gets null...
|
||||
return null;
|
||||
} else {
|
||||
// all other threads get entity exists exception
|
||||
throw new EntityExistsException(raceConditionError);
|
||||
}
|
||||
}
|
||||
}
|
||||
}).when(myEntityManager).persist(any(Object.class));
|
||||
|
||||
ourLogger.setLevel(Level.WARN);
|
||||
|
||||
// test
|
||||
// ExecutorService service = Executors.newFixedThreadPool(threads);
|
||||
ConcurrentHashMap<Integer, TagDefinition> outcomes = new ConcurrentHashMap<>();
|
||||
ConcurrentHashMap<Integer, Throwable> errors = new ConcurrentHashMap<>();
|
||||
|
||||
ThreadPoolTaskExecutor executor = ThreadPoolUtil.newThreadPool(threads, threads * 2, "test-");
|
||||
|
||||
AtomicInteger counter = new AtomicInteger();
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(threads);
|
||||
Runnable task = () -> {
|
||||
latch.countDown();
|
||||
try {
|
||||
TagDefinition retTag = myTestDao.getTagOrNull(new TransactionDetails(), tagType, scheme, term, label, version, userSelected);
|
||||
outcomes.put(retTag.hashCode(), retTag);
|
||||
counter.incrementAndGet();
|
||||
} catch (Exception ex) {
|
||||
errors.put(ex.hashCode(), ex);
|
||||
}
|
||||
};
|
||||
|
||||
ArrayList<Future> futures = new ArrayList<>();
|
||||
for (int i = 0; i < threads; i++) {
|
||||
futures.add(executor.submit(task));
|
||||
}
|
||||
for (Future f : futures) {
|
||||
f.get();
|
||||
}
|
||||
AsyncUtil.awaitLatchAndIgnoreInterrupt(latch, (long) threads, TimeUnit.SECONDS);
|
||||
|
||||
// try {
|
||||
// ArrayList<Future> futures = new ArrayList<>();
|
||||
// for (int i = 0; i < threads; i++) {
|
||||
// futures.add(service.submit(task));
|
||||
// }
|
||||
// for (Future f : futures) {
|
||||
// f.get();
|
||||
// }
|
||||
// } finally {
|
||||
// service.shutdown();
|
||||
// }
|
||||
// // should not take a second per thread.
|
||||
// // but will take some time, due to the delays above.
|
||||
// // a second per thread seems like a good threshold.
|
||||
// Assertions.assertTrue(
|
||||
// service.awaitTermination(threads, TimeUnit.SECONDS)
|
||||
// );
|
||||
|
||||
assertThat(getSingleResultInt.get()).as("Not enough gets " + getSingleResultInt.get()).isEqualTo(threads + 1);
|
||||
assertThat(persistInt.get()).as("Not enough persists " + persistInt.get()).isEqualTo(threads);
|
||||
|
||||
// verify
|
||||
assertThat(outcomes).hasSize(1);
|
||||
assertEquals(threads, counter.get());
|
||||
assertThat(errors.size()).as(errors.values().stream().map(Throwable::getMessage)
|
||||
.collect(Collectors.joining(", "))).isEqualTo(0);
|
||||
|
||||
// verify we logged some race conditions
|
||||
ArgumentCaptor<ILoggingEvent> captor = ArgumentCaptor.forClass(ILoggingEvent.class);
|
||||
verify(myAppender, Mockito.atLeastOnce())
|
||||
.doAppend(captor.capture());
|
||||
assertThat(captor.getAllValues().get(0).getMessage()).contains(raceConditionError);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getTagOrNull_failingForever_throwsInternalErrorAndLogsWarnings() {
|
||||
// setup
|
||||
TagTypeEnum tagType = TagTypeEnum.TAG;
|
||||
String scheme = "http://localhost";
|
||||
String term = "code123";
|
||||
String label = "hollow world";
|
||||
String version = "v1.0";
|
||||
Boolean userSelected = true;
|
||||
TransactionDetails transactionDetails = new TransactionDetails();
|
||||
String exMsg = "Hi there";
|
||||
String readError = "No read for you";
|
||||
|
||||
ourLogger.setLevel(Level.WARN);
|
||||
|
||||
// mock objects
|
||||
CriteriaBuilder builder = getMockedCriteriaBuilder();
|
||||
TypedQuery<TagDefinition> query = mock(TypedQuery.class);
|
||||
CriteriaQuery<TagDefinition> cq = mock(CriteriaQuery.class);
|
||||
Root<TagDefinition> from = getMockedFrom();
|
||||
|
||||
// when
|
||||
when(myEntityManager.getCriteriaBuilder())
|
||||
.thenReturn(builder);
|
||||
when(builder.createQuery(any(Class.class)))
|
||||
.thenReturn(cq);
|
||||
when(cq.from(any(Class.class)))
|
||||
.thenReturn(from);
|
||||
when(myEntityManager.createQuery(any(CriteriaQuery.class)))
|
||||
.thenReturn(query);
|
||||
when(query.getSingleResult())
|
||||
.thenThrow(new NoResultException(readError));
|
||||
doThrow(new RuntimeException(exMsg))
|
||||
.when(myEntityManager).persist(any(Object.class));
|
||||
|
||||
// test
|
||||
try {
|
||||
myTestDao.getTagOrNull(transactionDetails, tagType, scheme, term, label, version, userSelected);
|
||||
fail();
|
||||
} catch (Exception ex) {
|
||||
// verify
|
||||
assertThat(ex.getMessage()).contains("Tag get/create failed after 10 attempts with error(s): " + exMsg);
|
||||
|
||||
ArgumentCaptor<ILoggingEvent> appenderCaptor = ArgumentCaptor.forClass(ILoggingEvent.class);
|
||||
verify(myAppender, Mockito.times(10))
|
||||
.doAppend(appenderCaptor.capture());
|
||||
List<ILoggingEvent> events = appenderCaptor.getAllValues();
|
||||
assertThat(events).hasSize(10);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
String actualMsg = events.get(i).getMessage();
|
||||
assertThat(actualMsg).isEqualTo("Tag read/write failed: "
|
||||
+ exMsg
|
||||
+ ". "
|
||||
+ "This is not a failure on its own, "
|
||||
+ "but could be useful information in the result of an actual failure.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
////////// Static access tests
|
||||
|
||||
@Test
|
||||
public void cleanProvenanceSourceUri() {
|
||||
assertEquals("", MetaUtil.cleanProvenanceSourceUriOrEmpty(null));
|
||||
assertEquals("abc", MetaUtil.cleanProvenanceSourceUriOrEmpty("abc"));
|
||||
assertEquals("abc", MetaUtil.cleanProvenanceSourceUriOrEmpty("abc#"));
|
||||
assertEquals("abc", MetaUtil.cleanProvenanceSourceUriOrEmpty("abc#def"));
|
||||
assertEquals("abc", MetaUtil.cleanProvenanceSourceUriOrEmpty("abc#def#ghi"));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,151 @@
|
|||
package ca.uhn.fhir.jpa.dao;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
import ca.uhn.fhir.jpa.dao.data.ITagDefinitionDao;
|
||||
import ca.uhn.fhir.jpa.model.entity.TagDefinition;
|
||||
import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
|
||||
import ca.uhn.fhir.jpa.util.MemoryCacheService;
|
||||
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
||||
import ca.uhn.fhir.util.MetaUtil;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.data.domain.Pageable;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class CacheTagDefinitionDaoTest {
|
||||
|
||||
private static class TestDao extends CacheTagDefinitionDao {
|
||||
public TestDao(ITagDefinitionDao theTagDefinitionDao, MemoryCacheService theMemoryCacheService) {
|
||||
super(theTagDefinitionDao, theMemoryCacheService);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TagDefinition getTagOrNull(TransactionDetails theDetails,
|
||||
TagTypeEnum theEnum,
|
||||
String theScheme,
|
||||
String theTerm,
|
||||
String theLabel,
|
||||
String theVersion,
|
||||
Boolean theUserSelected) {
|
||||
try {
|
||||
TransactionSynchronizationManager.initSynchronization();
|
||||
return super.getTagOrNull(theDetails, theEnum, theScheme, theTerm, theLabel, theVersion, theUserSelected);
|
||||
} finally {
|
||||
TransactionSynchronizationManager.clearSynchronization();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Mock
|
||||
private MemoryCacheService myMemoryCacheService;
|
||||
|
||||
@Mock
|
||||
private ITagDefinitionDao tagDefinitionDao;
|
||||
|
||||
@InjectMocks
|
||||
private TestDao myTestDao;
|
||||
|
||||
@BeforeEach
|
||||
public void init() {
|
||||
LoggerFactory.getLogger(BaseHapiFhirDao.class);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() {
|
||||
// Cleanup logic if needed
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetTagOrNull_createsTag_ifNotFound() {
|
||||
// Arrange
|
||||
TagTypeEnum tagType = TagTypeEnum.TAG;
|
||||
String scheme = "http://localhost";
|
||||
String term = "code123";
|
||||
String label = "example label";
|
||||
String version = "v1.0";
|
||||
Boolean userSelected = true;
|
||||
|
||||
TagDefinition newTag = new TagDefinition(tagType, scheme, term, label);
|
||||
newTag.setVersion(version);
|
||||
newTag.setUserSelected(userSelected);
|
||||
|
||||
when(tagDefinitionDao.findByTagTypeAndSchemeAndTermAndVersionAndUserSelected(
|
||||
eq(tagType), eq(scheme), eq(term), eq(version), eq(userSelected), any(Pageable.class)))
|
||||
.thenReturn(List.of());
|
||||
|
||||
when(tagDefinitionDao.save(any(TagDefinition.class))).thenReturn(newTag);
|
||||
|
||||
// Act
|
||||
TagDefinition result = myTestDao.getTagOrNull(new TransactionDetails(), tagType, scheme, term, label, version, userSelected);
|
||||
|
||||
// Assert
|
||||
assertEquals(newTag, result);
|
||||
verify(tagDefinitionDao).save(any(TagDefinition.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimultaneousTagCreation_createsMultipleTags() throws InterruptedException, ExecutionException {
|
||||
int threadCount = 10;
|
||||
TagTypeEnum tagType = TagTypeEnum.TAG;
|
||||
String scheme = "http://localhost";
|
||||
String term = "code123";
|
||||
String label = "example label";
|
||||
String version = "v1.0";
|
||||
Boolean userSelected = true;
|
||||
|
||||
TagDefinition expectedTag = new TagDefinition(tagType, scheme, term, label);
|
||||
expectedTag.setVersion(version);
|
||||
expectedTag.setUserSelected(userSelected);
|
||||
|
||||
when(tagDefinitionDao.findByTagTypeAndSchemeAndTermAndVersionAndUserSelected(
|
||||
eq(tagType), eq(scheme), eq(term), eq(version), eq(userSelected), any(Pageable.class)))
|
||||
.thenReturn(List.of());
|
||||
when(tagDefinitionDao.save(any(TagDefinition.class))).thenReturn(expectedTag);
|
||||
|
||||
// Run the test with multiple threads
|
||||
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
|
||||
try {
|
||||
List<Future<TagDefinition>> futures = new ArrayList<>();
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
futures.add(executor.submit(() -> myTestDao.getTagOrNull(new TransactionDetails(), tagType, scheme, term, label, version, userSelected)));
|
||||
}
|
||||
|
||||
// Check results
|
||||
for (Future<TagDefinition> future : futures) {
|
||||
future.get();
|
||||
}
|
||||
verify(tagDefinitionDao, times(threadCount)).save(any(TagDefinition.class)); // multiple tags allowed
|
||||
} finally {
|
||||
executor.shutdown();
|
||||
}
|
||||
}
|
||||
////////// Static access tests
|
||||
|
||||
@Test
|
||||
public void cleanProvenanceSourceUri() {
|
||||
assertEquals("", MetaUtil.cleanProvenanceSourceUriOrEmpty(null));
|
||||
assertEquals("abc", MetaUtil.cleanProvenanceSourceUriOrEmpty("abc"));
|
||||
assertEquals("abc", MetaUtil.cleanProvenanceSourceUriOrEmpty("abc#"));
|
||||
assertEquals("abc", MetaUtil.cleanProvenanceSourceUriOrEmpty("abc#def"));
|
||||
assertEquals("abc", MetaUtil.cleanProvenanceSourceUriOrEmpty("abc#def#ghi"));
|
||||
}
|
||||
}
|
|
@ -1998,7 +1998,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
|
|||
outcome = mySystemDao.transaction(mySrd, input.get());
|
||||
ourLog.debug("Resp: {}", myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(outcome));
|
||||
myCaptureQueriesListener.logSelectQueries();
|
||||
assertEquals(7, myCaptureQueriesListener.countSelectQueries());
|
||||
assertEquals(6, myCaptureQueriesListener.countSelectQueries());
|
||||
myCaptureQueriesListener.logInsertQueries();
|
||||
assertEquals(7, myCaptureQueriesListener.countInsertQueries());
|
||||
myCaptureQueriesListener.logUpdateQueries();
|
||||
|
@ -3063,7 +3063,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
|
|||
assertEquals(6189, myCaptureQueriesListener.countInsertQueriesForCurrentThread());
|
||||
assertEquals(418, myCaptureQueriesListener.countUpdateQueriesForCurrentThread());
|
||||
assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread());
|
||||
assertEquals(2, myCaptureQueriesListener.countCommits());
|
||||
assertEquals(1, myCaptureQueriesListener.countCommits());
|
||||
assertEquals(0, myCaptureQueriesListener.countRollbacks());
|
||||
|
||||
assertThat(output.getEntry()).hasSize(input.getEntry().size());
|
||||
|
|
|
@ -2973,7 +2973,7 @@ public class PartitioningSqlR4Test extends BasePartitioningR4Test {
|
|||
assertEquals(6189, myCaptureQueriesListener.countInsertQueriesForCurrentThread());
|
||||
assertEquals(418, myCaptureQueriesListener.countUpdateQueriesForCurrentThread());
|
||||
assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread());
|
||||
assertEquals(2, myCaptureQueriesListener.countCommits());
|
||||
assertEquals(1, myCaptureQueriesListener.countCommits());
|
||||
assertEquals(0, myCaptureQueriesListener.countRollbacks());
|
||||
|
||||
assertThat(output.getEntry()).hasSize(input.getEntry().size());
|
||||
|
|
Loading…
Reference in New Issue