Merge remote-tracking branch 'origin/5745-ready-state-batch2' into 5818-update-batch2-framework-with-gate_waiting-state

This commit is contained in:
tyner 2024-04-12 11:08:06 -04:00
commit 7cf197c875
17 changed files with 298 additions and 86 deletions

View File

@ -609,12 +609,14 @@ public abstract class BaseParser implements IParser {
private boolean isStripVersionsFromReferences(
CompositeChildElement theCompositeChildElement, IBaseResource theResource) {
Set<String> autoVersionReferencesAtPathExtensions =
MetaUtil.getAutoVersionReferencesAtPath(theResource.getMeta(), myContext.getResourceType(theResource));
if (theResource != null) {
Set<String> autoVersionReferencesAtPathExtensions = MetaUtil.getAutoVersionReferencesAtPath(
theResource.getMeta(), myContext.getResourceType(theResource));
if (!autoVersionReferencesAtPathExtensions.isEmpty()
&& theCompositeChildElement.anyPathMatches(autoVersionReferencesAtPathExtensions)) {
return false;
if (!autoVersionReferencesAtPathExtensions.isEmpty()
&& theCompositeChildElement.anyPathMatches(autoVersionReferencesAtPathExtensions)) {
return false;
}
}
Boolean stripVersionsFromReferences = myStripVersionsFromReferences;
@ -622,21 +624,20 @@ public abstract class BaseParser implements IParser {
return stripVersionsFromReferences;
}
if (myContext.getParserOptions().isStripVersionsFromReferences() == false) {
if (!myContext.getParserOptions().isStripVersionsFromReferences()) {
return false;
}
Set<String> dontStripVersionsFromReferencesAtPaths = myDontStripVersionsFromReferencesAtPaths;
if (dontStripVersionsFromReferencesAtPaths != null) {
if (dontStripVersionsFromReferencesAtPaths.isEmpty() == false
&& theCompositeChildElement.anyPathMatches(dontStripVersionsFromReferencesAtPaths)) {
return false;
}
if (dontStripVersionsFromReferencesAtPaths != null
&& !dontStripVersionsFromReferencesAtPaths.isEmpty()
&& theCompositeChildElement.anyPathMatches(dontStripVersionsFromReferencesAtPaths)) {
return false;
}
dontStripVersionsFromReferencesAtPaths =
myContext.getParserOptions().getDontStripVersionsFromReferencesAtPaths();
return dontStripVersionsFromReferencesAtPaths.isEmpty() != false
return dontStripVersionsFromReferencesAtPaths.isEmpty()
|| !theCompositeChildElement.anyPathMatches(dontStripVersionsFromReferencesAtPaths);
}

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 5842
title: "Fixed a bug where an NPE was being thrown when trying to serialize a FHIR fragment (e.g., backboneElement
, compound datatype) to a string representation if the fragment contains a `Reference`."

View File

@ -127,7 +127,7 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
Builder version = forVersion(VersionEnum.V7_2_0);
// allow null codes in concept map targets
// allow null codes in concept map targets (see comment on "20190722.27" if you are going to change this)
version.onTable("TRM_CONCEPT_MAP_GRP_ELM_TGT")
.modifyColumn("20240327.1", "TARGET_CODE")
.nullable()
@ -2497,10 +2497,26 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
.modifyColumn("20190722.26", "SYSTEM_VERSION")
.nullable()
.withType(ColumnTypeEnum.STRING, 200);
/*
DISABLED THIS STEP (20190722.27) ON PURPOSE BECAUSE IT STARTED CAUSING FAILURES ON MSSQL FOR A FRESH DB.
I left it here for historical purposes.
The reason for the failure is as follows. The TARGET_CODE column was originally 'not nullable' when it was
first introduced. And in 7_2_0, it is being changed to a nullable column (see 20240327.1 in init720()).
Starting with 7_2_0, on a fresh db, we create the table with nullable TARGET_CODE (as it is made nullable now).
Since we run all migration steps on fresh db, this step will try to convert the column which is created as nullable
to not nullable (which will then need to be coverted back to nullable in 7_2_0 migration).
Changing a nullable column to not nullable is not allowed in
MSSQL if there is an index on the column, which is the case here, as there is IDX_CNCPT_MP_GRP_ELM_TGT_CD
on this column. Since init720() has the right migration
step, where the column is set to nullable and has the right type and length, this statement is also
not necessary anymore even for not fresh dbs.
version.onTable("TRM_CONCEPT_MAP_GRP_ELM_TGT")
.modifyColumn("20190722.27", "TARGET_CODE")
.nonNullable()
.withType(ColumnTypeEnum.STRING, 500);
*/
version.onTable("TRM_CONCEPT_MAP_GRP_ELM_TGT")
.modifyColumn("20190722.28", "VALUESET_URL")
.nullable()

View File

@ -97,7 +97,7 @@ public class StorageSettings {
private boolean myDefaultSearchParamsCanBeOverridden = true;
private Set<Subscription.SubscriptionChannelType> mySupportedSubscriptionTypes = new HashSet<>();
private boolean myAutoCreatePlaceholderReferenceTargets;
private boolean myCrossPartitionSubscriptionEnabled = false;
private boolean myCrossPartitionSubscriptionEnabled = true;
private Integer myBundleBatchPoolSize = DEFAULT_BUNDLE_BATCH_POOL_SIZE;
private Integer myBundleBatchMaxPoolSize = DEFAULT_BUNDLE_BATCH_MAX_POOL_SIZE;
private boolean myEnableInMemorySubscriptionMatching = true;

View File

@ -20,6 +20,7 @@
package ca.uhn.fhir.jpa.subscription.model.config;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import org.springframework.context.annotation.Bean;
@ -29,8 +30,9 @@ import org.springframework.context.annotation.Configuration;
public class SubscriptionModelConfig {
@Bean
public SubscriptionCanonicalizer subscriptionCanonicalizer(FhirContext theFhirContext) {
return new SubscriptionCanonicalizer(theFhirContext);
public SubscriptionCanonicalizer subscriptionCanonicalizer(
FhirContext theFhirContext, StorageSettings theStorageSettings) {
return new SubscriptionCanonicalizer(theFhirContext, theStorageSettings);
}
@Bean

View File

@ -4,6 +4,7 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
@ -50,7 +51,7 @@ public class SubscriptionRegisteringSubscriberTest {
@Mock
private SubscriptionRegistry mySubscriptionRegistry;
@Spy
private SubscriptionCanonicalizer mySubscriptionCanonicalizer = new SubscriptionCanonicalizer(myFhirContext);
private SubscriptionCanonicalizer mySubscriptionCanonicalizer = new SubscriptionCanonicalizer(myFhirContext, new StorageSettings());
@Mock
private DaoRegistry myDaoRegistry;
@Mock

View File

@ -2,6 +2,7 @@ package ca.uhn.fhir.jpa.subscription.match.registry;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
@ -33,7 +34,7 @@ public class SubscriptionRegistryTest {
static FhirContext ourFhirContext = FhirContext.forR4Cached();
@Spy
SubscriptionCanonicalizer mySubscriptionCanonicalizer = new SubscriptionCanonicalizer(ourFhirContext);
SubscriptionCanonicalizer mySubscriptionCanonicalizer = new SubscriptionCanonicalizer(ourFhirContext, new StorageSettings());
@Spy
ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer = new TestChannelNamer();

View File

@ -1,6 +1,7 @@
package ca.uhn.fhir.jpa.subscription.module;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
@ -8,12 +9,15 @@ import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.util.HapiExtensions;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.Nonnull;
import org.assertj.core.util.Lists;
import org.hamcrest.Matchers;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.StringType;
import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -66,7 +70,7 @@ public class CanonicalSubscriptionTest {
@Test
public void testCanonicalSubscriptionRetainsMetaTags() throws IOException {
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4());
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), new StorageSettings());
CanonicalSubscription sub1 = canonicalizer.canonicalize(makeMdmSubscription());
assertTrue(sub1.getTags().keySet().contains(TAG_SYSTEM));
assertEquals(sub1.getTags().get(TAG_SYSTEM), TAG_VALUE);
@ -74,7 +78,7 @@ public class CanonicalSubscriptionTest {
@Test
public void emailDetailsEquals() {
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4());
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), new StorageSettings());
CanonicalSubscription sub1 = canonicalizer.canonicalize(makeEmailSubscription());
CanonicalSubscription sub2 = canonicalizer.canonicalize(makeEmailSubscription());
assertTrue(sub1.equals(sub2));
@ -82,7 +86,7 @@ public class CanonicalSubscriptionTest {
@Test
public void testSerializeMultiPartitionSubscription(){
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4());
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), new StorageSettings());
Subscription subscription = makeEmailSubscription();
subscription.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new BooleanType().setValue(true));
CanonicalSubscription canonicalSubscription = canonicalizer.canonicalize(subscription);
@ -90,28 +94,30 @@ public class CanonicalSubscriptionTest {
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), true);
}
@Test
public void testSerializeIncorrectMultiPartitionSubscription(){
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4());
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSerializeIncorrectMultiPartitionSubscription(boolean theIsCrossPartitionEnabled){
final StorageSettings storageSettings = buildStorageSettings(theIsCrossPartitionEnabled);
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), storageSettings);
Subscription subscription = makeEmailSubscription();
subscription.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new StringType().setValue("false"));
CanonicalSubscription canonicalSubscription = canonicalizer.canonicalize(subscription);
System.out.print(canonicalSubscription);
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), false);
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), theIsCrossPartitionEnabled);
}
@Test
public void testSerializeNonMultiPartitionSubscription(){
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4());
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSerializeNonMultiPartitionSubscription(boolean theIsCrossPartitionEnabled){
final StorageSettings storageSettings = buildStorageSettings(theIsCrossPartitionEnabled);
SubscriptionCanonicalizer canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4(), storageSettings);
Subscription subscription = makeEmailSubscription();
subscription.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new BooleanType().setValue(false));
CanonicalSubscription canonicalSubscription = canonicalizer.canonicalize(subscription);
System.out.print(canonicalSubscription);
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), false);
assertEquals(canonicalSubscription.getCrossPartitionEnabled(), theIsCrossPartitionEnabled);
}
@Test
@ -154,4 +160,11 @@ public class CanonicalSubscriptionTest {
ResourceDeliveryMessage payload = resourceDeliveryMessage.getPayload();
return payload.getSubscription();
}
@Nonnull
private static StorageSettings buildStorageSettings(boolean theIsCrossPartitionEnabled) {
final StorageSettings storageSettings = new StorageSettings();
storageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
return storageSettings;
}
}

