Dao support for searching for FHIR ids, and full resources. (#5612)

Move several search transformations inside the resource dao.

Create search for FHIR Ids, and search for Resources entry points.
This commit is contained in:
Michael Buckley 2024-01-20 00:42:56 -05:00 committed by GitHub
parent 9438754b72
commit 4aa3b97228
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 264 additions and 82 deletions

View File

@ -0,0 +1,4 @@
---
type: change
issue: 5612
title: "The resource dao interface now supports searching for IIdType or full resources."

View File

@ -25,23 +25,19 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
import ca.uhn.fhir.jpa.model.dao.JpaPid;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.QueryChunker;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import jakarta.annotation.Nonnull;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.slf4j.LoggerFactory.getLogger;
@ -64,29 +60,20 @@ public class ResourceVersionSvcDaoImpl implements IResourceVersionSvc {
@Override
@Nonnull
@Transactional
public ResourceVersionMap getVersionMap(
RequestPartitionId theRequestPartitionId, String theResourceName, SearchParameterMap theSearchParamMap) {
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(theResourceName);
if (ourLog.isDebugEnabled()) {
ourLog.debug("About to retrieve version map for resource type: {}", theResourceName);
}
List<JpaPid> jpaPids = dao.searchForIds(
theSearchParamMap, new SystemRequestDetails().setRequestPartitionId(theRequestPartitionId));
List<Long> matchingIds = jpaPids.stream().map(JpaPid::getId).collect(Collectors.toList());
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(theResourceName);
SystemRequestDetails request = new SystemRequestDetails().setRequestPartitionId(theRequestPartitionId);
List<ResourceTable> allById = new ArrayList<>();
new QueryChunker<Long>().chunk(matchingIds, t -> {
List<ResourceTable> nextBatch = myResourceTableDao.findAllById(t);
allById.addAll(nextBatch);
});
List<IIdType> fhirIds = dao.searchForResourceIds(theSearchParamMap, request);
return ResourceVersionMap.fromResourceTableEntities(allById);
return ResourceVersionMap.fromIdsWithVersions(fhirIds);
}
@Override
/**
* Retrieves the latest versions for any resourceid that are found.
* If they are not found, they will not be contained in the returned map.
@ -98,8 +85,8 @@ public class ResourceVersionSvcDaoImpl implements IResourceVersionSvc {
*
* @param theRequestPartitionId - request partition id
* @param theIds - list of IIdTypes for resources of interest.
* @return
*/
@Override
public ResourcePersistentIdMap getLatestVersionIdsForResourceIds(
RequestPartitionId theRequestPartitionId, List<IIdType> theIds) {
ResourcePersistentIdMap idToPID = new ResourcePersistentIdMap();
@ -113,9 +100,8 @@ public class ResourceVersionSvcDaoImpl implements IResourceVersionSvc {
resourceTypeToIds.get(resourceType).add(id);
}
for (String resourceType : resourceTypeToIds.keySet()) {
ResourcePersistentIdMap idAndPID =
getIdsOfExistingResources(theRequestPartitionId, resourceTypeToIds.get(resourceType));
for (List<IIdType> nextIds : resourceTypeToIds.values()) {
ResourcePersistentIdMap idAndPID = getIdsOfExistingResources(theRequestPartitionId, nextIds);
idToPID.putAll(idAndPID);
}
@ -128,7 +114,6 @@ public class ResourceVersionSvcDaoImpl implements IResourceVersionSvc {
* If it's not found, it won't be included in the set.
*
* @param theIds - list of IIdType ids (for the same resource)
* @return
*/
private ResourcePersistentIdMap getIdsOfExistingResources(
RequestPartitionId thePartitionId, Collection<IIdType> theIds) {
@ -157,9 +142,7 @@ public class ResourceVersionSvcDaoImpl implements IResourceVersionSvc {
// this should always be present
// since it was passed in.
// but land of optionals...
idOp.ifPresent(id -> {
retval.put(id, pid);
});
idOp.ifPresent(id -> retval.put(id, pid));
}
// set any versions we don't already have
@ -167,11 +150,11 @@ public class ResourceVersionSvcDaoImpl implements IResourceVersionSvc {
Collection<Object[]> resourceEntries =
myResourceTableDao.getResourceVersionsForPid(new ArrayList<>(pidsToVersionToResourcePid.keySet()));
for (Object[] record : resourceEntries) {
for (Object[] nextRecord : resourceEntries) {
// order matters!
Long retPid = (Long) record[0];
String resType = (String) record[1];
Long version = (Long) record[2];
Long retPid = (Long) nextRecord[0];
String resType = (String) nextRecord[1];
Long version = (Long) nextRecord[2];
pidsToVersionToResourcePid.get(retPid).setVersion(version);
}
}

View File

@ -65,11 +65,13 @@ import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider;
import ca.uhn.fhir.jpa.search.PersistedJpaBundleProviderFactory;
import ca.uhn.fhir.jpa.search.ResourceSearchUrlSvc;
import ca.uhn.fhir.jpa.search.builder.SearchBuilder;
import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.ResourceSearch;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.MemoryCacheService;
import ca.uhn.fhir.jpa.util.QueryChunker;
import ca.uhn.fhir.model.api.IQueryParameterType;
import ca.uhn.fhir.model.api.StorageResponseCodeEnum;
import ca.uhn.fhir.model.dstu2.resource.BaseResource;
@ -117,7 +119,6 @@ import ca.uhn.fhir.validation.IValidatorModule;
import ca.uhn.fhir.validation.ValidationOptions;
import ca.uhn.fhir.validation.ValidationResult;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Streams;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
@ -125,7 +126,6 @@ import jakarta.persistence.LockModeType;
import jakarta.persistence.NoResultException;
import jakarta.persistence.TypedQuery;
import jakarta.servlet.http.HttpServletResponse;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseCoding;
import org.hl7.fhir.instance.model.api.IBaseMetaType;
@ -156,6 +156,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -224,6 +225,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
@Autowired
private IFhirSystemDao<?, ?> mySystemDao;
@Nullable
public static <T extends IBaseResource> T invokeStoragePreShowResources(
IInterceptorBroadcaster theInterceptorBroadcaster, RequestDetails theRequest, T retVal) {
if (CompositeInterceptorBroadcaster.hasHooks(
@ -1568,6 +1570,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
return retVal;
}
@Nullable
private T invokeStoragePreShowResources(RequestDetails theRequest, T retVal) {
retVal = invokeStoragePreShowResources(myInterceptorBroadcaster, theRequest, retVal);
return retVal;
@ -1577,6 +1580,23 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
invokeStoragePreAccessResources(myInterceptorBroadcaster, theRequest, theId, theResource);
}
private Optional<T> invokeStoragePreAccessResources(RequestDetails theRequest, T theResource) {
if (CompositeInterceptorBroadcaster.hasHooks(
Pointcut.STORAGE_PREACCESS_RESOURCES, myInterceptorBroadcaster, theRequest)) {
SimplePreResourceAccessDetails accessDetails = new SimplePreResourceAccessDetails(theResource);
HookParams params = new HookParams()
.add(IPreResourceAccessDetails.class, accessDetails)
.add(RequestDetails.class, theRequest)
.addIfMatchesType(ServletRequestDetails.class, theRequest);
CompositeInterceptorBroadcaster.doCallHooks(
myInterceptorBroadcaster, theRequest, Pointcut.STORAGE_PREACCESS_RESOURCES, params);
if (accessDetails.isDontReturnResourceAtIndex(0)) {
return Optional.empty();
}
}
return Optional.of(theResource);
}
@Override
public BaseHasResource readEntity(IIdType theId, RequestDetails theRequest) {
RequestPartitionId requestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequestForRead(
@ -2043,12 +2063,12 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
.withRequest(theRequest)
.withTransactionDetails(transactionDetails)
.withRequestPartitionId(requestPartitionId)
.execute(() -> {
.searchList(() -> {
if (isNull(theParams.getLoadSynchronousUpTo())) {
theParams.setLoadSynchronousUpTo(myStorageSettings.getInternalSynchronousSearchSize());
}
ISearchBuilder<?> builder =
ISearchBuilder<JpaPid> builder =
mySearchBuilderFactory.newSearchBuilder(this, getResourceName(), getResourceType());
List<JpaPid> ids = new ArrayList<>();
@ -2074,6 +2094,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
SearchParameterMap theParams,
RequestDetails theRequest,
@Nullable IBaseResource theConditionalOperationTargetOrNull) {
// the Stream is useless outside the bound connection time, so require our caller to have a session.
HapiTransactionService.requireTransaction();
@ -2081,15 +2102,83 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
myRequestPartitionHelperService.determineReadPartitionForRequestForSearchType(
theRequest, myResourceName, theParams, theConditionalOperationTargetOrNull);
ISearchBuilder<?> builder = mySearchBuilderFactory.newSearchBuilder(this, getResourceName(), getResourceType());
ISearchBuilder<JpaPid> builder =
mySearchBuilderFactory.newSearchBuilder(this, getResourceName(), getResourceType());
String uuid = UUID.randomUUID().toString();
SearchRuntimeDetails searchRuntimeDetails = new SearchRuntimeDetails(theRequest, uuid);
IResultIterator<PID> iter =
builder.createQuery(theParams, searchRuntimeDetails, theRequest, requestPartitionId);
// Adapt IResultIterator to stream, and connect the close handler.
return Streams.stream(iter).onClose(() -> IOUtils.closeQuietly(iter));
//noinspection unchecked
return (Stream<PID>) myTransactionService
.withRequest(theRequest)
.search(() ->
builder.createQueryStream(theParams, searchRuntimeDetails, theRequest, requestPartitionId));
}
@Override
public List<T> searchForResources(SearchParameterMap theParams, RequestDetails theRequest) {
return searchForTransformedIds(theParams, theRequest, this::pidsToResource);
}
@Override
public List<IIdType> searchForResourceIds(SearchParameterMap theParams, RequestDetails theRequest) {
return searchForTransformedIds(theParams, theRequest, this::pidsToIds);
}
private <V> List<V> searchForTransformedIds(
SearchParameterMap theParams,
RequestDetails theRequest,
BiFunction<RequestDetails, Stream<JpaPid>, Stream<V>> transform) {
RequestPartitionId requestPartitionId =
myRequestPartitionHelperService.determineReadPartitionForRequestForSearchType(
theRequest, myResourceName, theParams, null);
String uuid = UUID.randomUUID().toString();
SearchRuntimeDetails searchRuntimeDetails = new SearchRuntimeDetails(theRequest, uuid);
return myTransactionService
.withRequest(theRequest)
.withPropagation(Propagation.REQUIRED)
.searchList(() -> {
ISearchBuilder<JpaPid> builder =
mySearchBuilderFactory.newSearchBuilder(this, getResourceName(), getResourceType());
Stream<JpaPid> pidStream =
builder.createQueryStream(theParams, searchRuntimeDetails, theRequest, requestPartitionId);
Stream<V> transformedStream = transform.apply(theRequest, pidStream);
return transformedStream.collect(Collectors.toList());
});
}
/**
* Fetch the resources in chunks and apply PreAccess/PreShow interceptors.
*/
@Nonnull
private Stream<T> pidsToResource(RequestDetails theRequest, Stream<JpaPid> pidStream) {
ISearchBuilder<JpaPid> searchBuilder =
mySearchBuilderFactory.newSearchBuilder(this, getResourceName(), getResourceType());
@SuppressWarnings("unchecked")
Stream<T> resourceStream = (Stream<T>) new QueryChunker<>()
.chunk(pidStream, SearchBuilder.getMaximumPageSize())
.flatMap(pidChunk -> searchBuilder.loadResourcesByPid(pidChunk, theRequest).stream());
// apply interceptors
return resourceStream
.flatMap(resource -> invokeStoragePreAccessResources(theRequest, resource).stream())
.flatMap(resource -> Optional.ofNullable(invokeStoragePreShowResources(theRequest, resource)).stream());
}
/**
* get the Ids from the ResourceTable entities in chunks.
*/
@Nonnull
private Stream<IIdType> pidsToIds(RequestDetails theRequestDetails, Stream<JpaPid> thePidStream) {
Stream<Long> longStream = thePidStream.map(JpaPid::getId);
return new QueryChunker<>()
.chunk(longStream, SearchBuilder.getMaximumPageSize())
.flatMap(ids -> myResourceTableDao.findAllById(ids).stream())
.map(ResourceTable::getIdDt);
}
protected <MT extends IBaseMetaType> MT toMetaDt(Class<MT> theType, Collection<TagDefinition> tagDefinitions) {

View File

@ -28,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r5.model.Enumeration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
@ -60,8 +61,14 @@ public class JpaResourceDaoSearchParameter<T extends IBaseResource> extends Base
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
myCacheReloadTriggered.set(false);
mySearchParamRegistry.forceRefresh();
myTransactionService
.withSystemRequest()
.withPropagation(Propagation.NOT_SUPPORTED)
.execute(() -> {
// do this outside any current tx.
myCacheReloadTriggered.set(false);
mySearchParamRegistry.forceRefresh();
});
}
});
}

View File

@ -301,7 +301,7 @@ public class SearchTask implements Callable<Void> {
// the user has a chance to know that they were in the results
if (mySearchRuntimeDetails.getRequestDetails() != null && !unsyncedPids.isEmpty()) {
JpaPreResourceAccessDetails accessDetails =
new JpaPreResourceAccessDetails(unsyncedPids, () -> newSearchBuilder());
new JpaPreResourceAccessDetails(unsyncedPids, this::newSearchBuilder);
HookParams params = new HookParams()
.add(IPreResourceAccessDetails.class, accessDetails)
.add(RequestDetails.class, mySearchRuntimeDetails.getRequestDetails())
@ -446,7 +446,7 @@ public class SearchTask implements Callable<Void> {
myTxService
.withRequest(myRequest)
.withRequestPartitionId(myRequestPartitionId)
.execute(() -> doSearch());
.execute(this::doSearch);
mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus());
if (mySearch.getStatus() == SearchStatusEnum.FINISHED) {

View File

@ -44,7 +44,7 @@ public class ResourceVersionMap {
private ResourceVersionMap() {}
public static ResourceVersionMap fromResourceTableEntities(List<? extends IBasePersistedResource> theEntities) {
public static ResourceVersionMap fromResourceTableEntities(List<? extends IBasePersistedResource<?>> theEntities) {
ResourceVersionMap retval = new ResourceVersionMap();
theEntities.forEach(entity -> retval.add(entity.getIdDt()));
return retval;
@ -60,6 +60,12 @@ public class ResourceVersionMap {
return new ResourceVersionMap();
}
public static ResourceVersionMap fromIdsWithVersions(List<IIdType> theFhirIds) {
ResourceVersionMap retval = new ResourceVersionMap();
theFhirIds.forEach(retval::add);
return retval;
}
private void add(IIdType theId) {
if (theId.getVersionIdPart() == null) {
ourLog.warn("Not storing {} in ResourceVersionMap because it does not have a version.", theId);

View File

@ -0,0 +1,31 @@
package ca.uhn.fhir.jpa.cache;
import ca.uhn.fhir.model.primitive.IdDt;
import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.*;
class ResourceVersionMapTest {
@Test
void testCreate_fromIds() {
// given
List<IIdType> ids = List.of(
new IdDt("Patient", "p1", "2"),
new IdDt("Patient", "p2", "1"),
new IdDt("Observation", "o1", "1")
);
// when
ResourceVersionMap resourceVersionMap = ResourceVersionMap.fromIdsWithVersions(ids);
// then
assertEquals(Set.copyOf(ids), resourceVersionMap.getSourceIds());
assertEquals(2, resourceVersionMap.get(new IdDt("Patient", "p1")));
}
}

View File

@ -20,6 +20,7 @@
package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
@ -27,6 +28,7 @@ import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
@ -156,7 +158,10 @@ public class SubscriptionActivatingSubscriber implements MessageHandler {
SubscriptionConstants.REQUESTED_STATUS,
SubscriptionConstants.ACTIVE_STATUS);
SubscriptionUtil.setStatus(myFhirContext, subscription, SubscriptionConstants.ACTIVE_STATUS);
subscriptionDao.update(subscription, srd);
RequestPartitionId partitionId =
(RequestPartitionId) subscription.getUserData(Constants.RESOURCE_PARTITION_ID);
subscriptionDao.update(subscription, new SystemRequestDetails().setRequestPartitionId(partitionId));
return true;
} catch (final UnprocessableEntityException | ResourceGoneException e) {
subscription = subscription != null ? subscription : theSubscription;

View File

@ -26,6 +26,7 @@ import ca.uhn.fhir.rest.param.TokenOrListParam;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
@ -60,8 +61,9 @@ public class SubscriptionLoader extends BaseResourceCacheSynchronizer {
super("Subscription");
}
@VisibleForTesting
public int doSyncSubscriptionsForUnitTest() {
return super.doSyncResourcessForUnitTest();
return super.doSyncResourcesForUnitTest();
}
@Override
@ -119,7 +121,7 @@ public class SubscriptionLoader extends BaseResourceCacheSynchronizer {
}
/**
* @param theSubscription
* Check status of theSubscription and update to "active" if needed.
* @return true if activated
*/
private boolean activateSubscriptionIfRequested(IBaseResource theSubscription) {
@ -162,8 +164,8 @@ public class SubscriptionLoader extends BaseResourceCacheSynchronizer {
error = "";
}
ourLog.error(
"Subscription {} could not be activated."
+ " This will not prevent startup, but it could lead to undesirable outcomes! {}",
"Subscription {} could not be activated. "
+ "This will not prevent startup, but it could lead to undesirable outcomes! {}",
theSubscription.getIdElement().getIdPart(),
(StringUtils.isBlank(error) ? "" : "Error: " + error));
}

View File

@ -26,7 +26,6 @@ import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
@ -108,11 +107,9 @@ public class SubscriptionLoaderTest {
when(myDaoRegistry.getResourceDao("Subscription"))
.thenReturn(mySubscriptionDao);
when(myDaoRegistry.isResourceTypeSupported("Subscription"))
.thenReturn(true);
when(mySubscriptionDao.search(any(SearchParameterMap.class), any(SystemRequestDetails.class)))
.thenReturn(getSubscriptionList(
Collections.singletonList(subscription)
));
.thenReturn(true);
when(mySubscriptionDao.searchForResources(any(SearchParameterMap.class), any(SystemRequestDetails.class)))
.thenReturn(List.of(subscription));
when(mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(any(IBaseResource.class)))
.thenReturn(false);
@ -127,7 +124,7 @@ public class SubscriptionLoaderTest {
// verify
verify(mySubscriptionDao)
.search(any(SearchParameterMap.class), any(SystemRequestDetails.class));
.searchForResources(any(SearchParameterMap.class), any(SystemRequestDetails.class));
String expected = "Subscription "
+ subscription.getIdElement().getIdPart()

View File

@ -78,7 +78,7 @@ public class SubscriptionTopicR4BTest extends BaseSubscriptionsR4BTest {
public void testCreate() throws Exception {
// WIP SR4B test update, delete, etc
createEncounterSubscriptionTopic(SubscriptionTopic.InteractionTrigger.CREATE);
mySubscriptionTopicLoader.doSyncResourcessForUnitTest();
mySubscriptionTopicLoader.doSyncResourcesForUnitTest();
waitForRegisteredSubscriptionTopicCount();
Subscription subscription = createTopicSubscription();
@ -105,7 +105,7 @@ public class SubscriptionTopicR4BTest extends BaseSubscriptionsR4BTest {
public void testUpdate() throws Exception {
// WIP SR4B test update, delete, etc
createEncounterSubscriptionTopic(SubscriptionTopic.InteractionTrigger.CREATE, SubscriptionTopic.InteractionTrigger.UPDATE);
mySubscriptionTopicLoader.doSyncResourcessForUnitTest();
mySubscriptionTopicLoader.doSyncResourcesForUnitTest();
waitForRegisteredSubscriptionTopicCount();
Subscription subscription = createTopicSubscription();
@ -168,7 +168,7 @@ public class SubscriptionTopicR4BTest extends BaseSubscriptionsR4BTest {
if (size == 1) {
return true;
}
mySubscriptionTopicLoader.doSyncResourcessForUnitTest();
mySubscriptionTopicLoader.doSyncResourcesForUnitTest();
return mySubscriptionTopicRegistry.size() == 1;
}

View File

@ -283,7 +283,7 @@ public abstract class BaseSubscriptionsR5Test extends BaseResourceProviderR5Test
if (size == theTarget) {
return true;
}
mySubscriptionTopicLoader.doSyncResourcessForUnitTest();
mySubscriptionTopicLoader.doSyncResourcesForUnitTest();
return mySubscriptionTopicRegistry.size() == theTarget;
}

View File

@ -27,9 +27,7 @@ import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache;
import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.retry.Retrier;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
import jakarta.annotation.PostConstruct;
@ -127,7 +125,7 @@ public abstract class BaseResourceCacheSynchronizer implements IResourceChangeLi
}
@VisibleForTesting
public int doSyncResourcessForUnitTest() {
public int doSyncResourcesForUnitTest() {
// Two passes for delete flag to take effect
int first = doSyncResourcesWithRetry();
int second = doSyncResourcesWithRetry();
@ -141,6 +139,7 @@ public abstract class BaseResourceCacheSynchronizer implements IResourceChangeLi
return syncResourceRetrier.runWithRetry();
}
@SuppressWarnings("unchecked")
private int doSyncResources() {
if (isStopping()) {
return 0;
@ -149,20 +148,8 @@ public abstract class BaseResourceCacheSynchronizer implements IResourceChangeLi
synchronized (mySyncResourcesLock) {
ourLog.debug("Starting sync {}s", myResourceName);
IBundleProvider resourceBundleList = getResourceDao().search(mySearchParameterMap, mySystemRequestDetails);
Integer resourceCount = resourceBundleList.size();
assert resourceCount != null;
if (resourceCount >= SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS) {
ourLog.error(
"Currently over {} {}s. Some {}s have not been loaded.",
SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS,
myResourceName,
myResourceName);
}
List<IBaseResource> resourceList = resourceBundleList.getResources(0, resourceCount);
List<IBaseResource> resourceList = (List<IBaseResource>)
getResourceDao().searchForResources(mySearchParameterMap, mySystemRequestDetails);
return syncResourcesIntoCache(resourceList);
}
}

View File

@ -54,6 +54,7 @@ import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
@ -370,8 +371,10 @@ public interface IFhirResourceDao<T extends IBaseResource> extends IDao {
/**
* Search results matching theParams.
* The Stream MUST be called within a transaction because the stream wraps an open query ResultSet.
* This call does not currently invoke any interceptors, so should only be used for infrastructure that
* will not need to participate in the consent services, or caching.
* The Stream MUST be closed to avoid leaking resources.
* If called within a transaction, the Stream will fail if passed outside the tx boundary.
* @param theParams the search
* @param theRequest for partition target info
* @return a Stream that MUST only be used within the calling transaction.
@ -384,9 +387,30 @@ public interface IFhirResourceDao<T extends IBaseResource> extends IDao {
return iResourcePersistentIds.stream();
}
default <PID extends IResourcePersistentId<?>> Stream<PID> searchForIdStream(
SearchParameterMap theParams, RequestDetails theRequest) {
return searchForIdStream(theParams, theRequest, null);
/**
* Return all search results matching theParams.
* Will load all resources into ram, so not appropriate for large data sets.
* This call invokes both preaccess and preshow interceptors.
* @param theParams the search
* @param theRequest for partition target info
*/
default List<T> searchForResources(SearchParameterMap theParams, RequestDetails theRequest) {
IBundleProvider provider = search(theParams, theRequest);
//noinspection unchecked
return (List<T>) provider.getAllResources();
}
/**
* Return the FHIR Ids matching theParams.
* This call does not currently invoke any interceptors, so should only be used for infrastructure that
* will not need to participate in the consent services, or caching.
* @param theParams the search
* @param theRequest for partition target info
*/
default List<IIdType> searchForResourceIds(SearchParameterMap theParams, RequestDetails theRequest) {
return searchForResources(theParams, theRequest).stream()
.map(IBaseResource::getIdElement)
.collect(Collectors.toList());
}
/**

View File

@ -28,23 +28,47 @@ import ca.uhn.fhir.model.api.Include;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import com.google.common.collect.Streams;
import jakarta.annotation.Nonnull;
import jakarta.persistence.EntityManager;
import org.apache.commons.io.IOUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
public interface ISearchBuilder<T extends IResourcePersistentId<?>> {
static final Logger ourLog = LoggerFactory.getLogger(ISearchBuilder.class);
String SEARCH_BUILDER_BEAN_NAME = "SearchBuilder";
IResultIterator createQuery(
IResultIterator<T> createQuery(
SearchParameterMap theParams,
SearchRuntimeDetails theSearchRuntime,
RequestDetails theRequest,
@Nonnull RequestPartitionId theRequestPartitionId);
/**
* Stream equivalent of createQuery.
* Note: the Stream must be closed.
*/
default Stream<T> createQueryStream(
SearchParameterMap theParams,
SearchRuntimeDetails theSearchRuntime,
RequestDetails theRequest,
@Nonnull RequestPartitionId theRequestPartitionId) {
IResultIterator<T> iter = createQuery(theParams, theSearchRuntime, theRequest, theRequestPartitionId);
// Adapt IResultIterator to stream
Stream<T> stream = Streams.stream(iter);
// The iterator might have an open ResultSet. Connect the close handler.
return stream.onClose(() -> IOUtils.closeQuietly(iter));
}
Long createCountQuery(
SearchParameterMap theParams,
String theSearchUuid,
@ -60,6 +84,15 @@ public interface ISearchBuilder<T extends IResourcePersistentId<?>> {
boolean theForHistoryOperation,
RequestDetails theDetails);
default List<IBaseResource> loadResourcesByPid(Collection<T> thePids, RequestDetails theDetails) {
ArrayList<IBaseResource> result = new ArrayList<>();
loadResourcesByPid(thePids, List.of(), result, false, theDetails);
if (result.size() != thePids.size()) {
ourLog.warn("Only found {} resources for {} pids", result.size(), thePids.size());
}
return result;
}
/**
* Use the loadIncludes that takes a parameters object instead.
*/

View File

@ -439,7 +439,6 @@ public class HapiTransactionService implements IHapiTransactionService {
}
}
// wipmb is Clone ok, or do we want an explicit copy constructor?
protected class ExecutionBuilder implements IExecutionBuilder, TransactionOperations, Cloneable {
private final RequestDetails myRequestDetails;

View File

@ -30,6 +30,7 @@ import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionOperations;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.stream.Stream;
@ -109,12 +110,26 @@ public interface IHapiTransactionService {
<T> T execute(@Nonnull TransactionCallback<T> callback);
/**
* Read query path.
*/
default <T> T read(Callable<T> theCallback) {
return execute(theCallback);
}
/**
* Search for open Stream.
* The Stream may not be readable outside an outermost transaction.
*/
default <T> Stream<T> search(Callable<Stream<T>> theCallback) {
return execute(theCallback);
}
/**
* Search for concrete List.
*/
default <T> List<T> searchList(Callable<List<T>> theCallback) {
return execute(theCallback);
}
}
}