5606 trigger subscription failure (#5607)

* Fix, test, changelog

* Fix, test, changelog

* Add backwards compat

* Add backwards compat

* set all partitions if enabled

* Test fixes
This commit is contained in:
Tadgh 2024-01-18 19:18:10 -08:00 committed by GitHub
parent 5a747051de
commit 84f0bb4f34
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 210 additions and 44 deletions

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 5606
jira: SMILE-7678
title: "Fixed an issue where executing $trigger-subscription with a search URL criteria on a partitioned Subscription resource would result in the failure to deliver the affected resources. This issue has now been resolved."

View File

@ -35,18 +35,21 @@ import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService; import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer; import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
import ca.uhn.fhir.rest.api.CacheControlDirective; import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails; import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException; import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.util.ParametersUtil; import ca.uhn.fhir.util.ParametersUtil;
import ca.uhn.fhir.util.StopWatch; import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.UrlUtil; import ca.uhn.fhir.util.UrlUtil;
@ -122,34 +125,47 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
@Autowired @Autowired
private ISearchSvc mySearchService; private ISearchSvc mySearchService;
@Autowired
IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@Autowired @Autowired
private SearchBuilderFactory mySearchBuilderFactory; private SearchBuilderFactory mySearchBuilderFactory;
@Autowired
private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
@Override @Override
public IBaseParameters triggerSubscription( public IBaseParameters triggerSubscription(
@Nullable List<IPrimitiveType<String>> theResourceIds, @Nullable List<IPrimitiveType<String>> theResourceIds,
@Nullable List<IPrimitiveType<String>> theSearchUrls, @Nullable List<IPrimitiveType<String>> theSearchUrls,
@Nullable IIdType theSubscriptionId) { @Nullable IIdType theSubscriptionId,
RequestDetails theRequestDetails) {
if (myStorageSettings.getSupportedSubscriptionTypes().isEmpty()) { if (myStorageSettings.getSupportedSubscriptionTypes().isEmpty()) {
throw new PreconditionFailedException(Msg.code(22) + "Subscription processing not active on this server"); throw new PreconditionFailedException(Msg.code(22) + "Subscription processing not active on this server");
} }
// Throw a 404 if the subscription doesn't exist RequestPartitionId requestPartitionId;
// Throw a 404 if the subscription doesn't exist, otherwise determine its partition.
if (theSubscriptionId != null) { if (theSubscriptionId != null) {
IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getSubscriptionDao(); IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getSubscriptionDao();
IIdType subscriptionId = theSubscriptionId; IBaseResource subscription = subscriptionDao.read(theSubscriptionId, theRequestDetails);
if (!subscriptionId.hasResourceType()) { if (mySubscriptionCanonicalizer.canonicalize(subscription).getCrossPartitionEnabled()) {
subscriptionId = subscriptionId.withResourceType(ResourceTypeEnum.SUBSCRIPTION.getCode()); requestPartitionId = RequestPartitionId.allPartitions();
} else {
// Otherwise, trust the partition passed in via tenant/interceptor.
requestPartitionId = myRequestPartitionHelperSvc.determineGenericPartitionForRequest(theRequestDetails);
} }
subscriptionDao.read(subscriptionId, SystemRequestDetails.forAllPartitions()); } else {
// If we have no specific subscription, allow standard partition selection
requestPartitionId = myRequestPartitionHelperSvc.determineGenericPartitionForRequest(theRequestDetails);
} }
List<IPrimitiveType<String>> resourceIds = defaultIfNull(theResourceIds, Collections.emptyList()); List<IPrimitiveType<String>> resourceIds = defaultIfNull(theResourceIds, Collections.emptyList());
List<IPrimitiveType<String>> searchUrls = defaultIfNull(theSearchUrls, Collections.emptyList()); List<IPrimitiveType<String>> searchUrls = defaultIfNull(theSearchUrls, Collections.emptyList());
// Make sure we have at least one resource ID or search URL // Make sure we have at least one resource ID or search URL
if (resourceIds.size() == 0 && searchUrls.size() == 0) { if (resourceIds.isEmpty() && searchUrls.isEmpty()) {
throw new InvalidRequestException(Msg.code(23) + "No resource IDs or search URLs specified for triggering"); throw new InvalidRequestException(Msg.code(23) + "No resource IDs or search URLs specified for triggering");
} }
@ -174,6 +190,8 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
SubscriptionTriggeringJobDetails jobDetails = new SubscriptionTriggeringJobDetails(); SubscriptionTriggeringJobDetails jobDetails = new SubscriptionTriggeringJobDetails();
jobDetails.setJobId(UUID.randomUUID().toString()); jobDetails.setJobId(UUID.randomUUID().toString());
jobDetails.setRequestPartitionId(
requestPartitionId == null ? RequestPartitionId.allPartitions() : requestPartitionId);
jobDetails.setRemainingResourceIds( jobDetails.setRemainingResourceIds(
resourceIds.stream().map(IPrimitiveType::getValue).collect(Collectors.toList())); resourceIds.stream().map(IPrimitiveType::getValue).collect(Collectors.toList()));
jobDetails.setRemainingSearchUrls( jobDetails.setRemainingSearchUrls(
@ -202,6 +220,15 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
return retVal; return retVal;
} }
@Override
public IBaseParameters triggerSubscription(
@Nullable List<IPrimitiveType<String>> theResourceIds,
@Nullable List<IPrimitiveType<String>> theSearchUrls,
@Nullable IIdType theSubscriptionId) {
return triggerSubscription(
theResourceIds, theSearchUrls, theSubscriptionId, SystemRequestDetails.newSystemRequestAllPartitions());
}
@Override @Override
public void runDeliveryPass() { public void runDeliveryPass() {
@ -246,7 +273,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
while (!theJobDetails.getRemainingResourceIds().isEmpty() && totalSubmitted.get() < myMaxSubmitPerPass) { while (!theJobDetails.getRemainingResourceIds().isEmpty() && totalSubmitted.get() < myMaxSubmitPerPass) {
totalSubmitted.incrementAndGet(); totalSubmitted.incrementAndGet();
String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0); String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0);
submitResource(theJobDetails.getSubscriptionId(), nextResourceId); submitResource(theJobDetails.getSubscriptionId(), theJobDetails.getRequestPartitionId(), nextResourceId);
} }
// Make sure these all succeeded in submitting // Make sure these all succeeded in submitting
@ -271,20 +298,18 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(resourceType); IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(resourceType);
ourLog.info("Triggering job[{}] is starting a search for {}", theJobDetails.getJobId(), nextSearchUrl); ourLog.info("Triggering job[{}] is starting a search for {}", theJobDetails.getJobId(), nextSearchUrl);
search = mySearchCoordinatorSvc.registerSearch( search = mySearchCoordinatorSvc.registerSearch(
callingDao, callingDao,
params, params,
resourceType, resourceType,
new CacheControlDirective(), new CacheControlDirective(),
null, null,
RequestPartitionId.allPartitions()); theJobDetails.getRequestPartitionId());
if (isNull(search.getUuid())) { if (isNull(search.getUuid())) {
// we don't have a search uuid i.e. we're setting up for synchronous processing // we don't have a search uuid i.e. we're setting up for synchronous processing
theJobDetails.setCurrentSearchUrl(nextSearchUrl); theJobDetails.setCurrentSearchUrl(nextSearchUrl);
theJobDetails.setCurrentOffset(params.getOffset()); theJobDetails.setCurrentOffset(params.getOffset());
} else { } else {
// populate properties for asynchronous path // populate properties for asynchronous path
theJobDetails.setCurrentSearchUuid(search.getUuid()); theJobDetails.setCurrentSearchUuid(search.getUuid());
@ -332,16 +357,22 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
} }
ourLog.info( ourLog.info(
"Triggering job[{}] search {} requesting resources {} - {}", "Triggering job[{}] search {} requesting resources {} - {} from partition {}",
theJobDetails.getJobId(), theJobDetails.getJobId(),
theJobDetails.getCurrentSearchUuid(), theJobDetails.getCurrentSearchUuid(),
fromIndex, fromIndex,
toIndex); toIndex,
theJobDetails.getRequestPartitionId());
List<? extends IResourcePersistentId<?>> allResourceIds; List<? extends IResourcePersistentId<?>> allResourceIds;
RequestPartitionId requestPartitionId = RequestPartitionId.allPartitions(); RequestPartitionId requestPartitionId = theJobDetails.getRequestPartitionId();
try {
allResourceIds = mySearchCoordinatorSvc.getResources( allResourceIds = mySearchCoordinatorSvc.getResources(
theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex, null, requestPartitionId); theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex, null, requestPartitionId);
} catch (ResourceGoneException e) {
ourLog.trace("Search has expired, submission is done.");
allResourceIds = new ArrayList<>();
}
ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), allResourceIds.size()); ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), allResourceIds.size());
AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex()); AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex());
@ -362,7 +393,8 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
}); });
for (IBaseResource nextResource : listToPopulate) { for (IBaseResource nextResource : listToPopulate) {
submitResource(theJobDetails.getSubscriptionId(), nextResource); submitResource(
theJobDetails.getSubscriptionId(), theJobDetails.getRequestPartitionId(), nextResource);
totalSubmitted.incrementAndGet(); totalSubmitted.incrementAndGet();
highestIndexSubmitted.incrementAndGet(); highestIndexSubmitted.incrementAndGet();
} }
@ -403,10 +435,11 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
String searchUrl = theJobDetails.getCurrentSearchUrl(); String searchUrl = theJobDetails.getCurrentSearchUrl();
ourLog.info( ourLog.info(
"Triggered job [{}] - Starting synchronous processing at offset {} and index {}", "Triggered job [{}] - Starting synchronous processing at offset {} and index {} on partition {}",
theJobDetails.getJobId(), theJobDetails.getJobId(),
theJobDetails.getCurrentOffset(), theJobDetails.getCurrentOffset(),
fromIndex); fromIndex,
theJobDetails.getRequestPartitionId());
int submittableCount = myMaxSubmitPerPass - totalSubmitted.get(); int submittableCount = myMaxSubmitPerPass - totalSubmitted.get();
int toIndex = fromIndex + submittableCount; int toIndex = fromIndex + submittableCount;
@ -418,7 +451,11 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
} }
// we already have data from the initial step so process as much as we can. // we already have data from the initial step so process as much as we can.
ourLog.info("Triggered job[{}] will process up to {} resources", theJobDetails.getJobId(), toIndex); ourLog.info(
"Triggered job[{}] will process up to {} resources from partition {}",
theJobDetails.getJobId(),
toIndex,
theJobDetails.getRequestPartitionId());
allCurrentResources = search.getResources(0, toIndex); allCurrentResources = search.getResources(0, toIndex);
} else { } else {
@ -439,7 +476,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
toIndex, toIndex,
offset); offset);
search = mySearchService.executeQuery(resourceDef.getName(), params, RequestPartitionId.allPartitions()); search = mySearchService.executeQuery(resourceDef.getName(), params, theJobDetails.getRequestPartitionId());
allCurrentResources = search.getResources(0, submittableCount); allCurrentResources = search.getResources(0, submittableCount);
} }
@ -447,8 +484,8 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex()); AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex());
for (IBaseResource nextResource : allCurrentResources) { for (IBaseResource nextResource : allCurrentResources) {
Future<?> future = Future<?> future = myExecutorService.submit(() -> submitResource(
myExecutorService.submit(() -> submitResource(theJobDetails.getSubscriptionId(), nextResource)); theJobDetails.getSubscriptionId(), theJobDetails.getRequestPartitionId(), nextResource));
futures.add(future); futures.add(future);
totalSubmitted.incrementAndGet(); totalSubmitted.incrementAndGet();
highestIndexSubmitted.incrementAndGet(); highestIndexSubmitted.incrementAndGet();
@ -502,15 +539,17 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
return false; return false;
} }
private void submitResource(String theSubscriptionId, String theResourceIdToTrigger) { private void submitResource(
String theSubscriptionId, RequestPartitionId theRequestPartitionId, String theResourceIdToTrigger) {
org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger); org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger);
IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceId.getResourceType()); IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceId.getResourceType());
IBaseResource resourceToTrigger = dao.read(resourceId, SystemRequestDetails.forAllPartitions()); IBaseResource resourceToTrigger = dao.read(resourceId, SystemRequestDetails.forAllPartitions());
submitResource(theSubscriptionId, resourceToTrigger); submitResource(theSubscriptionId, theRequestPartitionId, resourceToTrigger);
} }
private void submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) { private void submitResource(
String theSubscriptionId, RequestPartitionId theRequestPartitionId, IBaseResource theResourceToTrigger) {
ourLog.info( ourLog.info(
"Submitting resource {} to subscription {}", "Submitting resource {} to subscription {}",
@ -518,7 +557,10 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
theSubscriptionId); theSubscriptionId);
ResourceModifiedMessage msg = new ResourceModifiedMessage( ResourceModifiedMessage msg = new ResourceModifiedMessage(
myFhirContext, theResourceToTrigger, ResourceModifiedMessage.OperationTypeEnum.UPDATE); myFhirContext,
theResourceToTrigger,
ResourceModifiedMessage.OperationTypeEnum.MANUALLY_TRIGGERED,
theRequestPartitionId);
msg.setSubscriptionId(theSubscriptionId); msg.setSubscriptionId(theSubscriptionId);
for (int i = 0; ; i++) { for (int i = 0; ; i++) {
@ -629,6 +671,8 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
private int myCurrentSearchLastUploadedIndex; private int myCurrentSearchLastUploadedIndex;
private int myCurrentOffset; private int myCurrentOffset;
private RequestPartitionId myRequestPartitionId;
Integer getCurrentSearchCount() { Integer getCurrentSearchCount() {
return myCurrentSearchCount; return myCurrentSearchCount;
} }
@ -712,5 +756,13 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
public void setCurrentOffset(Integer theCurrentOffset) { public void setCurrentOffset(Integer theCurrentOffset) {
myCurrentOffset = ObjectUtils.defaultIfNull(theCurrentOffset, 0); myCurrentOffset = ObjectUtils.defaultIfNull(theCurrentOffset, 0);
} }
public void setRequestPartitionId(RequestPartitionId theRequestPartitionId) {
myRequestPartitionId = theRequestPartitionId;
}
public RequestPartitionId getRequestPartitionId() {
return myRequestPartitionId;
}
} }
} }

