From 4aa3b972282198215128e708dcbe9acbee189906 Mon Sep 17 00:00:00 2001 From: Michael Buckley Date: Sat, 20 Jan 2024 00:42:56 -0500 Subject: [PATCH] Dao support for searching for FHIR ids, and full resources. (#5612) Move several search transformations inside the resource dao. Create search for FHIR Ids, and search for Resources entry points. --- .../7_0_0/5612-new-search-methods.yaml | 4 + .../jpa/cache/ResourceVersionSvcDaoImpl.java | 41 ++----- .../fhir/jpa/dao/BaseHapiFhirResourceDao.java | 107 ++++++++++++++++-- .../dao/JpaResourceDaoSearchParameter.java | 11 +- .../jpa/search/builder/tasks/SearchTask.java | 4 +- .../fhir/jpa/cache/ResourceVersionMap.java | 8 +- .../jpa/cache/ResourceVersionMapTest.java | 31 +++++ .../SubscriptionActivatingSubscriber.java | 7 +- .../match/registry/SubscriptionLoader.java | 10 +- .../registry/SubscriptionLoaderTest.java | 11 +- .../SubscriptionTopicR4BTest.java | 6 +- .../subscription/BaseSubscriptionsR5Test.java | 2 +- .../cache/BaseResourceCacheSynchronizer.java | 21 +--- .../fhir/jpa/api/dao/IFhirResourceDao.java | 32 +++++- .../ca/uhn/fhir/jpa/dao/ISearchBuilder.java | 35 +++++- .../jpa/dao/tx/HapiTransactionService.java | 1 - .../jpa/dao/tx/IHapiTransactionService.java | 15 +++ 17 files changed, 264 insertions(+), 82 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_0_0/5612-new-search-methods.yaml create mode 100644 hapi-fhir-jpaserver-searchparam/src/test/java/ca/uhn/fhir/jpa/cache/ResourceVersionMapTest.java diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_0_0/5612-new-search-methods.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_0_0/5612-new-search-methods.yaml new file mode 100644 index 00000000000..c395e8c3268 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_0_0/5612-new-search-methods.yaml @@ -0,0 +1,4 @@ +--- +type: change +issue: 5612 +title: "The resource dao interface now supports searching for IIdType or full resources." diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/cache/ResourceVersionSvcDaoImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/cache/ResourceVersionSvcDaoImpl.java index e1bc2ca5cb1..6cae759b3dc 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/cache/ResourceVersionSvcDaoImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/cache/ResourceVersionSvcDaoImpl.java @@ -25,23 +25,19 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.svc.IIdHelperService; import ca.uhn.fhir.jpa.dao.data.IResourceTableDao; import ca.uhn.fhir.jpa.model.dao.JpaPid; -import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; -import ca.uhn.fhir.jpa.util.QueryChunker; import ca.uhn.fhir.rest.api.server.SystemRequestDetails; import jakarta.annotation.Nonnull; import org.hl7.fhir.instance.model.api.IIdType; import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; import static org.slf4j.LoggerFactory.getLogger; @@ -64,29 +60,20 @@ public class ResourceVersionSvcDaoImpl implements IResourceVersionSvc { @Override @Nonnull - @Transactional public ResourceVersionMap getVersionMap( RequestPartitionId theRequestPartitionId, String theResourceName, SearchParameterMap theSearchParamMap) { - IFhirResourceDao dao = myDaoRegistry.getResourceDao(theResourceName); - if (ourLog.isDebugEnabled()) { ourLog.debug("About to retrieve version map for resource type: {}", theResourceName); } - List jpaPids = dao.searchForIds( - theSearchParamMap, new SystemRequestDetails().setRequestPartitionId(theRequestPartitionId)); - List matchingIds = jpaPids.stream().map(JpaPid::getId).collect(Collectors.toList()); + IFhirResourceDao dao = myDaoRegistry.getResourceDao(theResourceName); + SystemRequestDetails request = new SystemRequestDetails().setRequestPartitionId(theRequestPartitionId); - List allById = new ArrayList<>(); - new QueryChunker().chunk(matchingIds, t -> { - List nextBatch = myResourceTableDao.findAllById(t); - allById.addAll(nextBatch); - }); + List fhirIds = dao.searchForResourceIds(theSearchParamMap, request); - return ResourceVersionMap.fromResourceTableEntities(allById); + return ResourceVersionMap.fromIdsWithVersions(fhirIds); } - @Override /** * Retrieves the latest versions for any resourceid that are found. * If they are not found, they will not be contained in the returned map. @@ -98,8 +85,8 @@ public class ResourceVersionSvcDaoImpl implements IResourceVersionSvc { * * @param theRequestPartitionId - request partition id * @param theIds - list of IIdTypes for resources of interest. - * @return */ + @Override public ResourcePersistentIdMap getLatestVersionIdsForResourceIds( RequestPartitionId theRequestPartitionId, List theIds) { ResourcePersistentIdMap idToPID = new ResourcePersistentIdMap(); @@ -113,9 +100,8 @@ public class ResourceVersionSvcDaoImpl implements IResourceVersionSvc { resourceTypeToIds.get(resourceType).add(id); } - for (String resourceType : resourceTypeToIds.keySet()) { - ResourcePersistentIdMap idAndPID = - getIdsOfExistingResources(theRequestPartitionId, resourceTypeToIds.get(resourceType)); + for (List nextIds : resourceTypeToIds.values()) { + ResourcePersistentIdMap idAndPID = getIdsOfExistingResources(theRequestPartitionId, nextIds); idToPID.putAll(idAndPID); } @@ -128,7 +114,6 @@ public class ResourceVersionSvcDaoImpl implements IResourceVersionSvc { * If it's not found, it won't be included in the set. * * @param theIds - list of IIdType ids (for the same resource) - * @return */ private ResourcePersistentIdMap getIdsOfExistingResources( RequestPartitionId thePartitionId, Collection theIds) { @@ -157,9 +142,7 @@ public class ResourceVersionSvcDaoImpl implements IResourceVersionSvc { // this should always be present // since it was passed in. // but land of optionals... - idOp.ifPresent(id -> { - retval.put(id, pid); - }); + idOp.ifPresent(id -> retval.put(id, pid)); } // set any versions we don't already have @@ -167,11 +150,11 @@ public class ResourceVersionSvcDaoImpl implements IResourceVersionSvc { Collection resourceEntries = myResourceTableDao.getResourceVersionsForPid(new ArrayList<>(pidsToVersionToResourcePid.keySet())); - for (Object[] record : resourceEntries) { + for (Object[] nextRecord : resourceEntries) { // order matters! - Long retPid = (Long) record[0]; - String resType = (String) record[1]; - Long version = (Long) record[2]; + Long retPid = (Long) nextRecord[0]; + String resType = (String) nextRecord[1]; + Long version = (Long) nextRecord[2]; pidsToVersionToResourcePid.get(retPid).setVersion(version); } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java index 6a79e566d5f..079f9dc8215 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java @@ -65,11 +65,13 @@ import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider; import ca.uhn.fhir.jpa.search.PersistedJpaBundleProviderFactory; import ca.uhn.fhir.jpa.search.ResourceSearchUrlSvc; +import ca.uhn.fhir.jpa.search.builder.SearchBuilder; import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum; import ca.uhn.fhir.jpa.searchparam.MatchUrlService; import ca.uhn.fhir.jpa.searchparam.ResourceSearch; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.util.MemoryCacheService; +import ca.uhn.fhir.jpa.util.QueryChunker; import ca.uhn.fhir.model.api.IQueryParameterType; import ca.uhn.fhir.model.api.StorageResponseCodeEnum; import ca.uhn.fhir.model.dstu2.resource.BaseResource; @@ -117,7 +119,6 @@ import ca.uhn.fhir.validation.IValidatorModule; import ca.uhn.fhir.validation.ValidationOptions; import ca.uhn.fhir.validation.ValidationResult; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Streams; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import jakarta.annotation.PostConstruct; @@ -125,7 +126,6 @@ import jakarta.persistence.LockModeType; import jakarta.persistence.NoResultException; import jakarta.persistence.TypedQuery; import jakarta.servlet.http.HttpServletResponse; -import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.Validate; import org.hl7.fhir.instance.model.api.IBaseCoding; import org.hl7.fhir.instance.model.api.IBaseMetaType; @@ -156,6 +156,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.function.BiFunction; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -224,6 +225,7 @@ public abstract class BaseHapiFhirResourceDao extends B @Autowired private IFhirSystemDao mySystemDao; + @Nullable public static T invokeStoragePreShowResources( IInterceptorBroadcaster theInterceptorBroadcaster, RequestDetails theRequest, T retVal) { if (CompositeInterceptorBroadcaster.hasHooks( @@ -1568,6 +1570,7 @@ public abstract class BaseHapiFhirResourceDao extends B return retVal; } + @Nullable private T invokeStoragePreShowResources(RequestDetails theRequest, T retVal) { retVal = invokeStoragePreShowResources(myInterceptorBroadcaster, theRequest, retVal); return retVal; @@ -1577,6 +1580,23 @@ public abstract class BaseHapiFhirResourceDao extends B invokeStoragePreAccessResources(myInterceptorBroadcaster, theRequest, theId, theResource); } + private Optional invokeStoragePreAccessResources(RequestDetails theRequest, T theResource) { + if (CompositeInterceptorBroadcaster.hasHooks( + Pointcut.STORAGE_PREACCESS_RESOURCES, myInterceptorBroadcaster, theRequest)) { + SimplePreResourceAccessDetails accessDetails = new SimplePreResourceAccessDetails(theResource); + HookParams params = new HookParams() + .add(IPreResourceAccessDetails.class, accessDetails) + .add(RequestDetails.class, theRequest) + .addIfMatchesType(ServletRequestDetails.class, theRequest); + CompositeInterceptorBroadcaster.doCallHooks( + myInterceptorBroadcaster, theRequest, Pointcut.STORAGE_PREACCESS_RESOURCES, params); + if (accessDetails.isDontReturnResourceAtIndex(0)) { + return Optional.empty(); + } + } + return Optional.of(theResource); + } + @Override public BaseHasResource readEntity(IIdType theId, RequestDetails theRequest) { RequestPartitionId requestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequestForRead( @@ -2043,12 +2063,12 @@ public abstract class BaseHapiFhirResourceDao extends B .withRequest(theRequest) .withTransactionDetails(transactionDetails) .withRequestPartitionId(requestPartitionId) - .execute(() -> { + .searchList(() -> { if (isNull(theParams.getLoadSynchronousUpTo())) { theParams.setLoadSynchronousUpTo(myStorageSettings.getInternalSynchronousSearchSize()); } - ISearchBuilder builder = + ISearchBuilder builder = mySearchBuilderFactory.newSearchBuilder(this, getResourceName(), getResourceType()); List ids = new ArrayList<>(); @@ -2074,6 +2094,7 @@ public abstract class BaseHapiFhirResourceDao extends B SearchParameterMap theParams, RequestDetails theRequest, @Nullable IBaseResource theConditionalOperationTargetOrNull) { + // the Stream is useless outside the bound connection time, so require our caller to have a session. HapiTransactionService.requireTransaction(); @@ -2081,15 +2102,83 @@ public abstract class BaseHapiFhirResourceDao extends B myRequestPartitionHelperService.determineReadPartitionForRequestForSearchType( theRequest, myResourceName, theParams, theConditionalOperationTargetOrNull); - ISearchBuilder builder = mySearchBuilderFactory.newSearchBuilder(this, getResourceName(), getResourceType()); + ISearchBuilder builder = + mySearchBuilderFactory.newSearchBuilder(this, getResourceName(), getResourceType()); String uuid = UUID.randomUUID().toString(); SearchRuntimeDetails searchRuntimeDetails = new SearchRuntimeDetails(theRequest, uuid); - IResultIterator iter = - builder.createQuery(theParams, searchRuntimeDetails, theRequest, requestPartitionId); - // Adapt IResultIterator to stream, and connect the close handler. - return Streams.stream(iter).onClose(() -> IOUtils.closeQuietly(iter)); + //noinspection unchecked + return (Stream) myTransactionService + .withRequest(theRequest) + .search(() -> + builder.createQueryStream(theParams, searchRuntimeDetails, theRequest, requestPartitionId)); + } + + @Override + public List searchForResources(SearchParameterMap theParams, RequestDetails theRequest) { + return searchForTransformedIds(theParams, theRequest, this::pidsToResource); + } + + @Override + public List searchForResourceIds(SearchParameterMap theParams, RequestDetails theRequest) { + return searchForTransformedIds(theParams, theRequest, this::pidsToIds); + } + + private List searchForTransformedIds( + SearchParameterMap theParams, + RequestDetails theRequest, + BiFunction, Stream> transform) { + RequestPartitionId requestPartitionId = + myRequestPartitionHelperService.determineReadPartitionForRequestForSearchType( + theRequest, myResourceName, theParams, null); + + String uuid = UUID.randomUUID().toString(); + + SearchRuntimeDetails searchRuntimeDetails = new SearchRuntimeDetails(theRequest, uuid); + return myTransactionService + .withRequest(theRequest) + .withPropagation(Propagation.REQUIRED) + .searchList(() -> { + ISearchBuilder builder = + mySearchBuilderFactory.newSearchBuilder(this, getResourceName(), getResourceType()); + Stream pidStream = + builder.createQueryStream(theParams, searchRuntimeDetails, theRequest, requestPartitionId); + + Stream transformedStream = transform.apply(theRequest, pidStream); + + return transformedStream.collect(Collectors.toList()); + }); + } + + /** + * Fetch the resources in chunks and apply PreAccess/PreShow interceptors. + */ + @Nonnull + private Stream pidsToResource(RequestDetails theRequest, Stream pidStream) { + ISearchBuilder searchBuilder = + mySearchBuilderFactory.newSearchBuilder(this, getResourceName(), getResourceType()); + @SuppressWarnings("unchecked") + Stream resourceStream = (Stream) new QueryChunker<>() + .chunk(pidStream, SearchBuilder.getMaximumPageSize()) + .flatMap(pidChunk -> searchBuilder.loadResourcesByPid(pidChunk, theRequest).stream()); + // apply interceptors + return resourceStream + .flatMap(resource -> invokeStoragePreAccessResources(theRequest, resource).stream()) + .flatMap(resource -> Optional.ofNullable(invokeStoragePreShowResources(theRequest, resource)).stream()); + } + + /** + * get the Ids from the ResourceTable entities in chunks. + */ + @Nonnull + private Stream pidsToIds(RequestDetails theRequestDetails, Stream thePidStream) { + Stream longStream = thePidStream.map(JpaPid::getId); + + return new QueryChunker<>() + .chunk(longStream, SearchBuilder.getMaximumPageSize()) + .flatMap(ids -> myResourceTableDao.findAllById(ids).stream()) + .map(ResourceTable::getIdDt); } protected MT toMetaDt(Class theType, Collection tagDefinitions) { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/JpaResourceDaoSearchParameter.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/JpaResourceDaoSearchParameter.java index 38870ac0f6e..1217ef2bee5 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/JpaResourceDaoSearchParameter.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/JpaResourceDaoSearchParameter.java @@ -28,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.r5.model.Enumeration; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; @@ -60,8 +61,14 @@ public class JpaResourceDaoSearchParameter extends Base TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCommit() { - myCacheReloadTriggered.set(false); - mySearchParamRegistry.forceRefresh(); + myTransactionService + .withSystemRequest() + .withPropagation(Propagation.NOT_SUPPORTED) + .execute(() -> { + // do this outside any current tx. + myCacheReloadTriggered.set(false); + mySearchParamRegistry.forceRefresh(); + }); } }); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/tasks/SearchTask.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/tasks/SearchTask.java index d7cb95d4a5d..cbec057c33a 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/tasks/SearchTask.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/builder/tasks/SearchTask.java @@ -301,7 +301,7 @@ public class SearchTask implements Callable { // the user has a chance to know that they were in the results if (mySearchRuntimeDetails.getRequestDetails() != null && !unsyncedPids.isEmpty()) { JpaPreResourceAccessDetails accessDetails = - new JpaPreResourceAccessDetails(unsyncedPids, () -> newSearchBuilder()); + new JpaPreResourceAccessDetails(unsyncedPids, this::newSearchBuilder); HookParams params = new HookParams() .add(IPreResourceAccessDetails.class, accessDetails) .add(RequestDetails.class, mySearchRuntimeDetails.getRequestDetails()) @@ -446,7 +446,7 @@ public class SearchTask implements Callable { myTxService .withRequest(myRequest) .withRequestPartitionId(myRequestPartitionId) - .execute(() -> doSearch()); + .execute(this::doSearch); mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus()); if (mySearch.getStatus() == SearchStatusEnum.FINISHED) { diff --git a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/cache/ResourceVersionMap.java b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/cache/ResourceVersionMap.java index 476014383c5..4699ede1c28 100644 --- a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/cache/ResourceVersionMap.java +++ b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/cache/ResourceVersionMap.java @@ -44,7 +44,7 @@ public class ResourceVersionMap { private ResourceVersionMap() {} - public static ResourceVersionMap fromResourceTableEntities(List theEntities) { + public static ResourceVersionMap fromResourceTableEntities(List> theEntities) { ResourceVersionMap retval = new ResourceVersionMap(); theEntities.forEach(entity -> retval.add(entity.getIdDt())); return retval; @@ -60,6 +60,12 @@ public class ResourceVersionMap { return new ResourceVersionMap(); } + public static ResourceVersionMap fromIdsWithVersions(List theFhirIds) { + ResourceVersionMap retval = new ResourceVersionMap(); + theFhirIds.forEach(retval::add); + return retval; + } + private void add(IIdType theId) { if (theId.getVersionIdPart() == null) { ourLog.warn("Not storing {} in ResourceVersionMap because it does not have a version.", theId); diff --git a/hapi-fhir-jpaserver-searchparam/src/test/java/ca/uhn/fhir/jpa/cache/ResourceVersionMapTest.java b/hapi-fhir-jpaserver-searchparam/src/test/java/ca/uhn/fhir/jpa/cache/ResourceVersionMapTest.java new file mode 100644 index 00000000000..0b766fdc749 --- /dev/null +++ b/hapi-fhir-jpaserver-searchparam/src/test/java/ca/uhn/fhir/jpa/cache/ResourceVersionMapTest.java @@ -0,0 +1,31 @@ +package ca.uhn.fhir.jpa.cache; + +import ca.uhn.fhir.model.primitive.IdDt; +import org.hl7.fhir.instance.model.api.IIdType; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.*; + +class ResourceVersionMapTest { + + @Test + void testCreate_fromIds() { + // given + List ids = List.of( + new IdDt("Patient", "p1", "2"), + new IdDt("Patient", "p2", "1"), + new IdDt("Observation", "o1", "1") + ); + + // when + ResourceVersionMap resourceVersionMap = ResourceVersionMap.fromIdsWithVersions(ids); + + // then + assertEquals(Set.copyOf(ids), resourceVersionMap.getSourceIds()); + assertEquals(2, resourceVersionMap.get(new IdDt("Patient", "p1"))); + } + +} diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionActivatingSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionActivatingSubscriber.java index efd0ee984d8..a07d0eaf361 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionActivatingSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionActivatingSubscriber.java @@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber; import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.model.entity.StorageSettings; @@ -27,6 +28,7 @@ import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; +import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.server.SystemRequestDetails; import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; @@ -156,7 +158,10 @@ public class SubscriptionActivatingSubscriber implements MessageHandler { SubscriptionConstants.REQUESTED_STATUS, SubscriptionConstants.ACTIVE_STATUS); SubscriptionUtil.setStatus(myFhirContext, subscription, SubscriptionConstants.ACTIVE_STATUS); - subscriptionDao.update(subscription, srd); + + RequestPartitionId partitionId = + (RequestPartitionId) subscription.getUserData(Constants.RESOURCE_PARTITION_ID); + subscriptionDao.update(subscription, new SystemRequestDetails().setRequestPartitionId(partitionId)); return true; } catch (final UnprocessableEntityException | ResourceGoneException e) { subscription = subscription != null ? subscription : theSubscription; diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoader.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoader.java index 130fe8edace..4bea23c546f 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoader.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoader.java @@ -26,6 +26,7 @@ import ca.uhn.fhir.rest.param.TokenOrListParam; import ca.uhn.fhir.rest.param.TokenParam; import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; import ca.uhn.fhir.subscription.SubscriptionConstants; +import com.google.common.annotations.VisibleForTesting; import jakarta.annotation.Nonnull; import org.apache.commons.lang3.StringUtils; import org.hl7.fhir.instance.model.api.IBaseResource; @@ -60,8 +61,9 @@ public class SubscriptionLoader extends BaseResourceCacheSynchronizer { super("Subscription"); } + @VisibleForTesting public int doSyncSubscriptionsForUnitTest() { - return super.doSyncResourcessForUnitTest(); + return super.doSyncResourcesForUnitTest(); } @Override @@ -119,7 +121,7 @@ public class SubscriptionLoader extends BaseResourceCacheSynchronizer { } /** - * @param theSubscription + * Check status of theSubscription and update to "active" if needed. * @return true if activated */ private boolean activateSubscriptionIfRequested(IBaseResource theSubscription) { @@ -162,8 +164,8 @@ public class SubscriptionLoader extends BaseResourceCacheSynchronizer { error = ""; } ourLog.error( - "Subscription {} could not be activated." - + " This will not prevent startup, but it could lead to undesirable outcomes! {}", + "Subscription {} could not be activated. " + + "This will not prevent startup, but it could lead to undesirable outcomes! {}", theSubscription.getIdElement().getIdPart(), (StringUtils.isBlank(error) ? "" : "Error: " + error)); } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoaderTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoaderTest.java index e262fd813ae..52b615c277c 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoaderTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoaderTest.java @@ -26,7 +26,6 @@ import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; -import java.util.Collections; import java.util.List; import static org.hamcrest.MatcherAssert.assertThat; @@ -108,11 +107,9 @@ public class SubscriptionLoaderTest { when(myDaoRegistry.getResourceDao("Subscription")) .thenReturn(mySubscriptionDao); when(myDaoRegistry.isResourceTypeSupported("Subscription")) - .thenReturn(true); - when(mySubscriptionDao.search(any(SearchParameterMap.class), any(SystemRequestDetails.class))) - .thenReturn(getSubscriptionList( - Collections.singletonList(subscription) - )); + .thenReturn(true); + when(mySubscriptionDao.searchForResources(any(SearchParameterMap.class), any(SystemRequestDetails.class))) + .thenReturn(List.of(subscription)); when(mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(any(IBaseResource.class))) .thenReturn(false); @@ -127,7 +124,7 @@ public class SubscriptionLoaderTest { // verify verify(mySubscriptionDao) - .search(any(SearchParameterMap.class), any(SystemRequestDetails.class)); + .searchForResources(any(SearchParameterMap.class), any(SystemRequestDetails.class)); String expected = "Subscription " + subscription.getIdElement().getIdPart() diff --git a/hapi-fhir-jpaserver-test-r4b/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTopicR4BTest.java b/hapi-fhir-jpaserver-test-r4b/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTopicR4BTest.java index 42480efa364..4c3110a56a2 100644 --- a/hapi-fhir-jpaserver-test-r4b/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTopicR4BTest.java +++ b/hapi-fhir-jpaserver-test-r4b/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTopicR4BTest.java @@ -78,7 +78,7 @@ public class SubscriptionTopicR4BTest extends BaseSubscriptionsR4BTest { public void testCreate() throws Exception { // WIP SR4B test update, delete, etc createEncounterSubscriptionTopic(SubscriptionTopic.InteractionTrigger.CREATE); - mySubscriptionTopicLoader.doSyncResourcessForUnitTest(); + mySubscriptionTopicLoader.doSyncResourcesForUnitTest(); waitForRegisteredSubscriptionTopicCount(); Subscription subscription = createTopicSubscription(); @@ -105,7 +105,7 @@ public class SubscriptionTopicR4BTest extends BaseSubscriptionsR4BTest { public void testUpdate() throws Exception { // WIP SR4B test update, delete, etc createEncounterSubscriptionTopic(SubscriptionTopic.InteractionTrigger.CREATE, SubscriptionTopic.InteractionTrigger.UPDATE); - mySubscriptionTopicLoader.doSyncResourcessForUnitTest(); + mySubscriptionTopicLoader.doSyncResourcesForUnitTest(); waitForRegisteredSubscriptionTopicCount(); Subscription subscription = createTopicSubscription(); @@ -168,7 +168,7 @@ public class SubscriptionTopicR4BTest extends BaseSubscriptionsR4BTest { if (size == 1) { return true; } - mySubscriptionTopicLoader.doSyncResourcessForUnitTest(); + mySubscriptionTopicLoader.doSyncResourcesForUnitTest(); return mySubscriptionTopicRegistry.size() == 1; } diff --git a/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR5Test.java b/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR5Test.java index 44dd5e99568..e5dbce55528 100644 --- a/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR5Test.java +++ b/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR5Test.java @@ -283,7 +283,7 @@ public abstract class BaseSubscriptionsR5Test extends BaseResourceProviderR5Test if (size == theTarget) { return true; } - mySubscriptionTopicLoader.doSyncResourcessForUnitTest(); + mySubscriptionTopicLoader.doSyncResourcesForUnitTest(); return mySubscriptionTopicRegistry.size() == theTarget; } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/cache/BaseResourceCacheSynchronizer.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/cache/BaseResourceCacheSynchronizer.java index bacdb0e64e8..26f064a3630 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/cache/BaseResourceCacheSynchronizer.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/cache/BaseResourceCacheSynchronizer.java @@ -27,9 +27,7 @@ import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache; import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.retry.Retrier; -import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.SystemRequestDetails; -import ca.uhn.fhir.subscription.SubscriptionConstants; import com.google.common.annotations.VisibleForTesting; import jakarta.annotation.Nonnull; import jakarta.annotation.PostConstruct; @@ -127,7 +125,7 @@ public abstract class BaseResourceCacheSynchronizer implements IResourceChangeLi } @VisibleForTesting - public int doSyncResourcessForUnitTest() { + public int doSyncResourcesForUnitTest() { // Two passes for delete flag to take effect int first = doSyncResourcesWithRetry(); int second = doSyncResourcesWithRetry(); @@ -141,6 +139,7 @@ public abstract class BaseResourceCacheSynchronizer implements IResourceChangeLi return syncResourceRetrier.runWithRetry(); } + @SuppressWarnings("unchecked") private int doSyncResources() { if (isStopping()) { return 0; @@ -149,20 +148,8 @@ public abstract class BaseResourceCacheSynchronizer implements IResourceChangeLi synchronized (mySyncResourcesLock) { ourLog.debug("Starting sync {}s", myResourceName); - IBundleProvider resourceBundleList = getResourceDao().search(mySearchParameterMap, mySystemRequestDetails); - - Integer resourceCount = resourceBundleList.size(); - assert resourceCount != null; - if (resourceCount >= SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS) { - ourLog.error( - "Currently over {} {}s. Some {}s have not been loaded.", - SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS, - myResourceName, - myResourceName); - } - - List resourceList = resourceBundleList.getResources(0, resourceCount); - + List resourceList = (List) + getResourceDao().searchForResources(mySearchParameterMap, mySystemRequestDetails); return syncResourcesIntoCache(resourceList); } } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/IFhirResourceDao.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/IFhirResourceDao.java index 01a8becf7d8..7082fed8240 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/IFhirResourceDao.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/IFhirResourceDao.java @@ -54,6 +54,7 @@ import java.util.Collection; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -370,8 +371,10 @@ public interface IFhirResourceDao extends IDao { /** * Search results matching theParams. - * The Stream MUST be called within a transaction because the stream wraps an open query ResultSet. + * This call does not currently invoke any interceptors, so should only be used for infrastructure that + * will not need to participate in the consent services, or caching. * The Stream MUST be closed to avoid leaking resources. + * If called within a transaction, the Stream will fail if passed outside the tx boundary. * @param theParams the search * @param theRequest for partition target info * @return a Stream that MUST only be used within the calling transaction. @@ -384,9 +387,30 @@ public interface IFhirResourceDao extends IDao { return iResourcePersistentIds.stream(); } - default > Stream searchForIdStream( - SearchParameterMap theParams, RequestDetails theRequest) { - return searchForIdStream(theParams, theRequest, null); + /** + * Return all search results matching theParams. + * Will load all resources into ram, so not appropriate for large data sets. + * This call invokes both preaccess and preshow interceptors. + * @param theParams the search + * @param theRequest for partition target info + */ + default List searchForResources(SearchParameterMap theParams, RequestDetails theRequest) { + IBundleProvider provider = search(theParams, theRequest); + //noinspection unchecked + return (List) provider.getAllResources(); + } + + /** + * Return the FHIR Ids matching theParams. + * This call does not currently invoke any interceptors, so should only be used for infrastructure that + * will not need to participate in the consent services, or caching. + * @param theParams the search + * @param theRequest for partition target info + */ + default List searchForResourceIds(SearchParameterMap theParams, RequestDetails theRequest) { + return searchForResources(theParams, theRequest).stream() + .map(IBaseResource::getIdElement) + .collect(Collectors.toList()); } /** diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/ISearchBuilder.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/ISearchBuilder.java index 6ca5ce2350f..7b79e60658d 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/ISearchBuilder.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/ISearchBuilder.java @@ -28,23 +28,47 @@ import ca.uhn.fhir.model.api.Include; import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; import ca.uhn.fhir.rest.param.DateRangeParam; +import com.google.common.collect.Streams; import jakarta.annotation.Nonnull; import jakarta.persistence.EntityManager; +import org.apache.commons.io.IOUtils; import org.hl7.fhir.instance.model.api.IBaseResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.stream.Stream; public interface ISearchBuilder> { + static final Logger ourLog = LoggerFactory.getLogger(ISearchBuilder.class); + String SEARCH_BUILDER_BEAN_NAME = "SearchBuilder"; - IResultIterator createQuery( + IResultIterator createQuery( SearchParameterMap theParams, SearchRuntimeDetails theSearchRuntime, RequestDetails theRequest, @Nonnull RequestPartitionId theRequestPartitionId); + /** + * Stream equivalent of createQuery. + * Note: the Stream must be closed. + */ + default Stream createQueryStream( + SearchParameterMap theParams, + SearchRuntimeDetails theSearchRuntime, + RequestDetails theRequest, + @Nonnull RequestPartitionId theRequestPartitionId) { + IResultIterator iter = createQuery(theParams, theSearchRuntime, theRequest, theRequestPartitionId); + // Adapt IResultIterator to stream + Stream stream = Streams.stream(iter); + // The iterator might have an open ResultSet. Connect the close handler. + return stream.onClose(() -> IOUtils.closeQuietly(iter)); + } + Long createCountQuery( SearchParameterMap theParams, String theSearchUuid, @@ -60,6 +84,15 @@ public interface ISearchBuilder> { boolean theForHistoryOperation, RequestDetails theDetails); + default List loadResourcesByPid(Collection thePids, RequestDetails theDetails) { + ArrayList result = new ArrayList<>(); + loadResourcesByPid(thePids, List.of(), result, false, theDetails); + if (result.size() != thePids.size()) { + ourLog.warn("Only found {} resources for {} pids", result.size(), thePids.size()); + } + return result; + } + /** * Use the loadIncludes that takes a parameters object instead. */ diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java index 46023360cba..247eb8d0232 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java @@ -439,7 +439,6 @@ public class HapiTransactionService implements IHapiTransactionService { } } - // wipmb is Clone ok, or do we want an explicit copy constructor? protected class ExecutionBuilder implements IExecutionBuilder, TransactionOperations, Cloneable { private final RequestDetails myRequestDetails; diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/IHapiTransactionService.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/IHapiTransactionService.java index 401f0f66e1a..a3627d16083 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/IHapiTransactionService.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/IHapiTransactionService.java @@ -30,6 +30,7 @@ import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.support.TransactionCallback; import org.springframework.transaction.support.TransactionOperations; +import java.util.List; import java.util.concurrent.Callable; import java.util.stream.Stream; @@ -109,12 +110,26 @@ public interface IHapiTransactionService { T execute(@Nonnull TransactionCallback callback); + /** + * Read query path. + */ default T read(Callable theCallback) { return execute(theCallback); } + /** + * Search for open Stream. + * The Stream may not be readable outside an outermost transaction. + */ default Stream search(Callable> theCallback) { return execute(theCallback); } + + /** + * Search for concrete List. + */ + default List searchList(Callable> theCallback) { + return execute(theCallback); + } } }