Merge remote-tracking branch 'origin/master' into mm-20240318-support-for-contained-true-search-parameter

This commit is contained in:
Martha 2024-04-09 12:40:10 -07:00
commit adc28cc9fd
13 changed files with 251 additions and 73 deletions

View File

@ -97,7 +97,7 @@ public class StorageSettings {
private boolean myDefaultSearchParamsCanBeOverridden = true;
private Set<Subscription.SubscriptionChannelType> mySupportedSubscriptionTypes = new HashSet<>();
private boolean myAutoCreatePlaceholderReferenceTargets;
private boolean myCrossPartitionSubscriptionEnabled = false;
private boolean myCrossPartitionSubscriptionEnabled = true;
private Integer myBundleBatchPoolSize = DEFAULT_BUNDLE_BATCH_POOL_SIZE;
private Integer myBundleBatchMaxPoolSize = DEFAULT_BUNDLE_BATCH_MAX_POOL_SIZE;
private boolean myEnableInMemorySubscriptionMatching = true;

View File

@ -20,6 +20,7 @@
package ca.uhn.fhir.jpa.subscription.model.config;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import org.springframework.context.annotation.Bean;
@ -29,8 +30,9 @@ import org.springframework.context.annotation.Configuration;
public class SubscriptionModelConfig {
@Bean
public SubscriptionCanonicalizer subscriptionCanonicalizer(FhirContext theFhirContext) {
return new SubscriptionCanonicalizer(theFhirContext);
public SubscriptionCanonicalizer subscriptionCanonicalizer(
FhirContext theFhirContext, StorageSettings theStorageSettings) {
return new SubscriptionCanonicalizer(theFhirContext, theStorageSettings);
}
@Bean

View File

@ -4,6 +4,7 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
@ -50,7 +51,7 @@ public class SubscriptionRegisteringSubscriberTest {
@Mock
private SubscriptionRegistry mySubscriptionRegistry;
@Spy
private SubscriptionCanonicalizer mySubscriptionCanonicalizer = new SubscriptionCanonicalizer(myFhirContext);
private SubscriptionCanonicalizer mySubscriptionCanonicalizer = new SubscriptionCanonicalizer(myFhirContext, new StorageSettings());
@Mock
private DaoRegistry myDaoRegistry;
@Mock

View File

@ -2,6 +2,7 @@ package ca.uhn.fhir.jpa.subscription.match.registry;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
@ -33,7 +34,7 @@ public class SubscriptionRegistryTest {
static FhirContext ourFhirContext = FhirContext.forR4Cached();
@Spy
SubscriptionCanonicalizer mySubscriptionCanonicalizer = new SubscriptionCanonicalizer(ourFhirContext);
SubscriptionCanonicalizer mySubscriptionCanonicalizer = new SubscriptionCanonicalizer(ourFhirContext, new StorageSettings());
@Spy
ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer = new TestChannelNamer();

View File

@ -1,6 +1,7 @@
package ca.uhn.fhir.jpa.subscription.module;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
@ -8,12 +9,15 @@ import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.util.HapiExtensions;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.Nonnull;
import org.assertj.core.util.Lists;
import org.hamcrest.Matchers;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.StringType;
import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -66,7 +70,7 @@ public class CanonicalSubscriptionTest {
@Test
public void testCanonicalSubscriptionRetainsMetaTags() throws IOException {
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4());
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), new StorageSettings());
CanonicalSubscription sub1 = canonicalizer.canonicalize(makeMdmSubscription());
assertTrue(sub1.getTags().keySet().contains(TAG_SYSTEM));
assertEquals(sub1.getTags().get(TAG_SYSTEM), TAG_VALUE);
@ -74,7 +78,7 @@ public class CanonicalSubscriptionTest {
@Test
public void emailDetailsEquals() {
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4());
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), new StorageSettings());
CanonicalSubscription sub1 = canonicalizer.canonicalize(makeEmailSubscription());
CanonicalSubscription sub2 = canonicalizer.canonicalize(makeEmailSubscription());
assertTrue(sub1.equals(sub2));
@ -82,7 +86,7 @@ public class CanonicalSubscriptionTest {
@Test
public void testSerializeMultiPartitionSubscription(){
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4());
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), new StorageSettings());
Subscription subscription = makeEmailSubscription();
subscription.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new BooleanType().setValue(true));
CanonicalSubscription canonicalSubscription = canonicalizer.canonicalize(subscription);
@ -90,28 +94,30 @@ public class CanonicalSubscriptionTest {
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), true);
}
@Test
public void testSerializeIncorrectMultiPartitionSubscription(){
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4());
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSerializeIncorrectMultiPartitionSubscription(boolean theIsCrossPartitionEnabled){
final StorageSettings storageSettings = buildStorageSettings(theIsCrossPartitionEnabled);
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), storageSettings);
Subscription subscription = makeEmailSubscription();
subscription.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new StringType().setValue("false"));
CanonicalSubscription canonicalSubscription = canonicalizer.canonicalize(subscription);
System.out.print(canonicalSubscription);
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), false);
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), theIsCrossPartitionEnabled);
}
@Test
public void testSerializeNonMultiPartitionSubscription(){
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4());
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSerializeNonMultiPartitionSubscription(boolean theIsCrossPartitionEnabled){
final StorageSettings storageSettings = buildStorageSettings(theIsCrossPartitionEnabled);
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), storageSettings);
Subscription subscription = makeEmailSubscription();
subscription.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new BooleanType().setValue(false));
CanonicalSubscription canonicalSubscription = canonicalizer.canonicalize(subscription);
System.out.print(canonicalSubscription);
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), false);
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), theIsCrossPartitionEnabled);
}
@Test
@ -154,4 +160,11 @@ public class CanonicalSubscriptionTest {
ResourceDeliveryMessage payload = resourceDeliveryMessage.getPayload();
return payload.getSubscription();
}
@Nonnull
private static StorageSettings buildStorageSettings(boolean theIsCrossPartitionEnabled) {
final StorageSettings storageSettings = new StorageSettings();
storageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
return storageSettings;
}
}

