6188 subscription not marked as a cross partition subscription matches operation on resources in other partitions (#6191)

* initial failing test

* WIP

* fixing/adding tests

* added changelog

* spotless

* fixing tests

* Cleaning up tests

* addressing commetns from first code review.

* no-op to get pipelines going

---------

Co-authored-by: peartree <etienne.poirier@smilecdr.com>
This commit is contained in:
Etienne Poirier 2024-08-07 13:44:11 -04:00 committed by GitHub
parent 7a5bdee7ea
commit c53458b5c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 323 additions and 226 deletions

View File

@ -10,3 +10,4 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -332,6 +332,14 @@ public class RequestPartitionId implements IModelJson {
return new RequestPartitionId(thePartitionNames, thePartitionIds, thePartitionDate);
}
public static boolean isDefaultPartition(@Nullable RequestPartitionId thePartitionId) {
if (thePartitionId == null) {
return false;
}
return thePartitionId.isDefaultPartition();
}
/**
* Create a string representation suitable for use as a cache key. Null aware.
* <p>

View File

@ -65,7 +65,7 @@ public class SubscriptionUtil {
populatePrimitiveValue(theContext, theSubscription, "status", theStatus);
}
public static boolean isCrossPartition(IBaseResource theSubscription) {
public static boolean isDefinedAsCrossPartitionSubcription(IBaseResource theSubscription) {
if (theSubscription instanceof IBaseHasExtensions) {
IBaseExtension extension = ExtensionUtil.getExtensionByUrl(
theSubscription, HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION);

View File

@ -0,0 +1,6 @@
---
type: fix
issue: 6188
jira: SMILE-8759
title: "Previously, a Subscription not marked as a cross-partition subscription could listen to incoming resources from
other partitions. This issue is fixed."

View File

@ -151,15 +151,17 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
*/
private boolean processSubscription(
ResourceModifiedMessage theMsg, IIdType theResourceId, ActiveSubscription theActiveSubscription) {
// skip if the partitions don't match
CanonicalSubscription subscription = theActiveSubscription.getSubscription();
if (subscription != null
&& theMsg.getPartitionId() != null
&& theMsg.getPartitionId().hasPartitionIds()
&& !subscription.getCrossPartitionEnabled()
&& !subscription.isCrossPartitionEnabled()
&& !theMsg.getPartitionId().hasPartitionId(subscription.getRequestPartitionId())) {
return false;
}
String nextSubscriptionId = theActiveSubscription.getId();
if (isNotBlank(theMsg.getSubscriptionId())) {

View File

@ -241,7 +241,7 @@ public class SubscriptionValidatingInterceptor {
RequestPartitionId theRequestPartitionId,
Pointcut thePointcut) {
// If the subscription has the cross partition tag
if (SubscriptionUtil.isCrossPartition(theSubscription)
if (SubscriptionUtil.isDefinedAsCrossPartitionSubcription(theSubscription)
&& !(theRequestDetails instanceof SystemRequestDetails)) {
if (!mySubscriptionSettings.isCrossPartitionSubscriptionEnabled()) {
throw new UnprocessableEntityException(

View File

@ -151,7 +151,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
if (theSubscriptionId != null) {
IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getSubscriptionDao();
IBaseResource subscription = subscriptionDao.read(theSubscriptionId, theRequestDetails);
if (mySubscriptionCanonicalizer.canonicalize(subscription).getCrossPartitionEnabled()) {
if (mySubscriptionCanonicalizer.canonicalize(subscription).isCrossPartitionEnabled()) {
requestPartitionId = RequestPartitionId.allPartitions();
} else {
// Otherwise, trust the partition passed in via tenant/interceptor.

View File

@ -34,7 +34,7 @@ public class SubscriptionUtil {
RequestPartitionId requestPartitionId =
new PartitionablePartitionId(theSubscription.getRequestPartitionId(), null).toPartitionId();
if (theSubscription.getCrossPartitionEnabled()) {
if (theSubscription.isCrossPartitionEnabled()) {
requestPartitionId = RequestPartitionId.allPartitions();
}

View File

@ -1,43 +1,37 @@
package ca.uhn.fhir.jpa.subscription.match.registry;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.model.config.SubscriptionSettings;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.CanonicalTopicSubscriptionFilter;
import ca.uhn.fhir.jpa.model.config.SubscriptionSettings;
import ca.uhn.fhir.model.api.ExtensionDt;
import ca.uhn.fhir.model.api.IFhirVersion;
import ca.uhn.fhir.model.dstu2.FhirDstu2;
import ca.uhn.fhir.model.primitive.BooleanDt;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import ca.uhn.fhir.subscription.SubscriptionTestDataHelper;
import ca.uhn.fhir.util.HapiExtensions;
import jakarta.annotation.Nonnull;
import org.hl7.fhir.dstu3.hapi.ctx.FhirDstu3;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.hapi.ctx.FhirR4;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Extension;
import org.hl7.fhir.r4.model.Subscription;
import org.hl7.fhir.r4b.hapi.ctx.FhirR4B;
import org.hl7.fhir.r5.hapi.ctx.FhirR5;
import org.hl7.fhir.r5.model.Coding;
import org.hl7.fhir.r5.model.Enumerations;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.stream.Stream;
import static ca.uhn.fhir.rest.api.Constants.CT_FHIR_JSON_NEW;
import static ca.uhn.fhir.rest.api.Constants.RESOURCE_PARTITION_ID;
import static ca.uhn.fhir.util.HapiExtensions.EX_SEND_DELETE_MESSAGES;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
class SubscriptionCanonicalizerTest {
@ -172,44 +166,158 @@ class SubscriptionCanonicalizerTest {
verifyChannelParameters(canonical, thePayloadContent);
}
private static Stream<Arguments> crossPartitionParams() {
return Stream.of(
arguments(true, FhirContext.forDstu2Cached()),
arguments(false, FhirContext.forDstu2Cached()),
arguments(true, FhirContext.forDstu3Cached()),
arguments(false, FhirContext.forDstu3Cached()),
arguments(true, FhirContext.forR4Cached()),
arguments(false, FhirContext.forR4Cached()),
arguments(true, FhirContext.forR4BCached()),
arguments(false, FhirContext.forR4BCached()),
arguments(true, FhirContext.forR5Cached()),
arguments(false, FhirContext.forR5Cached())
);
private static Stream<RequestPartitionId> crossPartitionParams() {
return Stream.of(null, RequestPartitionId.fromPartitionId(1), RequestPartitionId.defaultPartition()) ;
}
@ParameterizedTest
@MethodSource("crossPartitionParams")
void testCrossPartition(boolean theCrossPartitionSubscriptionEnabled, FhirContext theFhirContext) {
final IFhirVersion version = theFhirContext.getVersion();
IBaseResource subscription = null;
if (version instanceof FhirDstu2){
subscription = new ca.uhn.fhir.model.dstu2.resource.Subscription();
} else if (version instanceof FhirDstu3){
subscription = new org.hl7.fhir.dstu3.model.Subscription();
} else if (version instanceof FhirR4){
subscription = new Subscription();
} else if (version instanceof FhirR4B){
subscription = new org.hl7.fhir.r4b.model.Subscription();
} else if (version instanceof FhirR5){
subscription = new org.hl7.fhir.r5.model.Subscription();
void testSubscriptionCrossPartitionEnableProperty_forDstu2WithExtensionAndPartitions(RequestPartitionId theRequestPartitionId) {
final SubscriptionSettings subscriptionSettings = new SubscriptionSettings();
subscriptionSettings.setCrossPartitionSubscriptionEnabled(true);
final SubscriptionCanonicalizer subscriptionCanonicalizer = new SubscriptionCanonicalizer(FhirContext.forDstu2(), subscriptionSettings);
ca.uhn.fhir.model.dstu2.resource.Subscription subscriptionWithoutExtension = new ca.uhn.fhir.model.dstu2.resource.Subscription();
subscriptionWithoutExtension.setUserData(RESOURCE_PARTITION_ID, theRequestPartitionId);
final CanonicalSubscription canonicalSubscriptionWithoutExtension = subscriptionCanonicalizer.canonicalize(subscriptionWithoutExtension);
assertThat(canonicalSubscriptionWithoutExtension.isCrossPartitionEnabled()).isFalse();
}
@ParameterizedTest
@MethodSource("crossPartitionParams")
void testSubscriptionCrossPartitionEnableProperty_forDstu3WithExtensionAndPartitions(RequestPartitionId theRequestPartitionId) {
final SubscriptionSettings subscriptionSettings = new SubscriptionSettings();
subscriptionSettings.setCrossPartitionSubscriptionEnabled(true);
final SubscriptionCanonicalizer subscriptionCanonicalizer = new SubscriptionCanonicalizer(FhirContext.forDstu3(), subscriptionSettings);
org.hl7.fhir.dstu3.model.Subscription subscriptionWithoutExtension = new org.hl7.fhir.dstu3.model.Subscription();
subscriptionWithoutExtension.setUserData(RESOURCE_PARTITION_ID, theRequestPartitionId);
org.hl7.fhir.dstu3.model.Subscription subscriptionWithExtensionCrossPartitionTrue = new org.hl7.fhir.dstu3.model.Subscription();
subscriptionWithExtensionCrossPartitionTrue.setUserData(RESOURCE_PARTITION_ID, theRequestPartitionId);
subscriptionWithExtensionCrossPartitionTrue.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new org.hl7.fhir.dstu3.model.BooleanType().setValue(true));
org.hl7.fhir.dstu3.model.Subscription subscriptionWithExtensionCrossPartitionFalse = new org.hl7.fhir.dstu3.model.Subscription();
subscriptionWithExtensionCrossPartitionFalse.setUserData(RESOURCE_PARTITION_ID, theRequestPartitionId);
subscriptionWithExtensionCrossPartitionFalse.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new org.hl7.fhir.dstu3.model.BooleanType().setValue(false));
final CanonicalSubscription canonicalSubscriptionWithoutExtension = subscriptionCanonicalizer.canonicalize(subscriptionWithoutExtension);
final CanonicalSubscription canonicalSubscriptionWithExtensionCrossPartitionTrue = subscriptionCanonicalizer.canonicalize(subscriptionWithExtensionCrossPartitionTrue);
final CanonicalSubscription canonicalSubscriptionWithExtensionCrossPartitionFalse = subscriptionCanonicalizer.canonicalize(subscriptionWithExtensionCrossPartitionFalse);
if(RequestPartitionId.isDefaultPartition(theRequestPartitionId)){
assertThat(canonicalSubscriptionWithoutExtension.isCrossPartitionEnabled()).isFalse();
assertThat(canonicalSubscriptionWithExtensionCrossPartitionTrue.isCrossPartitionEnabled()).isTrue();
assertThat(canonicalSubscriptionWithExtensionCrossPartitionFalse.isCrossPartitionEnabled()).isFalse();
} else {
assertThat(canonicalSubscriptionWithoutExtension.isCrossPartitionEnabled()).isFalse();
assertThat(canonicalSubscriptionWithExtensionCrossPartitionTrue.isCrossPartitionEnabled()).isFalse();
assertThat(canonicalSubscriptionWithExtensionCrossPartitionFalse.isCrossPartitionEnabled()).isFalse();
}
}
@ParameterizedTest
@MethodSource("crossPartitionParams")
void testSubscriptionCrossPartitionEnableProperty_forR4WithExtensionAndPartitions(RequestPartitionId theRequestPartitionId) {
final SubscriptionSettings subscriptionSettings = new SubscriptionSettings();
subscriptionSettings.setCrossPartitionSubscriptionEnabled(true);
final SubscriptionCanonicalizer subscriptionCanonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4Cached(), subscriptionSettings);
Subscription subscriptionWithoutExtension = new Subscription();
subscriptionWithoutExtension.setUserData(RESOURCE_PARTITION_ID, theRequestPartitionId);
Subscription subscriptionWithExtensionCrossPartitionTrue = new Subscription();
subscriptionWithExtensionCrossPartitionTrue.setUserData(RESOURCE_PARTITION_ID, theRequestPartitionId);
subscriptionWithExtensionCrossPartitionTrue.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new org.hl7.fhir.r4.model.BooleanType().setValue(true));
Subscription subscriptionWithExtensionCrossPartitionFalse = new Subscription();
subscriptionWithExtensionCrossPartitionFalse.setUserData(RESOURCE_PARTITION_ID, theRequestPartitionId);
subscriptionWithExtensionCrossPartitionFalse.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new org.hl7.fhir.r4.model.BooleanType().setValue(false));
final CanonicalSubscription canonicalSubscriptionWithoutExtension = subscriptionCanonicalizer.canonicalize(subscriptionWithoutExtension);
final CanonicalSubscription canonicalSubscriptionWithExtensionCrossPartitionTrue = subscriptionCanonicalizer.canonicalize(subscriptionWithExtensionCrossPartitionTrue);
final CanonicalSubscription canonicalSubscriptionWithExtensionCrossPartitionFalse = subscriptionCanonicalizer.canonicalize(subscriptionWithExtensionCrossPartitionFalse);
if(RequestPartitionId.isDefaultPartition(theRequestPartitionId)){
assertThat(canonicalSubscriptionWithoutExtension.isCrossPartitionEnabled()).isFalse();
assertThat(canonicalSubscriptionWithExtensionCrossPartitionTrue.isCrossPartitionEnabled()).isTrue();
assertThat(canonicalSubscriptionWithExtensionCrossPartitionFalse.isCrossPartitionEnabled()).isFalse();
} else {
assertThat(canonicalSubscriptionWithoutExtension.isCrossPartitionEnabled()).isFalse();
assertThat(canonicalSubscriptionWithExtensionCrossPartitionTrue.isCrossPartitionEnabled()).isFalse();
assertThat(canonicalSubscriptionWithExtensionCrossPartitionFalse.isCrossPartitionEnabled()).isFalse();
}
final SubscriptionSettings subscriptionSettings = new SubscriptionSettings();
subscriptionSettings.setCrossPartitionSubscriptionEnabled(theCrossPartitionSubscriptionEnabled);
final SubscriptionCanonicalizer subscriptionCanonicalizer = new SubscriptionCanonicalizer(theFhirContext, subscriptionSettings);
final CanonicalSubscription canonicalSubscription = subscriptionCanonicalizer.canonicalize(subscription);
}
assertEquals(theCrossPartitionSubscriptionEnabled, canonicalSubscription.getCrossPartitionEnabled());
@ParameterizedTest
@MethodSource("crossPartitionParams")
void testSubscriptionCrossPartitionEnableProperty_forR4BWithExtensionAndPartitions(RequestPartitionId theRequestPartitionId) {
final SubscriptionSettings subscriptionSettings = new SubscriptionSettings();
subscriptionSettings.setCrossPartitionSubscriptionEnabled(true);
final SubscriptionCanonicalizer subscriptionCanonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4BCached(), subscriptionSettings);
org.hl7.fhir.r4b.model.Subscription subscriptionWithoutExtension = new org.hl7.fhir.r4b.model.Subscription();
subscriptionWithoutExtension.setUserData(RESOURCE_PARTITION_ID, theRequestPartitionId);
org.hl7.fhir.r4b.model.Subscription subscriptionWithExtensionCrossPartitionTrue = new org.hl7.fhir.r4b.model.Subscription();
subscriptionWithExtensionCrossPartitionTrue.setUserData(RESOURCE_PARTITION_ID, theRequestPartitionId);
subscriptionWithExtensionCrossPartitionTrue.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new org.hl7.fhir.r4b.model.BooleanType().setValue(true));
org.hl7.fhir.r4b.model.Subscription subscriptionWithExtensionCrossPartitionFalse = new org.hl7.fhir.r4b.model.Subscription();
subscriptionWithExtensionCrossPartitionFalse.setUserData(RESOURCE_PARTITION_ID, theRequestPartitionId);
subscriptionWithExtensionCrossPartitionFalse.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new org.hl7.fhir.r4b.model.BooleanType().setValue(false));
final CanonicalSubscription canonicalSubscriptionWithoutExtension = subscriptionCanonicalizer.canonicalize(subscriptionWithoutExtension);
final CanonicalSubscription canonicalSubscriptionWithExtensionCrossPartitionTrue = subscriptionCanonicalizer.canonicalize(subscriptionWithExtensionCrossPartitionTrue);
final CanonicalSubscription canonicalSubscriptionWithExtensionCrossPartitionFalse = subscriptionCanonicalizer.canonicalize(subscriptionWithExtensionCrossPartitionFalse);
if(RequestPartitionId.isDefaultPartition(theRequestPartitionId)){
assertThat(canonicalSubscriptionWithoutExtension.isCrossPartitionEnabled()).isFalse();
assertThat(canonicalSubscriptionWithExtensionCrossPartitionTrue.isCrossPartitionEnabled()).isTrue();
assertThat(canonicalSubscriptionWithExtensionCrossPartitionFalse.isCrossPartitionEnabled()).isFalse();
} else {
assertThat(canonicalSubscriptionWithoutExtension.isCrossPartitionEnabled()).isFalse();
assertThat(canonicalSubscriptionWithExtensionCrossPartitionTrue.isCrossPartitionEnabled()).isFalse();
assertThat(canonicalSubscriptionWithExtensionCrossPartitionFalse.isCrossPartitionEnabled()).isFalse();
}
}
@ParameterizedTest
@MethodSource("crossPartitionParams")
void testSubscriptionCrossPartitionEnableProperty_forR5WithExtensionAndPartitions(RequestPartitionId theRequestPartitionId) {
final SubscriptionSettings subscriptionSettings = new SubscriptionSettings();
subscriptionSettings.setCrossPartitionSubscriptionEnabled(true);
final SubscriptionCanonicalizer subscriptionCanonicalizer = new SubscriptionCanonicalizer(FhirContext.forR5Cached(), subscriptionSettings);
org.hl7.fhir.r5.model.Subscription subscriptionWithoutExtension = new org.hl7.fhir.r5.model.Subscription();
subscriptionWithoutExtension.setUserData(RESOURCE_PARTITION_ID, theRequestPartitionId);
org.hl7.fhir.r5.model.Subscription subscriptionWithExtensionCrossPartitionTrue = new org.hl7.fhir.r5.model.Subscription();
subscriptionWithExtensionCrossPartitionTrue.setUserData(RESOURCE_PARTITION_ID, theRequestPartitionId);
subscriptionWithExtensionCrossPartitionTrue.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new org.hl7.fhir.r5.model.BooleanType().setValue(true));
org.hl7.fhir.r5.model.Subscription subscriptionWithExtensionCrossPartitionFalse = new org.hl7.fhir.r5.model.Subscription();
subscriptionWithExtensionCrossPartitionFalse.setUserData(RESOURCE_PARTITION_ID, theRequestPartitionId);
subscriptionWithExtensionCrossPartitionFalse.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new org.hl7.fhir.r5.model.BooleanType().setValue(false));
final CanonicalSubscription canonicalSubscriptionWithoutExtension = subscriptionCanonicalizer.canonicalize(subscriptionWithoutExtension);
final CanonicalSubscription canonicalSubscriptionWithExtensionCrossPartitionTrue = subscriptionCanonicalizer.canonicalize(subscriptionWithExtensionCrossPartitionTrue);
final CanonicalSubscription canonicalSubscriptionWithExtensionCrossPartitionFalse = subscriptionCanonicalizer.canonicalize(subscriptionWithExtensionCrossPartitionFalse);
if(RequestPartitionId.isDefaultPartition(theRequestPartitionId)){
assertThat(canonicalSubscriptionWithoutExtension.isCrossPartitionEnabled()).isFalse();
assertThat(canonicalSubscriptionWithExtensionCrossPartitionTrue.isCrossPartitionEnabled()).isTrue();
assertThat(canonicalSubscriptionWithExtensionCrossPartitionFalse.isCrossPartitionEnabled()).isFalse();
} else {
assertThat(canonicalSubscriptionWithoutExtension.isCrossPartitionEnabled()).isFalse();
assertThat(canonicalSubscriptionWithExtensionCrossPartitionTrue.isCrossPartitionEnabled()).isFalse();
assertThat(canonicalSubscriptionWithExtensionCrossPartitionFalse.isCrossPartitionEnabled()).isFalse();
}
}
private org.hl7.fhir.r4b.model.Subscription buildR4BSubscription(String thePayloadContent) {

View File

@ -1,22 +1,23 @@
package ca.uhn.fhir.jpa.subscription.module;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.model.config.SubscriptionSettings;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.model.config.SubscriptionSettings;
import ca.uhn.fhir.util.HapiExtensions;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.Nonnull;
import org.assertj.core.util.Lists;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.StringType;
import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -24,11 +25,13 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Stream;
import static ca.uhn.fhir.rest.api.Constants.RESOURCE_PARTITION_ID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class CanonicalSubscriptionTest {
@ -71,7 +74,7 @@ public class CanonicalSubscriptionTest {
}
@Test
public void testCanonicalSubscriptionRetainsMetaTags() throws IOException {
public void testCanonicalSubscriptionRetainsMetaTags() {
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), new SubscriptionSettings());
CanonicalSubscription sub1 = canonicalizer.canonicalize(makeMdmSubscription());
assertThat(sub1.getTags()).containsKey(TAG_SYSTEM);
@ -86,40 +89,48 @@ public class CanonicalSubscriptionTest {
assertTrue(sub1.equals(sub2));
}
@Test
public void testSerializeMultiPartitionSubscription(){
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), new SubscriptionSettings());
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionCrossPartitionEnabled_basedOnGlobalFlagAndExtensionFalse(boolean theIsCrossPartitionEnabled){
final SubscriptionSettings subscriptionSettings = buildSubscriptionSettings(theIsCrossPartitionEnabled);
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), subscriptionSettings);
Subscription subscriptionWithExtensionSetToBooleanFalse = makeEmailSubscription();
subscriptionWithExtensionSetToBooleanFalse.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new BooleanType().setValue(false));
CanonicalSubscription canonicalSubscriptionExtensionSetToBooleanFalse = canonicalizer.canonicalize(subscriptionWithExtensionSetToBooleanFalse);
assertEquals(canonicalSubscriptionExtensionSetToBooleanFalse.isCrossPartitionEnabled(), false);
}
static Stream<Arguments> requestPartitionIds() {
return Stream.of(
Arguments.of(null, false),
Arguments.of(RequestPartitionId.allPartitions(), false),
Arguments.of(RequestPartitionId.fromPartitionIds(1), false),
Arguments.of(RequestPartitionId.defaultPartition(), true)
);
}
@ParameterizedTest
@MethodSource("requestPartitionIds")
public void testSubscriptionCrossPartitionEnabled_basedOnGlobalFlagAndExtensionAndPartitionId(RequestPartitionId theRequestPartitionId, boolean theExpectedIsCrossPartitionEnabled){
final boolean globalIsCrossPartitionEnabled = true; // not required but to help understand what the test is doing.
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), buildSubscriptionSettings(globalIsCrossPartitionEnabled));
Subscription subscription = makeEmailSubscription();
subscription.setUserData(RESOURCE_PARTITION_ID,theRequestPartitionId);
subscription.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new BooleanType().setValue(true));
CanonicalSubscription canonicalSubscription = canonicalizer.canonicalize(subscription);
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), true);
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSerializeIncorrectMultiPartitionSubscription(boolean theIsCrossPartitionEnabled){
final SubscriptionSettings subscriptionSettings = buildSubscriptionSettings(theIsCrossPartitionEnabled);
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), subscriptionSettings);
Subscription subscription = makeEmailSubscription();
subscription.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new StringType().setValue("false"));
CanonicalSubscription canonicalSubscription = canonicalizer.canonicalize(subscription);
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), theIsCrossPartitionEnabled);
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSerializeNonMultiPartitionSubscription(boolean theIsCrossPartitionEnabled){
final SubscriptionSettings subscriptionSettings = buildSubscriptionSettings(theIsCrossPartitionEnabled);
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), subscriptionSettings);
Subscription subscription = makeEmailSubscription();
subscription.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new BooleanType().setValue(false));
CanonicalSubscription canonicalSubscription = canonicalizer.canonicalize(subscription);
System.out.print(canonicalSubscription);
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), theIsCrossPartitionEnabled);
// for a Subscription to be a cross-partition subscription, 3 things are required:
// - The Subs need to be created on the default partition
// - The Subs need to have extension EXTENSION_SUBSCRIPTION_CROSS_PARTITION set to true
// - Global flag CrossPartitionSubscriptionEnabled needs to be true
assertThat(canonicalSubscription.isCrossPartitionEnabled()).isEqualTo(theExpectedIsCrossPartitionEnabled);
}
@Test
@ -130,7 +141,7 @@ public class CanonicalSubscriptionTest {
CanonicalSubscription payload = resourceDeliveryMessage.getPayload().getSubscription();
assertFalse(payload.getCrossPartitionEnabled());
assertFalse(payload.isCrossPartitionEnabled());
}
private Subscription makeEmailSubscription() {

View File

@ -4,8 +4,8 @@ import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.model.config.SubscriptionSettings;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionCriteriaParser;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchDeliverer;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber;
@ -14,7 +14,6 @@ import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.standalone.BaseBlockingQueueSubscribableChannelDstu3Test;
import ca.uhn.fhir.jpa.model.config.SubscriptionSettings;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage;
@ -192,35 +191,16 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0));
}
@Test
public void testSubscriptionAndResourceOnTheSamePartitionMatch() throws InterruptedException {
@ParameterizedTest
@ValueSource(ints = {0,1})
public void testSubscriptionAndResourceOnTheSamePartitionMatch(int thePartitionId) throws InterruptedException {
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
String code = "1000000050";
String criteria = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
RequestPartitionId requestPartitionId = RequestPartitionId.fromPartitionId(0);
Subscription subscription = makeActiveSubscription(criteria, payload, ourListenerServerBase);
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
ourObservationListener.setExpectedCount(1);
mySubscriptionResourceMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT", requestPartitionId);
mySubscriptionResourceMatched.awaitExpected();
ourObservationListener.awaitExpected();
}
@Test
public void testSubscriptionAndResourceOnTheSamePartitionMatchPart2() throws InterruptedException {
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
String code = "1000000050";
String criteria = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
RequestPartitionId requestPartitionId = RequestPartitionId.fromPartitionId(1);
RequestPartitionId requestPartitionId = RequestPartitionId.fromPartitionId(thePartitionId);
Subscription subscription = makeActiveSubscription(criteria, payload, ourListenerServerBase);
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
@ -234,7 +214,7 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionAndResourceOnDiffPartitionNotMatch(boolean theIsCrossPartitionEnabled) throws InterruptedException {
public void testSubscriptionCrossPartitionMatching_whenSubscriptionAndResourceOnDiffPartition_withGlobalFlagCrossPartitionSubscriptionEnable(boolean theIsCrossPartitionEnabled) throws InterruptedException {
mySubscriptionSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
@ -242,8 +222,9 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
String code = "1000000050";
String criteria = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
RequestPartitionId requestPartitionId = RequestPartitionId.fromPartitionId(1);
RequestPartitionId requestPartitionId = RequestPartitionId.defaultPartition();
Subscription subscription = makeActiveSubscription(criteria, payload, ourListenerServerBase);
subscription.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new org.hl7.fhir.dstu3.model.BooleanType().setValue(true));
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
@ -255,80 +236,8 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionAndResourceOnDiffPartitionNotMatchPart2(boolean theIsCrossPartitionEnabled) throws InterruptedException {
mySubscriptionSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
String code = "1000000050";
String criteria = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
RequestPartitionId requestPartitionId = RequestPartitionId.fromPartitionId(0);
Subscription subscription = makeActiveSubscription(criteria, payload, ourListenerServerBase);
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
final ThrowsInterrupted throwsInterrupted = () -> sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(1));
if (theIsCrossPartitionEnabled) {
runWithinLatchLogicExpectSuccess(throwsInterrupted);
} else {
runWithLatchLogicExpectFailure(throwsInterrupted);
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionOnDefaultPartitionAndResourceOnDiffPartitionNotMatch(boolean theIsCrossPartitionEnabled) throws InterruptedException {
mySubscriptionSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
String code = "1000000050";
String criteria = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
RequestPartitionId requestPartitionId = RequestPartitionId.defaultPartition();
Subscription subscription = makeActiveSubscription(criteria, payload, ourListenerServerBase);
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
final ThrowsInterrupted throwsInterrupted = () -> sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(1));
if (theIsCrossPartitionEnabled) {
runWithinLatchLogicExpectSuccess(throwsInterrupted);
} else {
runWithLatchLogicExpectFailure(throwsInterrupted);
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionOnAPartitionAndResourceOnDefaultPartitionNotMatch(boolean theIsCrossPartitionEnabled) throws InterruptedException {
mySubscriptionSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
String code = "1000000050";
String criteria = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
RequestPartitionId requestPartitionId = RequestPartitionId.fromPartitionId(1);
Subscription subscription = makeActiveSubscription(criteria, payload, ourListenerServerBase);
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
final ThrowsInterrupted throwsInterrupted = () -> sendObservation(code, "SNOMED-CT", RequestPartitionId.defaultPartition());
if (theIsCrossPartitionEnabled) {
runWithinLatchLogicExpectSuccess(throwsInterrupted);
} else {
runWithLatchLogicExpectFailure(throwsInterrupted);
}
}
@Test
public void testSubscriptionOnOnePartitionMatchResourceOnMultiplePartitions() throws InterruptedException {
public void testSubscriptionOnOnePartition_whenResourceCreatedOnMultiplePartitions_matchesOnlyResourceCreatedOnSamePartition() throws InterruptedException {
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
@ -350,7 +259,7 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionOnOnePartitionDoNotMatchResourceOnMultiplePartitions(boolean theIsCrossPartitionEnabled) throws InterruptedException {
public void testSubscriptionCrossPartitionMatching_whenSubscriptionAndResourceOnDiffPartition_withGlobalFlagCrossPartitionSubscriptionEnable2(boolean theIsCrossPartitionEnabled) throws InterruptedException {
mySubscriptionSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
@ -358,8 +267,9 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
String code = "1000000050";
String criteria = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
RequestPartitionId requestPartitionId = RequestPartitionId.fromPartitionId(1);
RequestPartitionId requestPartitionId = RequestPartitionId.defaultPartition();
Subscription subscription = makeActiveSubscription(criteria, payload, ourListenerServerBase);
subscription.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new org.hl7.fhir.dstu3.model.BooleanType().setValue(true));
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);

View File

@ -11,9 +11,9 @@ import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.dao.r4.BasePartitioningR4Test;
import ca.uhn.fhir.jpa.entity.PartitionEntity;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.model.config.SubscriptionSettings;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
import ca.uhn.fhir.jpa.subscription.resthook.RestHookTestR4Test;
import ca.uhn.fhir.jpa.model.config.SubscriptionSettings;
import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc;
import ca.uhn.fhir.jpa.subscription.triggering.SubscriptionTriggeringSvcImpl;
import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscriber;
@ -26,7 +26,12 @@ import jakarta.servlet.ServletException;
import org.awaitility.core.ConditionTimeoutException;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.*;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Resource;
import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -43,9 +48,9 @@ import java.util.List;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(RestHookTestR4Test.class);
@ -60,6 +65,7 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
public static final RequestPartitionId REQ_PART_1 = RequestPartitionId.fromPartitionNames(PARTITION_1);
static final String PARTITION_2 = "PART-2";
public static final RequestPartitionId REQ_PART_2 = RequestPartitionId.fromPartitionNames(PARTITION_2);
public static final RequestPartitionId REQ_PART_DEFAULT = RequestPartitionId.defaultPartition();
protected MyReadWriteInterceptor myPartitionInterceptor;
protected LocalDate myPartitionDate;
@ -127,7 +133,7 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
Subscription subscription = newSubscription(criteria1, payload);
assertEquals(mySrdInterceptorService.getAllRegisteredInterceptors().size(), 1);
assertThat(mySrdInterceptorService.getAllRegisteredInterceptors()).hasSize(1);
myDaoRegistry.getResourceDao("Subscription").create(subscription, mySrd);
@ -150,13 +156,13 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
mySubscriptionSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
String payload = "application/fhir+json";
String code = "1000000050";
String criteria1 = "Patient?active=true";
Subscription subscription = newSubscription(criteria1, payload);
subscription.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new org.hl7.fhir.r4.model.BooleanType().setValue(true));
assertEquals(mySrdInterceptorService.getAllRegisteredInterceptors().size(), 1);
assertThat(mySrdInterceptorService.getAllRegisteredInterceptors()).hasSize(1);
myDaoRegistry.getResourceDao("Subscription").create(subscription, mySrd);
myDaoRegistry.getResourceDao("Subscription").create(subscription, new SystemRequestDetails().setRequestPartitionId(RequestPartitionId.defaultPartition()));
waitForActivatedSubscriptionCount(1);
@ -184,10 +190,8 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testManualTriggeredSubscriptionDoesNotCheckOutsideOfPartition(boolean theIsCrossPartitionEnabled) throws Exception {
mySubscriptionSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
@Test
public void testManualTriggeredSubscriptionDoesNotMatchOnAllPartitions() throws Exception {
String payload = "application/fhir+json";
String code = "1000000050";
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
@ -201,8 +205,9 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
myPartitionInterceptor.setRequestPartitionId(REQ_PART_1);
IIdType observationIdPartitionOne = observation.create(createBaseObservation(code, "SNOMED-CT"), mySrd).getId();
//Given: We create a subscrioption on Partition 1
IIdType subscriptionId = myDaoRegistry.getResourceDao("Subscription").create(newSubscription(criteria1, payload), mySrd).getId();
//Given: We create a subscription on partition 1
Subscription subscription = newSubscription(criteria1, payload);
IIdType subscriptionId = myDaoRegistry.getResourceDao("Subscription").create(subscription, mySrd).getId();
waitForActivatedSubscriptionCount(1);
ArrayList<IPrimitiveType<String>> searchUrlList = new ArrayList<>();
@ -213,14 +218,49 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
waitForQueueToDrain();
List<Observation> resourceUpdates = BaseSubscriptionsR4Test.ourObservationProvider.getResourceUpdates();
if (theIsCrossPartitionEnabled) {
assertEquals(2, resourceUpdates.size());
assertEquals(Stream.of(observationIdPartitionOne, observationIdPartitionTwo).map(Object::toString).sorted().toList(),
resourceUpdates.stream().map(Resource::getId).sorted().toList());
} else {
assertEquals(1, resourceUpdates.size());
assertEquals(observationIdPartitionOne.toString(), resourceUpdates.get(0).getId());
}
assertEquals(1, resourceUpdates.size());
assertEquals(observationIdPartitionOne.toString(), resourceUpdates.get(0).getId());
String responseValue = resultParameters.getParameter().get(0).getValue().primitiveValue();
assertThat(responseValue).contains("Subscription triggering job submitted as JOB ID");
}
@Test
public void testManualTriggeredCrossPartitinedSubscriptionDoesMatchOnAllPartitions() throws Exception {
mySubscriptionSettings.setCrossPartitionSubscriptionEnabled(true);
String payload = "application/fhir+json";
String code = "1000000050";
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
//Given: We store a resource in partition 2
myPartitionInterceptor.setRequestPartitionId(REQ_PART_2);
final IFhirResourceDao observation = myDaoRegistry.getResourceDao("Observation");
IIdType observationIdPartitionTwo = observation.create(createBaseObservation(code, "SNOMED-CT"), mySrd).getId();
//Given: We store a similar resource in partition 1
myPartitionInterceptor.setRequestPartitionId(REQ_PART_1);
IIdType observationIdPartitionOne = observation.create(createBaseObservation(code, "SNOMED-CT"), mySrd).getId();
//Given: We create a subscription on the default partition
myPartitionInterceptor.setRequestPartitionId(REQ_PART_DEFAULT);
Subscription subscription = newSubscription(criteria1, payload);
subscription.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new org.hl7.fhir.r4.model.BooleanType().setValue(true));
IIdType subscriptionId = myDaoRegistry.getResourceDao("Subscription").create(subscription, mySrd).getId();
waitForActivatedSubscriptionCount(1);
ArrayList<IPrimitiveType<String>> searchUrlList = new ArrayList<>();
searchUrlList.add(new StringDt("Observation?"));
Parameters resultParameters = (Parameters) mySubscriptionTriggeringSvc.triggerSubscription(null, searchUrlList, subscriptionId, mySrd);
mySubscriptionTriggeringSvc.runDeliveryPass();
waitForQueueToDrain();
List<Observation> resourceUpdates = BaseSubscriptionsR4Test.ourObservationProvider.getResourceUpdates();
assertEquals(2, resourceUpdates.size());
assertEquals(Stream.of(observationIdPartitionOne, observationIdPartitionTwo).map(Object::toString).sorted().toList(),
resourceUpdates.stream().map(Resource::getId).sorted().toList());
String responseValue = resultParameters.getParameter().get(0).getValue().primitiveValue();
assertThat(responseValue).contains("Subscription triggering job submitted as JOB ID");
@ -240,17 +280,17 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
myPartitionInterceptor.setRequestPartitionId(REQ_PART_1);
myDaoRegistry.getResourceDao("Observation").create(createBaseObservation(code, "SNOMED-CT"), mySrd).getId();
//Given: We create a subscription on Partition 1
//Given: We create a subscription on default partition
Subscription theResource = newSubscription(criteria1, payload);
theResource.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new BooleanType(Boolean.TRUE));
myPartitionInterceptor.setRequestPartitionId(RequestPartitionId.defaultPartition());
myPartitionInterceptor.setRequestPartitionId(REQ_PART_DEFAULT);
IIdType subscriptionId = myDaoRegistry.getResourceDao("Subscription").create(theResource, mySrd).getId();
waitForActivatedSubscriptionCount(1);
ArrayList<IPrimitiveType<String>> searchUrlList = new ArrayList<>();
searchUrlList.add(new StringDt("Observation?"));
myPartitionInterceptor.setRequestPartitionId(RequestPartitionId.defaultPartition());
myPartitionInterceptor.setRequestPartitionId(REQ_PART_DEFAULT);
mySubscriptionTriggeringSvc.triggerSubscription(null, searchUrlList, subscriptionId, mySrd);
mySubscriptionTriggeringSvc.runDeliveryPass();
@ -273,7 +313,7 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
// Create the subscription now
DaoMethodOutcome subscriptionOutcome = myDaoRegistry.getResourceDao("Subscription").create(newSubscription(criteria1, payload), mySrd);
assertEquals(mySrdInterceptorService.getAllRegisteredInterceptors().size(), 1);
assertThat(mySrdInterceptorService.getAllRegisteredInterceptors()).hasSize(1);
Subscription subscription = (Subscription) subscriptionOutcome.getResource();

View File

@ -123,7 +123,7 @@ public class SubscriptionCanonicalizer {
retVal.setIdElement(subscription.getIdElement());
retVal.setPayloadString(channel.getPayload());
retVal.setTags(extractTags(subscription));
handleCrossPartition(theSubscription, retVal);
retVal.setCrossPartitionEnabled(handleCrossPartition(theSubscription));
retVal.setSendDeleteMessages(extractDeleteExtensionDstu2(subscription));
} catch (FHIRException theE) {
throw new InternalErrorException(Msg.code(557) + theE);
@ -134,7 +134,7 @@ public class SubscriptionCanonicalizer {
private boolean extractDeleteExtensionDstu2(ca.uhn.fhir.model.dstu2.resource.Subscription theSubscription) {
return theSubscription.getChannel().getUndeclaredExtensionsByUrl(EX_SEND_DELETE_MESSAGES).stream()
.map(ExtensionDt::getValue)
.map(value -> (BooleanDt) value)
.map(BooleanDt.class::cast)
.map(BasePrimitive::getValue)
.findFirst()
.orElse(false);
@ -175,7 +175,7 @@ public class SubscriptionCanonicalizer {
retVal.setPayloadSearchCriteria(
getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA));
retVal.setTags(extractTags(subscription));
handleCrossPartition(theSubscription, retVal);
retVal.setCrossPartitionEnabled(handleCrossPartition(theSubscription));
if (retVal.getChannelType() == CanonicalSubscriptionChannelType.EMAIL) {
String from;
@ -306,7 +306,7 @@ public class SubscriptionCanonicalizer {
getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA));
retVal.setTags(extractTags(subscription));
setPartitionIdOnReturnValue(theSubscription, retVal);
handleCrossPartition(theSubscription, retVal);
retVal.setCrossPartitionEnabled(handleCrossPartition(theSubscription));
List<org.hl7.fhir.r4.model.CanonicalType> profiles =
subscription.getMeta().getProfile();
@ -401,7 +401,7 @@ public class SubscriptionCanonicalizer {
}
List<Extension> topicExts = subscription.getExtensionsByUrl("http://hl7.org/fhir/subscription/topics");
if (topicExts.size() > 0) {
if (!topicExts.isEmpty()) {
IBaseReference ref = (IBaseReference) topicExts.get(0).getValueAsPrimitive();
if (!"EventDefinition".equals(ref.getReferenceElement().getResourceType())) {
throw new PreconditionFailedException(Msg.code(563) + "Topic reference must be an EventDefinition");
@ -499,7 +499,7 @@ public class SubscriptionCanonicalizer {
List<org.hl7.fhir.r4b.model.Extension> topicExts =
subscription.getExtensionsByUrl("http://hl7.org/fhir/subscription/topics");
if (topicExts.size() > 0) {
if (!topicExts.isEmpty()) {
IBaseReference ref = (IBaseReference) topicExts.get(0).getValueAsPrimitive();
if (!"EventDefinition".equals(ref.getReferenceElement().getResourceType())) {
throw new PreconditionFailedException(Msg.code(566) + "Topic reference must be an EventDefinition");
@ -511,7 +511,7 @@ public class SubscriptionCanonicalizer {
retVal.setSendDeleteMessages(extension.getValueBooleanType().booleanValue());
}
handleCrossPartition(theSubscription, retVal);
retVal.setCrossPartitionEnabled(handleCrossPartition(theSubscription));
return retVal;
}
@ -531,7 +531,7 @@ public class SubscriptionCanonicalizer {
List<org.hl7.fhir.r5.model.Extension> topicExts =
subscription.getExtensionsByUrl("http://hl7.org/fhir/subscription/topics");
if (topicExts.size() > 0) {
if (!topicExts.isEmpty()) {
IBaseReference ref = (IBaseReference) topicExts.get(0).getValueAsPrimitive();
if (!"EventDefinition".equals(ref.getReferenceElement().getResourceType())) {
throw new PreconditionFailedException(Msg.code(2325) + "Topic reference must be an EventDefinition");
@ -568,16 +568,15 @@ public class SubscriptionCanonicalizer {
retVal.getTopicSubscription().setTopic(subscription.getTopic());
retVal.setChannelType(getChannelType(subscription));
subscription.getFilterBy().forEach(filter -> {
retVal.getTopicSubscription().addFilter(convertFilter(filter));
});
subscription.getFilterBy().forEach(filter -> retVal.getTopicSubscription()
.addFilter(convertFilter(filter)));
retVal.getTopicSubscription().setHeartbeatPeriod(subscription.getHeartbeatPeriod());
retVal.getTopicSubscription().setMaxCount(subscription.getMaxCount());
setR5FlagsBasedOnChannelType(subscription, retVal);
handleCrossPartition(theSubscription, retVal);
retVal.setCrossPartitionEnabled(handleCrossPartition(theSubscription));
return retVal;
}
@ -781,11 +780,23 @@ public class SubscriptionCanonicalizer {
return status.getValueAsString();
}
private void handleCrossPartition(IBaseResource theSubscription, CanonicalSubscription retVal) {
if (mySubscriptionSettings.isCrossPartitionSubscriptionEnabled()) {
retVal.setCrossPartitionEnabled(true);
} else {
retVal.setCrossPartitionEnabled(SubscriptionUtil.isCrossPartition(theSubscription));
private boolean handleCrossPartition(IBaseResource theSubscription) {
RequestPartitionId requestPartitionId =
(RequestPartitionId) theSubscription.getUserData(Constants.RESOURCE_PARTITION_ID);
boolean isSubscriptionCreatedOnDefaultPartition = false;
if (nonNull(requestPartitionId)) {
isSubscriptionCreatedOnDefaultPartition = requestPartitionId.isDefaultPartition();
}
boolean isSubscriptionDefinededAsCrossPartitionSubscription =
SubscriptionUtil.isDefinedAsCrossPartitionSubcription(theSubscription);
boolean isGlobalSettingCrossPartitionSubscriptionEnabled =
mySubscriptionSettings.isCrossPartitionSubscriptionEnabled();
return isSubscriptionCreatedOnDefaultPartition
&& isSubscriptionDefinededAsCrossPartitionSubscription
&& isGlobalSettingCrossPartitionSubscriptionEnabled;
}
}

View File

@ -261,7 +261,7 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso
myPartitionId = thePartitionId;
}
public boolean getCrossPartitionEnabled() {
public boolean isCrossPartitionEnabled() {
return myCrossPartitionEnabled;
}