View File

@ -26,15 +26,12 @@ import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test; import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test;
import ca.uhn.fhir.jpa.search.PersistedJpaSearchFirstPageBundleProvider; import ca.uhn.fhir.jpa.search.PersistedJpaSearchFirstPageBundleProvider;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.submit.interceptor.SearchParamValidatingInterceptor;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionValidatingInterceptor;
import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc; import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc;
import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc; import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc;
import ca.uhn.fhir.jpa.subscription.triggering.SubscriptionTriggeringSvcImpl; import ca.uhn.fhir.jpa.subscription.triggering.SubscriptionTriggeringSvcImpl;
import ca.uhn.fhir.jpa.term.TermReadSvcImpl; import ca.uhn.fhir.jpa.term.TermReadSvcImpl;
import ca.uhn.fhir.jpa.test.util.SubscriptionTestUtil; import ca.uhn.fhir.jpa.test.util.SubscriptionTestUtil;
import ca.uhn.fhir.jpa.util.SqlQuery; import ca.uhn.fhir.jpa.util.SqlQuery;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum; import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.api.SortSpec; import ca.uhn.fhir.rest.api.SortSpec;
import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.IBundleProvider;
@ -119,7 +116,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -3103,7 +3099,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
waitForActivatedSubscriptionCount(1); waitForActivatedSubscriptionCount(1);
mySubscriptionTriggeringSvc.triggerSubscription(null, List.of(new StringType("Patient?")), subscriptionId); mySubscriptionTriggeringSvc.triggerSubscription(null, List.of(new StringType("Patient?")), subscriptionId, mySrd);
// Test // Test
myCaptureQueriesListener.clear(); myCaptureQueriesListener.clear();

