Ensure CanonicalizedSubscription reflects StorageSettings.isCrossPartitionSubscriptionEnabled if it's true. Make that setting is true by default (#5810)

* First commit with TODOs and logging.

* Try to add cross partition config at startup to subscription module.

* Barely working solution with JpaStorageSettings injected into the Subscription module with the correct config for cross partition enabled.

* Implement agreed upon solution where StorageSettings used in the subscription module uses the JpaStorageSettings cross partition enabled setting.  Fix all compile errors.  TODOs for tests to add and known test failures.

* Fix test errors caused by bad log code.   Ensure all modules use StorageSettings for canonicalizer.

* Cleanup.

* Reintroduce old SubscriptionCanonicalizer constructor, but add a StorageSettings and deprecate it.  Cleanup logs and TODOs.

* Deprecate FHIR_PATCH.  More cleanup.

* Deprecate FHIR_PATCH correctly.

* Small fix.

* Set myCrossPartitionSubscriptionEnabled to true by default.

* Fix test failures.

* Fix another test.

* Code review feedback.

* Resolve static analysis warnings.
This commit is contained in:
Luke deGruchy 2024-04-09 10:27:15 -04:00 committed by GitHub
parent 107de2cfba
commit 4d8427ad60
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 251 additions and 73 deletions

View File