View File

@ -27,6 +27,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Answers;
import org.mockito.InjectMocks;
import org.mockito.Mock;
@ -38,6 +40,7 @@ import java.util.Optional;
import static ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionCriteriaParser.TypeEnum.STARTYPE_EXPRESSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
@ -227,8 +230,10 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
ourObservationListener.awaitExpected();
}
@Test
public void testSubscriptionAndResourceOnDiffPartitionNotMatch() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionAndResourceOnDiffPartitionNotMatch(boolean theIsCrossPartitionEnabled) throws InterruptedException {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
@ -240,13 +245,18 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
mySubscriptionResourceNotMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(0));
mySubscriptionResourceNotMatched.awaitExpected();
final ThrowsInterrupted throwsInterrupted = () -> sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(0));
if (theIsCrossPartitionEnabled) {
runWithinLatchLogicExpectSuccess(throwsInterrupted);
} else {
runWithLatchLogicExpectFailure(throwsInterrupted);
}
}
@Test
public void testSubscriptionAndResourceOnDiffPartitionNotMatchPart2() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionAndResourceOnDiffPartitionNotMatchPart2(boolean theIsCrossPartitionEnabled) throws InterruptedException {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
@ -258,13 +268,19 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
mySubscriptionResourceNotMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(1));
mySubscriptionResourceNotMatched.awaitExpected();
final ThrowsInterrupted throwsInterrupted = () -> sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(1));
if (theIsCrossPartitionEnabled) {
runWithinLatchLogicExpectSuccess(throwsInterrupted);
} else {
runWithLatchLogicExpectFailure(throwsInterrupted);
}
}
@Test
public void testSubscriptionOnDefaultPartitionAndResourceOnDiffPartitionNotMatch() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionOnDefaultPartitionAndResourceOnDiffPartitionNotMatch(boolean theIsCrossPartitionEnabled) throws InterruptedException {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
@ -276,13 +292,19 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
mySubscriptionResourceNotMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(1));
mySubscriptionResourceNotMatched.awaitExpected();
final ThrowsInterrupted throwsInterrupted = () -> sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(1));
if (theIsCrossPartitionEnabled) {
runWithinLatchLogicExpectSuccess(throwsInterrupted);
} else {
runWithLatchLogicExpectFailure(throwsInterrupted);
}
}
@Test
public void testSubscriptionOnAPartitionAndResourceOnDefaultPartitionNotMatch() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionOnAPartitionAndResourceOnDefaultPartitionNotMatch(boolean theIsCrossPartitionEnabled) throws InterruptedException {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
@ -294,9 +316,13 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
mySubscriptionResourceNotMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT", RequestPartitionId.defaultPartition());
mySubscriptionResourceNotMatched.awaitExpected();
final ThrowsInterrupted throwsInterrupted = () -> sendObservation(code, "SNOMED-CT", RequestPartitionId.defaultPartition());
if (theIsCrossPartitionEnabled) {
runWithinLatchLogicExpectSuccess(throwsInterrupted);
} else {
runWithLatchLogicExpectFailure(throwsInterrupted);
}
}
@Test
@ -320,8 +346,10 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
ourObservationListener.awaitExpected();
}
@Test
public void testSubscriptionOnOnePartitionDoNotMatchResourceOnMultiplePartitions() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionOnOnePartitionDoNotMatchResourceOnMultiplePartitions(boolean theIsCrossPartitionEnabled) throws InterruptedException {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
@ -333,10 +361,13 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
mySubscriptionResourceNotMatched.setExpectedCount(1);
List<Integer> partitionId = Collections.synchronizedList(Lists.newArrayList(0, 2));
sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionIds(partitionId));
mySubscriptionResourceNotMatched.awaitExpected();
final ThrowsInterrupted throwsInterrupted = () -> sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionIds(Collections.synchronizedList(Lists.newArrayList(0, 2))));
if (theIsCrossPartitionEnabled) {
runWithinLatchLogicExpectSuccess(throwsInterrupted);
} else {
runWithLatchLogicExpectFailure(throwsInterrupted);
}
}
@Test
@ -519,4 +550,31 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
verify(message, atLeastOnce()).getPayloadId(null);
}
}
private interface ThrowsInterrupted {
void runOrThrow() throws InterruptedException;
}
private void runWithLatchLogicExpectFailure(ThrowsInterrupted theRunnable) {
try {
mySubscriptionResourceNotMatched.setExpectedCount(1);
theRunnable.runOrThrow();
mySubscriptionResourceNotMatched.awaitExpected();
} catch (InterruptedException exception) {
Thread.currentThread().interrupt();
}
}
private void runWithinLatchLogicExpectSuccess(ThrowsInterrupted theRunnable) {
try {
ourObservationListener.setExpectedCount(1);
mySubscriptionResourceMatched.setExpectedCount(1);
theRunnable.runOrThrow();
mySubscriptionResourceMatched.awaitExpected();
ourObservationListener.awaitExpected();
} catch (InterruptedException exception) {
fail();
Thread.currentThread().interrupt();
}
}
}