View File

@ -27,6 +27,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Answers;
import org.mockito.InjectMocks;
import org.mockito.Mock;
@ -38,6 +40,7 @@ import java.util.Optional;
import static ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionCriteriaParser.TypeEnum.STARTYPE_EXPRESSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
@ -227,8 +230,10 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
ourObservationListener.awaitExpected();
}
@Test
public void testSubscriptionAndResourceOnDiffPartitionNotMatch() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionAndResourceOnDiffPartitionNotMatch(boolean theIsCrossPartitionEnabled) throws InterruptedException {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
@ -240,13 +245,18 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
mySubscriptionResourceNotMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(0));
mySubscriptionResourceNotMatched.awaitExpected();
final ThrowsInterrupted throwsInterrupted = () -> sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(0));
if (theIsCrossPartitionEnabled) {
runWithinLatchLogicExpectSuccess(throwsInterrupted);
} else {
runWithLatchLogicExpectFailure(throwsInterrupted);
}
}
@Test
public void testSubscriptionAndResourceOnDiffPartitionNotMatchPart2() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionAndResourceOnDiffPartitionNotMatchPart2(boolean theIsCrossPartitionEnabled) throws InterruptedException {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
@ -258,13 +268,19 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
mySubscriptionResourceNotMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(1));
mySubscriptionResourceNotMatched.awaitExpected();
final ThrowsInterrupted throwsInterrupted = () -> sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(1));
if (theIsCrossPartitionEnabled) {
runWithinLatchLogicExpectSuccess(throwsInterrupted);
} else {
runWithLatchLogicExpectFailure(throwsInterrupted);
}
}
@Test
public void testSubscriptionOnDefaultPartitionAndResourceOnDiffPartitionNotMatch() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionOnDefaultPartitionAndResourceOnDiffPartitionNotMatch(boolean theIsCrossPartitionEnabled) throws InterruptedException {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
@ -276,13 +292,19 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
mySubscriptionResourceNotMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(1));
mySubscriptionResourceNotMatched.awaitExpected();
final ThrowsInterrupted throwsInterrupted = () -> sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(1));
if (theIsCrossPartitionEnabled) {
runWithinLatchLogicExpectSuccess(throwsInterrupted);
} else {
runWithLatchLogicExpectFailure(throwsInterrupted);
}
}
@Test
public void testSubscriptionOnAPartitionAndResourceOnDefaultPartitionNotMatch() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionOnAPartitionAndResourceOnDefaultPartitionNotMatch(boolean theIsCrossPartitionEnabled) throws InterruptedException {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
@ -294,9 +316,13 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
mySubscriptionResourceNotMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT", RequestPartitionId.defaultPartition());
mySubscriptionResourceNotMatched.awaitExpected();
final ThrowsInterrupted throwsInterrupted = () -> sendObservation(code, "SNOMED-CT", RequestPartitionId.defaultPartition());
if (theIsCrossPartitionEnabled) {
runWithinLatchLogicExpectSuccess(throwsInterrupted);
} else {
runWithLatchLogicExpectFailure(throwsInterrupted);
}
}
@Test
@ -320,8 +346,10 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
ourObservationListener.awaitExpected();
}
@Test
public void testSubscriptionOnOnePartitionDoNotMatchResourceOnMultiplePartitions() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionOnOnePartitionDoNotMatchResourceOnMultiplePartitions(boolean theIsCrossPartitionEnabled) throws InterruptedException {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
@ -333,10 +361,13 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
mySubscriptionResourceNotMatched.setExpectedCount(1);
List<Integer> partitionId = Collections.synchronizedList(Lists.newArrayList(0, 2));
sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionIds(partitionId));
mySubscriptionResourceNotMatched.awaitExpected();
final ThrowsInterrupted throwsInterrupted = () -> sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionIds(Collections.synchronizedList(Lists.newArrayList(0, 2))));
if (theIsCrossPartitionEnabled) {
runWithinLatchLogicExpectSuccess(throwsInterrupted);
} else {
runWithLatchLogicExpectFailure(throwsInterrupted);
}
}
@Test
@ -519,4 +550,31 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
verify(message, atLeastOnce()).getPayloadId(null);
}
}
private interface ThrowsInterrupted {
void runOrThrow() throws InterruptedException;
}
private void runWithLatchLogicExpectFailure(ThrowsInterrupted theRunnable) {
try {
mySubscriptionResourceNotMatched.setExpectedCount(1);
theRunnable.runOrThrow();
mySubscriptionResourceNotMatched.awaitExpected();
} catch (InterruptedException exception) {
Thread.currentThread().interrupt();
}
}
private void runWithinLatchLogicExpectSuccess(ThrowsInterrupted theRunnable) {
try {
ourObservationListener.setExpectedCount(1);
mySubscriptionResourceMatched.setExpectedCount(1);
theRunnable.runOrThrow();
mySubscriptionResourceMatched.awaitExpected();
ourObservationListener.awaitExpected();
} catch (InterruptedException exception) {
fail();
Thread.currentThread().interrupt();
}
}
}