@ -97,7 +97,7 @@ public class StorageSettings {
private boolean myDefaultSearchParamsCanBeOverridden = true;
private Set<Subscription.SubscriptionChannelType> mySupportedSubscriptionTypes = new HashSet<>();
private boolean myAutoCreatePlaceholderReferenceTargets;
private boolean myCrossPartitionSubscriptionEnabled = false;
private boolean myCrossPartitionSubscriptionEnabled = true;
private Integer myBundleBatchPoolSize = DEFAULT_BUNDLE_BATCH_POOL_SIZE;
private Integer myBundleBatchMaxPoolSize = DEFAULT_BUNDLE_BATCH_MAX_POOL_SIZE;
private boolean myEnableInMemorySubscriptionMatching = true;

View File

@ -20,6 +20,7 @@
package ca.uhn.fhir.jpa.subscription.model.config;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import org.springframework.context.annotation.Bean;
@ -29,8 +30,9 @@ import org.springframework.context.annotation.Configuration;
public class SubscriptionModelConfig {
@Bean
public SubscriptionCanonicalizer subscriptionCanonicalizer(FhirContext theFhirContext) {
return new SubscriptionCanonicalizer(theFhirContext);
public SubscriptionCanonicalizer subscriptionCanonicalizer(
FhirContext theFhirContext, StorageSettings theStorageSettings) {
return new SubscriptionCanonicalizer(theFhirContext, theStorageSettings);
}
@Bean

View File

@ -4,6 +4,7 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
@ -50,7 +51,7 @@ public class SubscriptionRegisteringSubscriberTest {
@Mock
private SubscriptionRegistry mySubscriptionRegistry;
@Spy
private SubscriptionCanonicalizer mySubscriptionCanonicalizer = new SubscriptionCanonicalizer(myFhirContext);
private SubscriptionCanonicalizer mySubscriptionCanonicalizer = new SubscriptionCanonicalizer(myFhirContext, new StorageSettings());
@Mock
private DaoRegistry myDaoRegistry;
@Mock

View File

@ -2,6 +2,7 @@ package ca.uhn.fhir.jpa.subscription.match.registry;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
@ -33,7 +34,7 @@ public class SubscriptionRegistryTest {
static FhirContext ourFhirContext = FhirContext.forR4Cached();
@Spy
SubscriptionCanonicalizer mySubscriptionCanonicalizer = new SubscriptionCanonicalizer(ourFhirContext);
SubscriptionCanonicalizer mySubscriptionCanonicalizer = new SubscriptionCanonicalizer(ourFhirContext, new StorageSettings());
@Spy
ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer = new TestChannelNamer();

View File

@ -1,6 +1,7 @@
package ca.uhn.fhir.jpa.subscription.module;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
@ -8,12 +9,15 @@ import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.util.HapiExtensions;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.Nonnull;
import org.assertj.core.util.Lists;
import org.hamcrest.Matchers;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.StringType;
import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -66,7 +70,7 @@ public class CanonicalSubscriptionTest {
@Test
public void testCanonicalSubscriptionRetainsMetaTags() throws IOException {
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4());
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), new StorageSettings());
CanonicalSubscription sub1 = canonicalizer.canonicalize(makeMdmSubscription());
assertTrue(sub1.getTags().keySet().contains(TAG_SYSTEM));
assertEquals(sub1.getTags().get(TAG_SYSTEM), TAG_VALUE);
@ -74,7 +78,7 @@ public class CanonicalSubscriptionTest {
@Test
public void emailDetailsEquals() {
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4());
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), new StorageSettings());
CanonicalSubscription sub1 = canonicalizer.canonicalize(makeEmailSubscription());
CanonicalSubscription sub2 = canonicalizer.canonicalize(makeEmailSubscription());
assertTrue(sub1.equals(sub2));
@ -82,7 +86,7 @@ public class CanonicalSubscriptionTest {
@Test
public void testSerializeMultiPartitionSubscription(){
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4());
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), new StorageSettings());
Subscription subscription = makeEmailSubscription();
subscription.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new BooleanType().setValue(true));
CanonicalSubscription canonicalSubscription = canonicalizer.canonicalize(subscription);
@ -90,28 +94,30 @@ public class CanonicalSubscriptionTest {
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), true);
}
@Test
public void testSerializeIncorrectMultiPartitionSubscription(){
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4());
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSerializeIncorrectMultiPartitionSubscription(boolean theIsCrossPartitionEnabled){
final StorageSettings storageSettings = buildStorageSettings(theIsCrossPartitionEnabled);
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), storageSettings);
Subscription subscription = makeEmailSubscription();
subscription.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new StringType().setValue("false"));
CanonicalSubscription canonicalSubscription = canonicalizer.canonicalize(subscription);
System.out.print(canonicalSubscription);
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), false);
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), theIsCrossPartitionEnabled);
}
@Test
public void testSerializeNonMultiPartitionSubscription(){
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4());
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSerializeNonMultiPartitionSubscription(boolean theIsCrossPartitionEnabled){
final StorageSettings storageSettings = buildStorageSettings(theIsCrossPartitionEnabled);
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), storageSettings);
Subscription subscription = makeEmailSubscription();
subscription.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new BooleanType().setValue(false));
CanonicalSubscription canonicalSubscription = canonicalizer.canonicalize(subscription);
System.out.print(canonicalSubscription);
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), false);
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), theIsCrossPartitionEnabled);
}
@Test
@ -154,4 +160,11 @@ public class CanonicalSubscriptionTest {
ResourceDeliveryMessage payload = resourceDeliveryMessage.getPayload();
return payload.getSubscription();
}
@Nonnull
private static StorageSettings buildStorageSettings(boolean theIsCrossPartitionEnabled) {
final StorageSettings storageSettings = new StorageSettings();
storageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
return storageSettings;
}
}

View File

