Search paging timeout with 1500 < n < 2000 results (#1221)

* Avoid paging links when offset is artificially set very high

* Ongoing fixes

* Adding logging and clean up transaction processing

* Add changelog

* More work on queries

* Keep working on getting tests passing

* More test fixes

* More work on tests

* More test works

* One more test attempt

* Ongoing fixes

* One more change

* FIxes to search algorithm

* Add changelog

* Clean up for review

* Fixed NPE in token matcher when system is not null and value is null

* test opposite (null system not null value)

* added null protection to all matchers

* Address review comments

* Address broken test

* Trying to resolve DB ordering issues
This commit is contained in:
James Agnew 2019-03-06 05:45:05 -05:00 committed by GitHub
parent 4138832071
commit 9bc0f6784d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 568 additions and 150 deletions

View File

@ -77,6 +77,14 @@ public class TokenOrListParam extends BaseOrListParam<TokenOrListParam, TokenPar
return this;
}
/**
* Add a new token to this list
*/
public TokenOrListParam add(String theValue) {
add(new TokenParam(null, theValue));
return this;
}
public List<BaseCodingDt> getListAsCodings() {
ArrayList<BaseCodingDt> retVal = new ArrayList<BaseCodingDt>();
for (TokenParam next : getValuesAsQueryTokens()) {

View File

@ -63,6 +63,7 @@ ca.uhn.fhir.validation.ValidationResult.noIssuesDetected=No issues detected duri
ca.uhn.fhir.jpa.config.HapiFhirHibernateJpaDialect.resourceVersionConstraintFailure=The operation has failed with a version constraint failure. This generally means that two clients/threads were trying to update the same resource at the same time, and this request was chosen as the failing request.
ca.uhn.fhir.jpa.config.HapiFhirHibernateJpaDialect.resourceIndexedCompositeStringUniqueConstraintFailure=The operation has failed with a unique index constraint failure. This probably means that the operation was trying to create/update a resource that would have resulted in a duplicate value for a unique index.
ca.uhn.fhir.jpa.config.HapiFhirHibernateJpaDialect.forcedIdConstraintFailure=The operation has failed with a client-assigned ID constraint failure. This typically means that multiple client threads are trying to create a new resource with the same client-assigned ID at the same time, and this thread was chosen to be rejected.
ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.incomingNoopInTransaction=Transaction contains resource with operation NOOP. This is only valid as a response operation, not in a request
ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.invalidMatchUrlInvalidResourceType=Invalid match URL "{0}" - Unknown resource type: "{1}"

View File

@ -20,6 +20,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.*;
import org.springframework.core.env.Environment;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.dao.annotation.PersistenceExceptionTranslationPostProcessor;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.HibernateJpaDialect;
@ -158,13 +159,24 @@ public abstract class BaseConfig implements SchedulingConfigurer {
return new CompositeInMemoryDaoSubscriptionMatcher(daoSubscriptionMatcher(), inMemorySubscriptionMatcher());
}
@Bean
public HapiFhirHibernateJpaDialect hibernateJpaDialect() {
return new HapiFhirHibernateJpaDialect(fhirContext().getLocalizer());
}
@Bean
public PersistenceExceptionTranslationPostProcessor persistenceExceptionTranslationPostProcessor() {
return new PersistenceExceptionTranslationPostProcessor();
}
public static void configureEntityManagerFactory(LocalContainerEntityManagerFactoryBean theFactory, FhirContext theCtx) {
theFactory.setJpaDialect(hibernateJpaDialect(theCtx.getLocalizer()));
theFactory.setPackagesToScan("ca.uhn.fhir.jpa.model.entity", "ca.uhn.fhir.jpa.entity");
theFactory.setPersistenceProvider(new HibernatePersistenceProvider());
}
private static HibernateJpaDialect hibernateJpaDialect(HapiLocalizer theLocalizer) {
private static HapiFhirHibernateJpaDialect hibernateJpaDialect(HapiLocalizer theLocalizer) {
return new HapiFhirHibernateJpaDialect(theLocalizer);
}
}

View File

@ -21,20 +21,26 @@ package ca.uhn.fhir.jpa.config;
*/
import ca.uhn.fhir.i18n.HapiLocalizer;
import ca.uhn.fhir.jpa.model.entity.ForcedId;
import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedCompositeStringUnique;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import org.hibernate.HibernateException;
import org.hibernate.exception.ConstraintViolationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.mapping.PreferredConstructor;
import org.springframework.orm.ObjectOptimisticLockingFailureException;
import org.springframework.orm.jpa.vendor.HibernateJpaDialect;
import javax.persistence.PersistenceException;
import static org.apache.commons.lang3.StringUtils.defaultString;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class HapiFhirHibernateJpaDialect extends HibernateJpaDialect {
private static final Logger ourLog = LoggerFactory.getLogger(HapiFhirHibernateJpaDialect.class);
private HapiLocalizer myLocalizer;
/**
@ -44,17 +50,40 @@ public class HapiFhirHibernateJpaDialect extends HibernateJpaDialect {
myLocalizer = theLocalizer;
}
public RuntimeException translate(PersistenceException theException, String theMessageToPrepend) {
if (theException.getCause() instanceof HibernateException) {
return new PersistenceException(convertHibernateAccessException((HibernateException) theException.getCause(), theMessageToPrepend));
}
return theException;
}
@Override
protected DataAccessException convertHibernateAccessException(HibernateException theException) {
return convertHibernateAccessException(theException, null);
}
private DataAccessException convertHibernateAccessException(HibernateException theException, String theMessageToPrepend) {
String messageToPrepend = "";
if (isNotBlank(theMessageToPrepend)) {
messageToPrepend = theMessageToPrepend + " - ";
}
if (theException.toString().contains("Batch update")) {
theException.toString();
}
// <editor-fold desc="I HATE YOU">
if (theException instanceof ConstraintViolationException) {
String constraintName = ((ConstraintViolationException) theException).getConstraintName();
switch (defaultString(constraintName)) {
case ResourceHistoryTable.IDX_RESVER_ID_VER:
throw new ResourceVersionConflictException(myLocalizer.getMessage(HapiFhirHibernateJpaDialect.class, "resourceVersionConstraintFailure"));
throw new ResourceVersionConflictException(messageToPrepend + myLocalizer.getMessage(HapiFhirHibernateJpaDialect.class, "resourceVersionConstraintFailure"));
case ResourceIndexedCompositeStringUnique.IDX_IDXCMPSTRUNIQ_STRING:
throw new ResourceVersionConflictException(myLocalizer.getMessage(HapiFhirHibernateJpaDialect.class, "resourceIndexedCompositeStringUniqueConstraintFailure"));
throw new ResourceVersionConflictException(messageToPrepend + myLocalizer.getMessage(HapiFhirHibernateJpaDialect.class, "resourceIndexedCompositeStringUniqueConstraintFailure"));
case ForcedId.IDX_FORCEDID_TYPE_FID:
throw new ResourceVersionConflictException(messageToPrepend + myLocalizer.getMessage(HapiFhirHibernateJpaDialect.class, "forcedIdConstraintFailure"));
}
}
// </editor-fold>
return super.convertHibernateAccessException(theException);
}

View File

@ -919,7 +919,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
@Override
public T read(IIdType theId, RequestDetails theRequestDetails, boolean theDeletedOk) {
validateResourceTypeAndThrowIllegalArgumentException(theId);
validateResourceTypeAndThrowInvalidRequestException(theId);
// Notify interceptors
if (theRequestDetails != null) {
@ -956,7 +956,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
@Override
public BaseHasResource readEntity(IIdType theId, boolean theCheckForForcedId) {
validateResourceTypeAndThrowIllegalArgumentException(theId);
validateResourceTypeAndThrowInvalidRequestException(theId);
Long pid = myIdHelperService.translateForcedIdToPid(getResourceName(), theId.getIdPart());
BaseHasResource entity = myEntityManager.find(ResourceTable.class, pid);
@ -1427,9 +1427,10 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
validateResourceType(entity, myResourceName);
}
private void validateResourceTypeAndThrowIllegalArgumentException(IIdType theId) {
private void validateResourceTypeAndThrowInvalidRequestException(IIdType theId) {
if (theId.hasResourceType() && !theId.getResourceType().equals(myResourceName)) {
throw new IllegalArgumentException("Incorrect resource type (" + theId.getResourceType() + ") for this DAO, wanted: " + myResourceName);
// Note- Throw a HAPI FHIR exception here so that hibernate doesn't try to translate it into a database exception
throw new InvalidRequestException("Incorrect resource type (" + theId.getResourceType() + ") for this DAO, wanted: " + myResourceName);
}
}

View File

@ -108,7 +108,11 @@ public class SearchBuilder implements ISearchBuilder {
private static final List<Long> EMPTY_LONG_LIST = Collections.unmodifiableList(new ArrayList<>());
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchBuilder.class);
private static final int maxLoad = 800;
/**
* @see #loadResourcesByPid(Collection, List, Set, boolean, EntityManager, FhirContext, IDao)
* for an explanation of why we use the constant 800
*/
private static final int MAXIMUM_PAGE_SIZE = 800;
private static Long NO_MORE = -1L;
private static HandlerTypeEnum ourLastHandlerMechanismForUnitTest;
private static SearchParameterMap ourLastHandlerParamsForUnitTest;
@ -1998,8 +2002,8 @@ public class SearchBuilder implements ISearchBuilder {
* but this should work too. Sigh.
*/
List<Long> pids = new ArrayList<>(theIncludePids);
for (int i = 0; i < pids.size(); i += maxLoad) {
int to = i + maxLoad;
for (int i = 0; i < pids.size(); i += MAXIMUM_PAGE_SIZE) {
int to = i + MAXIMUM_PAGE_SIZE;
to = Math.min(to, pids.size());
List<Long> pidsSubList = pids.subList(i, to);
doLoadPids(theResourceListToPopulate, theIncludedPids, theForHistoryOperation, entityManager, context, theDao, position, pidsSubList);
@ -2046,7 +2050,7 @@ public class SearchBuilder implements ISearchBuilder {
if (matchAll) {
String sql;
sql = "SELECT r FROM ResourceLink r WHERE r." + searchFieldName + " IN (:target_pids) ";
List<Collection<Long>> partitions = partition(nextRoundMatches, maxLoad);
List<Collection<Long>> partitions = partition(nextRoundMatches, MAXIMUM_PAGE_SIZE);
for (Collection<Long> nextPartition : partitions) {
TypedQuery<ResourceLink> q = theEntityManager.createQuery(sql, ResourceLink.class);
q.setParameter("target_pids", nextPartition);
@ -2099,7 +2103,7 @@ public class SearchBuilder implements ISearchBuilder {
sql = "SELECT r FROM ResourceLink r WHERE r.mySourcePath = :src_path AND r." + searchFieldName + " IN (:target_pids)";
}
List<Collection<Long>> partitions = partition(nextRoundMatches, maxLoad);
List<Collection<Long>> partitions = partition(nextRoundMatches, MAXIMUM_PAGE_SIZE);
for (Collection<Long> nextPartition : partitions) {
TypedQuery<ResourceLink> q = theEntityManager.createQuery(sql, ResourceLink.class);
q.setParameter("src_path", nextPath);
@ -2331,16 +2335,16 @@ public class SearchBuilder implements ISearchBuilder {
List<String> path = param.getPathsSplit();
/*
* SearchParameters can declare paths on multiple resources
* SearchParameters can declare paths on multiple resource
* types. Here we only want the ones that actually apply.
*/
path = new ArrayList<>(path);
for (int i = 0; i < path.size(); i++) {
String nextPath = trim(path.get(i));
ListIterator<String> iter = path.listIterator();
while (iter.hasNext()) {
String nextPath = trim(iter.next());
if (!nextPath.contains(theResourceType + ".")) {
path.remove(i);
i--;
iter.remove();
}
}
@ -2456,6 +2460,7 @@ public class SearchBuilder implements ISearchBuilder {
if (myMaxResultsToFetch == null) {
myMaxResultsToFetch = myDaoConfig.getFetchSizeDefaultMaximum();
}
final TypedQuery<Long> query = createQuery(mySort, myMaxResultsToFetch, false);
Query<Long> hibernateQuery = (Query<Long>) query;
@ -2478,8 +2483,6 @@ public class SearchBuilder implements ISearchBuilder {
if (myPidSet.add(next)) {
myNext = next;
break;
} else {
mySkipCount++;
}
}
}
@ -2509,8 +2512,6 @@ public class SearchBuilder implements ISearchBuilder {
if (myPidSet.add(next)) {
myNext = next;
break;
} else {
mySkipCount++;
}
}
if (myNext == null) {
@ -2555,6 +2556,7 @@ public class SearchBuilder implements ISearchBuilder {
public int getSkippedCount() {
return mySkipCount;
}
}
private class UniqueIndexIterator implements IResultIterator {
@ -2596,6 +2598,7 @@ public class SearchBuilder implements ISearchBuilder {
public int getSkippedCount() {
return 0;
}
}
private static class CountQueryIterator implements Iterator<Long> {

View File

@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.dao;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.jpa.config.HapiFhirHibernateJpaDialect;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.provider.ServletSubRequestDetails;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
@ -49,6 +50,7 @@ import com.google.common.collect.ArrayListMultimap;
import org.apache.commons.lang3.Validate;
import org.apache.http.NameValuePair;
import org.hibernate.Session;
import org.hibernate.exception.ConstraintViolationException;
import org.hibernate.internal.SessionImpl;
import org.hl7.fhir.dstu3.model.Bundle;
import org.hl7.fhir.exceptions.FHIRException;
@ -64,7 +66,9 @@ import org.springframework.transaction.support.TransactionTemplate;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.PersistenceContextType;
import javax.persistence.PersistenceException;
import java.util.*;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.*;
@ -85,6 +89,8 @@ public class TransactionProcessor<BUNDLE extends IBaseBundle, BUNDLEENTRY> {
private MatchUrlService myMatchUrlService;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired(required = false)
private HapiFhirHibernateJpaDialect myHapiFhirHibernateJpaDialect;
public BUNDLE transaction(RequestDetails theRequestDetails, BUNDLE theRequest) {
if (theRequestDetails != null) {
@ -576,8 +582,8 @@ public class TransactionProcessor<BUNDLE extends IBaseBundle, BUNDLEENTRY> {
*/
for (int i = 0; i < theEntries.size(); i++) {
if (i % 100 == 0) {
ourLog.debug("Processed {} non-GET entries out of {}", i, theEntries.size());
if (i % 250 == 0) {
ourLog.info("Processed {} non-GET entries out of {} in transaction", i, theEntries.size());
}
BUNDLEENTRY nextReqEntry = theEntries.get(i);
@ -749,7 +755,13 @@ public class TransactionProcessor<BUNDLE extends IBaseBundle, BUNDLEENTRY> {
FhirTerser terser = myContext.newTerser();
theTransactionStopWatch.startTask("Index " + theIdToPersistedOutcome.size() + " resources");
int i = 0;
for (DaoMethodOutcome nextOutcome : theIdToPersistedOutcome.values()) {
if (i++ % 250 == 0) {
ourLog.info("Have indexed {} entities out of {} in transaction", i, theIdToPersistedOutcome.values().size());
}
IBaseResource nextResource = nextOutcome.getResource();
if (nextResource == null) {
continue;
@ -803,7 +815,16 @@ public class TransactionProcessor<BUNDLE extends IBaseBundle, BUNDLEENTRY> {
theTransactionStopWatch.endCurrentTask();
theTransactionStopWatch.startTask("Flush writes to database");
try {
flushJpaSession();
} catch (PersistenceException e) {
if (myHapiFhirHibernateJpaDialect != null) {
List<String> types = theIdToPersistedOutcome.keySet().stream().filter(t -> t != null).map(t -> t.getResourceType()).collect(Collectors.toList());
String message = "Error flushing transaction with resource types: " + types;
throw myHapiFhirHibernateJpaDialect.translate(e, message);
}
throw e;
}
theTransactionStopWatch.endCurrentTask();
if (conditionalRequestUrls.size() > 0) {

View File

@ -33,6 +33,6 @@ import ca.uhn.fhir.jpa.model.entity.SearchParamPresent;
public interface ISearchParamPresentDao extends JpaRepository<SearchParamPresent, Long> {
@Query("SELECT s FROM SearchParamPresent s WHERE s.myResource = :res")
public Collection<SearchParamPresent> findAllForResource(@Param("res") ResourceTable theResource);
Collection<SearchParamPresent> findAllForResource(@Param("res") ResourceTable theResource);
}

View File

@ -61,6 +61,7 @@ public class JpaStorageServices extends BaseHapiFhirDao<IBaseResource> implement
IFhirResourceDao<? extends IBaseResource> dao = getDao(typeDef.getImplementingClass());
SearchParameterMap params = new SearchParameterMap();
params.setLoadSynchronousUpTo(500);
for (Argument nextArgument : theSearchParams) {

View File

@ -201,16 +201,21 @@ public class PersistedJpaBundleProvider implements IBundleProvider {
switch (mySearchEntity.getSearchType()) {
case HISTORY:
return template.execute(new TransactionCallback<List<IBaseResource>>() {
@Override
public List<IBaseResource> doInTransaction(TransactionStatus theStatus) {
return doHistoryInTransaction(theFromIndex, theToIndex);
}
});
return template.execute(theStatus -> doHistoryInTransaction(theFromIndex, theToIndex));
case SEARCH:
case EVERYTHING:
default:
return doSearchOrEverything(theFromIndex, theToIndex);
List<IBaseResource> retVal = doSearchOrEverything(theFromIndex, theToIndex);
/*
* If we got fewer resources back than we asked for, it's possible that the search
* completed. If that's the case, the cached version of the search entity is probably
* no longer valid so let's force a reload if it gets asked for again (most likely
* because someone is calling size() on us)
*/
if (retVal.size() < theToIndex - theFromIndex) {
mySearchEntity = null;
}
return retVal;
}
}

View File

@ -463,11 +463,11 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
private final ArrayList<Long> myUnsyncedPids = new ArrayList<>();
private Search mySearch;
private boolean myAbortRequested;
private int myCountSaved = 0;
private int myCountSavedTotal = 0;
private int myCountSavedThisPass = 0;
private boolean myAdditionalPrefetchThresholdsRemaining;
private List<Long> myPreviouslyAddedResourcePids;
private Integer myMaxResultsToFetch;
private int myCountFetchedDuringThisPass;
/**
* Constructor
@ -490,7 +490,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
protected void setPreviouslyAddedResourcePids(List<Long> thePreviouslyAddedResourcePids) {
myPreviouslyAddedResourcePids = thePreviouslyAddedResourcePids;
myCountSaved = myPreviouslyAddedResourcePids.size();
myCountSavedTotal = myPreviouslyAddedResourcePids.size();
}
private ISearchBuilder newSearchBuilder() {
@ -590,36 +590,46 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
for (Long nextPid : myUnsyncedPids) {
SearchResult nextResult = new SearchResult(mySearch);
nextResult.setResourcePid(nextPid);
nextResult.setOrder(myCountSaved++);
nextResult.setOrder(myCountSavedTotal);
resultsToSave.add(nextResult);
ourLog.trace("Saving ORDER[{}] Resource {}", nextResult.getOrder(), nextResult.getResourcePid());
int order = nextResult.getOrder();
ourLog.trace("Saving ORDER[{}] Resource {}", order, nextResult.getResourcePid());
myCountSavedTotal++;
myCountSavedThisPass++;
}
mySearchResultDao.saveAll(resultsToSave);
synchronized (mySyncedPids) {
int numSyncedThisPass = myUnsyncedPids.size();
ourLog.trace("Syncing {} search results", numSyncedThisPass);
ourLog.trace("Syncing {} search results - Have more: {}", numSyncedThisPass, theResultIter.hasNext());
mySyncedPids.addAll(myUnsyncedPids);
myUnsyncedPids.clear();
if (theResultIter.hasNext() == false) {
mySearch.setNumFound(myCountSaved);
int loadedCountThisPass = theResultIter.getSkippedCount() + myCountSaved;
if (myMaxResultsToFetch != null && loadedCountThisPass < myMaxResultsToFetch) {
mySearch.setNumFound(myCountSavedTotal);
int skippedCount = theResultIter.getSkippedCount();
int totalFetched = skippedCount + myCountSavedThisPass;
ourLog.trace("MaxToFetch[{}] SkippedCount[{}] CountSavedThisPass[{}] CountSavedThisTotal[{}] AdditionalPrefetchRemaining[{}]", myMaxResultsToFetch, skippedCount, myCountSavedThisPass, myCountSavedTotal, myAdditionalPrefetchThresholdsRemaining);
if (myMaxResultsToFetch != null && totalFetched < myMaxResultsToFetch) {
ourLog.trace("Setting search status to FINISHED");
mySearch.setStatus(SearchStatusEnum.FINISHED);
mySearch.setTotalCount(myCountSaved);
mySearch.setTotalCount(myCountSavedTotal);
} else if (myAdditionalPrefetchThresholdsRemaining) {
ourLog.trace("Setting search status to PASSCMPLET");
mySearch.setStatus(SearchStatusEnum.PASSCMPLET);
mySearch.setSearchParameterMap(myParams);
} else {
ourLog.trace("Setting search status to FINISHED");
mySearch.setStatus(SearchStatusEnum.FINISHED);
mySearch.setTotalCount(myCountSaved);
mySearch.setTotalCount(myCountSavedTotal);
}
}
}
mySearch.setNumFound(myCountSaved);
mySearch.setNumFound(myCountSavedTotal);
int numSynced;
synchronized (mySyncedPids) {
@ -684,7 +694,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
}
});
ourLog.info("Completed search for [{}{}] and found {} resources in {}ms", mySearch.getResourceType(), mySearch.getSearchQueryString(), mySyncedPids.size(), sw.getMillis());
ourLog.info("Have completed search for [{}{}] and found {} resources in {}ms - Status is {}", mySearch.getResourceType(), mySearch.getSearchQueryString(), mySyncedPids.size(), sw.getMillis(), mySearch.getStatus());
} catch (Throwable t) {
@ -850,8 +860,8 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
/*
* Construct the SQL query we'll be sending to the database
*/
IResultIterator theResultIterator = sb.createQuery(myParams, mySearch.getUuid());
assert (theResultIterator != null);
IResultIterator resultIterator = sb.createQuery(myParams, mySearch.getUuid());
assert (resultIterator != null);
/*
* The following loop actually loads the PIDs of the resources
@ -859,9 +869,8 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
* every X results, we commit to the HFJ_SEARCH table.
*/
int syncSize = mySyncSize;
while (theResultIterator.hasNext()) {
myUnsyncedPids.add(theResultIterator.next());
myCountFetchedDuringThisPass++;
while (resultIterator.hasNext()) {
myUnsyncedPids.add(resultIterator.next());
boolean shouldSync = myUnsyncedPids.size() >= syncSize;
@ -879,7 +888,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
Validate.isTrue(isNotAborted(), "Abort has been requested");
if (shouldSync) {
saveUnsynced(theResultIterator);
saveUnsynced(resultIterator);
}
if (myLoadingThrottleForUnitTests != null) {
@ -895,7 +904,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
// If no abort was requested, bail out
Validate.isTrue(isNotAborted(), "Abort has been requested");
saveUnsynced(theResultIterator);
saveUnsynced(resultIterator);
}
}

View File

@ -131,7 +131,7 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc {
TransactionTemplate tt = new TransactionTemplate(myTransactionManager);
final Slice<Long> toDelete = tt.execute(theStatus ->
mySearchDao.findWhereLastReturnedBefore(cutoff, PageRequest.of(0, 1000))
mySearchDao.findWhereLastReturnedBefore(cutoff, PageRequest.of(0, 2000))
);
for (final Long nextSearchToDelete : toDelete) {
ourLog.debug("Deleting search with PID {}", nextSearchToDelete);
@ -148,9 +148,11 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc {
int count = toDelete.getContent().size();
if (count > 0) {
if (ourLog.isDebugEnabled()) {
long total = tt.execute(t -> mySearchDao.count());
ourLog.debug("Deleted {} searches, {} remaining", count, total);
}
}
}

View File

@ -19,6 +19,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import javax.annotation.PreDestroy;
@ -104,13 +106,28 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
private void submitResourceModified(IBaseResource theNewResource, ResourceModifiedMessage.OperationTypeEnum theOperationType) {
ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theNewResource, theOperationType);
// Interceptor call: SUBSCRIPTION_RESOURCE_MODIFIED
if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED, msg)) {
return;
/*
* We only want to submit the message to the processing queue once the
* transaction is committed. We do this in order to make sure that the
* data is actually in the DB, in case it's the database matcher.
*/
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public int getOrder() {
return 0;
}
@Override
public void afterCommit() {
submitResourceModified(msg);
}
});
} else {
submitResourceModified(msg);
}
}
private void sendToProcessingChannel(final ResourceModifiedMessage theMessage) {
ourLog.trace("Sending resource modified message to processing channel");
@ -127,6 +144,11 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
*/
@Override
public void submitResourceModified(final ResourceModifiedMessage theMsg) {
// Interceptor call: SUBSCRIPTION_RESOURCE_MODIFIED
if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED, theMsg)) {
return;
}
sendToProcessingChannel(theMsg);
}

View File

@ -36,16 +36,20 @@ import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionTemplate;
public class DaoSubscriptionMatcher implements ISubscriptionMatcher {
private Logger ourLog = LoggerFactory.getLogger(DaoSubscriptionMatcher.class);
@Autowired
private FhirContext myCtx;
@Autowired
DaoRegistry myDaoRegistry;
@Autowired
MatchUrlService myMatchUrlService;
private Logger ourLog = LoggerFactory.getLogger(DaoSubscriptionMatcher.class);
@Autowired
private FhirContext myCtx;
@Autowired
private PlatformTransactionManager myTxManager;
@Override
public SubscriptionMatchResult match(CanonicalSubscription theSubscription, ResourceModifiedMessage theMsg) {
@ -75,6 +79,10 @@ public class DaoSubscriptionMatcher implements ISubscriptionMatcher {
IFhirResourceDao<? extends IBaseResource> responseDao = myDaoRegistry.getResourceDao(responseResourceDef.getImplementingClass());
responseCriteriaUrl.setLoadSynchronousUpTo(1);
return responseDao.search(responseCriteriaUrl);
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
return txTemplate.execute(t -> responseDao.search(responseCriteriaUrl));
}
}

View File

@ -3,6 +3,7 @@ package ca.uhn.fhir.jpa.config;
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
import ca.uhn.fhir.validation.ResultSeverityEnum;
import net.ttddyy.dsproxy.listener.SingleQueryCountHolder;
import net.ttddyy.dsproxy.listener.logging.SLF4JLogLevel;
import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder;
import org.apache.commons.dbcp2.BasicDataSource;
import org.springframework.context.annotation.Bean;
@ -34,7 +35,6 @@ public class TestR4Config extends BaseJavaConfigR4 {
* starvation
*/
ourMaxThreads = (int) (Math.random() * 6.0) + 1;
ourMaxThreads = 1;
}
private Exception myLastStackTrace;
@ -96,7 +96,7 @@ public class TestR4Config extends BaseJavaConfigR4 {
DataSource dataSource = ProxyDataSourceBuilder
.create(retVal)
// .logQueryBySlf4j(SLF4JLogLevel.INFO, "SQL")
.logQueryBySlf4j(SLF4JLogLevel.INFO, "SQL")
// .logSlowQueryBySlf4j(10, TimeUnit.SECONDS)
// .countQuery(new ThreadQueryCountHolder())
.beforeQuery(new BlockLargeNumbersOfParamsListener())

View File

@ -15,6 +15,8 @@ import org.springframework.scheduling.concurrent.ExecutorConfigurationSupport;
*/
public class UnregisterScheduledProcessor implements BeanFactoryPostProcessor {
public static final String SCHEDULING_DISABLED = "scheduling_disabled";
private final Environment myEnvironment;
public UnregisterScheduledProcessor(Environment theEnv) {
@ -23,7 +25,7 @@ public class UnregisterScheduledProcessor implements BeanFactoryPostProcessor {
@Override
public void postProcessBeanFactory(final ConfigurableListableBeanFactory beanFactory) throws BeansException {
String schedulingDisabled = myEnvironment.getProperty("scheduling_disabled");
String schedulingDisabled = myEnvironment.getProperty(SCHEDULING_DISABLED);
if ("true".equals(schedulingDisabled)) {
for (String beanName : beanFactory.getBeanNamesForType(ScheduledAnnotationBeanPostProcessor.class)) {
((DefaultListableBeanFactory) beanFactory).removeBeanDefinition(beanName);

View File

@ -11,6 +11,7 @@ import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
import ca.uhn.fhir.jpa.subscription.dbmatcher.DaoSubscriptionMatcher;
import ca.uhn.fhir.jpa.term.VersionIndependentConcept;
import ca.uhn.fhir.jpa.util.ExpungeOptions;
import ca.uhn.fhir.jpa.util.JpaConstants;
@ -316,6 +317,14 @@ public abstract class BaseJpaTest {
return retVal;
}
protected List<String> toUnqualifiedVersionlessIdValues(List<? extends IBaseResource> theFound) {
List<String> retVal = new ArrayList<>();
for (IBaseResource next : theFound) {
retVal.add(next.getIdElement().toUnqualifiedVersionless().getValue());
}
return retVal;
}
protected List<IIdType> toUnqualifiedVersionlessIds(org.hl7.fhir.dstu3.model.Bundle theFound) {
List<IIdType> retVal = new ArrayList<IIdType>();
for (BundleEntryComponent next : theFound.getEntry()) {

View File

@ -1380,7 +1380,7 @@ public class FhirResourceDaoDstu2Test extends BaseJpaDstu2Test {
try {
myEncounterDao.read(outcome.getId(), mySrd);
fail();
} catch (IllegalArgumentException e) {
} catch (InvalidRequestException e) {
// expected
}
try {

View File

@ -1788,7 +1788,7 @@ public class FhirResourceDaoDstu3Test extends BaseJpaDstu3Test {
try {
myEncounterDao.read(outcome.getId(), mySrd);
fail();
} catch (IllegalArgumentException e) {
} catch (InvalidRequestException e) {
// expected
}
try {

View File

@ -329,8 +329,7 @@ public abstract class BaseJpaR4Test extends BaseJpaTest {
@Before
@Transactional()
public void beforePurgeDatabase() throws InterruptedException {
final EntityManager entityManager = this.myEntityManager;
public void beforePurgeDatabase() {
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry);
}

View File

@ -4,6 +4,7 @@ import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import ca.uhn.fhir.util.TestUtil;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IIdType;
@ -136,7 +137,7 @@ public class FhirResourceDaoR4CreateTest extends BaseJpaR4Test {
try {
myPatientDao.create(p).getId();
fail();
} catch (DataIntegrityViolationException e) {
} catch (ResourceVersionConflictException e) {
// good
}

View File

@ -2003,7 +2003,7 @@ public class FhirResourceDaoR4Test extends BaseJpaR4Test {
try {
myEncounterDao.read(outcome.getId(), mySrd);
fail();
} catch (IllegalArgumentException e) {
} catch (InvalidRequestException e) {
// expected
}
try {

View File

@ -1037,11 +1037,12 @@ public class ResourceProviderDstu2Test extends BaseResourceProviderDstu2Test {
obs1.getDevice().setReference(devId1);
IIdType obsId1 = myObservationDao.create(obs1, mySrd).getId().toUnqualifiedVersionless();
IIdType obsId2;
Observation obs2 = new Observation();
obs2.getSubject().setReference(ptId1);
obs2.getCode().addCoding().setCode("CODE2");
obs2.setValue(new StringDt("obsvalue2"));
IIdType obsId2 = myObservationDao.create(obs2, mySrd).getId().toUnqualifiedVersionless();
obsId2 = myObservationDao.create(obs2, mySrd).getId().toUnqualifiedVersionless();
Observation obs3 = new Observation();
obs3.getSubject().setReference(ptId2);
@ -1057,17 +1058,15 @@ public class ResourceProviderDstu2Test extends BaseResourceProviderDstu2Test {
param = new StringAndListParam();
param.addAnd(new StringOrListParam().addOr(new StringParam("obsvalue1")));
//@formatter:off
Parameters response = ourClient
.operation()
.onInstance(ptId1)
.named("everything")
.withParameter(Parameters.class, Constants.PARAM_CONTENT, new StringDt("obsvalue1"))
.execute();
//@formatter:on
actual = toUnqualifiedVersionlessIds((ca.uhn.fhir.model.dstu2.resource.Bundle) response.getParameter().get(0).getResource());
assertThat(actual, containsInAnyOrder(ptId1, obsId1, devId1));
assertThat(actual.toString(), actual, containsInAnyOrder(ptId1, obsId1, devId1));
}
@ -1636,10 +1635,17 @@ public class ResourceProviderDstu2Test extends BaseResourceProviderDstu2Test {
nextUrl = response.getLink("next").getUrl();
response = ourClient.fetchResourceFromUrl(ca.uhn.fhir.model.dstu2.resource.Bundle.class, nextUrl);
assertEquals(1, response.getEntry().size());
// assertEquals(21, response.getTotal().intValue());
// assertEquals(null, response.getLink("next"));
// Again
response = ourClient.fetchResourceFromUrl(ca.uhn.fhir.model.dstu2.resource.Bundle.class, nextUrl);
assertEquals(1, response.getEntry().size());
assertEquals(21, response.getTotal().intValue());
assertEquals(null, response.getLink("next"));
}
@Test
@ -2087,7 +2093,7 @@ public class ResourceProviderDstu2Test extends BaseResourceProviderDstu2Test {
IOUtils.closeQuietly(response.getEntity().getContent());
ourLog.info(resp);
ca.uhn.fhir.model.dstu2.resource.Bundle bundle = myFhirCtx.newXmlParser().parseResource(ca.uhn.fhir.model.dstu2.resource.Bundle.class, resp);
matches = bundle.getTotal();
matches = bundle.getEntry().size();
assertThat(matches, greaterThan(0));
}

View File

@ -56,7 +56,7 @@ public class GraphQLProviderR4Test extends BaseResourceProviderR4Test {
try {
String resp = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
ourLog.info(resp);
assertEquals(TestUtil.stripReturns(resp), TestUtil.stripReturns("{\n" +
assertEquals(TestUtil.stripReturns("{\n" +
" \"PatientList\":[{\n" +
" \"name\":[{\n" +
" \"family\":\"FAM\",\n" +
@ -69,7 +69,7 @@ public class GraphQLProviderR4Test extends BaseResourceProviderR4Test {
" \"given\":[\"GivenOnlyB1\",\"GivenOnlyB2\"]\n" +
" }]\n" +
" }]\n" +
"}"));
"}"), TestUtil.stripReturns(resp));
} finally {
IOUtils.closeQuietly(response);
}

View File

@ -158,6 +158,35 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
@Test
public void testManualPagingLinkOffsetDoesntReturnBeyondEnd() {
myDaoConfig.setSearchPreFetchThresholds(Lists.newArrayList(10, 1000));
for (int i = 0; i < 50; i++) {
Organization o = new Organization();
o.setId("O" + i);
o.setName("O" + i);
ourClient.update().resource(o).execute().getId().toUnqualifiedVersionless();
}
Bundle output = ourClient
.search()
.forResource("Organization")
.count(3)
.returnBundle(Bundle.class)
.execute();
ourLog.info(myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(output));
String linkNext = output.getLink("next").getUrl();
linkNext = linkNext.replaceAll("_getpagesoffset=[0-9]+", "_getpagesoffset=3300");
assertThat(linkNext, containsString("_getpagesoffset=3300"));
Bundle nextPageBundle = ourClient.loadPage().byUrl(linkNext).andReturnBundle(Bundle.class).execute();
ourLog.info(myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(nextPageBundle));
assertEquals(null, nextPageBundle.getLink("next"));
}
@Test
public void testSearchLinksWorkWithIncludes() {
for (int i = 0; i < 5; i++) {
@ -200,35 +229,6 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
}
@Test
@Ignore
public void testManualPagingLinkOffsetDoesntReturnBeyondEnd() {
myDaoConfig.setSearchPreFetchThresholds(Lists.newArrayList(10, 1000));
for (int i = 0; i < 50; i++) {
Organization o = new Organization();
o.setId("O" + i);
o.setName("O" + i);
ourClient.update().resource(o).execute().getId().toUnqualifiedVersionless();
}
Bundle output = ourClient
.search()
.forResource("Organization")
.count(3)
.returnBundle(Bundle.class)
.execute();
ourLog.info(myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(output));
String linkNext = output.getLink("next").getUrl();
linkNext = linkNext.replaceAll("_getpagesoffset=[0-9]+", "_getpagesoffset=3300");
assertThat(linkNext, containsString("_getpagesoffset=3300"));
Bundle nextPageBundle = ourClient.loadPage().byUrl(linkNext).andReturnBundle(Bundle.class).execute();
ourLog.info(myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(nextPageBundle));
assertEquals(null, nextPageBundle.getLink("next"));
}
@Test
public void testSearchFetchPageBeyondEnd() {
@ -3550,7 +3550,7 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
response.getEntity().getContent().close();
ourLog.info(resp);
Bundle bundle = myFhirCtx.newXmlParser().parseResource(Bundle.class, resp);
matches = bundle.getTotal();
matches = bundle.getEntry().size();
assertThat(matches, greaterThan(0));
}

View File

@ -449,6 +449,7 @@ public class SearchCoordinatorSvcImplTest {
public int getSkippedCount() {
return myWrap.getSkippedCount();
}
}
public static class ResultIterator extends BaseIterator<Long> implements IResultIterator {

View File

@ -1,36 +1,64 @@
package ca.uhn.fhir.jpa.stresstest;
import ca.uhn.fhir.jpa.config.UnregisterScheduledProcessor;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.provider.r4.BaseResourceProviderR4Test;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.TokenOrListParam;
import ca.uhn.fhir.rest.server.IPagingProvider;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.TestUtil;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.hamcrest.Matchers;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.hapi.validation.FhirInstanceValidator;
import org.hl7.fhir.r4.model.*;
import org.hl7.fhir.r4.model.Bundle.BundleType;
import org.hl7.fhir.r4.model.Bundle.HTTPVerb;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.TestPropertySource;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import static org.junit.Assert.*;
import static org.apache.commons.lang3.StringUtils.*;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@TestPropertySource(properties = {
// Since scheduled tasks can cause searches, which messes up the
// value returned by SearchBuilder.getLastHandlerMechanismForUnitTest()
UnregisterScheduledProcessor.SCHEDULING_DISABLED + "=true"
})
public class StressTestR4Test extends BaseResourceProviderR4Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(StressTestR4Test.class);
private RequestValidatingInterceptor myRequestValidatingInterceptor;
@Autowired
private IPagingProvider myPagingProvider;
@Override
@After
@ -38,6 +66,8 @@ public class StressTestR4Test extends BaseResourceProviderR4Test {
super.after();
ourRestServer.unregisterInterceptor(myRequestValidatingInterceptor);
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED);
}
@Override
@ -51,6 +81,113 @@ public class StressTestR4Test extends BaseResourceProviderR4Test {
myRequestValidatingInterceptor.addValidatorModule(module);
}
@Test
public void testPageThroughLotsOfPages() {
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.DISABLED);
/*
* This test creates a really huge number of resources to make sure that even large scale
* searches work correctly. 5000 is arbitrary, this test was intended to demonstrate an
* issue that occurred with 1600 resources but I'm using a huge number here just to
* hopefully catch future issues.
*/
int count = 5000;
Bundle bundle = new Bundle();
DiagnosticReport dr = new DiagnosticReport();
dr.setId(IdType.newRandomUuid());
bundle.addEntry().setFullUrl(dr.getId()).setResource(dr).getRequest().setMethod(HTTPVerb.POST).setUrl("DiagnosticReport");
for (int i = 0; i < count; i++) {
Observation o = new Observation();
o.setId("A" + leftPad(Integer.toString(i), 4, '0'));
o.setStatus(Observation.ObservationStatus.FINAL);
bundle.addEntry().setFullUrl(o.getId()).setResource(o).getRequest().setMethod(HTTPVerb.PUT).setUrl("Observation/A" + i);
}
StopWatch sw = new StopWatch();
ourLog.info("Saving {} resources", bundle.getEntry().size());
mySystemDao.transaction(null, bundle);
ourLog.info("Saved {} resources in {}", bundle.getEntry().size(), sw.toString());
// Load from DAOs
List<String> ids = new ArrayList<>();
Bundle resultBundle = ourClient.search().forResource("Observation").count(100).returnBundle(Bundle.class).execute();
int pageIndex = 0;
while (true) {
ids.addAll(resultBundle.getEntry().stream().map(t -> t.getResource().getIdElement().toUnqualifiedVersionless().getValue()).collect(Collectors.toList()));
if (resultBundle.getLink("next") == null) {
break;
}
ourLog.info("Loading page {} - Have {} results: {}", pageIndex++, ids.size(), resultBundle.getLink("next").getUrl());
resultBundle = ourClient.loadPage().next(resultBundle).execute();
}
assertEquals(count, ids.size());
assertEquals(count, Sets.newHashSet(ids).size());
// Load from DAOs
ids = new ArrayList<>();
SearchParameterMap map = new SearchParameterMap();
map.add("status", new TokenOrListParam().add("final").add("aaa")); // add some noise to guarantee we don't reuse a previous query
IBundleProvider results = myObservationDao.search(map);
for (int i = 0; i <= count; i += 100) {
List<IBaseResource> resultsAndIncludes = results.getResources(i, i + 100);
ids.addAll(toUnqualifiedVersionlessIdValues(resultsAndIncludes));
results = myPagingProvider.retrieveResultList(results.getUuid());
}
assertEquals(count, ids.size());
assertEquals(count, Sets.newHashSet(ids).size());
// Load from DAOs starting half way through
ids = new ArrayList<>();
map = new SearchParameterMap();
map.add("status", new TokenOrListParam().add("final").add("aaa")); // add some noise to guarantee we don't reuse a previous query
results = myObservationDao.search(map);
for (int i = 1000; i <= count; i += 100) {
List<IBaseResource> resultsAndIncludes = results.getResources(i, i + 100);
ids.addAll(toUnqualifiedVersionlessIdValues(resultsAndIncludes));
results = myPagingProvider.retrieveResultList(results.getUuid());
}
assertEquals(count - 1000, ids.size());
assertEquals(count - 1000, Sets.newHashSet(ids).size());
}
@Test
public void testPageThroughLotsOfPages2() {
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.DISABLED);
Bundle bundle = new Bundle();
int count = 1603;
for (int i = 0; i < count; i++) {
Observation o = new Observation();
o.setId("A" + leftPad(Integer.toString(i), 4, '0'));
o.setStatus(Observation.ObservationStatus.FINAL);
bundle.addEntry().setFullUrl(o.getId()).setResource(o).getRequest().setMethod(HTTPVerb.PUT).setUrl("Observation/A" + i);
}
StopWatch sw = new StopWatch();
ourLog.info("Saving {} resources", bundle.getEntry().size());
mySystemDao.transaction(null, bundle);
ourLog.info("Saved {} resources in {}", bundle.getEntry().size(), sw.toString());
// Load from DAOs
List<String> ids = new ArrayList<>();
Bundle resultBundle = ourClient.search().forResource("Observation").count(300).returnBundle(Bundle.class).execute();
int pageIndex = 0;
while (true) {
ids.addAll(resultBundle.getEntry().stream().map(t -> t.getResource().getIdElement().toUnqualifiedVersionless().getValue()).collect(Collectors.toList()));
if (resultBundle.getLink("next") == null) {
break;
}
ourLog.info("Loading page {} - Have {} results: {}", pageIndex++, ids.size(), resultBundle.getLink("next").getUrl());
resultBundle = ourClient.loadPage().next(resultBundle).execute();
}
assertEquals(count, ids.size());
assertEquals(count, Sets.newHashSet(ids).size());
}
@Test
public void testSearchWithLargeNumberOfIncludes() {
@ -100,7 +237,6 @@ public class StressTestR4Test extends BaseResourceProviderR4Test {
}
@Test
public void testMultithreadedSearch() throws Exception {
Bundle input = new Bundle();
@ -130,6 +266,103 @@ public class StressTestR4Test extends BaseResourceProviderR4Test {
}
@Test
public void testMultiThreadedCreateWithDuplicateClientAssignedIdsInTransaction() throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(20);
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
int finalI = i;
Callable<String> task = () -> {
Bundle input = new Bundle();
input.setType(BundleType.TRANSACTION);
Patient p = new Patient();
p.setId("A" + finalI);
p.addIdentifier().setValue("A"+finalI);
input.addEntry().setResource(p).setFullUrl("Patient/A" + finalI).getRequest().setMethod(HTTPVerb.PUT).setUrl("Patient/A" + finalI);
try {
ourClient.transaction().withBundle(input).execute();
return null;
} catch (ResourceVersionConflictException e) {
assertThat(e.toString(), containsString("Error flushing transaction with resource types: [Patient] - The operation has failed with a client-assigned ID constraint failure"));
return e.toString();
}
};
for (int j = 0; j < 2; j++) {
Future<String> future = executor.submit(task);
futures.add(future);
}
}
List<String> results = new ArrayList<>();
for (Future<String> next : futures) {
String nextOutcome = next.get();
if (isNotBlank(nextOutcome)) {
results.add(nextOutcome);
}
}
ourLog.info("Results: {}", results);
assertThat(results, not(Matchers.empty()));
assertThat(results.get(0), containsString("HTTP 409 Conflict: Error flushing transaction with resource types: [Patient]"));
}
@Test
public void testMultiThreadedUpdateSameResourceInTransaction() throws Exception {
Patient p = new Patient();
p.setActive(true);
IIdType id = myPatientDao.create(p).getId().toUnqualifiedVersionless();
ExecutorService executor = Executors.newFixedThreadPool(20);
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
int finalI = i;
Callable<String> task = () -> {
Bundle input = new Bundle();
input.setType(BundleType.TRANSACTION);
Patient updatePatient = new Patient();
updatePatient.setId(id);
updatePatient.addIdentifier().setValue("A"+finalI);
input.addEntry().setResource(updatePatient).setFullUrl(updatePatient.getId()).getRequest().setMethod(HTTPVerb.PUT).setUrl(updatePatient.getId());
try {
ourClient.transaction().withBundle(input).execute();
return null;
} catch (ResourceVersionConflictException e) {
assertThat(e.toString(), containsString("Error flushing transaction with resource types: [Patient] - The operation has failed with a version constraint failure. This generally means that two clients/threads were trying to update the same resource at the same time, and this request was chosen as the failing request."));
return e.toString();
}
};
for (int j = 0; j < 2; j++) {
Future<String> future = executor.submit(task);
futures.add(future);
}
}
List<String> results = new ArrayList<>();
for (Future<String> next : futures) {
String nextOutcome = next.get();
if (isNotBlank(nextOutcome)) {
results.add(nextOutcome);
}
}
ourLog.info("Results: {}", results);
assertThat(results, not(Matchers.empty()));
assertThat(results.get(0), containsString("HTTP 409 Conflict: Error flushing transaction with resource types: [Patient]"));
}
/**
* This test prevents a deadlock that was detected with a large number of
* threads creating resources and blocking on the searchparamcache refreshing
@ -191,11 +424,6 @@ public class StressTestR4Test extends BaseResourceProviderR4Test {
ourLog.info("Loaded {} searches", total);
}
@AfterClass
public static void afterClassClearContext() {
TestUtil.clearAllStaticFieldsForUnitTest();
}
public class BaseTask extends Thread {
protected Throwable myError;
protected int myTaskCount = 0;
@ -280,5 +508,10 @@ public class StressTestR4Test extends BaseResourceProviderR4Test {
}
}
@AfterClass
public static void afterClassClearContext() {
TestUtil.clearAllStaticFieldsForUnitTest();
}
}

View File

@ -27,7 +27,7 @@ import javax.persistence.*;
@Entity()
@Table(name = "HFJ_FORCED_ID", uniqueConstraints = {
@UniqueConstraint(name = "IDX_FORCEDID_RESID", columnNames = {"RESOURCE_PID"}),
@UniqueConstraint(name = "IDX_FORCEDID_TYPE_FID", columnNames = {"RESOURCE_TYPE", "FORCED_ID"})
@UniqueConstraint(name = ForcedId.IDX_FORCEDID_TYPE_FID, columnNames = {"RESOURCE_TYPE", "FORCED_ID"})
}, indexes = {
/*
* NB: We previously had indexes named
@ -39,6 +39,7 @@ import javax.persistence.*;
public class ForcedId {
public static final int MAX_FORCED_ID_LENGTH = 100;
public static final String IDX_FORCEDID_TYPE_FID = "IDX_FORCEDID_TYPE_FID";
@Column(name = "FORCED_ID", nullable = false, length = MAX_FORCED_ID_LENGTH, updatable = false)
private String myForcedId;

View File

@ -33,6 +33,7 @@ import org.hibernate.search.annotations.NumericField;
import javax.persistence.*;
import java.math.BigDecimal;
import java.util.Objects;
@Embeddable
@Entity
@ -162,7 +163,7 @@ public class ResourceIndexedSearchParamNumber extends BaseResourceIndexedSearchP
return false;
}
NumberParam number = (NumberParam) theParam;
return getValue().equals(number.getValue());
return Objects.equals(getValue(), number.getValue());
}
}

View File

@ -33,6 +33,9 @@ import org.hibernate.search.annotations.NumericField;
import javax.persistence.*;
import java.math.BigDecimal;
import java.util.Objects;
import static org.apache.commons.lang3.StringUtils.defaultString;
//@formatter:off
@Embeddable
@ -242,27 +245,31 @@ public class ResourceIndexedSearchParamQuantity extends BaseResourceIndexedSearc
boolean retval = false;
// Only match on system if it wasn't specified
if (quantity.getSystem() == null && quantity.getUnits() == null) {
if (getValue().equals(quantity.getValue())) {
String quantityUnitsString = defaultString(quantity.getUnits());
if (quantity.getSystem() == null && quantityUnitsString == null) {
if (Objects.equals(getValue(),quantity.getValue())) {
retval = true;
}
} else if (quantity.getSystem() == null) {
if (getUnits().equalsIgnoreCase(quantity.getUnits()) &&
getValue().equals(quantity.getValue())) {
} else {
String unitsString = defaultString(getUnits());
if (quantity.getSystem() == null) {
if (unitsString.equalsIgnoreCase(quantityUnitsString) &&
Objects.equals(getValue(),quantity.getValue())) {
retval = true;
}
} else if (quantity.getUnits() == null) {
} else if (quantityUnitsString == null) {
if (getSystem().equalsIgnoreCase(quantity.getSystem()) &&
getValue().equals(quantity.getValue())) {
Objects.equals(getValue(),quantity.getValue())) {
retval = true;
}
} else {
if (getSystem().equalsIgnoreCase(quantity.getSystem()) &&
getUnits().equalsIgnoreCase(quantity.getUnits()) &&
getValue().equals(quantity.getValue())) {
unitsString.equalsIgnoreCase(quantityUnitsString) &&
Objects.equals(getValue(),quantity.getValue())) {
retval = true;
}
}
}
return retval;
}

View File

@ -33,6 +33,7 @@ import org.hibernate.search.annotations.*;
import javax.persistence.Index;
import javax.persistence.*;
import static org.apache.commons.lang3.StringUtils.defaultString;
import static org.apache.commons.lang3.StringUtils.left;
//@formatter:off
@ -243,7 +244,7 @@ public class ResourceIndexedSearchParamString extends BaseResourceIndexedSearchP
}
public void setValueExact(String theValueExact) {
if (StringUtils.defaultString(theValueExact).length() > MAX_LENGTH) {
if (defaultString(theValueExact).length() > MAX_LENGTH) {
throw new IllegalArgumentException("Value is too long: " + theValueExact.length());
}
myValueExact = theValueExact;
@ -254,7 +255,7 @@ public class ResourceIndexedSearchParamString extends BaseResourceIndexedSearchP
}
public void setValueNormalized(String theValueNormalized) {
if (StringUtils.defaultString(theValueNormalized).length() > MAX_LENGTH) {
if (defaultString(theValueNormalized).length() > MAX_LENGTH) {
throw new IllegalArgumentException("Value is too long: " + theValueNormalized.length());
}
myValueNormalized = theValueNormalized;
@ -313,7 +314,7 @@ public class ResourceIndexedSearchParamString extends BaseResourceIndexedSearchP
return false;
}
StringParam string = (StringParam)theParam;
String normalizedString = StringNormalizer.normalizeString(string.getValue());
return getValueNormalized().startsWith(normalizedString);
String normalizedString = StringNormalizer.normalizeString(defaultString(string.getValue()));
return defaultString(getValueNormalized()).startsWith(normalizedString);
}
}

View File

@ -252,18 +252,21 @@ public class ResourceIndexedSearchParamToken extends BaseResourceIndexedSearchPa
}
TokenParam token = (TokenParam) theParam;
boolean retval = false;
String valueString = defaultString(getValue());
String tokenValueString = defaultString(token.getValue());
// Only match on system if it wasn't specified
if (token.getSystem() == null || token.getSystem().isEmpty()) {
if (getValue().equalsIgnoreCase(token.getValue())) {
if (valueString.equalsIgnoreCase(tokenValueString)) {
retval = true;
}
} else if (token.getValue() == null || token.getValue().isEmpty()) {
} else if (tokenValueString == null || tokenValueString.isEmpty()) {
if (token.getSystem().equalsIgnoreCase(getSystem())) {
retval = true;
}
} else {
if (token.getSystem().equalsIgnoreCase(getSystem()) &&
getValue().equalsIgnoreCase(token.getValue())) {
valueString.equalsIgnoreCase(tokenValueString)) {
retval = true;
}
}

View File

@ -30,6 +30,8 @@ import org.hibernate.search.annotations.Field;
import javax.persistence.*;
import static org.apache.commons.lang3.StringUtils.defaultString;
@Embeddable
@Entity
@Table(name = "HFJ_SPIDX_URI", indexes = {
@ -188,7 +190,7 @@ public class ResourceIndexedSearchParamUri extends BaseResourceIndexedSearchPara
return false;
}
UriParam uri = (UriParam) theParam;
return getUri().equalsIgnoreCase(uri.getValueNotNull());
return defaultString(getUri()).equalsIgnoreCase(uri.getValueNotNull());
}
public static long calculateHashUri(String theResourceType, String theParamName, String theUri) {

View File

@ -558,4 +558,24 @@ public class InMemorySubscriptionMatcherTestR3 extends BaseSubscriptionDstu3Test
cr.getRequester().getAgent().setReference("Organization/O1276");
assertMatched(cr, criteria);
}
@Test
public void testSystemWithNullValue() {
String criteria = "Observation?code=17861-6";
Observation observation = new Observation();
CodeableConcept code = new CodeableConcept();
observation.getCode().addCoding().setSystem("http://loinc.org");
assertNotMatched(observation, criteria);
}
@Test
public void testNullSystemNotNullValue() {
String criteria = "Observation?code=17861-6";
Observation observation = new Observation();
CodeableConcept code = new CodeableConcept();
observation.getCode().addCoding().setCode("look ma no system");
assertNotMatched(observation, criteria);
}
}

View File

@ -48,6 +48,16 @@
using the string name of the datatype (e.g. "dateTime") in order to help
building Parameters resources in a version-independent way.
</action>
<action type="fix">
When performing a search using the JPA server, if a search returned between 1500
and 2000 results, a query for the final page of results would timeout due to
a page calculation error. This has been corrected.
</action>
<action type="add">
In the JPA server, a much more readable error message is now returned returned when
two client threads collide while trying to simultaneously create a resource with the
same client-assigned ID. In addition, better error messages are now returned
when conflicts such as this one are hit within a FHIR transaction operation.
<action type="add">
The JPA query builder has been optimized to take better advantage of SQL IN (..) expressions
when performing token searches with multiple OR values.