View File

@ -6,6 +6,7 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
@ -226,7 +227,7 @@ public class SubscriptionValidatingInterceptorTest {
@Bean
SubscriptionCanonicalizer subscriptionCanonicalizer(FhirContext theFhirContext) {
return new SubscriptionCanonicalizer(theFhirContext);
return new SubscriptionCanonicalizer(theFhirContext, new StorageSettings());
}
@Bean

View File

@ -5,6 +5,7 @@ import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.dao.r4.BasePartitioningR4Test;
@ -24,15 +25,13 @@ import jakarta.servlet.ServletException;
import org.awaitility.core.ConditionTimeoutException;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Subscription;
import org.hl7.fhir.r4.model.*;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -41,6 +40,7 @@ import java.time.LocalDate;
import java.time.Month;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.containsString;
@ -146,8 +146,10 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
Assertions.assertEquals(Constants.CT_FHIR_JSON_NEW, BaseSubscriptionsR4Test.ourRestfulServer.getRequestContentTypes().get(0));
}
@Test
public void testCreateSubscriptionInPartitionAndResourceInDifferentPartition() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCreateSubscriptionInPartitionAndResourceInDifferentPartition(boolean theIsCrossPartitionEnabled) throws Exception {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
String payload = "application/fhir+json";
String code = "1000000050";
@ -169,31 +171,40 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
Assertions.assertEquals(0, BaseSubscriptionsR4Test.ourPatientProvider.getCountCreate());
try {
// Should have 0 matching subscription, if we get 1 update count then the test fails
BaseSubscriptionsR4Test.ourPatientProvider.waitForUpdateCount(1);
fail();
if (!theIsCrossPartitionEnabled) {
fail("Expecting a timeout and 0 matching subscriptions and thus a timeout if cross partition is DISabled");
}
Assertions.assertEquals(1, BaseSubscriptionsR4Test.ourRestfulServer.getRequestContentTypes().size());
} catch (ConditionTimeoutException e) {
Assertions.assertEquals(0, BaseSubscriptionsR4Test.ourRestfulServer.getRequestContentTypes().size());
if (theIsCrossPartitionEnabled) {
fail("Expecting no timeout and 1 matching subscriptions and thus a timeout if cross partition is enabled");
} else {
// Should have 0 matching subscription, if we get 1 update count then the test fails
Assertions.assertEquals(0, BaseSubscriptionsR4Test.ourRestfulServer.getRequestContentTypes().size());
}
}
}
@Test
public void testManualTriggeredSubscriptionDoesNotCheckOutsideOfPartition() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testManualTriggeredSubscriptionDoesNotCheckOutsideOfPartition(boolean theIsCrossPartitionEnabled) throws Exception {
myStorageSettings.setCrossPartitionSubscriptionEnabled(theIsCrossPartitionEnabled);
String payload = "application/fhir+json";
String code = "1000000050";
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
//Given: We store a resource in partition 2
myPartitionInterceptor.setRequestPartitionId(REQ_PART_2);
IIdType observationIdPartitionTwo = myDaoRegistry.getResourceDao("Observation").create(createBaseObservation(code, "SNOMED-CT"), mySrd).getId();
final IFhirResourceDao observation = myDaoRegistry.getResourceDao("Observation");
IIdType observationIdPartitionTwo = observation.create(createBaseObservation(code, "SNOMED-CT"), mySrd).getId();
//Given: We store a similar resource in partition 1
myPartitionInterceptor.setRequestPartitionId(REQ_PART_1);
IIdType observationIdPartitionOne = myDaoRegistry.getResourceDao("Observation").create(createBaseObservation(code, "SNOMED-CT"), mySrd).getId();
IIdType observationIdPartitionOne = observation.create(createBaseObservation(code, "SNOMED-CT"), mySrd).getId();
//Given: We create a subscrioption on Partition 1
IIdType subscriptionId= myDaoRegistry.getResourceDao("Subscription").create(newSubscription(criteria1, payload), mySrd).getId();
IIdType subscriptionId = myDaoRegistry.getResourceDao("Subscription").create(newSubscription(criteria1, payload), mySrd).getId();
waitForActivatedSubscriptionCount(1);
ArrayList<IPrimitiveType<String>> searchUrlList = new ArrayList<>();
@ -204,8 +215,14 @@ public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4
waitForQueueToDrain();
List<Observation> resourceUpdates = BaseSubscriptionsR4Test.ourObservationProvider.getResourceUpdates();
assertThat(resourceUpdates.size(), is(equalTo(1)));
assertThat(resourceUpdates.get(0).getId(), is(equalTo(observationIdPartitionOne.toString())));
if (theIsCrossPartitionEnabled) {
assertThat(resourceUpdates.size(), is(equalTo(2)));
assertThat(resourceUpdates.stream().map(Resource::getId).sorted().toList(),
is(equalTo(Stream.of(observationIdPartitionOne, observationIdPartitionTwo).map(Object::toString).sorted().toList())));
} else {
assertThat(resourceUpdates.size(), is(equalTo(1)));
assertThat(resourceUpdates.get(0).getId(), is(equalTo(observationIdPartitionOne.toString())));
}
String responseValue = resultParameters.getParameter().get(0).getValue().primitiveValue();
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));