@ -27,6 +27,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Answers;
import org.mockito.InjectMocks;
import org.mockito.Mock;
@ -38,6 +40,7 @@ import java.util.Optional;
import static ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionCriteriaParser.TypeEnum.STARTYPE_EXPRESSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
@ -227,8 +230,10 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
ourObservationListener.awaitExpected();
}
@Test
public void testSubscriptionAndResourceOnDiffPartitionNotMatch() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionAndResourceOnDiffPartitionNotMatch(boolean theIsCrossPartitionEnabled) throws InterruptedException {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
@ -240,13 +245,18 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
mySubscriptionResourceNotMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(0));
mySubscriptionResourceNotMatched.awaitExpected();
final ThrowsInterrupted throwsInterrupted = () -> sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(0));
if (theIsCrossPartitionEnabled) {
runWithinLatchLogicExpectSuccess(throwsInterrupted);
} else {
runWithLatchLogicExpectFailure(throwsInterrupted);
}
}
@Test
public void testSubscriptionAndResourceOnDiffPartitionNotMatchPart2() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionAndResourceOnDiffPartitionNotMatchPart2(boolean theIsCrossPartitionEnabled) throws InterruptedException {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
@ -258,13 +268,19 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
mySubscriptionResourceNotMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(1));
mySubscriptionResourceNotMatched.awaitExpected();
final ThrowsInterrupted throwsInterrupted = () -> sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(1));
if (theIsCrossPartitionEnabled) {
runWithinLatchLogicExpectSuccess(throwsInterrupted);
} else {
runWithLatchLogicExpectFailure(throwsInterrupted);
}
}
@Test
public void testSubscriptionOnDefaultPartitionAndResourceOnDiffPartitionNotMatch() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionOnDefaultPartitionAndResourceOnDiffPartitionNotMatch(boolean theIsCrossPartitionEnabled) throws InterruptedException {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
@ -276,13 +292,19 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
mySubscriptionResourceNotMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(1));
mySubscriptionResourceNotMatched.awaitExpected();
final ThrowsInterrupted throwsInterrupted = () -> sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(1));
if (theIsCrossPartitionEnabled) {
runWithinLatchLogicExpectSuccess(throwsInterrupted);
} else {
runWithLatchLogicExpectFailure(throwsInterrupted);
}
}
@Test
public void testSubscriptionOnAPartitionAndResourceOnDefaultPartitionNotMatch() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionOnAPartitionAndResourceOnDefaultPartitionNotMatch(boolean theIsCrossPartitionEnabled) throws InterruptedException {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
@ -294,9 +316,13 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
mySubscriptionResourceNotMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT", RequestPartitionId.defaultPartition());
mySubscriptionResourceNotMatched.awaitExpected();
final ThrowsInterrupted throwsInterrupted = () -> sendObservation(code, "SNOMED-CT", RequestPartitionId.defaultPartition());
if (theIsCrossPartitionEnabled) {
runWithinLatchLogicExpectSuccess(throwsInterrupted);
} else {
runWithLatchLogicExpectFailure(throwsInterrupted);
}
}
@Test
@ -320,8 +346,10 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
ourObservationListener.awaitExpected();
}
@Test
public void testSubscriptionOnOnePartitionDoNotMatchResourceOnMultiplePartitions() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionOnOnePartitionDoNotMatchResourceOnMultiplePartitions(boolean theIsCrossPartitionEnabled) throws InterruptedException {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
@ -333,10 +361,13 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
mySubscriptionResourceNotMatched.setExpectedCount(1);
List<Integer> partitionId = Collections.synchronizedList(Lists.newArrayList(0, 2));
sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionIds(partitionId));
mySubscriptionResourceNotMatched.awaitExpected();
final ThrowsInterrupted throwsInterrupted = () -> sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionIds(Collections.synchronizedList(Lists.newArrayList(0, 2))));
if (theIsCrossPartitionEnabled) {
runWithinLatchLogicExpectSuccess(throwsInterrupted);
} else {
runWithLatchLogicExpectFailure(throwsInterrupted);
}
}
@Test
@ -519,4 +550,31 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
verify(message, atLeastOnce()).getPayloadId(null);
}
}
private interface ThrowsInterrupted {
void runOrThrow() throws InterruptedException;
}
private void runWithLatchLogicExpectFailure(ThrowsInterrupted theRunnable) {
try {
mySubscriptionResourceNotMatched.setExpectedCount(1);
theRunnable.runOrThrow();
mySubscriptionResourceNotMatched.awaitExpected();
} catch (InterruptedException exception) {
Thread.currentThread().interrupt();
}
}
private void runWithinLatchLogicExpectSuccess(ThrowsInterrupted theRunnable) {
try {
ourObservationListener.setExpectedCount(1);
mySubscriptionResourceMatched.setExpectedCount(1);
theRunnable.runOrThrow();
mySubscriptionResourceMatched.awaitExpected();
ourObservationListener.awaitExpected();
} catch (InterruptedException exception) {
fail();
Thread.currentThread().interrupt();
}
}
}

View File