View File

@ -13,11 +13,18 @@ import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test; import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
import ca.uhn.fhir.jpa.subscription.resthook.RestHookTestR4Test; import ca.uhn.fhir.jpa.subscription.resthook.RestHookTestR4Test;
import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc; import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc;
import ca.uhn.fhir.jpa.subscription.triggering.SubscriptionTriggeringSvcImpl;
import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscriber; import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.model.primitive.StringDt;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails; import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.util.HapiExtensions;
import jakarta.servlet.ServletException;
import org.awaitility.core.ConditionTimeoutException; import org.awaitility.core.ConditionTimeoutException;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Observation; import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Parameters; import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Patient; import org.hl7.fhir.r4.model.Patient;
@ -30,12 +37,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import jakarta.servlet.ServletException;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.Month; import java.time.Month;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
@ -49,7 +59,9 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
private ISubscriptionTriggeringSvc mySubscriptionTriggeringSvc; private ISubscriptionTriggeringSvc mySubscriptionTriggeringSvc;
static final String PARTITION_1 = "PART-1"; static final String PARTITION_1 = "PART-1";
public static final RequestPartitionId REQ_PART_1 = RequestPartitionId.fromPartitionNames(PARTITION_1);
static final String PARTITION_2 = "PART-2"; static final String PARTITION_2 = "PART-2";
public static final RequestPartitionId REQ_PART_2 = RequestPartitionId.fromPartitionNames(PARTITION_2);
protected MyReadWriteInterceptor myPartitionInterceptor; protected MyReadWriteInterceptor myPartitionInterceptor;
protected LocalDate myPartitionDate; protected LocalDate myPartitionDate;
@ -60,6 +72,7 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
@BeforeEach @BeforeEach
public void beforeEach() throws ServletException { public void beforeEach() throws ServletException {
myStorageSettings.setCrossPartitionSubscriptionEnabled(true);
myPartitionSettings.setPartitioningEnabled(true); myPartitionSettings.setPartitioningEnabled(true);
myPartitionSettings.setIncludePartitionInSearchHashes(new PartitionSettings().isIncludePartitionInSearchHashes()); myPartitionSettings.setIncludePartitionInSearchHashes(new PartitionSettings().isIncludePartitionInSearchHashes());
@ -73,13 +86,12 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
myPartitionId2 = 2; myPartitionId2 = 2;
myPartitionInterceptor = new MyReadWriteInterceptor(); myPartitionInterceptor = new MyReadWriteInterceptor();
myPartitionInterceptor.setResultPartitionId(RequestPartitionId.fromPartitionNames(PARTITION_1)); myPartitionInterceptor.setRequestPartitionId(REQ_PART_1);
mySrdInterceptorService.registerInterceptor(myPartitionInterceptor); mySrdInterceptorService.registerInterceptor(myPartitionInterceptor);
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(1).setName(PARTITION_1), null); myPartitionConfigSvc.createPartition(new PartitionEntity().setId(1).setName(PARTITION_1), null);
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(2).setName(PARTITION_2), null); myPartitionConfigSvc.createPartition(new PartitionEntity().setId(2).setName(PARTITION_2), null);
myStorageSettings.setIndexMissingFields(JpaStorageSettings.IndexEnabledEnum.ENABLED); myStorageSettings.setIndexMissingFields(JpaStorageSettings.IndexEnabledEnum.ENABLED);
} }
@ -90,6 +102,7 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
myStoppableSubscriptionDeliveringRestHookSubscriber.unPause(); myStoppableSubscriptionDeliveringRestHookSubscriber.unPause();
myStorageSettings.setTriggerSubscriptionsForNonVersioningChanges(new JpaStorageSettings().isTriggerSubscriptionsForNonVersioningChanges()); myStorageSettings.setTriggerSubscriptionsForNonVersioningChanges(new JpaStorageSettings().isTriggerSubscriptionsForNonVersioningChanges());
myStorageSettings.setCrossPartitionSubscriptionEnabled(false);
myPartitionSettings.setPartitioningEnabled(false); myPartitionSettings.setPartitioningEnabled(false);
myPartitionSettings.setUnnamedPartitionMode(false); myPartitionSettings.setUnnamedPartitionMode(false);
@ -100,6 +113,10 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
myStorageSettings.setAllowMultipleDelete(new JpaStorageSettings().isAllowMultipleDelete()); myStorageSettings.setAllowMultipleDelete(new JpaStorageSettings().isAllowMultipleDelete());
mySrdInterceptorService.unregisterInterceptorsIf(t -> t instanceof BasePartitioningR4Test.MyReadWriteInterceptor); mySrdInterceptorService.unregisterInterceptorsIf(t -> t instanceof BasePartitioningR4Test.MyReadWriteInterceptor);
await().until(() -> {
mySubscriptionTriggeringSvc.runDeliveryPass();
return ((SubscriptionTriggeringSvcImpl)mySubscriptionTriggeringSvc).getActiveJobCount() == 0;
});
super.afterUnregisterRestHookListener(); super.afterUnregisterRestHookListener();
} }
@ -160,6 +177,72 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
} }
} }
@Test
public void testManualTriggeredSubscriptionDoesNotCheckOutsideOfPartition() throws Exception {
String payload = "application/fhir+json";
String code = "1000000050";
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
//Given: We store a resource in partition 2
myPartitionInterceptor.setRequestPartitionId(REQ_PART_2);
IIdType observationIdPartitionTwo = myDaoRegistry.getResourceDao("Observation").create(createBaseObservation(code, "SNOMED-CT"), mySrd).getId();
//Given: We store a similar resource in partition 1
myPartitionInterceptor.setRequestPartitionId(REQ_PART_1);
IIdType observationIdPartitionOne = myDaoRegistry.getResourceDao("Observation").create(createBaseObservation(code, "SNOMED-CT"), mySrd).getId();
//Given: We create a subscrioption on Partition 1
IIdType subscriptionId= myDaoRegistry.getResourceDao("Subscription").create(newSubscription(criteria1, payload), mySrd).getId();
waitForActivatedSubscriptionCount(1);
ArrayList<IPrimitiveType<String>> searchUrlList = new ArrayList<>();
searchUrlList.add(new StringDt("Observation?"));
Parameters resultParameters = (Parameters) mySubscriptionTriggeringSvc.triggerSubscription(null, searchUrlList, subscriptionId, mySrd);
mySubscriptionTriggeringSvc.runDeliveryPass();
waitForQueueToDrain();
List<Observation> resourceUpdates = BaseSubscriptionsR4Test.ourObservationProvider.getResourceUpdates();
assertThat(resourceUpdates.size(), is(equalTo(1)));
assertThat(resourceUpdates.get(0).getId(), is(equalTo(observationIdPartitionOne.toString())));
String responseValue = resultParameters.getParameter().get(0).getValue().primitiveValue();
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));
}
@Test
public void testManualTriggeredSubscriptionWithCrossPartitionChecksBothPartitions() throws Exception {
String payload = "application/fhir+json";
String code = "1000000050";
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
//Given: We store a resource in partition 2
myPartitionInterceptor.setRequestPartitionId(REQ_PART_2);
myDaoRegistry.getResourceDao("Observation").create(createBaseObservation(code, "SNOMED-CT"), mySrd).getId();
//Given: We store a similar resource in partition 1
myPartitionInterceptor.setRequestPartitionId(REQ_PART_1);
myDaoRegistry.getResourceDao("Observation").create(createBaseObservation(code, "SNOMED-CT"), mySrd).getId();
//Given: We create a subscription on Partition 1
Subscription theResource = newSubscription(criteria1, payload);
theResource.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new BooleanType(Boolean.TRUE));
myPartitionInterceptor.setRequestPartitionId(RequestPartitionId.defaultPartition());
IIdType subscriptionId= myDaoRegistry.getResourceDao("Subscription").create(theResource, mySrd).getId();
waitForActivatedSubscriptionCount(1);
ArrayList<IPrimitiveType<String>> searchUrlList = new ArrayList<>();
searchUrlList.add(new StringDt("Observation?"));
myPartitionInterceptor.setRequestPartitionId(RequestPartitionId.defaultPartition());
mySubscriptionTriggeringSvc.triggerSubscription(null, searchUrlList, subscriptionId, mySrd);
mySubscriptionTriggeringSvc.runDeliveryPass();
waitForQueueToDrain();
List<Observation> resourceUpdates = BaseSubscriptionsR4Test.ourObservationProvider.getResourceUpdates();
assertThat(resourceUpdates.size(), is(equalTo(2)));
}
@Test @Test
public void testManualTriggeredSubscriptionInPartition() throws Exception { public void testManualTriggeredSubscriptionInPartition() throws Exception {
String payload = "application/fhir+json"; String payload = "application/fhir+json";
@ -184,10 +267,11 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
resourceIdList.add(observation.getIdElement()); resourceIdList.add(observation.getIdElement());
Parameters resultParameters = (Parameters) mySubscriptionTriggeringSvc.triggerSubscription(resourceIdList, null, subscription.getIdElement()); Parameters resultParameters = (Parameters) mySubscriptionTriggeringSvc.triggerSubscription(resourceIdList, null, subscription.getIdElement(), mySrd);
mySubscriptionTriggeringSvc.runDeliveryPass();
waitForQueueToDrain(); waitForQueueToDrain();
Assertions.assertEquals(0, BaseSubscriptionsR4Test.ourObservationProvider.getCountCreate()); Assertions.assertEquals(1, BaseSubscriptionsR4Test.ourObservationProvider.getCountUpdate());
String responseValue = resultParameters.getParameter().get(0).getValue().primitiveValue(); String responseValue = resultParameters.getParameter().get(0).getValue().primitiveValue();
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID")); assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));
@ -197,22 +281,27 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
public static class MyReadWriteInterceptor { public static class MyReadWriteInterceptor {
private RequestPartitionId myReadPartitionId; private RequestPartitionId myReadPartitionId;
public void setResultPartitionId(RequestPartitionId theRequestPartitionId) { public void setRequestPartitionId(RequestPartitionId theRequestPartitionId) {
myReadPartitionId = theRequestPartitionId; myReadPartitionId = theRequestPartitionId;
} }
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_READ) @Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_READ)
public RequestPartitionId read() { public RequestPartitionId read(ServletRequestDetails theSrd) {
RequestPartitionId retVal = myReadPartitionId; RequestPartitionId retVal = myReadPartitionId;
ourLog.info("Returning partition for read: {}", retVal); ourLog.info("Returning partition for read: {}", retVal);
return retVal; return retVal;
} }
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE) @Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE)
public RequestPartitionId create() { public RequestPartitionId create(ServletRequestDetails theSrd) {
RequestPartitionId retVal = myReadPartitionId; RequestPartitionId retVal = myReadPartitionId;
ourLog.info("Returning partition for write: {}", retVal); ourLog.info("Returning partition for write: {}", retVal);
return retVal; return retVal;
} }
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_ANY)
public RequestPartitionId any() {
return myReadPartitionId;
}
} }
} }