View File

@ -6,6 +6,7 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
@ -226,7 +227,7 @@ public class SubscriptionValidatingInterceptorTest {
@Bean
SubscriptionCanonicalizer subscriptionCanonicalizer(FhirContext theFhirContext) {
return new SubscriptionCanonicalizer(theFhirContext);
return new SubscriptionCanonicalizer(theFhirContext, new StorageSettings());
}
@Bean

View File

@ -5,6 +5,7 @@ import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.dao.r4.BasePartitioningR4Test;
@ -24,15 +25,13 @@ import jakarta.servlet.ServletException;
import org.awaitility.core.ConditionTimeoutException;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Subscription;
import org.hl7.fhir.r4.model.*;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -41,6 +40,7 @@ import java.time.LocalDate;
import java.time.Month;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.containsString;
@ -146,8 +146,10 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
Assertions.assertEquals(Constants.CT_FHIR_JSON_NEW, BaseSubscriptionsR4Test.ourRestfulServer.getRequestContentTypes().get(0));
}
@Test
public void testCreateSubscriptionInPartitionAndResourceInDifferentPartition() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCreateSubscriptionInPartitionAndResourceInDifferentPartition(boolean theIsCrossPartitionEnabled) throws Exception {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
String payload = "application/fhir+json";
String code = "1000000050";
@ -169,31 +171,40 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
Assertions.assertEquals(0, BaseSubscriptionsR4Test.ourPatientProvider.getCountCreate());
try {
// Should have 0 matching subscription, if we get 1 update count then the test fails
BaseSubscriptionsR4Test.ourPatientProvider.waitForUpdateCount(1);
fail();
if (!theIsCrossPartitionEnabled) {
fail("Expecting a timeout and 0 matching subscriptions and thus a timeout if cross partition is DISabled");
}
Assertions.assertEquals(1, BaseSubscriptionsR4Test.ourRestfulServer.getRequestContentTypes().size());
} catch (ConditionTimeoutException e) {
if (theIsCrossPartitionEnabled) {
fail("Expecting no timeout and 1 matching subscriptions and thus a timeout if cross partition is enabled");
} else {
// Should have 0 matching subscription, if we get 1 update count then the test fails
Assertions.assertEquals(0, BaseSubscriptionsR4Test.ourRestfulServer.getRequestContentTypes().size());
}
}
}
@Test
public void testManualTriggeredSubscriptionDoesNotCheckOutsideOfPartition() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testManualTriggeredSubscriptionDoesNotCheckOutsideOfPartition(boolean theIsCrossPartitionEnabled) throws Exception {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
String payload = "application/fhir+json";
String code = "1000000050";
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
//Given: We store a resource in partition 2
myPartitionInterceptor.setRequestPartitionId(REQ_PART_2);
IIdType observationIdPartitionTwo = myDaoRegistry.getResourceDao("Observation").create(createBaseObservation(code, "SNOMED-CT"), mySrd).getId();
final IFhirResourceDao observation = myDaoRegistry.getResourceDao("Observation");
IIdType observationIdPartitionTwo = observation.create(createBaseObservation(code, "SNOMED-CT"), mySrd).getId();
//Given: We store a similar resource in partition 1
myPartitionInterceptor.setRequestPartitionId(REQ_PART_1);
IIdType observationIdPartitionOne = myDaoRegistry.getResourceDao("Observation").create(createBaseObservation(code, "SNOMED-CT"), mySrd).getId();
IIdType observationIdPartitionOne = observation.create(createBaseObservation(code, "SNOMED-CT"), mySrd).getId();
//Given: We create a subscrioption on Partition 1
IIdType subscriptionId= myDaoRegistry.getResourceDao("Subscription").create(newSubscription(criteria1, payload), mySrd).getId();
IIdType subscriptionId = myDaoRegistry.getResourceDao("Subscription").create(newSubscription(criteria1, payload), mySrd).getId();
waitForActivatedSubscriptionCount(1);
ArrayList<IPrimitiveType<String>> searchUrlList = new ArrayList<>();
@ -204,8 +215,14 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
waitForQueueToDrain();
List<Observation> resourceUpdates = BaseSubscriptionsR4Test.ourObservationProvider.getResourceUpdates();
if (theIsCrossPartitionEnabled) {
assertThat(resourceUpdates.size(), is(equalTo(2)));
assertThat(resourceUpdates.stream().map(Resource::getId).sorted().toList(),
is(equalTo(Stream.of(observationIdPartitionOne, observationIdPartitionTwo).map(Object::toString).sorted().toList())));
} else {
assertThat(resourceUpdates.size(), is(equalTo(1)));
assertThat(resourceUpdates.get(0).getId(), is(equalTo(observationIdPartitionOne.toString())));
}
String responseValue = resultParameters.getParameter().get(0).getValue().primitiveValue();
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));