@ -6,6 +6,7 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
@ -226,7 +227,7 @@ public class SubscriptionValidatingInterceptorTest {
@Bean
SubscriptionCanonicalizer subscriptionCanonicalizer(FhirContext theFhirContext) {
return new SubscriptionCanonicalizer(theFhirContext);
return new SubscriptionCanonicalizer(theFhirContext, new StorageSettings());
}
@Bean

View File

@ -5,6 +5,7 @@ import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.dao.r4.BasePartitioningR4Test;
@ -24,15 +25,13 @@ import jakarta.servlet.ServletException;
import org.awaitility.core.ConditionTimeoutException;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Subscription;
import org.hl7.fhir.r4.model.*;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -41,6 +40,7 @@ import java.time.LocalDate;
import java.time.Month;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.containsString;
@ -146,8 +146,10 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
Assertions.assertEquals(Constants.CT_FHIR_JSON_NEW, BaseSubscriptionsR4Test.ourRestfulServer.getRequestContentTypes().get(0));
}
@Test
public void testCreateSubscriptionInPartitionAndResourceInDifferentPartition() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCreateSubscriptionInPartitionAndResourceInDifferentPartition(boolean theIsCrossPartitionEnabled) throws Exception {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
String payload = "application/fhir+json";
String code = "1000000050";
@ -169,31 +171,40 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
Assertions.assertEquals(0, BaseSubscriptionsR4Test.ourPatientProvider.getCountCreate());
try {
// Should have 0 matching subscription, if we get 1 update count then the test fails
BaseSubscriptionsR4Test.ourPatientProvider.waitForUpdateCount(1);
fail();
if (!theIsCrossPartitionEnabled) {
fail("Expecting a timeout and 0 matching subscriptions and thus a timeout if cross partition is DISabled");
}
Assertions.assertEquals(1, BaseSubscriptionsR4Test.ourRestfulServer.getRequestContentTypes().size());
} catch (ConditionTimeoutException e) {
Assertions.assertEquals(0, BaseSubscriptionsR4Test.ourRestfulServer.getRequestContentTypes().size());
if (theIsCrossPartitionEnabled) {
fail("Expecting no timeout and 1 matching subscriptions and thus a timeout if cross partition is enabled");
} else {
// Should have 0 matching subscription, if we get 1 update count then the test fails
Assertions.assertEquals(0, BaseSubscriptionsR4Test.ourRestfulServer.getRequestContentTypes().size());
}
}
}
@Test
public void testManualTriggeredSubscriptionDoesNotCheckOutsideOfPartition() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testManualTriggeredSubscriptionDoesNotCheckOutsideOfPartition(boolean theIsCrossPartitionEnabled) throws Exception {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
String payload = "application/fhir+json";
String code = "1000000050";
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
//Given: We store a resource in partition 2
myPartitionInterceptor.setRequestPartitionId(REQ_PART_2);
IIdType observationIdPartitionTwo = myDaoRegistry.getResourceDao("Observation").create(createBaseObservation(code, "SNOMED-CT"), mySrd).getId();
final IFhirResourceDao observation = myDaoRegistry.getResourceDao("Observation");
IIdType observationIdPartitionTwo = observation.create(createBaseObservation(code, "SNOMED-CT"), mySrd).getId();
//Given: We store a similar resource in partition 1
myPartitionInterceptor.setRequestPartitionId(REQ_PART_1);
IIdType observationIdPartitionOne = myDaoRegistry.getResourceDao("Observation").create(createBaseObservation(code, "SNOMED-CT"), mySrd).getId();
IIdType observationIdPartitionOne = observation.create(createBaseObservation(code, "SNOMED-CT"), mySrd).getId();
//Given: We create a subscrioption on Partition 1
IIdType subscriptionId= myDaoRegistry.getResourceDao("Subscription").create(newSubscription(criteria1, payload), mySrd).getId();
IIdType subscriptionId = myDaoRegistry.getResourceDao("Subscription").create(newSubscription(criteria1, payload), mySrd).getId();
waitForActivatedSubscriptionCount(1);
ArrayList<IPrimitiveType<String>> searchUrlList = new ArrayList<>();
@ -204,8 +215,14 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
waitForQueueToDrain();
List<Observation> resourceUpdates = BaseSubscriptionsR4Test.ourObservationProvider.getResourceUpdates();
assertThat(resourceUpdates.size(), is(equalTo(1)));
assertThat(resourceUpdates.get(0).getId(), is(equalTo(observationIdPartitionOne.toString())));
if (theIsCrossPartitionEnabled) {
assertThat(resourceUpdates.size(), is(equalTo(2)));
assertThat(resourceUpdates.stream().map(Resource::getId).sorted().toList(),
is(equalTo(Stream.of(observationIdPartitionOne, observationIdPartitionTwo).map(Object::toString).sorted().toList())));
} else {
assertThat(resourceUpdates.size(), is(equalTo(1)));
assertThat(resourceUpdates.get(0).getId(), is(equalTo(observationIdPartitionOne.toString())));
}
String responseValue = resultParameters.getParameter().get(0).getValue().primitiveValue();
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));

