Refactor BaseHapiFhirDao.getOrCreateTag method to run in a separate thread for XA transaction compatibility (#6224)

* Fix typo in docs.

* Refactor the use of ConcurrentTaskScheduler to use the non-deprecated constructor.

* Refactor getOrCreateTag method to prevent transaction suspension for XA transaction compatibility

    The getOrCreateTag method previously used a propagation behavior that caused issues with
    XA transactions when using the PostgreSQL JDBC driver. The PGXAConnection does not support
    transaction suspend/resume, which made it incompatible with the existing propagation strategy
    'PROPAGATION_REQUIRES_NEW'.

    This refactor changes the getOrCreateTag logic to perform a lookup/write in a new transaction
    as before, but running in a separate thread, such that the main transaction is not suspended.
    The result is retrieved through a future.

    This change aims to improve compatibility and prevent transaction-related issues when using HAPI-FHIR with
    XA transactions and PostgreSQL.

    Closes #3412

* Refactor tag creation logic and handle concurrent access:

- Simplified tag creation by removing unnecessary transaction complexity, since we allow duplicate tags in hfj_tag_def from #4813
- Removed redundant retry logic based on updated DB constraints
This commit is contained in:
Ibrahim 2024-10-30 15:06:45 +01:00 committed by GitHub
parent ba05db1097
commit e43c140a24
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 312 additions and 625 deletions

View File

@ -43,6 +43,7 @@ import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import ca.uhn.fhir.jpa.bulk.imprt.svc.BulkDataImportSvcImpl;
import ca.uhn.fhir.jpa.cache.IResourceVersionSvc;
import ca.uhn.fhir.jpa.cache.ResourceVersionSvcDaoImpl;
import ca.uhn.fhir.jpa.dao.CacheTagDefinitionDao;
import ca.uhn.fhir.jpa.dao.DaoSearchParamProvider;
import ca.uhn.fhir.jpa.dao.HistoryBuilder;
import ca.uhn.fhir.jpa.dao.HistoryBuilderFactory;
@ -56,6 +57,7 @@ import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.TransactionProcessor;
import ca.uhn.fhir.jpa.dao.data.IResourceModifiedDao;
import ca.uhn.fhir.jpa.dao.data.IResourceSearchUrlDao;
import ca.uhn.fhir.jpa.dao.data.ITagDefinitionDao;
import ca.uhn.fhir.jpa.dao.expunge.ExpungeEverythingService;
import ca.uhn.fhir.jpa.dao.expunge.ExpungeOperation;
import ca.uhn.fhir.jpa.dao.expunge.ExpungeService;
@ -377,17 +379,17 @@ public class JpaConfig {
@Bean
public TaskScheduler taskScheduler() {
ConcurrentTaskScheduler retVal = new ConcurrentTaskScheduler();
retVal.setConcurrentExecutor(scheduledExecutorService().getObject());
retVal.setScheduledExecutor(scheduledExecutorService().getObject());
ConcurrentTaskScheduler retVal = new ConcurrentTaskScheduler(
scheduledExecutorService().getObject(),
scheduledExecutorService().getObject());
return retVal;
}
@Bean(name = TASK_EXECUTOR_NAME)
public AsyncTaskExecutor taskExecutor() {
ConcurrentTaskScheduler retVal = new ConcurrentTaskScheduler();
retVal.setConcurrentExecutor(scheduledExecutorService().getObject());
retVal.setScheduledExecutor(scheduledExecutorService().getObject());
ConcurrentTaskScheduler retVal = new ConcurrentTaskScheduler(
scheduledExecutorService().getObject(),
scheduledExecutorService().getObject());
return retVal;
}
@ -893,4 +895,10 @@ public class JpaConfig {
FhirContext theFhirContext, HibernatePropertiesProvider theHibernatePropertiesProvider) {
return new ResourceHistoryCalculator(theFhirContext, theHibernatePropertiesProvider.isOracleDialect());
}
@Bean
public CacheTagDefinitionDao tagDefinitionDao(
ITagDefinitionDao tagDefinitionDao, MemoryCacheService memoryCacheService) {
return new CacheTagDefinitionDao(tagDefinitionDao, memoryCacheService);
}
}

View File

@ -75,7 +75,6 @@ import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryResourceMatcher;
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
import ca.uhn.fhir.jpa.term.api.ITermReadSvc;
import ca.uhn.fhir.jpa.util.AddRemoveCount;
import ca.uhn.fhir.jpa.util.MemoryCacheService;
import ca.uhn.fhir.jpa.util.QueryChunker;
import ca.uhn.fhir.model.api.IResource;
import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum;
@ -89,7 +88,6 @@ import ca.uhn.fhir.rest.api.InterceptorInvocationTimingEnum;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
@ -107,14 +105,8 @@ import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.persistence.EntityManager;
import jakarta.persistence.NoResultException;
import jakarta.persistence.PersistenceContext;
import jakarta.persistence.PersistenceContextType;
import jakarta.persistence.TypedQuery;
import jakarta.persistence.criteria.CriteriaBuilder;
import jakarta.persistence.criteria.CriteriaQuery;
import jakarta.persistence.criteria.Predicate;
import jakarta.persistence.criteria.Root;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
@ -136,19 +128,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
@ -158,7 +142,6 @@ import java.util.stream.Collectors;
import javax.xml.stream.events.Characters;
import javax.xml.stream.events.XMLEvent;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static org.apache.commons.collections4.CollectionUtils.isEqualCollection;
import static org.apache.commons.lang3.StringUtils.isBlank;
@ -182,8 +165,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
public static final long INDEX_STATUS_INDEXED = 1L;
public static final long INDEX_STATUS_INDEXING_FAILED = 2L;
public static final String NS_JPA_PROFILE = "https://github.com/hapifhir/hapi-fhir/ns/jpa/profile";
// total attempts to do a tag transaction
private static final int TOTAL_TAG_READ_ATTEMPTS = 10;
private static final Logger ourLog = LoggerFactory.getLogger(BaseHapiFhirDao.class);
private static boolean ourValidationDisabledForUnitTest;
private static boolean ourDisableIncrementOnUpdateForUnitTest = false;
@ -248,17 +229,14 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
@Autowired
private IPartitionLookupSvc myPartitionLookupSvc;
@Autowired
private MemoryCacheService myMemoryCacheService;
@Autowired(required = false)
private IFulltextSearchSvc myFulltextSearchSvc;
@Autowired
private PlatformTransactionManager myTransactionManager;
protected ResourceHistoryCalculator myResourceHistoryCalculator;
@Autowired
protected ResourceHistoryCalculator myResourceHistoryCalculator;
protected CacheTagDefinitionDao cacheTagDefinitionDao;
protected final CodingSpy myCodingSpy = new CodingSpy();
@ -307,7 +285,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
TagList tagList = ResourceMetadataKeyEnum.TAG_LIST.get(theResource);
if (tagList != null) {
for (Tag next : tagList) {
TagDefinition def = getTagOrNull(
TagDefinition def = cacheTagDefinitionDao.getTagOrNull(
theTransactionDetails,
TagTypeEnum.TAG,
next.getScheme(),
@ -326,7 +304,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
List<BaseCodingDt> securityLabels = ResourceMetadataKeyEnum.SECURITY_LABELS.get(theResource);
if (securityLabels != null) {
for (BaseCodingDt next : securityLabels) {
TagDefinition def = getTagOrNull(
TagDefinition def = cacheTagDefinitionDao.getTagOrNull(
theTransactionDetails,
TagTypeEnum.SECURITY_LABEL,
next.getSystemElement().getValue(),
@ -345,7 +323,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
List<IdDt> profiles = ResourceMetadataKeyEnum.PROFILES.get(theResource);
if (profiles != null) {
for (IIdType next : profiles) {
TagDefinition def = getTagOrNull(
TagDefinition def = cacheTagDefinitionDao.getTagOrNull(
theTransactionDetails, TagTypeEnum.PROFILE, NS_JPA_PROFILE, next.getValue(), null, null, null);
if (def != null) {
ResourceTag tag = theEntity.addTag(def);
@ -364,7 +342,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
List<? extends IBaseCoding> tagList = theResource.getMeta().getTag();
if (tagList != null) {
for (IBaseCoding next : tagList) {
TagDefinition def = getTagOrNull(
TagDefinition def = cacheTagDefinitionDao.getTagOrNull(
theTransactionDetails,
TagTypeEnum.TAG,
next.getSystem(),
@ -383,7 +361,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
List<? extends IBaseCoding> securityLabels = theResource.getMeta().getSecurity();
if (securityLabels != null) {
for (IBaseCoding next : securityLabels) {
TagDefinition def = getTagOrNull(
TagDefinition def = cacheTagDefinitionDao.getTagOrNull(
theTransactionDetails,
TagTypeEnum.SECURITY_LABEL,
next.getSystem(),
@ -402,7 +380,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
List<? extends IPrimitiveType<String>> profiles = theResource.getMeta().getProfile();
if (profiles != null) {
for (IPrimitiveType<String> next : profiles) {
TagDefinition def = getTagOrNull(
TagDefinition def = cacheTagDefinitionDao.getTagOrNull(
theTransactionDetails, TagTypeEnum.PROFILE, NS_JPA_PROFILE, next.getValue(), null, null, null);
if (def != null) {
ResourceTag tag = theEntity.addTag(def);
@ -422,7 +400,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
if (!def.isStandardType()) {
String profile = def.getResourceProfile("");
if (isNotBlank(profile)) {
TagDefinition profileDef = getTagOrNull(
TagDefinition profileDef = cacheTagDefinitionDao.getTagOrNull(
theTransactionDetails, TagTypeEnum.PROFILE, NS_JPA_PROFILE, profile, null, null, null);
ResourceTag tag = theEntity.addTag(profileDef);
@ -456,164 +434,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
myContext = theContext;
}
/**
* <code>null</code> will only be returned if the scheme and tag are both blank
*/
protected TagDefinition getTagOrNull(
TransactionDetails theTransactionDetails,
TagTypeEnum theTagType,
String theScheme,
String theTerm,
String theLabel,
String theVersion,
Boolean theUserSelected) {
if (isBlank(theScheme) && isBlank(theTerm) && isBlank(theLabel)) {
return null;
}
MemoryCacheService.TagDefinitionCacheKey key =
toTagDefinitionMemoryCacheKey(theTagType, theScheme, theTerm, theVersion, theUserSelected);
TagDefinition retVal = myMemoryCacheService.getIfPresent(MemoryCacheService.CacheEnum.TAG_DEFINITION, key);
if (retVal == null) {
HashMap<MemoryCacheService.TagDefinitionCacheKey, TagDefinition> resolvedTagDefinitions =
theTransactionDetails.getOrCreateUserData(
HapiTransactionService.XACT_USERDATA_KEY_RESOLVED_TAG_DEFINITIONS, HashMap::new);
retVal = resolvedTagDefinitions.get(key);
if (retVal == null) {
// actual DB hit(s) happen here
retVal = getOrCreateTag(theTagType, theScheme, theTerm, theLabel, theVersion, theUserSelected);
TransactionSynchronization sync = new AddTagDefinitionToCacheAfterCommitSynchronization(key, retVal);
TransactionSynchronizationManager.registerSynchronization(sync);
resolvedTagDefinitions.put(key, retVal);
}
}
return retVal;
}
/**
* Gets the tag defined by the fed in values, or saves it if it does not
* exist.
* <p>
* Can also throw an InternalErrorException if something bad happens.
*/
private TagDefinition getOrCreateTag(
TagTypeEnum theTagType,
String theScheme,
String theTerm,
String theLabel,
String theVersion,
Boolean theUserSelected) {
TypedQuery<TagDefinition> q = buildTagQuery(theTagType, theScheme, theTerm, theVersion, theUserSelected);
q.setMaxResults(1);
TransactionTemplate template = new TransactionTemplate(myTransactionManager);
template.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
// this transaction will attempt to get or create the tag,
// repeating (on any failure) 10 times.
// if it fails more than this, we will throw exceptions
TagDefinition retVal;
int count = 0;
HashSet<Throwable> throwables = new HashSet<>();
do {
try {
retVal = template.execute(new TransactionCallback<TagDefinition>() {
// do the actual DB call(s) to read and/or write the values
private TagDefinition readOrCreate() {
TagDefinition val;
try {
val = q.getSingleResult();
} catch (NoResultException e) {
val = new TagDefinition(theTagType, theScheme, theTerm, theLabel);
val.setVersion(theVersion);
val.setUserSelected(theUserSelected);
myEntityManager.persist(val);
}
return val;
}
@Override
public TagDefinition doInTransaction(TransactionStatus status) {
TagDefinition tag = null;
try {
tag = readOrCreate();
} catch (Exception ex) {
// log any exceptions - just in case
// they may be signs of things to come...
ourLog.warn(
"Tag read/write failed: "
+ ex.getMessage() + ". "
+ "This is not a failure on its own, "
+ "but could be useful information in the result of an actual failure.",
ex);
throwables.add(ex);
}
return tag;
}
});
} catch (Exception ex) {
// transaction template can fail if connections to db are exhausted and/or timeout
ourLog.warn(
"Transaction failed with: {}. Transaction will rollback and be reattempted.", ex.getMessage());
retVal = null;
}
count++;
} while (retVal == null && count < TOTAL_TAG_READ_ATTEMPTS);
if (retVal == null) {
// if tag is still null,
// something bad must be happening
// - throw
String msg = throwables.stream().map(Throwable::getMessage).collect(Collectors.joining(", "));
throw new InternalErrorException(Msg.code(2023)
+ "Tag get/create failed after "
+ TOTAL_TAG_READ_ATTEMPTS
+ " attempts with error(s): "
+ msg);
}
return retVal;
}
private TypedQuery<TagDefinition> buildTagQuery(
TagTypeEnum theTagType, String theScheme, String theTerm, String theVersion, Boolean theUserSelected) {
CriteriaBuilder builder = myEntityManager.getCriteriaBuilder();
CriteriaQuery<TagDefinition> cq = builder.createQuery(TagDefinition.class);
Root<TagDefinition> from = cq.from(TagDefinition.class);
List<Predicate> predicates = new ArrayList<>();
predicates.add(builder.and(
builder.equal(from.get("myTagType"), theTagType), builder.equal(from.get("myCode"), theTerm)));
predicates.add(
isBlank(theScheme)
? builder.isNull(from.get("mySystem"))
: builder.equal(from.get("mySystem"), theScheme));
predicates.add(
isBlank(theVersion)
? builder.isNull(from.get("myVersion"))
: builder.equal(from.get("myVersion"), theVersion));
predicates.add(
isNull(theUserSelected)
? builder.isNull(from.get("myUserSelected"))
: builder.equal(from.get("myUserSelected"), theUserSelected));
cq.where(predicates.toArray(new Predicate[0]));
return myEntityManager.createQuery(cq);
}
void incrementId(T theResource, ResourceTable theSavedEntity, IIdType theResourceId) {
if (theResourceId == null || theResourceId.getVersionIdPart() == null) {
theSavedEntity.initializeVersion();
@ -933,7 +753,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
@Override
@CoverageIgnore
public BaseHasResource readEntity(IIdType theValueId, RequestDetails theRequest) {
throw new NotImplementedException(Msg.code(927) + "");
throw new NotImplementedException(Msg.code(927));
}
/**
@ -1839,9 +1659,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
}
@PostConstruct
public void start() {
// nothing yet
}
public void start() {}
@VisibleForTesting
public void setStorageSettingsForUnitTest(JpaStorageSettings theStorageSettings) {
@ -1880,30 +1698,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
myJpaStorageResourceParser = theJpaStorageResourceParser;
}
private class AddTagDefinitionToCacheAfterCommitSynchronization implements TransactionSynchronization {
private final TagDefinition myTagDefinition;
private final MemoryCacheService.TagDefinitionCacheKey myKey;
public AddTagDefinitionToCacheAfterCommitSynchronization(
MemoryCacheService.TagDefinitionCacheKey theKey, TagDefinition theTagDefinition) {
myTagDefinition = theTagDefinition;
myKey = theKey;
}
@Override
public void afterCommit() {
myMemoryCacheService.put(MemoryCacheService.CacheEnum.TAG_DEFINITION, myKey, myTagDefinition);
}
}
@Nonnull
public static MemoryCacheService.TagDefinitionCacheKey toTagDefinitionMemoryCacheKey(
TagTypeEnum theTagType, String theScheme, String theTerm, String theVersion, Boolean theUserSelected) {
return new MemoryCacheService.TagDefinitionCacheKey(
theTagType, theScheme, theTerm, theVersion, theUserSelected);
}
@SuppressWarnings("unchecked")
public static String parseContentTextIntoWords(FhirContext theContext, IBaseResource theResource) {

View File

@ -1043,7 +1043,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
if (!entityHasTag) {
theEntity.setHasTags(true);
TagDefinition def = getTagOrNull(
TagDefinition def = cacheTagDefinitionDao.getTagOrNull(
theTransactionDetails,
nextDef.getTagType(),
nextDef.getSystem(),

View File

@ -0,0 +1,113 @@
package ca.uhn.fhir.jpa.dao;
import ca.uhn.fhir.jpa.dao.data.ITagDefinitionDao;
import ca.uhn.fhir.jpa.model.entity.TagDefinition;
import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
import ca.uhn.fhir.jpa.util.MemoryCacheService;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import jakarta.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.util.HashMap;
import java.util.List;
import static org.apache.commons.lang3.StringUtils.isBlank;
@Repository
public class CacheTagDefinitionDao {
private static final Logger ourLog = LoggerFactory.getLogger(CacheTagDefinitionDao.class);
private final ITagDefinitionDao tagDefinitionDao;
private final MemoryCacheService memoryCacheService;
public CacheTagDefinitionDao(ITagDefinitionDao tagDefinitionDao, MemoryCacheService memoryCacheService) {
this.tagDefinitionDao = tagDefinitionDao;
this.memoryCacheService = memoryCacheService;
}
/**
* Returns a TagDefinition or null if the scheme, term, and label are all blank.
*/
protected TagDefinition getTagOrNull(
TransactionDetails transactionDetails,
TagTypeEnum tagType,
String scheme,
String term,
String label,
String version,
Boolean userSelected) {
if (isBlank(scheme) && isBlank(term) && isBlank(label)) {
return null;
}
MemoryCacheService.TagDefinitionCacheKey key =
toTagDefinitionMemoryCacheKey(tagType, scheme, term, version, userSelected);
TagDefinition tagDefinition = memoryCacheService.getIfPresent(MemoryCacheService.CacheEnum.TAG_DEFINITION, key);
if (tagDefinition == null) {
HashMap<MemoryCacheService.TagDefinitionCacheKey, TagDefinition> resolvedTagDefinitions =
transactionDetails.getOrCreateUserData("resolvedTagDefinitions", HashMap::new);
tagDefinition = resolvedTagDefinitions.get(key);
if (tagDefinition == null) {
tagDefinition = getOrCreateTag(tagType, scheme, term, label, version, userSelected);
TransactionSynchronization sync =
new AddTagDefinitionToCacheAfterCommitSynchronization(key, tagDefinition);
TransactionSynchronizationManager.registerSynchronization(sync);
resolvedTagDefinitions.put(key, tagDefinition);
}
}
return tagDefinition;
}
/**
* Gets or creates a TagDefinition entity.
*/
private TagDefinition getOrCreateTag(
TagTypeEnum tagType, String scheme, String term, String label, String version, Boolean userSelected) {
List<TagDefinition> result = tagDefinitionDao.findByTagTypeAndSchemeAndTermAndVersionAndUserSelected(
tagType, scheme, term, version, userSelected, Pageable.ofSize(1));
if (!result.isEmpty()) {
return result.get(0);
} else {
// Create a new TagDefinition if no result is found
TagDefinition newTag = new TagDefinition(tagType, scheme, term, label);
newTag.setVersion(version);
newTag.setUserSelected(userSelected);
return tagDefinitionDao.save(newTag);
}
}
@Nonnull
private static MemoryCacheService.TagDefinitionCacheKey toTagDefinitionMemoryCacheKey(
TagTypeEnum tagType, String scheme, String term, String version, Boolean userSelected) {
return new MemoryCacheService.TagDefinitionCacheKey(tagType, scheme, term, version, userSelected);
}
private class AddTagDefinitionToCacheAfterCommitSynchronization implements TransactionSynchronization {
private final TagDefinition tagDefinition;
private final MemoryCacheService.TagDefinitionCacheKey key;
public AddTagDefinitionToCacheAfterCommitSynchronization(
MemoryCacheService.TagDefinitionCacheKey key, TagDefinition tagDefinition) {
this.tagDefinition = tagDefinition;
this.key = key;
}
@Override
public void afterCommit() {
memoryCacheService.put(MemoryCacheService.CacheEnum.TAG_DEFINITION, key, tagDefinition);
}
}
}

View File

@ -20,8 +20,25 @@
package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.jpa.model.entity.TagDefinition;
import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.util.List;
public interface ITagDefinitionDao extends JpaRepository<TagDefinition, Long>, IHapiFhirJpaRepository {
// nothing
@Query("SELECT t FROM TagDefinition t WHERE " + "t.myTagType = :tagType AND "
+ "( :scheme IS NULL OR :scheme = '' OR t.mySystem = :scheme ) AND "
+ "t.myCode = :term AND "
+ "( :version IS NULL OR :version = '' OR t.myVersion = :version ) AND "
+ "( :userSelected IS NULL OR t.myUserSelected = :userSelected )")
List<TagDefinition> findByTagTypeAndSchemeAndTermAndVersionAndUserSelected(
@Param("tagType") TagTypeEnum tagType,
@Param("scheme") String scheme,
@Param("term") String term,
@Param("version") String version,
@Param("userSelected") Boolean userSelected,
Pageable pageable);
}

View File

@ -1,396 +0,0 @@
package ca.uhn.fhir.jpa.dao;
import static org.junit.jupiter.api.Assertions.assertEquals;
import ca.uhn.fhir.jpa.model.entity.TagDefinition;
import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
import ca.uhn.fhir.jpa.util.MemoryCacheService;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.util.AsyncUtil;
import ca.uhn.fhir.util.MetaUtil;
import ca.uhn.fhir.util.ThreadPoolUtil;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import jakarta.annotation.Nullable;
import jakarta.persistence.EntityExistsException;
import jakarta.persistence.EntityManager;
import jakarta.persistence.NoResultException;
import jakarta.persistence.TypedQuery;
import jakarta.persistence.criteria.CriteriaBuilder;
import jakarta.persistence.criteria.CriteriaQuery;
import jakarta.persistence.criteria.Path;
import jakarta.persistence.criteria.Predicate;
import jakarta.persistence.criteria.Root;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class BaseHapiFhirDaoTest {
private static class TestDao extends BaseHapiFhirResourceDao<Patient> {
@Nullable
@Override
public String getResourceName() {
return "Patient";
}
@Override
protected TagDefinition getTagOrNull(TransactionDetails theDetails,
TagTypeEnum theEnum,
String theScheme,
String theTerm,
String theLabel,
String theVersion,
Boolean theUserSelected ) {
// we need to init synchronization due to what
// the underlying class is doing
try {
TransactionSynchronizationManager.initSynchronization();
return super.getTagOrNull(theDetails, theEnum, theScheme, theTerm, theLabel, theVersion, theUserSelected);
} finally {
TransactionSynchronizationManager.clearSynchronization();
}
}
}
private Logger ourLogger;
@Mock
private Appender<ILoggingEvent> myAppender;
@Mock
private MemoryCacheService myMemoryCacheService;
@Mock
private EntityManager myEntityManager;
@Mock
private PlatformTransactionManager myTransactionManager;
@InjectMocks
private TestDao myTestDao;
@BeforeEach
public void init() {
ourLogger = (Logger) LoggerFactory.getLogger(BaseHapiFhirDao.class);
ourLogger.addAppender(myAppender);
}
@AfterEach
public void end() {
ourLogger.detachAppender(myAppender);
}
/**
* Returns a mocked criteria builder
* @return
*/
private CriteriaBuilder getMockedCriteriaBuilder() {
Predicate pred = mock(Predicate.class);
CriteriaBuilder builder = mock(CriteriaBuilder.class);
// lenient due to multiple equal calls with different inputs
lenient().when(builder.equal(any(), any()))
.thenReturn(pred);
return builder;
}
/**
* Returns a mocked from
* @return
*/
private Root<TagDefinition> getMockedFrom() {
Path path = mock(Path.class);
Root<TagDefinition> from = mock(Root.class);
// lenient due to multiple get calls with different inputs
lenient().when(from.get(anyString()))
.thenReturn(path);
return from;
}
@Test
public void getTagOrNull_raceCondition_wontUpsertDuplicates() throws InterruptedException, ExecutionException {
/*
* We use this boolean to fake a race condition.
* Boolean is used for two reasons:
* 1) We don't want an unstable test (so a fake
* race condition will ensure the test always executes
* exactly as expected... but this just ensures the code
* is not buggy, not that race conditions are actually handled)
* 2) We want the ability (and confidence!) to know
* that a _real_ race condition can be handled. Setting
* this boolean false (and potentially tweaking thread count)
* gives us this confidence.
*
* Set this false to try with a real race condition
*/
boolean fakeRaceCondition = true;
// the more threads, the more likely we
// are to see race conditions.
// We need a lot to ensure at least 2 threads
// are in the create method at the same time
int threads = fakeRaceCondition ? 2 : 30;
// setup
TagTypeEnum tagType = TagTypeEnum.TAG;
String scheme = "http://localhost";
String term = "code123";
String label = "hollow world";
String version = "v1.0";
Boolean userSelected = true;
String raceConditionError = "Entity exists; if this is logged, you have race condition issues!";
TagDefinition tagDefinition = new TagDefinition(tagType, scheme, term, label);
tagDefinition.setVersion(version);
tagDefinition.setUserSelected(userSelected);
// mock objects
CriteriaBuilder builder = getMockedCriteriaBuilder();
TypedQuery<TagDefinition> query = mock(TypedQuery.class);
CriteriaQuery<TagDefinition> cq = mock(CriteriaQuery.class);
Root<TagDefinition> from = getMockedFrom();
// when
when(myEntityManager.getCriteriaBuilder())
.thenReturn(builder);
when(builder.createQuery(any(Class.class)))
.thenReturn(cq);
when(cq.from(any(Class.class)))
.thenReturn(from);
when(myEntityManager.createQuery(any(CriteriaQuery.class)))
.thenReturn(query);
AtomicBoolean atomicBoolean = new AtomicBoolean(false);
AtomicInteger getSingleResultInt = new AtomicInteger();
when(query.getSingleResult())
.thenAnswer(new Answer<TagDefinition>() {
private final AtomicInteger count = new AtomicInteger();
@Override
public TagDefinition answer(InvocationOnMock invocationOnMock) throws Throwable {
getSingleResultInt.incrementAndGet();
if (fakeRaceCondition) {
// fake
// ensure the first 2 accesses throw to
// help fake a race condition (2, or possibly the same,
// thread failing to access the resource)
if (count.get() < 2) {
count.incrementAndGet();
throw new NoResultException();
}
}
else {
// real
if (!atomicBoolean.get()) {
throw new NoResultException();
}
}
return tagDefinition;
}
});
AtomicInteger persistInt = new AtomicInteger();
doAnswer(new Answer() {
private final AtomicInteger count = new AtomicInteger();
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
persistInt.incrementAndGet();
if (fakeRaceCondition) {
// fake
if (count.get() < 1) {
count.incrementAndGet();
return null;
}
else {
throw new EntityExistsException(raceConditionError);
}
}
else {
// real
if (!atomicBoolean.getAndSet(true)) {
// first thread gets null...
return null;
} else {
// all other threads get entity exists exception
throw new EntityExistsException(raceConditionError);
}
}
}
}).when(myEntityManager).persist(any(Object.class));
ourLogger.setLevel(Level.WARN);
// test
// ExecutorService service = Executors.newFixedThreadPool(threads);
ConcurrentHashMap<Integer, TagDefinition> outcomes = new ConcurrentHashMap<>();
ConcurrentHashMap<Integer, Throwable> errors = new ConcurrentHashMap<>();
ThreadPoolTaskExecutor executor = ThreadPoolUtil.newThreadPool(threads, threads * 2, "test-");
AtomicInteger counter = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(threads);
Runnable task = () -> {
latch.countDown();
try {
TagDefinition retTag = myTestDao.getTagOrNull(new TransactionDetails(), tagType, scheme, term, label, version, userSelected);
outcomes.put(retTag.hashCode(), retTag);
counter.incrementAndGet();
} catch (Exception ex) {
errors.put(ex.hashCode(), ex);
}
};
ArrayList<Future> futures = new ArrayList<>();
for (int i = 0; i < threads; i++) {
futures.add(executor.submit(task));
}
for (Future f : futures) {
f.get();
}
AsyncUtil.awaitLatchAndIgnoreInterrupt(latch, (long) threads, TimeUnit.SECONDS);
// try {
// ArrayList<Future> futures = new ArrayList<>();
// for (int i = 0; i < threads; i++) {
// futures.add(service.submit(task));
// }
// for (Future f : futures) {
// f.get();
// }
// } finally {
// service.shutdown();
// }
// // should not take a second per thread.
// // but will take some time, due to the delays above.
// // a second per thread seems like a good threshold.
// Assertions.assertTrue(
// service.awaitTermination(threads, TimeUnit.SECONDS)
// );
assertThat(getSingleResultInt.get()).as("Not enough gets " + getSingleResultInt.get()).isEqualTo(threads + 1);
assertThat(persistInt.get()).as("Not enough persists " + persistInt.get()).isEqualTo(threads);
// verify
assertThat(outcomes).hasSize(1);
assertEquals(threads, counter.get());
assertThat(errors.size()).as(errors.values().stream().map(Throwable::getMessage)
.collect(Collectors.joining(", "))).isEqualTo(0);
// verify we logged some race conditions
ArgumentCaptor<ILoggingEvent> captor = ArgumentCaptor.forClass(ILoggingEvent.class);
verify(myAppender, Mockito.atLeastOnce())
.doAppend(captor.capture());
assertThat(captor.getAllValues().get(0).getMessage()).contains(raceConditionError);
}
@Test
public void getTagOrNull_failingForever_throwsInternalErrorAndLogsWarnings() {
// setup
TagTypeEnum tagType = TagTypeEnum.TAG;
String scheme = "http://localhost";
String term = "code123";
String label = "hollow world";
String version = "v1.0";
Boolean userSelected = true;
TransactionDetails transactionDetails = new TransactionDetails();
String exMsg = "Hi there";
String readError = "No read for you";
ourLogger.setLevel(Level.WARN);
// mock objects
CriteriaBuilder builder = getMockedCriteriaBuilder();
TypedQuery<TagDefinition> query = mock(TypedQuery.class);
CriteriaQuery<TagDefinition> cq = mock(CriteriaQuery.class);
Root<TagDefinition> from = getMockedFrom();
// when
when(myEntityManager.getCriteriaBuilder())
.thenReturn(builder);
when(builder.createQuery(any(Class.class)))
.thenReturn(cq);
when(cq.from(any(Class.class)))
.thenReturn(from);
when(myEntityManager.createQuery(any(CriteriaQuery.class)))
.thenReturn(query);
when(query.getSingleResult())
.thenThrow(new NoResultException(readError));
doThrow(new RuntimeException(exMsg))
.when(myEntityManager).persist(any(Object.class));
// test
try {
myTestDao.getTagOrNull(transactionDetails, tagType, scheme, term, label, version, userSelected);
fail();
} catch (Exception ex) {
// verify
assertThat(ex.getMessage()).contains("Tag get/create failed after 10 attempts with error(s): " + exMsg);
ArgumentCaptor<ILoggingEvent> appenderCaptor = ArgumentCaptor.forClass(ILoggingEvent.class);
verify(myAppender, Mockito.times(10))
.doAppend(appenderCaptor.capture());
List<ILoggingEvent> events = appenderCaptor.getAllValues();
assertThat(events).hasSize(10);
for (int i = 0; i < 10; i++) {
String actualMsg = events.get(i).getMessage();
assertThat(actualMsg).isEqualTo("Tag read/write failed: "
+ exMsg
+ ". "
+ "This is not a failure on its own, "
+ "but could be useful information in the result of an actual failure.");
}
}
}
////////// Static access tests
@Test
public void cleanProvenanceSourceUri() {
assertEquals("", MetaUtil.cleanProvenanceSourceUriOrEmpty(null));
assertEquals("abc", MetaUtil.cleanProvenanceSourceUriOrEmpty("abc"));
assertEquals("abc", MetaUtil.cleanProvenanceSourceUriOrEmpty("abc#"));
assertEquals("abc", MetaUtil.cleanProvenanceSourceUriOrEmpty("abc#def"));
assertEquals("abc", MetaUtil.cleanProvenanceSourceUriOrEmpty("abc#def#ghi"));
}
}

View File

@ -0,0 +1,151 @@
package ca.uhn.fhir.jpa.dao;
import static org.junit.jupiter.api.Assertions.assertEquals;
import ca.uhn.fhir.jpa.dao.data.ITagDefinitionDao;
import ca.uhn.fhir.jpa.model.entity.TagDefinition;
import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
import ca.uhn.fhir.jpa.util.MemoryCacheService;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.util.MetaUtil;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Pageable;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
public class CacheTagDefinitionDaoTest {
private static class TestDao extends CacheTagDefinitionDao {
public TestDao(ITagDefinitionDao theTagDefinitionDao, MemoryCacheService theMemoryCacheService) {
super(theTagDefinitionDao, theMemoryCacheService);
}
@Override
protected TagDefinition getTagOrNull(TransactionDetails theDetails,
TagTypeEnum theEnum,
String theScheme,
String theTerm,
String theLabel,
String theVersion,
Boolean theUserSelected) {
try {
TransactionSynchronizationManager.initSynchronization();
return super.getTagOrNull(theDetails, theEnum, theScheme, theTerm, theLabel, theVersion, theUserSelected);
} finally {
TransactionSynchronizationManager.clearSynchronization();
}
}
}
@Mock
private MemoryCacheService myMemoryCacheService;
@Mock
private ITagDefinitionDao tagDefinitionDao;
@InjectMocks
private TestDao myTestDao;
@BeforeEach
public void init() {
LoggerFactory.getLogger(BaseHapiFhirDao.class);
}
@AfterEach
public void tearDown() {
// Cleanup logic if needed
}
@Test
public void testGetTagOrNull_createsTag_ifNotFound() {
// Arrange
TagTypeEnum tagType = TagTypeEnum.TAG;
String scheme = "http://localhost";
String term = "code123";
String label = "example label";
String version = "v1.0";
Boolean userSelected = true;
TagDefinition newTag = new TagDefinition(tagType, scheme, term, label);
newTag.setVersion(version);
newTag.setUserSelected(userSelected);
when(tagDefinitionDao.findByTagTypeAndSchemeAndTermAndVersionAndUserSelected(
eq(tagType), eq(scheme), eq(term), eq(version), eq(userSelected), any(Pageable.class)))
.thenReturn(List.of());
when(tagDefinitionDao.save(any(TagDefinition.class))).thenReturn(newTag);
// Act
TagDefinition result = myTestDao.getTagOrNull(new TransactionDetails(), tagType, scheme, term, label, version, userSelected);
// Assert
assertEquals(newTag, result);
verify(tagDefinitionDao).save(any(TagDefinition.class));
}
@Test
public void testSimultaneousTagCreation_createsMultipleTags() throws InterruptedException, ExecutionException {
int threadCount = 10;
TagTypeEnum tagType = TagTypeEnum.TAG;
String scheme = "http://localhost";
String term = "code123";
String label = "example label";
String version = "v1.0";
Boolean userSelected = true;
TagDefinition expectedTag = new TagDefinition(tagType, scheme, term, label);
expectedTag.setVersion(version);
expectedTag.setUserSelected(userSelected);
when(tagDefinitionDao.findByTagTypeAndSchemeAndTermAndVersionAndUserSelected(
eq(tagType), eq(scheme), eq(term), eq(version), eq(userSelected), any(Pageable.class)))
.thenReturn(List.of());
when(tagDefinitionDao.save(any(TagDefinition.class))).thenReturn(expectedTag);
// Run the test with multiple threads
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
try {
List<Future<TagDefinition>> futures = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
futures.add(executor.submit(() -> myTestDao.getTagOrNull(new TransactionDetails(), tagType, scheme, term, label, version, userSelected)));
}
// Check results
for (Future<TagDefinition> future : futures) {
future.get();
}
verify(tagDefinitionDao, times(threadCount)).save(any(TagDefinition.class)); // multiple tags allowed
} finally {
executor.shutdown();
}
}
////////// Static access tests
@Test
public void cleanProvenanceSourceUri() {
assertEquals("", MetaUtil.cleanProvenanceSourceUriOrEmpty(null));
assertEquals("abc", MetaUtil.cleanProvenanceSourceUriOrEmpty("abc"));
assertEquals("abc", MetaUtil.cleanProvenanceSourceUriOrEmpty("abc#"));
assertEquals("abc", MetaUtil.cleanProvenanceSourceUriOrEmpty("abc#def"));
assertEquals("abc", MetaUtil.cleanProvenanceSourceUriOrEmpty("abc#def#ghi"));
}
}

View File

@ -2010,7 +2010,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
outcome = mySystemDao.transaction(mySrd, input.get());
ourLog.debug("Resp: {}", myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(outcome));
myCaptureQueriesListener.logSelectQueries();
assertEquals(7, myCaptureQueriesListener.countSelectQueries());
assertEquals(6, myCaptureQueriesListener.countSelectQueries());
myCaptureQueriesListener.logInsertQueries();
assertEquals(7, myCaptureQueriesListener.countInsertQueries());
myCaptureQueriesListener.logUpdateQueries();
@ -3075,7 +3075,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
assertEquals(6189, myCaptureQueriesListener.countInsertQueriesForCurrentThread());
assertEquals(418, myCaptureQueriesListener.countUpdateQueriesForCurrentThread());
assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread());
assertEquals(2, myCaptureQueriesListener.countCommits());
assertEquals(1, myCaptureQueriesListener.countCommits());
assertEquals(0, myCaptureQueriesListener.countRollbacks());
assertThat(output.getEntry()).hasSize(input.getEntry().size());

View File

@ -2974,7 +2974,7 @@ public class PartitioningSqlR4Test extends BasePartitioningR4Test {
assertEquals(6189, myCaptureQueriesListener.countInsertQueriesForCurrentThread());
assertEquals(418, myCaptureQueriesListener.countUpdateQueriesForCurrentThread());
assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread());
assertEquals(2, myCaptureQueriesListener.countCommits());
assertEquals(1, myCaptureQueriesListener.countCommits());
assertEquals(0, myCaptureQueriesListener.countRollbacks());
assertThat(output.getEntry()).hasSize(input.getEntry().size());

View File

@ -578,7 +578,7 @@ public class HapiTransactionService implements IHapiTransactionService {
}
/**
* Returns true if we alreadyt have an active transaction associated with the current thread, AND
* Returns true if we already have an active transaction associated with the current thread, AND
* either it's non-read-only or we only need a read-only transaction, AND
* the newly requested transaction has a propagation of REQUIRED
*/