From 9e741cb14598ae2c6b61f482d52984a123190a38 Mon Sep 17 00:00:00 2001 From: Ken Stevens Date: Tue, 18 Apr 2023 10:20:43 -0400 Subject: [PATCH] remove enabled flag and instead only create the subscription topic beans in the fhir versions that use them (#4737) Co-authored-by: Ken Stevens --- .../config/SubscriptionProcessorConfig.java | 11 +++++++++-- .../MatchingQueueSubscriberLoader.java | 5 ++--- .../jpa/topic/SubscriptionTopicConfig.java | 8 -------- .../resthook/RestHookTestDstu3Test.java | 9 +++++++++ .../resthook/RestHookTestR4Test.java | 9 ++++++++- .../SubscriptionTopicR4BTest.java | 6 ++++++ .../subscription/SubscriptionTopicR5Test.java | 6 ++++++ .../fhir/jpa/test/config/TestR4BConfig.java | 2 ++ .../fhir/jpa/test/config/TestR5Config.java | 2 ++ .../cache/BaseResourceCacheSynchronizer.java | 19 ------------------- 10 files changed, 44 insertions(+), 33 deletions(-) diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/config/SubscriptionProcessorConfig.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/config/SubscriptionProcessorConfig.java index 3743f5197d7..4e2dd76ed38 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/config/SubscriptionProcessorConfig.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/config/SubscriptionProcessorConfig.java @@ -19,6 +19,8 @@ */ package ca.uhn.fhir.jpa.subscription.match.config; +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionDeliveryChannelNamer; @@ -33,12 +35,12 @@ import ca.uhn.fhir.jpa.subscription.match.matcher.matching.ISubscriptionMatcher; import ca.uhn.fhir.jpa.subscription.match.matcher.matching.InMemorySubscriptionMatcher; import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.MatchingQueueSubscriberLoader; import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber; +import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchDeliverer; import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber; import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionRegisteringSubscriber; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionLoader; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.model.config.SubscriptionModelConfig; -import ca.uhn.fhir.jpa.topic.SubscriptionTopicConfig; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Primary; @@ -48,7 +50,7 @@ import org.springframework.context.annotation.Scope; * This Spring config should be imported by a system that pulls messages off of the * matching queue for processing, and handles delivery */ -@Import({SubscriptionModelConfig.class, SubscriptionTopicConfig.class}) +@Import(SubscriptionModelConfig.class) public class SubscriptionProcessorConfig { @Bean @@ -96,6 +98,11 @@ public class SubscriptionProcessorConfig { return new SubscriptionDeliveryHandlerFactory(); } + @Bean + public SubscriptionMatchDeliverer subscriptionMatchDeliverer(FhirContext theFhirContext, IInterceptorBroadcaster theInterceptorBroadcaster, SubscriptionChannelRegistry theSubscriptionChannelRegistry) { + return new SubscriptionMatchDeliverer(theFhirContext, theInterceptorBroadcaster, theSubscriptionChannelRegistry); + } + @Bean @Scope("prototype") public SubscriptionDeliveringRestHookSubscriber subscriptionDeliveringRestHookSubscriber() { diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/MatchingQueueSubscriberLoader.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/MatchingQueueSubscriberLoader.java index 90e91baa7af..4ae38f21577 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/MatchingQueueSubscriberLoader.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/MatchingQueueSubscriberLoader.java @@ -21,7 +21,6 @@ package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber; import ca.uhn.fhir.IHapiBootOrder; import ca.uhn.fhir.context.FhirContext; -import ca.uhn.fhir.context.FhirVersionEnum; import ca.uhn.fhir.jpa.model.entity.StorageSettings; import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; @@ -45,7 +44,7 @@ public class MatchingQueueSubscriberLoader { FhirContext myFhirContext; @Autowired private SubscriptionMatchingSubscriber mySubscriptionMatchingSubscriber; - @Autowired + @Autowired(required = false) private SubscriptionTopicMatchingSubscriber mySubscriptionTopicMatchingSubscriber; @Autowired private SubscriptionChannelFactory mySubscriptionChannelFactory; @@ -67,7 +66,7 @@ public class MatchingQueueSubscriberLoader { myMatchingChannel.subscribe(mySubscriptionActivatingSubscriber); myMatchingChannel.subscribe(mySubscriptionRegisteringSubscriber); ourLog.info("Subscription Matching Subscriber subscribed to Matching Channel {} with name {}", myMatchingChannel.getClass().getName(), SUBSCRIPTION_MATCHING_CHANNEL_NAME); - if (myFhirContext.getVersion().getVersion().isEqualOrNewerThan(FhirVersionEnum.R4B)) { + if (mySubscriptionTopicMatchingSubscriber != null) { ourLog.info("Starting SubscriptionTopic Matching Subscriber"); myMatchingChannel.subscribe(mySubscriptionTopicMatchingSubscriber); } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicConfig.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicConfig.java index 1f9bf66550f..387ef453009 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicConfig.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicConfig.java @@ -20,19 +20,11 @@ package ca.uhn.fhir.jpa.topic; import ca.uhn.fhir.context.FhirContext; -import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher; -import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry; -import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchDeliverer; import org.springframework.context.annotation.Bean; public class SubscriptionTopicConfig { - @Bean - public SubscriptionMatchDeliverer subscriptionMatchDeliverer(FhirContext theFhirContext, IInterceptorBroadcaster theInterceptorBroadcaster, SubscriptionChannelRegistry theSubscriptionChannelRegistry) { - return new SubscriptionMatchDeliverer(theFhirContext, theInterceptorBroadcaster, theSubscriptionChannelRegistry); - } - @Bean public SubscriptionTopicMatchingSubscriber subscriptionTopicMatchingSubscriber(FhirContext theFhirContext) { return new SubscriptionTopicMatchingSubscriber(theFhirContext); diff --git a/hapi-fhir-jpaserver-test-dstu3/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestDstu3Test.java b/hapi-fhir-jpaserver-test-dstu3/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestDstu3Test.java index 1b55c5392bd..fba62e04979 100644 --- a/hapi-fhir-jpaserver-test-dstu3/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestDstu3Test.java +++ b/hapi-fhir-jpaserver-test-dstu3/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestDstu3Test.java @@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.subscription.NotificationServlet; import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy; import ca.uhn.fhir.jpa.subscription.util.SubscriptionDebugLogInterceptor; import ca.uhn.fhir.jpa.test.util.SubscriptionTestUtil; +import ca.uhn.fhir.jpa.topic.SubscriptionTopicRegistry; import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.rest.annotation.Create; import ca.uhn.fhir.rest.annotation.Delete; @@ -91,6 +92,8 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test { private final List mySubscriptionIds = Collections.synchronizedList(new ArrayList<>()); @Autowired private SubscriptionTestUtil mySubscriptionTestUtil; + @Autowired(required = false) + SubscriptionTopicRegistry mySubscriptionTopicRegistry; @AfterEach public void afterUnregisterRestHookListener() { @@ -185,6 +188,12 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test { return observation; } + @Test + public void testSubscriptionTopicRegistryBean() { + // This bean should not exist in DSTU3 + assertNull(mySubscriptionTopicRegistry); + } + @Test public void testDatabaseStrategyMeta() throws InterruptedException { String databaseCriteria = "Observation?code=17861-6&context.type=IHD"; diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestR4Test.java index 554eb9591fa..d315d9fadad 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestR4Test.java @@ -4,6 +4,7 @@ import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test; import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscriber; +import ca.uhn.fhir.jpa.topic.SubscriptionTopicRegistry; import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.rest.api.CacheControlDirective; import ca.uhn.fhir.rest.api.Constants; @@ -59,6 +60,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test { @Autowired StoppableSubscriptionDeliveringRestHookSubscriber myStoppableSubscriptionDeliveringRestHookSubscriber; + @Autowired(required = false) + SubscriptionTopicRegistry mySubscriptionTopicRegistry; @AfterEach public void cleanupStoppableSubscriptionDeliveringRestHookSubscriber() { @@ -68,7 +71,11 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test { myStorageSettings.setTriggerSubscriptionsForNonVersioningChanges(new JpaStorageSettings().isTriggerSubscriptionsForNonVersioningChanges()); } - + @Test + public void testSubscriptionTopicRegistryBean() { + // This bean should not exist in R4 + assertNull(mySubscriptionTopicRegistry); + } /** * Make sure that if we delete a subscription, then reinstate it with a criteria diff --git a/hapi-fhir-jpaserver-test-r4b/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTopicR4BTest.java b/hapi-fhir-jpaserver-test-r4b/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTopicR4BTest.java index 0f1d7237e04..ea3c0f7f6de 100644 --- a/hapi-fhir-jpaserver-test-r4b/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTopicR4BTest.java +++ b/hapi-fhir-jpaserver-test-r4b/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTopicR4BTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; public class SubscriptionTopicR4BTest extends BaseSubscriptionsR4BTest { private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTopicR4BTest.class); @@ -71,6 +72,11 @@ public class SubscriptionTopicR4BTest extends BaseSubscriptionsR4BTest { super.after(); } + @Test + public void testSubscriptionTopicRegistryBean() { + assertNotNull(mySubscriptionTopicRegistry); + } + @Test public void testCreate() throws Exception { // WIP SR4B test update, delete, etc diff --git a/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTopicR5Test.java b/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTopicR5Test.java index 03b2775222e..20918953cfd 100644 --- a/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTopicR5Test.java +++ b/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTopicR5Test.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; public class SubscriptionTopicR5Test extends BaseSubscriptionsR5Test { private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTopicR5Test.class); @@ -57,6 +58,11 @@ public class SubscriptionTopicR5Test extends BaseSubscriptionsR5Test { super.after(); } + @Test + public void testSubscriptionTopicRegistryBean() { + assertNotNull(mySubscriptionTopicRegistry); + } + @Test public void testRestHookSubscriptionTopicApplicationFhirJson() throws Exception { // WIP SR4B test update, delete, etc diff --git a/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/config/TestR4BConfig.java b/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/config/TestR4BConfig.java index 9ca5f9033ec..5beaab359db 100644 --- a/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/config/TestR4BConfig.java +++ b/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/config/TestR4BConfig.java @@ -28,6 +28,7 @@ import ca.uhn.fhir.jpa.config.HapiJpaConfig; import ca.uhn.fhir.jpa.config.r4b.JpaR4BConfig; import ca.uhn.fhir.jpa.config.util.HapiEntityManagerFactoryUtil; import ca.uhn.fhir.jpa.model.dialect.HapiFhirH2Dialect; +import ca.uhn.fhir.jpa.topic.SubscriptionTopicConfig; import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener; import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener; import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor; @@ -64,6 +65,7 @@ import static org.junit.jupiter.api.Assertions.fail; JpaR4BConfig.class, HapiJpaConfig.class, TestJPAConfig.class, + SubscriptionTopicConfig.class, TestHSearchAddInConfig.DefaultLuceneHeap.class, JpaBatch2Config.class, Batch2JobsConfig.class diff --git a/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/config/TestR5Config.java b/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/config/TestR5Config.java index 4dcdf1158ec..cb8d9ed8d74 100644 --- a/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/config/TestR5Config.java +++ b/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/config/TestR5Config.java @@ -28,6 +28,7 @@ import ca.uhn.fhir.jpa.config.HapiJpaConfig; import ca.uhn.fhir.jpa.config.r5.JpaR5Config; import ca.uhn.fhir.jpa.config.util.HapiEntityManagerFactoryUtil; import ca.uhn.fhir.jpa.model.dialect.HapiFhirH2Dialect; +import ca.uhn.fhir.jpa.topic.SubscriptionTopicConfig; import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener; import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener; import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor; @@ -57,6 +58,7 @@ import static org.junit.jupiter.api.Assertions.fail; JpaR5Config.class, HapiJpaConfig.class, TestJPAConfig.class, + SubscriptionTopicConfig.class, JpaBatch2Config.class, Batch2JobsConfig.class, TestHSearchAddInConfig.DefaultLuceneHeap.class diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/cache/BaseResourceCacheSynchronizer.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/cache/BaseResourceCacheSynchronizer.java index abd7e49f65c..62dbba9da76 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/cache/BaseResourceCacheSynchronizer.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/cache/BaseResourceCacheSynchronizer.java @@ -66,7 +66,6 @@ public abstract class BaseResourceCacheSynchronizer implements IResourceChangeLi private SearchParameterMap mySearchParameterMap; private SystemRequestDetails mySystemRequestDetails; private boolean myStopping; - private boolean myEnabled; private final Semaphore mySyncResourcesSemaphore = new Semaphore(1); private final Object mySyncResourcesLock = new Object(); @@ -83,7 +82,6 @@ public abstract class BaseResourceCacheSynchronizer implements IResourceChangeLi mySearchParameterMap = getSearchParameterMap(); mySystemRequestDetails = SystemRequestDetails.forAllPartitions(); - myEnabled = true; IResourceChangeListenerCache resourceCache = myResourceChangeListenerRegistry.registerResourceResourceChangeListener(myResourceName, mySearchParameterMap, this, REFRESH_INTERVAL); resourceCache.forceRefresh(); } @@ -101,9 +99,6 @@ public abstract class BaseResourceCacheSynchronizer implements IResourceChangeLi * Read the existing resources from the database */ public void syncDatabaseToCache() { - if (!myEnabled) { - return; - } if (!resourceDaoExists()) { return; } @@ -119,17 +114,11 @@ public abstract class BaseResourceCacheSynchronizer implements IResourceChangeLi @VisibleForTesting public void acquireSemaphoreForUnitTest() throws InterruptedException { - if (!myEnabled) { - return; - } mySyncResourcesSemaphore.acquire(); } @VisibleForTesting public int doSyncResourcessForUnitTest() { - if (!myEnabled) { - return 0; - } // Two passes for delete flag to take effect int first = doSyncResourcesWithRetry(); int second = doSyncResourcesWithRetry(); @@ -191,10 +180,6 @@ public abstract class BaseResourceCacheSynchronizer implements IResourceChangeLi @Override public void handleInit(Collection theResourceIds) { - if (!myEnabled) { - return; - } - if (!resourceDaoExists()) { ourLog.warn("The resource type {} is enabled on this server, but there is no {} DAO configured.", myResourceName, myResourceName); return; @@ -209,10 +194,6 @@ public abstract class BaseResourceCacheSynchronizer implements IResourceChangeLi @Override public void handleChange(IResourceChangeEvent theResourceChangeEvent) { - if (!myEnabled) { - return; - } - // For now ignore the contents of theResourceChangeEvent. In the future, consider updating the registry based on // known resources that have been created, updated & deleted syncDatabaseToCache();