View File

@ -5,6 +5,7 @@ import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
@ -63,7 +64,7 @@ public class SubscriptionValidatingInterceptorTest {
@BeforeEach
public void before() {
mySvc = new SubscriptionValidatingInterceptor();
mySubscriptionCanonicalizer = spy(new SubscriptionCanonicalizer(myCtx));
mySubscriptionCanonicalizer = spy(new SubscriptionCanonicalizer(myCtx, new StorageSettings()));
mySvc.setSubscriptionCanonicalizerForUnitTest(mySubscriptionCanonicalizer);
mySvc.setDaoRegistryForUnitTest(myDaoRegistry);
mySvc.setSubscriptionStrategyEvaluatorForUnitTest(mySubscriptionStrategyEvaluator);

View File

@ -27,6 +27,11 @@ import org.hl7.fhir.instance.model.api.IIdType;
import java.util.Set;
/**
* @deprecated Users should be instead be granted more granular write permissions that cover PATCH operations.
* @since 7.2.0
*/
@Deprecated
class RuleImplPatch extends BaseRule {
private boolean myAllRequests;

View File

@ -48,6 +48,7 @@ import java.util.TreeSet;
@SuppressWarnings("JavadocLinkAsPlainText")
public class JpaStorageSettings extends StorageSettings {
private static final Logger ourLog = LoggerFactory.getLogger(JpaStorageSettings.class);
/**
* Default value for {@link #getBulkExportFileMaximumSize()}: 100 MB
@ -105,7 +106,6 @@ public class JpaStorageSettings extends StorageSettings {
*/
private static final Integer DEFAULT_MAXIMUM_SEARCH_RESULT_COUNT_IN_TRANSACTION = null;
private static final Logger ourLog = LoggerFactory.getLogger(JpaStorageSettings.class);
private static final int DEFAULT_REINDEX_BATCH_SIZE = 800;
private static final int DEFAULT_MAXIMUM_DELETE_CONFLICT_COUNT = 60;
/**

View File

@ -23,6 +23,7 @@ import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
@ -68,10 +69,23 @@ public class SubscriptionCanonicalizer {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionCanonicalizer.class);
final FhirContext myFhirContext;
private final StorageSettings myStorageSettings;
@Autowired
public SubscriptionCanonicalizer(FhirContext theFhirContext, StorageSettings theStorageSettings) {
myFhirContext = theFhirContext;
myStorageSettings = theStorageSettings;
}
// TODO: LD: remove this constructor once all callers call the 2 arg constructor above
/**
* @deprecated All callers should invoke {@link SubscriptionCanonicalizer()} instead.
*/
@Deprecated
public SubscriptionCanonicalizer(FhirContext theFhirContext) {
myFhirContext = theFhirContext;
myStorageSettings = new StorageSettings();
}
public CanonicalSubscription canonicalize(IBaseResource theSubscription) {
@ -109,7 +123,7 @@ public class SubscriptionCanonicalizer {
retVal.setIdElement(subscription.getIdElement());
retVal.setPayloadString(channel.getPayload());
retVal.setTags(extractTags(subscription));
retVal.setCrossPartitionEnabled(SubscriptionUtil.isCrossPartition(theSubscription));
handleCrossPartition(theSubscription, retVal);
retVal.setSendDeleteMessages(extractDeleteExtensionDstu2(subscription));
} catch (FHIRException theE) {
throw new InternalErrorException(Msg.code(557) + theE);
@ -161,7 +175,7 @@ public class SubscriptionCanonicalizer {
retVal.setPayloadSearchCriteria(
getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA));
retVal.setTags(extractTags(subscription));
retVal.setCrossPartitionEnabled(SubscriptionUtil.isCrossPartition(theSubscription));
handleCrossPartition(theSubscription, retVal);
if (retVal.getChannelType() == CanonicalSubscriptionChannelType.EMAIL) {
String from;
@ -292,7 +306,7 @@ public class SubscriptionCanonicalizer {
getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA));
retVal.setTags(extractTags(subscription));
setPartitionIdOnReturnValue(theSubscription, retVal);
retVal.setCrossPartitionEnabled(SubscriptionUtil.isCrossPartition(theSubscription));
handleCrossPartition(theSubscription, retVal);
List<org.hl7.fhir.r4.model.CanonicalType> profiles =
subscription.getMeta().getProfile();
@ -497,6 +511,8 @@ public class SubscriptionCanonicalizer {
retVal.setSendDeleteMessages(extension.getValueBooleanType().booleanValue());
}
handleCrossPartition(theSubscription, retVal);
return retVal;
}
@ -561,6 +577,8 @@ public class SubscriptionCanonicalizer {
setR5FlagsBasedOnChannelType(subscription, retVal);
handleCrossPartition(theSubscription, retVal);
return retVal;
}
@ -762,4 +780,12 @@ public class SubscriptionCanonicalizer {
}
return status.getValueAsString();
}
private void handleCrossPartition(IBaseResource theSubscription, CanonicalSubscription retVal) {
if (myStorageSettings.isCrossPartitionSubscriptionEnabled()) {
retVal.setCrossPartitionEnabled(true);
} else {
retVal.setCrossPartitionEnabled(SubscriptionUtil.isCrossPartition(theSubscription));
}
}
}

View File

@ -1,23 +1,35 @@
package ca.uhn.fhir.jpa.subscription.match.registry;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.CanonicalTopicSubscriptionFilter;
import ca.uhn.fhir.model.api.ExtensionDt;
import ca.uhn.fhir.model.api.IFhirVersion;
import ca.uhn.fhir.model.dstu2.FhirDstu2;
import ca.uhn.fhir.model.primitive.BooleanDt;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import ca.uhn.fhir.subscription.SubscriptionTestDataHelper;
import org.hl7.fhir.dstu3.hapi.ctx.FhirDstu3;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.hapi.ctx.FhirR4;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Extension;
import org.hl7.fhir.r4.model.Subscription;
import org.hl7.fhir.r4b.hapi.ctx.FhirR4B;
import org.hl7.fhir.r5.hapi.ctx.FhirR5;
import org.hl7.fhir.r5.model.Coding;
import org.hl7.fhir.r5.model.Enumerations;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.stream.Stream;
import static ca.uhn.fhir.rest.api.Constants.CT_FHIR_JSON_NEW;
import static ca.uhn.fhir.util.HapiExtensions.EX_SEND_DELETE_MESSAGES;
import static org.hamcrest.MatcherAssert.assertThat;
@ -26,12 +38,13 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
class SubscriptionCanonicalizerTest {
FhirContext r4Context = FhirContext.forR4();
private final SubscriptionCanonicalizer testedSC = new SubscriptionCanonicalizer(r4Context);
private final SubscriptionCanonicalizer testedSC = new SubscriptionCanonicalizer(r4Context, new StorageSettings());
@Test
void testCanonicalizeR4SendDeleteMessagesSetsExtensionValueNotPresent() {
@ -72,7 +85,7 @@ class SubscriptionCanonicalizerTest {
@Test
public void testCanonicalizeDstu2SendDeleteMessages() {
//setup
SubscriptionCanonicalizer dstu2Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forDstu2Cached());
SubscriptionCanonicalizer dstu2Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forDstu2Cached(), new StorageSettings());
ca.uhn.fhir.model.dstu2.resource.Subscription dstu2Sub = new ca.uhn.fhir.model.dstu2.resource.Subscription();
ExtensionDt extensionDt = new ExtensionDt();
extensionDt.setUrl(EX_SEND_DELETE_MESSAGES);
@ -108,7 +121,7 @@ class SubscriptionCanonicalizerTest {
@ValueSource(strings = {"full-resource", "id-only", "empty"})
public void testR5Canonicalize_returnsCorrectCanonicalSubscription(String thePayloadContent) {
// setup
SubscriptionCanonicalizer r5Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR5Cached());
SubscriptionCanonicalizer r5Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR5Cached(), new StorageSettings());
org.hl7.fhir.r5.model.Subscription.SubscriptionPayloadContent payloadContent =
org.hl7.fhir.r5.model.Subscription.SubscriptionPayloadContent.fromCode(thePayloadContent);
org.hl7.fhir.r5.model.Subscription subscription = buildR5Subscription(payloadContent);
@ -148,7 +161,7 @@ class SubscriptionCanonicalizerTest {
// Example drawn from http://build.fhir.org/ig/HL7/fhir-subscription-backport-ig/Subscription-subscription-zulip.json.html
// setup
SubscriptionCanonicalizer r4bCanonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4BCached());
SubscriptionCanonicalizer r4bCanonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4BCached(), new StorageSettings());
org.hl7.fhir.r4b.model.Subscription subscription = buildR4BSubscription(thePayloadContent);
// execute
@ -160,6 +173,46 @@ class SubscriptionCanonicalizerTest {
verifyChannelParameters(canonical, thePayloadContent);
}
private static Stream<Arguments> crossPartitionParams() {
return Stream.of(
arguments(true, FhirContext.forDstu2Cached()),
arguments(false, FhirContext.forDstu2Cached()),
arguments(true, FhirContext.forDstu3Cached()),
arguments(false, FhirContext.forDstu3Cached()),
arguments(true, FhirContext.forR4Cached()),
arguments(false, FhirContext.forR4Cached()),
arguments(true, FhirContext.forR4BCached()),
arguments(false, FhirContext.forR4BCached()),
arguments(true, FhirContext.forR5Cached()),
arguments(false, FhirContext.forR5Cached())
);
}
@ParameterizedTest
@MethodSource("crossPartitionParams")
void testCrossPartition(boolean theCrossPartitionSubscriptionEnabled, FhirContext theFhirContext) {
final IFhirVersion version = theFhirContext.getVersion();
IBaseResource subscription = null;
if (version instanceof FhirDstu2){
subscription = new ca.uhn.fhir.model.dstu2.resource.Subscription();
} else if (version instanceof FhirDstu3){
subscription = new org.hl7.fhir.dstu3.model.Subscription();
} else if (version instanceof FhirR4){
subscription = new Subscription();
} else if (version instanceof FhirR4B){
subscription = new org.hl7.fhir.r4b.model.Subscription();
} else if (version instanceof FhirR5){
subscription = new org.hl7.fhir.r5.model.Subscription();
}
final StorageSettings storageSettings = new StorageSettings();
storageSettings.setCrossPartitionSubscriptionEnabled(theCrossPartitionSubscriptionEnabled);
final SubscriptionCanonicalizer subscriptionCanonicalizer = new SubscriptionCanonicalizer(theFhirContext, storageSettings);
final CanonicalSubscription canonicalSubscription = subscriptionCanonicalizer.canonicalize(subscription);
assertEquals(theCrossPartitionSubscriptionEnabled, canonicalSubscription.getCrossPartitionEnabled());
}
private org.hl7.fhir.r4b.model.Subscription buildR4BSubscription(String thePayloadContent) {
org.hl7.fhir.r4b.model.Subscription subscription = new org.hl7.fhir.r4b.model.Subscription();
@ -195,7 +248,7 @@ class SubscriptionCanonicalizerTest {
// Example drawn from http://build.fhir.org/ig/HL7/fhir-subscription-backport-ig/Subscription-subscription-zulip.json.html
// setup
SubscriptionCanonicalizer r4Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4Cached());
SubscriptionCanonicalizer r4Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4Cached(), new StorageSettings());
// execute
Subscription subscription = SubscriptionTestDataHelper.buildR4TopicSubscriptionWithContent(thePayloadContent);