remove enabled flag and instead only create the subscription topic beans in the fhir versions that use them (#4737)

Co-authored-by: Ken Stevens <ken@smilecdr.com>
This commit is contained in:
Ken Stevens 2023-04-18 10:20:43 -04:00 committed by GitHub
parent 396007af40
commit 9e741cb145
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 44 additions and 33 deletions

View File

@ -19,6 +19,8 @@
*/ */
package ca.uhn.fhir.jpa.subscription.match.config; package ca.uhn.fhir.jpa.subscription.match.config;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionDeliveryChannelNamer; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionDeliveryChannelNamer;
@ -33,12 +35,12 @@ import ca.uhn.fhir.jpa.subscription.match.matcher.matching.ISubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.InMemorySubscriptionMatcher; import ca.uhn.fhir.jpa.subscription.match.matcher.matching.InMemorySubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.MatchingQueueSubscriberLoader; import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.MatchingQueueSubscriberLoader;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber; import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchDeliverer;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber; import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionRegisteringSubscriber; import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionRegisteringSubscriber;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionLoader; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.config.SubscriptionModelConfig; import ca.uhn.fhir.jpa.subscription.model.config.SubscriptionModelConfig;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicConfig;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary; import org.springframework.context.annotation.Primary;
@ -48,7 +50,7 @@ import org.springframework.context.annotation.Scope;
* This Spring config should be imported by a system that pulls messages off of the * This Spring config should be imported by a system that pulls messages off of the
* matching queue for processing, and handles delivery * matching queue for processing, and handles delivery
*/ */
@Import({SubscriptionModelConfig.class, SubscriptionTopicConfig.class}) @Import(SubscriptionModelConfig.class)
public class SubscriptionProcessorConfig { public class SubscriptionProcessorConfig {
@Bean @Bean
@ -96,6 +98,11 @@ public class SubscriptionProcessorConfig {
return new SubscriptionDeliveryHandlerFactory(); return new SubscriptionDeliveryHandlerFactory();
} }
@Bean
public SubscriptionMatchDeliverer subscriptionMatchDeliverer(FhirContext theFhirContext, IInterceptorBroadcaster theInterceptorBroadcaster, SubscriptionChannelRegistry theSubscriptionChannelRegistry) {
return new SubscriptionMatchDeliverer(theFhirContext, theInterceptorBroadcaster, theSubscriptionChannelRegistry);
}
@Bean @Bean
@Scope("prototype") @Scope("prototype")
public SubscriptionDeliveringRestHookSubscriber subscriptionDeliveringRestHookSubscriber() { public SubscriptionDeliveringRestHookSubscriber subscriptionDeliveringRestHookSubscriber() {

View File

@ -21,7 +21,6 @@ package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
import ca.uhn.fhir.IHapiBootOrder; import ca.uhn.fhir.IHapiBootOrder;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.jpa.model.entity.StorageSettings; import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings; import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
@ -45,7 +44,7 @@ public class MatchingQueueSubscriberLoader {
FhirContext myFhirContext; FhirContext myFhirContext;
@Autowired @Autowired
private SubscriptionMatchingSubscriber mySubscriptionMatchingSubscriber; private SubscriptionMatchingSubscriber mySubscriptionMatchingSubscriber;
@Autowired @Autowired(required = false)
private SubscriptionTopicMatchingSubscriber mySubscriptionTopicMatchingSubscriber; private SubscriptionTopicMatchingSubscriber mySubscriptionTopicMatchingSubscriber;
@Autowired @Autowired
private SubscriptionChannelFactory mySubscriptionChannelFactory; private SubscriptionChannelFactory mySubscriptionChannelFactory;
@ -67,7 +66,7 @@ public class MatchingQueueSubscriberLoader {
myMatchingChannel.subscribe(mySubscriptionActivatingSubscriber); myMatchingChannel.subscribe(mySubscriptionActivatingSubscriber);
myMatchingChannel.subscribe(mySubscriptionRegisteringSubscriber); myMatchingChannel.subscribe(mySubscriptionRegisteringSubscriber);
ourLog.info("Subscription Matching Subscriber subscribed to Matching Channel {} with name {}", myMatchingChannel.getClass().getName(), SUBSCRIPTION_MATCHING_CHANNEL_NAME); ourLog.info("Subscription Matching Subscriber subscribed to Matching Channel {} with name {}", myMatchingChannel.getClass().getName(), SUBSCRIPTION_MATCHING_CHANNEL_NAME);
if (myFhirContext.getVersion().getVersion().isEqualOrNewerThan(FhirVersionEnum.R4B)) { if (mySubscriptionTopicMatchingSubscriber != null) {
ourLog.info("Starting SubscriptionTopic Matching Subscriber"); ourLog.info("Starting SubscriptionTopic Matching Subscriber");
myMatchingChannel.subscribe(mySubscriptionTopicMatchingSubscriber); myMatchingChannel.subscribe(mySubscriptionTopicMatchingSubscriber);
} }

View File

@ -20,19 +20,11 @@
package ca.uhn.fhir.jpa.topic; package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher; import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchDeliverer;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
public class SubscriptionTopicConfig { public class SubscriptionTopicConfig {
@Bean
public SubscriptionMatchDeliverer subscriptionMatchDeliverer(FhirContext theFhirContext, IInterceptorBroadcaster theInterceptorBroadcaster, SubscriptionChannelRegistry theSubscriptionChannelRegistry) {
return new SubscriptionMatchDeliverer(theFhirContext, theInterceptorBroadcaster, theSubscriptionChannelRegistry);
}
@Bean @Bean
public SubscriptionTopicMatchingSubscriber subscriptionTopicMatchingSubscriber(FhirContext theFhirContext) { public SubscriptionTopicMatchingSubscriber subscriptionTopicMatchingSubscriber(FhirContext theFhirContext) {
return new SubscriptionTopicMatchingSubscriber(theFhirContext); return new SubscriptionTopicMatchingSubscriber(theFhirContext);

View File

@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.subscription.NotificationServlet;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy; import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.util.SubscriptionDebugLogInterceptor; import ca.uhn.fhir.jpa.subscription.util.SubscriptionDebugLogInterceptor;
import ca.uhn.fhir.jpa.test.util.SubscriptionTestUtil; import ca.uhn.fhir.jpa.test.util.SubscriptionTestUtil;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicRegistry;
import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.annotation.Create; import ca.uhn.fhir.rest.annotation.Create;
import ca.uhn.fhir.rest.annotation.Delete; import ca.uhn.fhir.rest.annotation.Delete;
@ -91,6 +92,8 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
private final List<IIdType> mySubscriptionIds = Collections.synchronizedList(new ArrayList<>()); private final List<IIdType> mySubscriptionIds = Collections.synchronizedList(new ArrayList<>());
@Autowired @Autowired
private SubscriptionTestUtil mySubscriptionTestUtil; private SubscriptionTestUtil mySubscriptionTestUtil;
@Autowired(required = false)
SubscriptionTopicRegistry mySubscriptionTopicRegistry;
@AfterEach @AfterEach
public void afterUnregisterRestHookListener() { public void afterUnregisterRestHookListener() {
@ -185,6 +188,12 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
return observation; return observation;
} }
@Test
public void testSubscriptionTopicRegistryBean() {
// This bean should not exist in DSTU3
assertNull(mySubscriptionTopicRegistry);
}
@Test @Test
public void testDatabaseStrategyMeta() throws InterruptedException { public void testDatabaseStrategyMeta() throws InterruptedException {
String databaseCriteria = "Observation?code=17861-6&context.type=IHD"; String databaseCriteria = "Observation?code=17861-6&context.type=IHD";

View File

@ -4,6 +4,7 @@ import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test; import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscriber; import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicRegistry;
import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.CacheControlDirective; import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;
@ -59,6 +60,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
@Autowired @Autowired
StoppableSubscriptionDeliveringRestHookSubscriber myStoppableSubscriptionDeliveringRestHookSubscriber; StoppableSubscriptionDeliveringRestHookSubscriber myStoppableSubscriptionDeliveringRestHookSubscriber;
@Autowired(required = false)
SubscriptionTopicRegistry mySubscriptionTopicRegistry;
@AfterEach @AfterEach
public void cleanupStoppableSubscriptionDeliveringRestHookSubscriber() { public void cleanupStoppableSubscriptionDeliveringRestHookSubscriber() {
@ -68,7 +71,11 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
myStorageSettings.setTriggerSubscriptionsForNonVersioningChanges(new JpaStorageSettings().isTriggerSubscriptionsForNonVersioningChanges()); myStorageSettings.setTriggerSubscriptionsForNonVersioningChanges(new JpaStorageSettings().isTriggerSubscriptionsForNonVersioningChanges());
} }
@Test
public void testSubscriptionTopicRegistryBean() {
// This bean should not exist in R4
assertNull(mySubscriptionTopicRegistry);
}
/** /**
* Make sure that if we delete a subscription, then reinstate it with a criteria * Make sure that if we delete a subscription, then reinstate it with a criteria

View File

@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.awaitility.Awaitility.await; import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class SubscriptionTopicR4BTest extends BaseSubscriptionsR4BTest { public class SubscriptionTopicR4BTest extends BaseSubscriptionsR4BTest {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTopicR4BTest.class); private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTopicR4BTest.class);
@ -71,6 +72,11 @@ public class SubscriptionTopicR4BTest extends BaseSubscriptionsR4BTest {
super.after(); super.after();
} }
@Test
public void testSubscriptionTopicRegistryBean() {
assertNotNull(mySubscriptionTopicRegistry);
}
@Test @Test
public void testCreate() throws Exception { public void testCreate() throws Exception {
// WIP SR4B test update, delete, etc // WIP SR4B test update, delete, etc

View File

@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.awaitility.Awaitility.await; import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class SubscriptionTopicR5Test extends BaseSubscriptionsR5Test { public class SubscriptionTopicR5Test extends BaseSubscriptionsR5Test {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTopicR5Test.class); private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTopicR5Test.class);
@ -57,6 +58,11 @@ public class SubscriptionTopicR5Test extends BaseSubscriptionsR5Test {
super.after(); super.after();
} }
@Test
public void testSubscriptionTopicRegistryBean() {
assertNotNull(mySubscriptionTopicRegistry);
}
@Test @Test
public void testRestHookSubscriptionTopicApplicationFhirJson() throws Exception { public void testRestHookSubscriptionTopicApplicationFhirJson() throws Exception {
// WIP SR4B test update, delete, etc // WIP SR4B test update, delete, etc

View File

@ -28,6 +28,7 @@ import ca.uhn.fhir.jpa.config.HapiJpaConfig;
import ca.uhn.fhir.jpa.config.r4b.JpaR4BConfig; import ca.uhn.fhir.jpa.config.r4b.JpaR4BConfig;
import ca.uhn.fhir.jpa.config.util.HapiEntityManagerFactoryUtil; import ca.uhn.fhir.jpa.config.util.HapiEntityManagerFactoryUtil;
import ca.uhn.fhir.jpa.model.dialect.HapiFhirH2Dialect; import ca.uhn.fhir.jpa.model.dialect.HapiFhirH2Dialect;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicConfig;
import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener; import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener;
import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener; import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener;
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor; import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
@ -64,6 +65,7 @@ import static org.junit.jupiter.api.Assertions.fail;
JpaR4BConfig.class, JpaR4BConfig.class,
HapiJpaConfig.class, HapiJpaConfig.class,
TestJPAConfig.class, TestJPAConfig.class,
SubscriptionTopicConfig.class,
TestHSearchAddInConfig.DefaultLuceneHeap.class, TestHSearchAddInConfig.DefaultLuceneHeap.class,
JpaBatch2Config.class, JpaBatch2Config.class,
Batch2JobsConfig.class Batch2JobsConfig.class

View File

@ -28,6 +28,7 @@ import ca.uhn.fhir.jpa.config.HapiJpaConfig;
import ca.uhn.fhir.jpa.config.r5.JpaR5Config; import ca.uhn.fhir.jpa.config.r5.JpaR5Config;
import ca.uhn.fhir.jpa.config.util.HapiEntityManagerFactoryUtil; import ca.uhn.fhir.jpa.config.util.HapiEntityManagerFactoryUtil;
import ca.uhn.fhir.jpa.model.dialect.HapiFhirH2Dialect; import ca.uhn.fhir.jpa.model.dialect.HapiFhirH2Dialect;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicConfig;
import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener; import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener;
import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener; import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener;
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor; import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
@ -57,6 +58,7 @@ import static org.junit.jupiter.api.Assertions.fail;
JpaR5Config.class, JpaR5Config.class,
HapiJpaConfig.class, HapiJpaConfig.class,
TestJPAConfig.class, TestJPAConfig.class,
SubscriptionTopicConfig.class,
JpaBatch2Config.class, JpaBatch2Config.class,
Batch2JobsConfig.class, Batch2JobsConfig.class,
TestHSearchAddInConfig.DefaultLuceneHeap.class TestHSearchAddInConfig.DefaultLuceneHeap.class

View File

@ -66,7 +66,6 @@ public abstract class BaseResourceCacheSynchronizer implements IResourceChangeLi
private SearchParameterMap mySearchParameterMap; private SearchParameterMap mySearchParameterMap;
private SystemRequestDetails mySystemRequestDetails; private SystemRequestDetails mySystemRequestDetails;
private boolean myStopping; private boolean myStopping;
private boolean myEnabled;
private final Semaphore mySyncResourcesSemaphore = new Semaphore(1); private final Semaphore mySyncResourcesSemaphore = new Semaphore(1);
private final Object mySyncResourcesLock = new Object(); private final Object mySyncResourcesLock = new Object();
@ -83,7 +82,6 @@ public abstract class BaseResourceCacheSynchronizer implements IResourceChangeLi
mySearchParameterMap = getSearchParameterMap(); mySearchParameterMap = getSearchParameterMap();
mySystemRequestDetails = SystemRequestDetails.forAllPartitions(); mySystemRequestDetails = SystemRequestDetails.forAllPartitions();
myEnabled = true;
IResourceChangeListenerCache resourceCache = myResourceChangeListenerRegistry.registerResourceResourceChangeListener(myResourceName, mySearchParameterMap, this, REFRESH_INTERVAL); IResourceChangeListenerCache resourceCache = myResourceChangeListenerRegistry.registerResourceResourceChangeListener(myResourceName, mySearchParameterMap, this, REFRESH_INTERVAL);
resourceCache.forceRefresh(); resourceCache.forceRefresh();
} }
@ -101,9 +99,6 @@ public abstract class BaseResourceCacheSynchronizer implements IResourceChangeLi
* Read the existing resources from the database * Read the existing resources from the database
*/ */
public void syncDatabaseToCache() { public void syncDatabaseToCache() {
if (!myEnabled) {
return;
}
if (!resourceDaoExists()) { if (!resourceDaoExists()) {
return; return;
} }
@ -119,17 +114,11 @@ public abstract class BaseResourceCacheSynchronizer implements IResourceChangeLi
@VisibleForTesting @VisibleForTesting
public void acquireSemaphoreForUnitTest() throws InterruptedException { public void acquireSemaphoreForUnitTest() throws InterruptedException {
if (!myEnabled) {
return;
}
mySyncResourcesSemaphore.acquire(); mySyncResourcesSemaphore.acquire();
} }
@VisibleForTesting @VisibleForTesting
public int doSyncResourcessForUnitTest() { public int doSyncResourcessForUnitTest() {
if (!myEnabled) {
return 0;
}
// Two passes for delete flag to take effect // Two passes for delete flag to take effect
int first = doSyncResourcesWithRetry(); int first = doSyncResourcesWithRetry();
int second = doSyncResourcesWithRetry(); int second = doSyncResourcesWithRetry();
@ -191,10 +180,6 @@ public abstract class BaseResourceCacheSynchronizer implements IResourceChangeLi
@Override @Override
public void handleInit(Collection<IIdType> theResourceIds) { public void handleInit(Collection<IIdType> theResourceIds) {
if (!myEnabled) {
return;
}
if (!resourceDaoExists()) { if (!resourceDaoExists()) {
ourLog.warn("The resource type {} is enabled on this server, but there is no {} DAO configured.", myResourceName, myResourceName); ourLog.warn("The resource type {} is enabled on this server, but there is no {} DAO configured.", myResourceName, myResourceName);
return; return;
@ -209,10 +194,6 @@ public abstract class BaseResourceCacheSynchronizer implements IResourceChangeLi
@Override @Override
public void handleChange(IResourceChangeEvent theResourceChangeEvent) { public void handleChange(IResourceChangeEvent theResourceChangeEvent) {
if (!myEnabled) {
return;
}
// For now ignore the contents of theResourceChangeEvent. In the future, consider updating the registry based on // For now ignore the contents of theResourceChangeEvent. In the future, consider updating the registry based on
// known resources that have been created, updated & deleted // known resources that have been created, updated & deleted
syncDatabaseToCache(); syncDatabaseToCache();