2718 cross partition support for subscription (#3298)

* added settings for cross-partition to Dao and Model config

* modified subscription validator to validate cross partition subscriptions

* modify subscription matcher to allow matching on all partitions when subscription is set to cross partition

* added cross partition to canonicalSubscription

* end of day commit, changed how we are checking if default partition in subscription validator

* fixed issue with i memory matching for cross partition subscriptions

* added test for parsing legacy CanonicalSubscription

* added changelog and doc changes

* addressed comments in pr

* added assertdoesnotthrow

* test fixes, added new mocks

* added check for if resource can have extension before checking for extensions

* removed unnecessary semicolon

Co-authored-by: Steven Li <steven@smilecdr.com>
Co-authored-by: Long Ma <long@smilecdr.com>
This commit is contained in:
longma1 2022-01-21 15:04:39 -07:00 committed by GitHub
parent 62966de58b
commit ab22fd6e82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 426 additions and 26 deletions

View File

@ -135,6 +135,11 @@ public class HapiExtensions {
*/
public static final String EX_SEND_DELETE_MESSAGES = "http://hapifhir.io/fhir/StructureDefinition/subscription-send-delete-messages";
/**
* This entension allows subscriptions to be marked as cross partition and with correct settings, listen to incoming resources from all partitions.
*/
public static final String EXTENSION_SUBSCRIPTION_CROSS_PARTITION = "https://smilecdr.com/fhir/ns/StructureDefinition/subscription-cross-partition";
/**
* Non instantiable
*/

View File

@ -5,11 +5,15 @@ import ca.uhn.fhir.context.BaseRuntimeElementDefinition;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import org.hl7.fhir.instance.model.api.IBase;
import org.hl7.fhir.instance.model.api.IBaseBooleanDatatype;
import org.hl7.fhir.instance.model.api.IBaseExtension;
import org.hl7.fhir.instance.model.api.IBaseHasExtensions;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.thymeleaf.util.Validate;
import java.util.List;
import java.util.Objects;
/*
* #%L
@ -61,4 +65,18 @@ public class SubscriptionUtil {
populatePrimitiveValue(theContext, theSubscription, "status", theStatus);
}
public static boolean isCrossPartition(IBaseResource theSubscription) {
if (theSubscription instanceof IBaseHasExtensions) {
IBaseExtension extension = ExtensionUtil.getExtensionByUrl(theSubscription, HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION);
if (Objects.nonNull(extension)) {
try {
IBaseBooleanDatatype booleanDatatype = (IBaseBooleanDatatype) (extension.getValue());
return booleanDatatype.getValue();
} catch (ClassCastException theClassCastException) {
return false;
}
}
}
return false;
}
}

View File

@ -0,0 +1,4 @@
---
type: add
issue: 2718
title: "Added support for cross-partition subscriptions. Subscription in the default partition can now listen to resource changes from all partitions"

View File

@ -173,4 +173,4 @@ None of the limitations listed here are considered permanent. Over time the HAPI
* **Advanced Elasticsearch indexing is not partition optimized**: The results are correctly partitioned, but the extended indexing is not optimized to account for partitions.
* **Subscriptions are partition aware**: Subscriptions can be placed on any partition and will deliver matching resources from the same partition.
* **Subscriptions are partition aware**: Subscriptions can be placed on any partition and will deliver matching resources from the same partition. A subscription on the default can deliver resource from all partition if it is placed in the default partition with the cross-partition extension and the server allows cross-partition subscriptions.

View File

@ -1,22 +1,39 @@
package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionValidatingInterceptor;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.util.HapiExtensions;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@ -28,6 +45,10 @@ public class SubscriptionValidatingInterceptorTest {
private final FhirContext myCtx = FhirContext.forR4Cached();
@Mock
private SubscriptionStrategyEvaluator mySubscriptionStrategyEvaluator;
@Mock
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@Mock
private DaoConfig myDaoConfig;
@BeforeEach
public void before() {
@ -36,6 +57,8 @@ public class SubscriptionValidatingInterceptorTest {
mySvc.setDaoRegistryForUnitTest(myDaoRegistry);
mySvc.setSubscriptionStrategyEvaluatorForUnitTest(mySubscriptionStrategyEvaluator);
mySvc.setFhirContextForUnitTest(myCtx);
mySvc.setDaoConfigForUnitTest(myDaoConfig);
mySvc.setRequestPartitionHelperSvcForUnitTest(myRequestPartitionHelperSvc);
}
@Test
@ -168,4 +191,99 @@ public class SubscriptionValidatingInterceptorTest {
}
}
@Test
public void testValidate_Cross_Partition_Subscription() {
when(myDaoRegistry.isResourceTypeSupported(eq("Patient"))).thenReturn(true);
when(myDaoConfig.isCrossPartitionSubscription()).thenReturn(true);
when(myRequestPartitionHelperSvc.determineCreatePartitionForRequest(isA(RequestDetails.class), isA(Subscription.class), eq("Subscription"))).thenReturn(RequestPartitionId.defaultPartition());
Subscription subscription = new Subscription();
subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE);
subscription.setCriteria("Patient?identifier=foo");
subscription.getChannel().setType(Subscription.SubscriptionChannelType.RESTHOOK);
subscription.getChannel().setEndpoint("http://foo");
subscription.addExtension().setUrl(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION).setValue(new BooleanType(true));
RequestDetails requestDetails = new ServletRequestDetails();
requestDetails.setRestOperationType(RestOperationTypeEnum.CREATE);
// No asserts here because the function should throw an UnprocessableEntityException exception if the subscription
// is invalid
assertDoesNotThrow(() -> mySvc.validateSubmittedSubscription(subscription, requestDetails));
Mockito.verify(myDaoConfig, times(1)).isCrossPartitionSubscription();
Mockito.verify(myDaoRegistry, times(1)).isResourceTypeSupported(eq("Patient"));
Mockito.verify(myRequestPartitionHelperSvc, times(1)).determineCreatePartitionForRequest(isA(RequestDetails.class), isA(Subscription.class), eq("Subscription"));
}
@Test
public void testValidate_Cross_Partition_Subscription_On_Wrong_Partition() {
when(myDaoConfig.isCrossPartitionSubscription()).thenReturn(true);
when(myRequestPartitionHelperSvc.determineCreatePartitionForRequest(isA(RequestDetails.class), isA(Subscription.class), eq("Subscription"))).thenReturn(RequestPartitionId.fromPartitionId(1));
Subscription subscription = new Subscription();
subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE);
subscription.setCriteria("Patient?identifier=foo");
subscription.getChannel().setType(Subscription.SubscriptionChannelType.RESTHOOK);
subscription.getChannel().setEndpoint("http://foo");
subscription.addExtension().setUrl(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION).setValue(new BooleanType(true));
RequestDetails requestDetails = new ServletRequestDetails();
requestDetails.setRestOperationType(RestOperationTypeEnum.CREATE);
try {
mySvc.validateSubmittedSubscription(subscription, requestDetails);
fail();
} catch (UnprocessableEntityException theUnprocessableEntityException) {
assertEquals("Cross partition subscription must be created on the default partition", theUnprocessableEntityException.getMessage());
}
}
@Test
public void testValidate_Cross_Partition_Subscription_Without_Setting() {
when(myDaoConfig.isCrossPartitionSubscription()).thenReturn(false);
Subscription subscription = new Subscription();
subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE);
subscription.setCriteria("Patient?identifier=foo");
subscription.getChannel().setType(Subscription.SubscriptionChannelType.RESTHOOK);
subscription.getChannel().setEndpoint("http://foo");
subscription.addExtension().setUrl(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION).setValue(new BooleanType(true));
RequestDetails requestDetails = new ServletRequestDetails();
requestDetails.setRestOperationType(RestOperationTypeEnum.CREATE);
try {
mySvc.validateSubmittedSubscription(subscription, requestDetails);
fail();
} catch (UnprocessableEntityException theUnprocessableEntityException) {
assertEquals("Cross partition subscription is not enabled on this server", theUnprocessableEntityException.getMessage());
}
}
@Test
public void testValidate_Cross_Partition_System_Subscription_Without_Setting() {
when(myDaoRegistry.isResourceTypeSupported(eq("Patient"))).thenReturn(true);
Subscription subscription = new Subscription();
subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE);
subscription.setCriteria("Patient?identifier=foo");
subscription.getChannel().setType(Subscription.SubscriptionChannelType.RESTHOOK);
subscription.getChannel().setEndpoint("http://foo");
subscription.addExtension().setUrl(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION).setValue(new BooleanType(true));
RequestDetails requestDetails = new SystemRequestDetails();
requestDetails.setRestOperationType(RestOperationTypeEnum.CREATE);
// No asserts here because the function should throw an UnprocessableEntityException exception if the subscription
// is invalid
mySvc.validateSubmittedSubscription(subscription, requestDetails);
Mockito.verify(myDaoConfig, never()).isCrossPartitionSubscription();
Mockito.verify(myDaoRegistry, times(1)).isResourceTypeSupported(eq("Patient"));
Mockito.verify(myRequestPartitionHelperSvc, never()).determineCreatePartitionForRequest(isA(RequestDetails.class), isA(Patient.class), eq("Patient"));
}
}

View File

@ -77,6 +77,7 @@ public class ModelConfig {
private Set<String> myTreatReferencesAsLogical = new HashSet<>(DEFAULT_LOGICAL_BASE_URLS);
private boolean myDefaultSearchParamsCanBeOverridden = true;
private Set<Subscription.SubscriptionChannelType> mySupportedSubscriptionTypes = new HashSet<>();
private boolean myCrossPartitionSubscription = false;
private String myEmailFromAddress = "noreply@unknown.com";
private String myWebsocketContextPath = DEFAULT_WEBSOCKET_CONTEXT_PATH;
/**
@ -872,4 +873,34 @@ public class ModelConfig {
}
/**
* If enabled, the server will support cross-partition subscription.
* This subscription will be the responsible for all the requests from all the partitions on this server.
* For example, if the server has 3 partitions, P1, P2, P3
* The subscription will live in the DEFAULT partition. Resource posted to DEFAULT, P1, P2, and P3 will trigger this subscription.
* <p>
* Default is <code>false</code>
* </p>
*
* @since 7.5.0
*/
public boolean isCrossPartitionSubscription() {
return myCrossPartitionSubscription;
}
/**
* If enabled, the server will support cross-partition subscription.
* This subscription will be the responsible for all the requests from all the partitions on this server.
* For example, if the server has 3 partitions, P1, P2, P3
* The subscription will live in the DEFAULT partition. Resource posted to DEFAULT, P1, P2, and P3 will trigger this subscription.
* <p>
* Default is <code>false</code>
* </p>
*
* @since 7.5.0
*/
public void setCrossPartitionSubscription(boolean theAllowCrossPartitionSubscription) {
myCrossPartitionSubscription = theAllowCrossPartitionSubscription;
}
}

View File

@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.subscription.match.matcher.matching;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
@ -29,6 +30,7 @@ import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.util.SubscriptionUtil;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
@ -36,8 +38,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import static ca.uhn.fhir.jpa.subscription.util.SubscriptionUtil.createRequestDetailForPartitionedRequest;
public class DaoSubscriptionMatcher implements ISubscriptionMatcher {
private final Logger ourLog = LoggerFactory.getLogger(DaoSubscriptionMatcher.class);
@ -76,7 +76,7 @@ public class DaoSubscriptionMatcher implements ISubscriptionMatcher {
IFhirResourceDao<? extends IBaseResource> responseDao = myDaoRegistry.getResourceDao(responseResourceDef.getImplementingClass());
responseCriteriaUrl.setLoadSynchronousUpTo(1);
return responseDao.search(responseCriteriaUrl, createRequestDetailForPartitionedRequest(theSubscription));
return responseDao.search(responseCriteriaUrl, SubscriptionUtil.createRequestDetailForPartitionedRequest(theSubscription));
}
}

View File

@ -127,7 +127,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
// skip if the partitions don't match
CanonicalSubscription subscription = nextActiveSubscription.getSubscription();
if (subscription != null && subscription.getRequestPartitionId() != null && theMsg.getPartitionId() != null
&& !theMsg.getPartitionId().hasPartitionId(subscription.getRequestPartitionId())) {
&& !subscription.getCrossPartitionEnabled() && !theMsg.getPartitionId().hasPartitionId(subscription.getRequestPartitionId())) {
continue;
}
String nextSubscriptionId = getId(nextActiveSubscription);

View File

@ -24,7 +24,11 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionCriteriaParser;
@ -33,15 +37,18 @@ import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.HapiExtensions;
import ca.uhn.fhir.util.SubscriptionUtil;
import com.google.common.annotations.VisibleForTesting;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.beans.factory.annotation.Autowired;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Objects;
import static org.apache.commons.lang3.StringUtils.isBlank;
@ -53,18 +60,22 @@ public class SubscriptionValidatingInterceptor {
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private DaoConfig myDaoConfig;
@Autowired
private SubscriptionStrategyEvaluator mySubscriptionStrategyEvaluator;
@Autowired
private FhirContext myFhirContext;
@Autowired
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@Hook(Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED)
public void resourcePreCreate(IBaseResource theResource) {
validateSubmittedSubscription(theResource);
public void resourcePreCreate(IBaseResource theResource, RequestDetails theRequestDetails) {
validateSubmittedSubscription(theResource, theRequestDetails);
}
@Hook(Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED)
public void resourcePreCreate(IBaseResource theOldResource, IBaseResource theResource) {
validateSubmittedSubscription(theResource);
public void resourcePreCreate(IBaseResource theOldResource, IBaseResource theResource, RequestDetails theRequestDetails) {
validateSubmittedSubscription(theResource, theRequestDetails);
}
@VisibleForTesting
@ -72,7 +83,12 @@ public class SubscriptionValidatingInterceptor {
myFhirContext = theFhirContext;
}
@Deprecated
public void validateSubmittedSubscription(IBaseResource theSubscription) {
validateSubmittedSubscription(theSubscription, null);
}
public void validateSubmittedSubscription(IBaseResource theSubscription, RequestDetails theRequestDetails) {
if (!"Subscription".equals(myFhirContext.getResourceType(theSubscription))) {
return;
}
@ -94,6 +110,17 @@ public class SubscriptionValidatingInterceptor {
break;
}
// If the subscription has the cross partition tag &&
if (SubscriptionUtil.isCrossPartition(theSubscription) && !(theRequestDetails instanceof SystemRequestDetails)) {
if (!myDaoConfig.isCrossPartitionSubscription()){
throw new UnprocessableEntityException("Cross partition subscription is not enabled on this server");
}
if (!determinePartition(theRequestDetails, theSubscription).isDefaultPartition()) {
throw new UnprocessableEntityException("Cross partition subscription must be created on the default partition");
}
}
mySubscriptionCanonicalizer.setMatchingStrategyTag(theSubscription, null);
if (!finished) {
@ -123,6 +150,17 @@ public class SubscriptionValidatingInterceptor {
}
}
private RequestPartitionId determinePartition(RequestDetails theRequestDetails, IBaseResource theResource) {
switch (theRequestDetails.getRestOperationType()) {
case CREATE:
return myRequestPartitionHelperSvc.determineCreatePartitionForRequest(theRequestDetails, theResource, "Subscription");
case UPDATE:
return myRequestPartitionHelperSvc.determineReadPartitionForRequestForRead(theRequestDetails, "Subscription", theResource.getIdElement());
default:
return null;
}
}
public void validateQuery(String theQuery, String theFieldName) {
if (isBlank(theQuery)) {
throw new UnprocessableEntityException(theFieldName + " must be populated");
@ -215,6 +253,16 @@ public class SubscriptionValidatingInterceptor {
myDaoRegistry = theDaoRegistry;
}
@VisibleForTesting
public void setDaoConfigForUnitTest(DaoConfig theDaoConfig) {
myDaoConfig = theDaoConfig;
}
@VisibleForTesting
public void setRequestPartitionHelperSvcForUnitTest(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) {
myRequestPartitionHelperSvc = theRequestPartitionHelperSvc;
}
@VisibleForTesting
@SuppressWarnings("WeakerAccess")

View File

@ -13,6 +13,11 @@ public class SubscriptionUtil {
public static RequestDetails createRequestDetailForPartitionedRequest(CanonicalSubscription theSubscription) {
RequestPartitionId requestPartitionId = new PartitionablePartitionId(theSubscription.getRequestPartitionId(), null).toPartitionId();
if (theSubscription.getCrossPartitionEnabled()) {
requestPartitionId = RequestPartitionId.allPartitions();
}
return new SystemRequestDetails().setRequestPartitionId(requestPartitionId);
}
}

View File

@ -5,11 +5,17 @@ import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.util.HapiExtensions;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.assertj.core.util.Lists;
import org.hamcrest.Matchers;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.StringType;
import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
@ -20,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class CanonicalSubscriptionTest {
private static final Logger ourLog = LoggerFactory.getLogger(CanonicalSubscriptionTest.class);
private static final String TAG_SYSTEM = "https://hapifhir.org/NamingSystem/managing-mdm-system";
private static final String TAG_VALUE = "HAPI-MDM";
@ -73,6 +80,51 @@ public class CanonicalSubscriptionTest {
assertTrue(sub1.equals(sub2));
}
@Test
public void testSerializeMultiPartitionSubscription(){
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4());
Subscription subscription = makeEmailSubscription();
subscription.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new BooleanType().setValue(true));
CanonicalSubscription canonicalSubscription = canonicalizer.canonicalize(subscription);
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), true);
}
@Test
public void testSerializeIncorrectMultiPartitionSubscription(){
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4());
Subscription subscription = makeEmailSubscription();
subscription.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new StringType().setValue("false"));
CanonicalSubscription canonicalSubscription = canonicalizer.canonicalize(subscription);
System.out.print(canonicalSubscription);
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), false);
}
@Test
public void testSerializeNonMultiPartitionSubscription(){
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4());
Subscription subscription = makeEmailSubscription();
subscription.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new BooleanType().setValue(false));
CanonicalSubscription canonicalSubscription = canonicalizer.canonicalize(subscription);
System.out.print(canonicalSubscription);
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), false);
}
@Test
public void testLegacyCanonicalSubscription() throws JsonProcessingException {
String legacyCanonical = "{\"headers\":{\"retryCount\":0,\"customHeaders\":{}},\"payload\":{\"canonicalSubscription\":{\"extensions\":{\"key1\":[\"VALUE1\"],\"key2\":[\"VALUE2a\",\"VALUE2b\"]},\"sendDeleteMessages\":false},\"partitionId\":{\"allPartitions\":false,\"partitionIds\":[null]}}}";
ObjectMapper mapper = new ObjectMapper();
ResourceDeliveryJsonMessage resourceDeliveryMessage = mapper.readValue(legacyCanonical, ResourceDeliveryJsonMessage.class);
CanonicalSubscription payload = resourceDeliveryMessage.getPayload().getSubscription();
assertEquals(payload.getCrossPartitionEnabled(), false);
}
private Subscription makeEmailSubscription() {
Subscription retVal = new Subscription();
Subscription.SubscriptionChannelComponent channel = new Subscription.SubscriptionChannelComponent();

View File

@ -7,7 +7,6 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer;
@ -45,7 +44,6 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -92,6 +90,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer;
@Autowired
protected PartitionSettings myPartitionSettings;
@Autowired
protected DaoConfig myDaoConfig;
protected String myCode = "1000000050";

View File

@ -15,7 +15,9 @@ import ca.uhn.fhir.jpa.subscription.module.standalone.BaseBlockingQueueSubscriba
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage;
import ca.uhn.fhir.util.HapiExtensions;
import com.google.common.collect.Lists;
import org.hl7.fhir.dstu3.model.BooleanType;
import org.hl7.fhir.dstu3.model.Observation;
import org.hl7.fhir.dstu3.model.Subscription;
import org.junit.jupiter.api.BeforeEach;
@ -283,18 +285,94 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
mySubscriptionResourceNotMatched.awaitExpected();
}
@Test
public void testCrossPartitionSubscriptionForResourceOnTheSamePartitionMatch() throws InterruptedException {
myPartitionSettings.setPartitioningEnabled(true);
myDaoConfig.setCrossPartitionSubscription(true);
String payload = "application/fhir+json";
String code = "1000000050";
String criteria = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
RequestPartitionId subscriptionPartitionId = RequestPartitionId.defaultPartition();
Subscription subscription = makeActiveSubscription(criteria, payload, ourListenerServerBase);
subscription.addExtension().setUrl(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION).setValue(new BooleanType(true));
mockSubscriptionRead(subscriptionPartitionId, subscription);
sendSubscription(subscription, subscriptionPartitionId, true);
ourObservationListener.setExpectedCount(1);
mySubscriptionResourceMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT", subscriptionPartitionId);
mySubscriptionResourceMatched.awaitExpected();
ourObservationListener.awaitExpected();
}
@Test
public void testCrossPartitionSubscriptionForResourceOnDifferentPartitionMatch() throws InterruptedException {
myPartitionSettings.setPartitioningEnabled(true);
myDaoConfig.setCrossPartitionSubscription(true);
String payload = "application/fhir+json";
String code = "1000000050";
String criteria = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
RequestPartitionId subscriptionPartitionId = RequestPartitionId.defaultPartition();
RequestPartitionId requestPartitionId = RequestPartitionId.fromPartitionId(1);
Subscription subscription = makeActiveSubscription(criteria, payload, ourListenerServerBase);
subscription.addExtension().setUrl(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION).setValue(new BooleanType(true));
mockSubscriptionRead(subscriptionPartitionId, subscription);
sendSubscription(subscription, subscriptionPartitionId, true);
ourObservationListener.setExpectedCount(1);
mySubscriptionResourceMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT", requestPartitionId);
mySubscriptionResourceMatched.awaitExpected();
ourObservationListener.awaitExpected();
}
@Test
public void testCrossPartitionSubscriptionForMultipleResourceOnDifferentPartitionMatch() throws InterruptedException {
myPartitionSettings.setPartitioningEnabled(true);
myDaoConfig.setCrossPartitionSubscription(true);
String payload = "application/fhir+json";
String code = "1000000050";
String criteria = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
RequestPartitionId subscriptionPartitionId = RequestPartitionId.defaultPartition();
RequestPartitionId requestPartitionId = RequestPartitionId.fromPartitionId(1);
RequestPartitionId requestPartitionId2 = RequestPartitionId.fromPartitionId(2);
Subscription subscription = makeActiveSubscription(criteria, payload, ourListenerServerBase);
subscription.addExtension().setUrl(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION).setValue(new BooleanType(true));
mockSubscriptionRead(subscriptionPartitionId, subscription);
sendSubscription(subscription, subscriptionPartitionId, true);
ourObservationListener.setExpectedCount(2);
mySubscriptionResourceMatched.setExpectedCount(2);
sendObservation(code, "SNOMED-CT", requestPartitionId);
sendObservation(code, "SNOMED-CT", requestPartitionId2);
mySubscriptionResourceMatched.awaitExpected();
ourObservationListener.awaitExpected();
}
@Nested
public class TestDeleteMessages {
private final SubscriptionMatchingSubscriber subscriber = new SubscriptionMatchingSubscriber();
@Mock ResourceModifiedMessage message;
@Mock IInterceptorBroadcaster myInterceptorBroadcaster;
@Mock SubscriptionRegistry mySubscriptionRegistry;
@Mock(answer = Answers.RETURNS_DEEP_STUBS) ActiveSubscription myActiveSubscription;
@Mock CanonicalSubscription myCanonicalSubscription;
@Mock SubscriptionCriteriaParser.SubscriptionCriteria mySubscriptionCriteria;
@Mock
ResourceModifiedMessage message;
@Mock
IInterceptorBroadcaster myInterceptorBroadcaster;
@Mock
SubscriptionRegistry mySubscriptionRegistry;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
ActiveSubscription myActiveSubscription;
@Mock
CanonicalSubscription myCanonicalSubscription;
@Mock
SubscriptionCriteriaParser.SubscriptionCriteria mySubscriptionCriteria;
@Test
public void testAreNotIgnored() {
public void testAreNotIgnored() {
ReflectionTestUtils.setField(subscriber, "myInterceptorBroadcaster", myInterceptorBroadcaster);
ReflectionTestUtils.setField(subscriber, "mySubscriptionRegistry", mySubscriptionRegistry);
@ -354,7 +432,6 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
}
private void mockSubscriptionRead(RequestPartitionId theRequestPartitionId, Subscription subscription) {
Subscription modifiedSubscription = subscription.copy();
// the original partition info was the request info, but we need the actual storage partition.

View File

@ -1,7 +1,9 @@
package ca.uhn.fhir.jpa.subscription.submit.interceptor;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
@ -29,6 +31,10 @@ public class SubscriptionValidatingInterceptorTest {
private DaoRegistry myDaoRegistry;
@MockBean
private SubscriptionStrategyEvaluator mySubscriptionStrategyEvaluator;
@MockBean
private DaoConfig myDaoConfig;
@MockBean
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@BeforeEach
public void before() {

View File

@ -292,8 +292,8 @@ public class DaoConfig {
*/
private boolean myStoreResourceInLuceneIndex;
/**
* @see FhirValidator#isConcurrentBundleValidation()
/**
* @see FhirValidator#isConcurrentBundleValidation()
* @since 5.7.0
*/
private boolean myConcurrentBundleValidation;
@ -2760,10 +2760,10 @@ public class DaoConfig {
*/
public void setStoreResourceInLuceneIndex(boolean theStoreResourceInLuceneIndex) {
myStoreResourceInLuceneIndex = theStoreResourceInLuceneIndex;
}
/**
* @see FhirValidator#isConcurrentBundleValidation()
}
/**
* @see FhirValidator#isConcurrentBundleValidation()
* @since 5.7.0
*/
public boolean isConcurrentBundleValidation() {
@ -2779,6 +2779,26 @@ public class DaoConfig {
return this;
}
/**
* This setting indicates if a cross-partition subscription can be made.
*
* @see ModelConfig#setCrossPartitionSubscription(boolean)
* @since 7.5.0
*/
public boolean isCrossPartitionSubscription() {
return this.myModelConfig.isCrossPartitionSubscription();
}
/**
* This setting indicates if a cross-partition subscription can be made.
*
* @see ModelConfig#setCrossPartitionSubscription(boolean)
* @since 7.5.0
*/
public void setCrossPartitionSubscription(boolean theAllowCrossPartitionSubscription) {
this.myModelConfig.setCrossPartitionSubscription(theAllowCrossPartitionSubscription);
}
public enum StoreMetaSourceInformationEnum {
NONE(false, false),
SOURCE_URI(true, false),

View File

@ -31,6 +31,7 @@ import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.util.HapiExtensions;
import ca.uhn.fhir.util.SubscriptionUtil;
import org.hl7.fhir.exceptions.FHIRException;
import org.hl7.fhir.instance.model.api.IBaseCoding;
import org.hl7.fhir.instance.model.api.IBaseHasExtensions;
@ -95,6 +96,7 @@ public class SubscriptionCanonicalizer {
retVal.setIdElement(subscription.getIdElement());
retVal.setPayloadString(subscription.getChannel().getPayload());
retVal.setTags(extractTags(subscription));
retVal.setCrossPartitionEnabled(SubscriptionUtil.isCrossPartition(theSubscription));
} catch (FHIRException theE) {
throw new InternalErrorException(theE);
}
@ -135,6 +137,7 @@ public class SubscriptionCanonicalizer {
retVal.setPayloadString(subscription.getChannel().getPayload());
retVal.setPayloadSearchCriteria(getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA));
retVal.setTags(extractTags(subscription));
retVal.setCrossPartitionEnabled(SubscriptionUtil.isCrossPartition(theSubscription));
if (retVal.getChannelType() == CanonicalSubscriptionChannelType.EMAIL) {
String from;
@ -235,6 +238,7 @@ public class SubscriptionCanonicalizer {
retVal.setPayloadSearchCriteria(getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA));
retVal.setTags(extractTags(subscription));
setPartitionIdOnReturnValue(theSubscription, retVal);
retVal.setCrossPartitionEnabled(SubscriptionUtil.isCrossPartition(theSubscription));
if (retVal.getChannelType() == CanonicalSubscriptionChannelType.EMAIL) {
String from;

View File

@ -74,6 +74,8 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso
private String myPayloadSearchCriteria;
@JsonProperty("partitionId")
private Integer myPartitionId;
@JsonProperty("crossPartitionEnabled")
private boolean myCrossPartitionEnabled;
@JsonProperty("sendDeleteMessages")
private boolean mySendDeleteMessages;
@ -239,6 +241,14 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso
myPartitionId = thePartitionId;
}
public boolean getCrossPartitionEnabled() {
return myCrossPartitionEnabled;
}
public void setCrossPartitionEnabled(boolean myCrossPartitionEnabled) {
this.myCrossPartitionEnabled = myCrossPartitionEnabled;
}
/**
* For now we're using the R4 triggerdefinition, but this
* may change in the future when things stabilize
@ -247,7 +257,9 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso
return myTrigger;
}
public boolean getSendDeleteMessages() { return mySendDeleteMessages; }
public boolean getSendDeleteMessages() {
return mySendDeleteMessages;
}
public void setSendDeleteMessages(boolean theSendDeleteMessages) {
mySendDeleteMessages = theSendDeleteMessages;