View File

@ -5,6 +5,7 @@ import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
@ -63,7 +64,7 @@ public class SubscriptionValidatingInterceptorTest {
@BeforeEach
public void before() {
mySvc = new SubscriptionValidatingInterceptor();
mySubscriptionCanonicalizer = spy(new SubscriptionCanonicalizer(myCtx));
mySubscriptionCanonicalizer = spy(new SubscriptionCanonicalizer(myCtx, new StorageSettings()));
mySvc.setSubscriptionCanonicalizerForUnitTest(mySubscriptionCanonicalizer);
mySvc.setDaoRegistryForUnitTest(myDaoRegistry);
mySvc.setSubscriptionStrategyEvaluatorForUnitTest(mySubscriptionStrategyEvaluator);

View File

@ -27,6 +27,11 @@ import org.hl7.fhir.instance.model.api.IIdType;
import java.util.Set;
/**
* @deprecated Users should be instead be granted more granular write permissions that cover PATCH operations.
* @since 7.2.0
*/
@Deprecated
class RuleImplPatch extends BaseRule {
private boolean myAllRequests;

View File

@ -48,6 +48,7 @@ import java.util.TreeSet;
@SuppressWarnings("JavadocLinkAsPlainText")
public class JpaStorageSettings extends StorageSettings {
private static final Logger ourLog = LoggerFactory.getLogger(JpaStorageSettings.class);
/**
* Default value for {@link #getBulkExportFileMaximumSize()}: 100 MB
@ -105,7 +106,6 @@ public class JpaStorageSettings extends StorageSettings {
*/
private static final Integer DEFAULT_MAXIMUM_SEARCH_RESULT_COUNT_IN_TRANSACTION = null;
private static final Logger ourLog = LoggerFactory.getLogger(JpaStorageSettings.class);
private static final int DEFAULT_REINDEX_BATCH_SIZE = 800;
private static final int DEFAULT_MAXIMUM_DELETE_CONFLICT_COUNT = 60;
/**

View File

@ -23,6 +23,7 @@ import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
@ -68,10 +69,23 @@ public class SubscriptionCanonicalizer {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionCanonicalizer.class);
final FhirContext myFhirContext;
private final StorageSettings myStorageSettings;
@Autowired
public SubscriptionCanonicalizer(FhirContext theFhirContext, StorageSettings theStorageSettings) {
myFhirContext = theFhirContext;
myStorageSettings = theStorageSettings;
}
// TODO: LD: remove this constructor once all callers call the 2 arg constructor above
/**
* @deprecated All callers should invoke {@link SubscriptionCanonicalizer()} instead.
*/
@Deprecated
public SubscriptionCanonicalizer(FhirContext theFhirContext) {
myFhirContext = theFhirContext;
myStorageSettings = new StorageSettings();
}
public CanonicalSubscription canonicalize(IBaseResource theSubscription) {
@ -109,7 +123,7 @@ public class SubscriptionCanonicalizer {
retVal.setIdElement(subscription.getIdElement());
retVal.setPayloadString(channel.getPayload());
retVal.setTags(extractTags(subscription));
retVal.setCrossPartitionEnabled(SubscriptionUtil.isCrossPartition(theSubscription));
handleCrossPartition(theSubscription, retVal);
retVal.setSendDeleteMessages(extractDeleteExtensionDstu2(subscription));
} catch (FHIRException theE) {
throw new InternalErrorException(Msg.code(557) + theE);
@ -161,7 +175,7 @@ public class SubscriptionCanonicalizer {
retVal.setPayloadSearchCriteria(
getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA));
retVal.setTags(extractTags(subscription));
retVal.setCrossPartitionEnabled(SubscriptionUtil.isCrossPartition(theSubscription));
handleCrossPartition(theSubscription, retVal);
if (retVal.getChannelType() == CanonicalSubscriptionChannelType.EMAIL) {
String from;
@ -292,7 +306,7 @@ public class SubscriptionCanonicalizer {
getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA));
retVal.setTags(extractTags(subscription));
setPartitionIdOnReturnValue(theSubscription, retVal);
retVal.setCrossPartitionEnabled(SubscriptionUtil.isCrossPartition(theSubscription));
handleCrossPartition(theSubscription, retVal);
List<org.hl7.fhir.r4.model.CanonicalType> profiles =
subscription.getMeta().getProfile();
@ -497,6 +511,8 @@ public class SubscriptionCanonicalizer {
retVal.setSendDeleteMessages(extension.getValueBooleanType().booleanValue());
}
handleCrossPartition(theSubscription, retVal);
return retVal;
}
@ -561,6 +577,8 @@ public class SubscriptionCanonicalizer {
setR5FlagsBasedOnChannelType(subscription, retVal);
handleCrossPartition(theSubscription, retVal);
return retVal;
}
@ -762,4 +780,12 @@ public class SubscriptionCanonicalizer {
}
return status.getValueAsString();
}
private void handleCrossPartition(IBaseResource theSubscription, CanonicalSubscription retVal) {
if (myStorageSettings.isCrossPartitionSubscriptionEnabled()) {
retVal.setCrossPartitionEnabled(true);
} else {
retVal.setCrossPartitionEnabled(SubscriptionUtil.isCrossPartition(theSubscription));
}
}
}

View File

@ -1,23 +1,35 @@
package ca.uhn.fhir.jpa.subscription.match.registry;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.CanonicalTopicSubscriptionFilter;
import ca.uhn.fhir.model.api.ExtensionDt;
import ca.uhn.fhir.model.api.IFhirVersion;
import ca.uhn.fhir.model.dstu2.FhirDstu2;
import ca.uhn.fhir.model.primitive.BooleanDt;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import ca.uhn.fhir.subscription.SubscriptionTestDataHelper;
import org.hl7.fhir.dstu3.hapi.ctx.FhirDstu3;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.hapi.ctx.FhirR4;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Extension;
import org.hl7.fhir.r4.model.Subscription;
import org.hl7.fhir.r4b.hapi.ctx.FhirR4B;
import org.hl7.fhir.r5.hapi.ctx.FhirR5;
import org.hl7.fhir.r5.model.Coding;
import org.hl7.fhir.r5.model.Enumerations;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.stream.Stream;
import static ca.uhn.fhir.rest.api.Constants.CT_FHIR_JSON_NEW;
import static ca.uhn.fhir.util.HapiExtensions.EX_SEND_DELETE_MESSAGES;
import static org.hamcrest.MatcherAssert.assertThat;
@ -26,12 +38,13 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
class SubscriptionCanonicalizerTest {
FhirContext r4Context = FhirContext.forR4();
private final SubscriptionCanonicalizer testedSC = new SubscriptionCanonicalizer(r4Context);
private final SubscriptionCanonicalizer testedSC = new SubscriptionCanonicalizer(r4Context, new StorageSettings());
@Test
void testCanonicalizeR4SendDeleteMessagesSetsExtensionValueNotPresent() {
@ -72,7 +85,7 @@ class SubscriptionCanonicalizerTest {
@Test
public void testCanonicalizeDstu2SendDeleteMessages() {
//setup
SubscriptionCanonicalizer dstu2Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forDstu2Cached());
SubscriptionCanonicalizer dstu2Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forDstu2Cached(), new StorageSettings());
ca.uhn.fhir.model.dstu2.resource.Subscription dstu2Sub = new ca.uhn.fhir.model.dstu2.resource.Subscription();
ExtensionDt extensionDt = new ExtensionDt();
extensionDt.setUrl(EX_SEND_DELETE_MESSAGES);
@ -108,7 +121,7 @@ class SubscriptionCanonicalizerTest {
@ValueSource(strings = {"full-resource", "id-only", "empty"})
public void testR5Canonicalize_returnsCorrectCanonicalSubscription(String thePayloadContent) {
// setup
SubscriptionCanonicalizer r5Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR5Cached());
SubscriptionCanonicalizer r5Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR5Cached(), new StorageSettings());
org.hl7.fhir.r5.model.Subscription.SubscriptionPayloadContent payloadContent =
org.hl7.fhir.r5.model.Subscription.SubscriptionPayloadContent.fromCode(thePayloadContent);
org.hl7.fhir.r5.model.Subscription subscription = buildR5Subscription(payloadContent);
@ -148,7 +161,7 @@ class SubscriptionCanonicalizerTest {
// Example drawn from http://build.fhir.org/ig/HL7/fhir-subscription-backport-ig/Subscription-subscription-zulip.json.html
// setup
SubscriptionCanonicalizer r4bCanonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4BCached());
SubscriptionCanonicalizer r4bCanonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4BCached(), new StorageSettings());
org.hl7.fhir.r4b.model.Subscription subscription = buildR4BSubscription(thePayloadContent);
// execute
@ -160,6 +173,46 @@ class SubscriptionCanonicalizerTest {
verifyChannelParameters(canonical, thePayloadContent);
}
private static Stream<Arguments> crossPartitionParams() {
return Stream.of(
arguments(true, FhirContext.forDstu2Cached()),
arguments(false, FhirContext.forDstu2Cached()),
arguments(true, FhirContext.forDstu3Cached()),
arguments(false, FhirContext.forDstu3Cached()),
arguments(true, FhirContext.forR4Cached()),
arguments(false, FhirContext.forR4Cached()),
arguments(true, FhirContext.forR4BCached()),
arguments(false, FhirContext.forR4BCached()),
arguments(true, FhirContext.forR5Cached()),
arguments(false, FhirContext.forR5Cached())
);
}
@ParameterizedTest
@MethodSource("crossPartitionParams")
void testCrossPartition(boolean theCrossPartitionSubscriptionEnabled, FhirContext theFhirContext) {
final IFhirVersion version = theFhirContext.getVersion();
IBaseResource subscription = null;
if (version instanceof FhirDstu2){
subscription = new ca.uhn.fhir.model.dstu2.resource.Subscription();
} else if (version instanceof FhirDstu3){
subscription = new org.hl7.fhir.dstu3.model.Subscription();
} else if (version instanceof FhirR4){
subscription = new Subscription();
} else if (version instanceof FhirR4B){
subscription = new org.hl7.fhir.r4b.model.Subscription();
} else if (version instanceof FhirR5){
subscription = new org.hl7.fhir.r5.model.Subscription();
}
final StorageSettings storageSettings = new StorageSettings();
storageSettings.setCrossPartitionSubscriptionEnabled(theCrossPartitionSubscriptionEnabled);
final SubscriptionCanonicalizer subscriptionCanonicalizer = new SubscriptionCanonicalizer(theFhirContext, storageSettings);
final CanonicalSubscription canonicalSubscription = subscriptionCanonicalizer.canonicalize(subscription);
assertEquals(theCrossPartitionSubscriptionEnabled, canonicalSubscription.getCrossPartitionEnabled());
}
private org.hl7.fhir.r4b.model.Subscription buildR4BSubscription(String thePayloadContent) {
org.hl7.fhir.r4b.model.Subscription subscription = new org.hl7.fhir.r4b.model.Subscription();
@ -195,7 +248,7 @@ class SubscriptionCanonicalizerTest {
// Example drawn from http://build.fhir.org/ig/HL7/fhir-subscription-backport-ig/Subscription-subscription-zulip.json.html
// setup
SubscriptionCanonicalizer r4Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4Cached());
SubscriptionCanonicalizer r4Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4Cached(), new StorageSettings());
// execute
Subscription subscription = SubscriptionTestDataHelper.buildR4TopicSubscriptionWithContent(thePayloadContent);