View File

@ -45,6 +45,7 @@ public class SubscriptionTriggeringProvider implements IResourceProvider {
@Operation(name = JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) @Operation(name = JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
public IBaseParameters triggerSubscription( public IBaseParameters triggerSubscription(
ca.uhn.fhir.rest.api.server.RequestDetails theRequestDetails,
@OperationParam( @OperationParam(
name = ProviderConstants.SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID, name = ProviderConstants.SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID,
min = 0, min = 0,
@ -57,11 +58,12 @@ public class SubscriptionTriggeringProvider implements IResourceProvider {
max = OperationParam.MAX_UNLIMITED, max = OperationParam.MAX_UNLIMITED,
typeName = "string") typeName = "string")
List<IPrimitiveType<String>> theSearchUrls) { List<IPrimitiveType<String>> theSearchUrls) {
return mySubscriptionTriggeringSvc.triggerSubscription(theResourceIds, theSearchUrls, null); return mySubscriptionTriggeringSvc.triggerSubscription(theResourceIds, theSearchUrls, null, theRequestDetails);
} }
@Operation(name = JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) @Operation(name = JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
public IBaseParameters triggerSubscription( public IBaseParameters triggerSubscription(
ca.uhn.fhir.rest.api.server.RequestDetails theRequestDetails,
@IdParam IIdType theSubscriptionId, @IdParam IIdType theSubscriptionId,
@OperationParam( @OperationParam(
name = ProviderConstants.SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID, name = ProviderConstants.SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID,
@ -75,7 +77,8 @@ public class SubscriptionTriggeringProvider implements IResourceProvider {
max = OperationParam.MAX_UNLIMITED, max = OperationParam.MAX_UNLIMITED,
typeName = "string") typeName = "string")
List<IPrimitiveType<String>> theSearchUrls) { List<IPrimitiveType<String>> theSearchUrls) {
return mySubscriptionTriggeringSvc.triggerSubscription(theResourceIds, theSearchUrls, theSubscriptionId); return mySubscriptionTriggeringSvc.triggerSubscription(
theResourceIds, theSearchUrls, theSubscriptionId, theRequestDetails);
} }
@Override @Override

View File

@ -59,6 +59,15 @@ public class ResourceModifiedMessage extends BaseResourceModifiedMessage {
setPartitionId(RequestPartitionId.defaultPartition()); setPartitionId(RequestPartitionId.defaultPartition());
} }
public ResourceModifiedMessage(
FhirContext theFhirContext,
IBaseResource theResource,
OperationTypeEnum theOperationType,
RequestPartitionId theRequestPartitionId) {
super(theFhirContext, theResource, theOperationType);
setPartitionId(theRequestPartitionId);
}
public ResourceModifiedMessage( public ResourceModifiedMessage(
FhirContext theFhirContext, FhirContext theFhirContext,
IBaseResource theNewResource, IBaseResource theNewResource,

View File

@ -19,6 +19,7 @@
*/ */
package ca.uhn.fhir.jpa.subscription.triggering; package ca.uhn.fhir.jpa.subscription.triggering;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import jakarta.annotation.Nullable; import jakarta.annotation.Nullable;
import org.hl7.fhir.instance.model.api.IBaseParameters; import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
@ -28,6 +29,17 @@ import java.util.List;
public interface ISubscriptionTriggeringSvc { public interface ISubscriptionTriggeringSvc {
IBaseParameters triggerSubscription(
@Nullable List<IPrimitiveType<String>> theResourceIds,
@Nullable List<IPrimitiveType<String>> theSearchUrls,
@Nullable IIdType theSubscriptionId,
RequestDetails theRequestDetails);
@Deprecated(forRemoval = true)
/**
* Use {@link ISubscriptionTriggeringSvc#triggerSubscription(List, List, IIdType, RequestDetails)} instead.
* This implementation uses a SystemRequestDetails for All Partitions, as the previous behaviour did.
*/
IBaseParameters triggerSubscription( IBaseParameters triggerSubscription(
@Nullable List<IPrimitiveType<String>> theResourceIds, @Nullable List<IPrimitiveType<String>> theResourceIds,
@Nullable List<IPrimitiveType<String>> theSearchUrls, @Nullable List<IPrimitiveType<String>> theSearchUrls,