View File

@ -5,6 +5,7 @@ import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
@ -63,7 +64,7 @@ public class SubscriptionValidatingInterceptorTest {
@BeforeEach
public void before() {
mySvc = new SubscriptionValidatingInterceptor();
mySubscriptionCanonicalizer = spy(new SubscriptionCanonicalizer(myCtx));
mySubscriptionCanonicalizer = spy(new SubscriptionCanonicalizer(myCtx, new StorageSettings()));
mySvc.setSubscriptionCanonicalizerForUnitTest(mySubscriptionCanonicalizer);
mySvc.setDaoRegistryForUnitTest(myDaoRegistry);
mySvc.setSubscriptionStrategyEvaluatorForUnitTest(mySubscriptionStrategyEvaluator);

View File

@ -27,6 +27,11 @@ import org.hl7.fhir.instance.model.api.IIdType;
import java.util.Set;
/**
* @deprecated Users should be instead be granted more granular write permissions that cover PATCH operations.
* @since 7.2.0
*/
@Deprecated
class RuleImplPatch extends BaseRule {
private boolean myAllRequests;

View File

@ -48,6 +48,7 @@ import java.util.TreeSet;
@SuppressWarnings("JavadocLinkAsPlainText")
public class JpaStorageSettings extends StorageSettings {
private static final Logger ourLog = LoggerFactory.getLogger(JpaStorageSettings.class);
/**
* Default value for {@link #getBulkExportFileMaximumSize()}: 100 MB
@ -105,7 +106,6 @@ public class JpaStorageSettings extends StorageSettings {
*/
private static final Integer DEFAULT_MAXIMUM_SEARCH_RESULT_COUNT_IN_TRANSACTION = null;
private static final Logger ourLog = LoggerFactory.getLogger(JpaStorageSettings.class);
private static final int DEFAULT_REINDEX_BATCH_SIZE = 800;
private static final int DEFAULT_MAXIMUM_DELETE_CONFLICT_COUNT = 60;
/**

View File

@ -23,6 +23,7 @@ import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
@ -68,10 +69,23 @@ public class SubscriptionCanonicalizer {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionCanonicalizer.class);
final FhirContext myFhirContext;
private final StorageSettings myStorageSettings;
@Autowired
public SubscriptionCanonicalizer(FhirContext theFhirContext, StorageSettings theStorageSettings) {
myFhirContext = theFhirContext;
myStorageSettings = theStorageSettings;
}
// TODO: LD: remove this constructor once all callers call the 2 arg constructor above
/**
* @deprecated All callers should invoke {@link SubscriptionCanonicalizer()} instead.
*/
@Deprecated
public SubscriptionCanonicalizer(FhirContext theFhirContext) {
myFhirContext = theFhirContext;
myStorageSettings = new StorageSettings();
}
public CanonicalSubscription canonicalize(IBaseResource theSubscription) {
@ -109,7 +123,7 @@ public class SubscriptionCanonicalizer {
retVal.setIdElement(subscription.getIdElement());
retVal.setPayloadString(channel.getPayload());
retVal.setTags(extractTags(subscription));
retVal.setCrossPartitionEnabled(SubscriptionUtil.isCrossPartition(theSubscription));
handleCrossPartition(theSubscription, retVal);
retVal.setSendDeleteMessages(extractDeleteExtensionDstu2(subscription));
} catch (FHIRException theE) {
throw new InternalErrorException(Msg.code(557) + theE);
@ -161,7 +175,7 @@ public class SubscriptionCanonicalizer {
retVal.setPayloadSearchCriteria(
getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA));
retVal.setTags(extractTags(subscription));
retVal.setCrossPartitionEnabled(SubscriptionUtil.isCrossPartition(theSubscription));
handleCrossPartition(theSubscription, retVal);
if (retVal.getChannelType() == CanonicalSubscriptionChannelType.EMAIL) {
String from;
@ -292,7 +306,7 @@ public class SubscriptionCanonicalizer {
getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA));
retVal.setTags(extractTags(subscription));
setPartitionIdOnReturnValue(theSubscription, retVal);
retVal.setCrossPartitionEnabled(SubscriptionUtil.isCrossPartition(theSubscription));
handleCrossPartition(theSubscription, retVal);
List<org.hl7.fhir.r4.model.CanonicalType> profiles =
subscription.getMeta().getProfile();
@ -497,6 +511,8 @@ public class SubscriptionCanonicalizer {
retVal.setSendDeleteMessages(extension.getValueBooleanType().booleanValue());
}
handleCrossPartition(theSubscription, retVal);
return retVal;
}
@ -561,6 +577,8 @@ public class SubscriptionCanonicalizer {
setR5FlagsBasedOnChannelType(subscription, retVal);
handleCrossPartition(theSubscription, retVal);
return retVal;
}
@ -762,4 +780,12 @@ public class SubscriptionCanonicalizer {
}
return status.getValueAsString();
}
private void handleCrossPartition(IBaseResource theSubscription, CanonicalSubscription retVal) {
if (myStorageSettings.isCrossPartitionSubscriptionEnabled()) {
retVal.setCrossPartitionEnabled(true);
} else {
retVal.setCrossPartitionEnabled(SubscriptionUtil.isCrossPartition(theSubscription));
}
}
}

View File

@ -1,23 +1,35 @@
package ca.uhn.fhir.jpa.subscription.match.registry;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.CanonicalTopicSubscriptionFilter;
import ca.uhn.fhir.model.api.ExtensionDt;
import ca.uhn.fhir.model.api.IFhirVersion;
import ca.uhn.fhir.model.dstu2.FhirDstu2;
import ca.uhn.fhir.model.primitive.BooleanDt;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import ca.uhn.fhir.subscription.SubscriptionTestDataHelper;
import org.hl7.fhir.dstu3.hapi.ctx.FhirDstu3;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.hapi.ctx.FhirR4;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Extension;
import org.hl7.fhir.r4.model.Subscription;
import org.hl7.fhir.r4b.hapi.ctx.FhirR4B;
import org.hl7.fhir.r5.hapi.ctx.FhirR5;
import org.hl7.fhir.r5.model.Coding;
import org.hl7.fhir.r5.model.Enumerations;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.stream.Stream;
import static ca.uhn.fhir.rest.api.Constants.CT_FHIR_JSON_NEW;
import static ca.uhn.fhir.util.HapiExtensions.EX_SEND_DELETE_MESSAGES;
import static org.hamcrest.MatcherAssert.assertThat;
@ -26,12 +38,13 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
class SubscriptionCanonicalizerTest {
FhirContext r4Context = FhirContext.forR4();
private final SubscriptionCanonicalizer testedSC = new SubscriptionCanonicalizer(r4Context);
private final SubscriptionCanonicalizer testedSC = new SubscriptionCanonicalizer(r4Context, new StorageSettings());
@Test
void testCanonicalizeR4SendDeleteMessagesSetsExtensionValueNotPresent() {
@ -72,7 +85,7 @@ class SubscriptionCanonicalizerTest {
@Test
public void testCanonicalizeDstu2SendDeleteMessages() {
//setup
SubscriptionCanonicalizer dstu2Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forDstu2Cached());
SubscriptionCanonicalizer dstu2Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forDstu2Cached(), new StorageSettings());
ca.uhn.fhir.model.dstu2.resource.Subscription dstu2Sub = new ca.uhn.fhir.model.dstu2.resource.Subscription();
ExtensionDt extensionDt = new ExtensionDt();
extensionDt.setUrl(EX_SEND_DELETE_MESSAGES);
@ -108,7 +121,7 @@ class SubscriptionCanonicalizerTest {
@ValueSource(strings = {"full-resource", "id-only", "empty"})
public void testR5Canonicalize_returnsCorrectCanonicalSubscription(String thePayloadContent) {
// setup
SubscriptionCanonicalizer r5Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR5Cached());
SubscriptionCanonicalizer r5Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR5Cached(), new StorageSettings());
org.hl7.fhir.r5.model.Subscription.SubscriptionPayloadContent payloadContent =
org.hl7.fhir.r5.model.Subscription.SubscriptionPayloadContent.fromCode(thePayloadContent);
org.hl7.fhir.r5.model.Subscription subscription = buildR5Subscription(payloadContent);
@ -148,7 +161,7 @@ class SubscriptionCanonicalizerTest {
// Example drawn from http://build.fhir.org/ig/HL7/fhir-subscription-backport-ig/Subscription-subscription-zulip.json.html
// setup
SubscriptionCanonicalizer r4bCanonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4BCached());
SubscriptionCanonicalizer r4bCanonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4BCached(), new StorageSettings());
org.hl7.fhir.r4b.model.Subscription subscription = buildR4BSubscription(thePayloadContent);
// execute
@ -160,6 +173,46 @@ class SubscriptionCanonicalizerTest {
verifyChannelParameters(canonical, thePayloadContent);
}
private static Stream<Arguments> crossPartitionParams() {
return Stream.of(
arguments(true, FhirContext.forDstu2Cached()),
arguments(false, FhirContext.forDstu2Cached()),
arguments(true, FhirContext.forDstu3Cached()),
arguments(false, FhirContext.forDstu3Cached()),
arguments(true, FhirContext.forR4Cached()),
arguments(false, FhirContext.forR4Cached()),
arguments(true, FhirContext.forR4BCached()),
arguments(false, FhirContext.forR4BCached()),
arguments(true, FhirContext.forR5Cached()),
arguments(false, FhirContext.forR5Cached())
);
}
@ParameterizedTest
@MethodSource("crossPartitionParams")
void testCrossPartition(boolean theCrossPartitionSubscriptionEnabled, FhirContext theFhirContext) {
final IFhirVersion version = theFhirContext.getVersion();
IBaseResource subscription = null;
if (version instanceof FhirDstu2){
subscription = new ca.uhn.fhir.model.dstu2.resource.Subscription();
} else if (version instanceof FhirDstu3){
subscription = new org.hl7.fhir.dstu3.model.Subscription();
} else if (version instanceof FhirR4){
subscription = new Subscription();
} else if (version instanceof FhirR4B){
subscription = new org.hl7.fhir.r4b.model.Subscription();
} else if (version instanceof FhirR5){
subscription = new org.hl7.fhir.r5.model.Subscription();
}
final StorageSettings storageSettings = new StorageSettings();
storageSettings.setCrossPartitionSubscriptionEnabled(theCrossPartitionSubscriptionEnabled);
final SubscriptionCanonicalizer subscriptionCanonicalizer = new SubscriptionCanonicalizer(theFhirContext, storageSettings);
final CanonicalSubscription canonicalSubscription = subscriptionCanonicalizer.canonicalize(subscription);
assertEquals(theCrossPartitionSubscriptionEnabled, canonicalSubscription.getCrossPartitionEnabled());
}
private org.hl7.fhir.r4b.model.Subscription buildR4BSubscription(String thePayloadContent) {
org.hl7.fhir.r4b.model.Subscription subscription = new org.hl7.fhir.r4b.model.Subscription();
@ -195,7 +248,7 @@ class SubscriptionCanonicalizerTest {
// Example drawn from http://build.fhir.org/ig/HL7/fhir-subscription-backport-ig/Subscription-subscription-zulip.json.html
// setup
SubscriptionCanonicalizer r4Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4Cached());
SubscriptionCanonicalizer r4Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4Cached(), new StorageSettings());
// execute
Subscription subscription = SubscriptionTestDataHelper.buildR4TopicSubscriptionWithContent(thePayloadContent);

View File

@ -1168,6 +1168,18 @@ public class JsonParserR4Test extends BaseTest {
assertEquals(expected, actual);
}
@Test
public void testEncodeToString_CompoundTypeWithReference() {
Identifier identifier = new Identifier();
identifier.setSystem("http://system.org");
identifier.setValue("123");
Reference reference = new Reference("Organization/1");
identifier.setAssigner(reference);
String expected = "{\"system\":\"http://system.org\",\"value\":\"123\",\"assigner\":{\"reference\":\"Organization/1\"}}";
String actual = ourCtx.newJsonParser().encodeToString(identifier);
assertEquals(expected, actual);
}
@Test
public void testEncodeToString_Resource() {
Patient p = new Patient();