From 79da1578cb45ac6213e15689a2188106421bab29 Mon Sep 17 00:00:00 2001 From: jamesagnew Date: Sat, 4 Apr 2020 20:04:32 -0400 Subject: [PATCH] Subscription cleanup --- .../fhir/jpa/dao/DaoSearchParamProvider.java | 9 +-- ...rResourceDaoR4InvalidSubscriptionTest.java | 13 +--- ...tivatesPreExistingSubscriptionsR4Test.java | 6 -- ...rceptorRegisteredToDaoConfigDstu3Test.java | 16 ++-- .../SubscriptionChannelFactory.java | 13 +++- .../matching/DaoSubscriptionMatcher.java | 7 +- .../SubscriptionActivatingSubscriber.java | 10 +-- .../matching/DaoSubscriptionMatcherTest.java | 75 +++++++++++++++++++ 8 files changed, 105 insertions(+), 44 deletions(-) create mode 100644 hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/DaoSubscriptionMatcherTest.java diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/DaoSearchParamProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/DaoSearchParamProvider.java index ddf4342c62e..07bb9b36c4c 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/DaoSearchParamProvider.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/DaoSearchParamProvider.java @@ -22,22 +22,18 @@ package ca.uhn.fhir.jpa.dao; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; -import ca.uhn.fhir.jpa.searchparam.registry.SearchParamRegistryImpl; import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamProvider; +import ca.uhn.fhir.jpa.searchparam.registry.SearchParamRegistryImpl; import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum; import ca.uhn.fhir.rest.api.server.IBundleProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Service; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.transaction.support.TransactionTemplate; @Service @Primary public class DaoSearchParamProvider implements ISearchParamProvider { - @Autowired - private PlatformTransactionManager myTxManager; @Autowired private DaoRegistry myDaoRegistry; @@ -48,7 +44,6 @@ public class DaoSearchParamProvider implements ISearchParamProvider { @Override public int refreshCache(SearchParamRegistryImpl theSearchParamRegistry, long theRefreshInterval) { - TransactionTemplate txTemplate = new TransactionTemplate(myTxManager); - return txTemplate.execute(t-> theSearchParamRegistry.doRefresh(theRefreshInterval)); + return theSearchParamRegistry.doRefresh(theRefreshInterval); } } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4InvalidSubscriptionTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4InvalidSubscriptionTest.java index d41d57f43aa..35c203ed6f4 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4InvalidSubscriptionTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4InvalidSubscriptionTest.java @@ -1,8 +1,7 @@ package ca.uhn.fhir.jpa.dao.r4; -import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao; import ca.uhn.fhir.jpa.api.config.DaoConfig; -import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionActivatingSubscriber; +import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao; import ca.uhn.fhir.jpa.subscription.SubscriptionTestUtil; import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; import ca.uhn.fhir.util.TestUtil; @@ -14,7 +13,8 @@ import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; public class FhirResourceDaoR4InvalidSubscriptionTest extends BaseJpaR4Test { @@ -23,20 +23,13 @@ public class FhirResourceDaoR4InvalidSubscriptionTest extends BaseJpaR4Test { @After public void afterResetDao() { - SubscriptionActivatingSubscriber.setWaitForSubscriptionActivationSynchronouslyForUnitTest(false); myDaoConfig.setResourceServerIdStrategy(new DaoConfig().getResourceServerIdStrategy()); BaseHapiFhirDao.setValidationDisabledForUnitTest(false); } - @Before - public void before() { - SubscriptionActivatingSubscriber.setWaitForSubscriptionActivationSynchronouslyForUnitTest(true); - } - @After public void afterUnregisterRestHookListener() { mySubscriptionTestUtil.unregisterSubscriptionInterceptor(); - SubscriptionActivatingSubscriber.setWaitForSubscriptionActivationSynchronouslyForUnitTest(false); } @Before diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookActivatesPreExistingSubscriptionsR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookActivatesPreExistingSubscriptionsR4Test.java index e60b105c69d..4368f474659 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookActivatesPreExistingSubscriptionsR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookActivatesPreExistingSubscriptionsR4Test.java @@ -45,11 +45,6 @@ public class RestHookActivatesPreExistingSubscriptionsR4Test extends BaseResourc @Autowired private SubscriptionTestUtil mySubscriptionTestUtil; - @After - public void afterResetSubscriptionActivatingInterceptor() { - SubscriptionActivatingSubscriber.setWaitForSubscriptionActivationSynchronouslyForUnitTest(false); - } - @After public void afterUnregisterRestHookListener() { mySubscriptionTestUtil.unregisterSubscriptionInterceptor(); @@ -57,7 +52,6 @@ public class RestHookActivatesPreExistingSubscriptionsR4Test extends BaseResourc @Before public void beforeSetSubscriptionActivatingInterceptor() { - SubscriptionActivatingSubscriber.setWaitForSubscriptionActivationSynchronouslyForUnitTest(true); mySubscriptionLoader.doSyncSubscriptionsForUnitTest(); } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestWithInterceptorRegisteredToDaoConfigDstu3Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestWithInterceptorRegisteredToDaoConfigDstu3Test.java index 552d851f3c1..04110bd6db6 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestWithInterceptorRegisteredToDaoConfigDstu3Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestWithInterceptorRegisteredToDaoConfigDstu3Test.java @@ -4,7 +4,6 @@ package ca.uhn.fhir.jpa.subscription.resthook; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.provider.dstu3.BaseResourceProviderDstu3Test; -import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionActivatingSubscriber; import ca.uhn.fhir.jpa.subscription.SubscriptionTestUtil; import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.rest.annotation.Create; @@ -18,9 +17,18 @@ import com.google.common.collect.Lists; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; -import org.hl7.fhir.dstu3.model.*; +import org.hl7.fhir.dstu3.model.CodeableConcept; +import org.hl7.fhir.dstu3.model.Coding; +import org.hl7.fhir.dstu3.model.IdType; +import org.hl7.fhir.dstu3.model.Observation; +import org.hl7.fhir.dstu3.model.Subscription; import org.hl7.fhir.instance.model.api.IBaseResource; -import org.junit.*; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import java.util.Collections; @@ -56,7 +64,6 @@ public class RestHookTestWithInterceptorRegisteredToDaoConfigDstu3Test extends B myDaoConfig.setAllowMultipleDelete(new DaoConfig().isAllowMultipleDelete()); mySubscriptionTestUtil.unregisterSubscriptionInterceptor(); - SubscriptionActivatingSubscriber.setWaitForSubscriptionActivationSynchronouslyForUnitTest(false); } @Before @@ -68,7 +75,6 @@ public class RestHookTestWithInterceptorRegisteredToDaoConfigDstu3Test extends B public void beforeReset() { ourCreatedObservations.clear(); ourUpdatedObservations.clear(); - SubscriptionActivatingSubscriber.setWaitForSubscriptionActivationSynchronouslyForUnitTest(true); } private Subscription createSubscription(String criteria, String payload, String endpoint) throws InterruptedException { diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java index 70da786f8a4..30e207b952e 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java @@ -24,6 +24,7 @@ import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelFactory; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants; +import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -60,10 +61,13 @@ public class SubscriptionChannelFactory { } - private static class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements MessageHandler { + private static class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements MessageHandler, DisposableBean { + + private final SubscribableChannel myWrappedChannel; public BroadcastingSubscribableChannelWrapper(SubscribableChannel theChannel) { theChannel.subscribe(this); + myWrappedChannel = theChannel; } @@ -79,6 +83,13 @@ public class SubscriptionChannelFactory { public void handleMessage(Message message) throws MessagingException { send(message); } + + @Override + public void destroy() throws Exception { + if (myWrappedChannel instanceof DisposableBean) { + ((DisposableBean) myWrappedChannel).destroy(); + } + } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/DaoSubscriptionMatcher.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/DaoSubscriptionMatcher.java index 3c9cb3437aa..d01676ad6f0 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/DaoSubscriptionMatcher.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/DaoSubscriptionMatcher.java @@ -47,8 +47,6 @@ public class DaoSubscriptionMatcher implements ISubscriptionMatcher { private Logger ourLog = LoggerFactory.getLogger(DaoSubscriptionMatcher.class); @Autowired private FhirContext myCtx; - @Autowired - private PlatformTransactionManager myTxManager; @Override public InMemoryMatchResult match(CanonicalSubscription theSubscription, ResourceModifiedMessage theMsg) { @@ -78,10 +76,7 @@ public class DaoSubscriptionMatcher implements ISubscriptionMatcher { IFhirResourceDao responseDao = myDaoRegistry.getResourceDao(responseResourceDef.getImplementingClass()); responseCriteriaUrl.setLoadSynchronousUpTo(1); - TransactionTemplate txTemplate = new TransactionTemplate(myTxManager); - txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); - return txTemplate.execute(t -> responseDao.search(responseCriteriaUrl)); - + return responseDao.search(responseCriteriaUrl); } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionActivatingSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionActivatingSubscriber.java index 8f35ce3b196..f442c873211 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionActivatingSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionActivatingSubscriber.java @@ -57,8 +57,6 @@ import javax.annotation.Nonnull; public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscriptionResources implements MessageHandler { private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class); @Autowired - private PlatformTransactionManager myTransactionManager; - @Autowired private SubscriptionRegistry mySubscriptionRegistry; @Autowired private DaoRegistry myDaoRegistry; @@ -150,13 +148,7 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript } private void activateAndRegisterSubscriptionIfRequiredInTransaction(IBaseResource theSubscription) { - TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager); - txTemplate.execute(new TransactionCallbackWithoutResult() { - @Override - protected void doInTransactionWithoutResult(@Nonnull TransactionStatus theStatus) { - activateOrRegisterSubscriptionIfRequired(theSubscription); - } - }); + activateOrRegisterSubscriptionIfRequired(theSubscription); } } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/DaoSubscriptionMatcherTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/DaoSubscriptionMatcherTest.java new file mode 100644 index 00000000000..4d2bc26a60f --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/DaoSubscriptionMatcherTest.java @@ -0,0 +1,75 @@ +package ca.uhn.fhir.jpa.subscription.process.matcher.matching; + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.context.support.IValidationSupport; +import ca.uhn.fhir.interceptor.api.IInterceptorService; +import ca.uhn.fhir.jpa.api.config.DaoConfig; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.model.entity.ModelConfig; +import ca.uhn.fhir.jpa.model.sched.ISchedulerService; +import ca.uhn.fhir.jpa.searchparam.config.SearchParamConfig; +import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamProvider; +import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory; +import ca.uhn.fhir.jpa.subscription.process.config.SubscriptionProcessorConfig; +import ca.uhn.fhir.jpa.subscription.submit.config.SubscriptionSubmitterConfig; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.transaction.PlatformTransactionManager; + +import static org.junit.Assert.*; + +@RunWith(SpringRunner.class) +@ContextConfiguration(classes = { + SubscriptionProcessorConfig.class, + SearchParamConfig.class, + DaoSubscriptionMatcherTest.MyConfig.class +}) +public class DaoSubscriptionMatcherTest { + + @Autowired(required = false) + private PlatformTransactionManager myTxManager; + @Autowired + private DaoSubscriptionMatcher mySvc; + @MockBean + private ModelConfig myModelConfig; + @MockBean + private DaoConfig myDaoConfig; + @MockBean + private ISearchParamProvider mySearchParamProvider; + @MockBean + private ISchedulerService mySchedulerService; + @MockBean + private IInterceptorService myInterceptorService; + @MockBean + private DaoRegistry myDaoRegistry; + @MockBean + private IValidationSupport myValidationSupport; + @MockBean + private SubscriptionChannelFactory mySubscriptionChannelFactory; + + /** + * Make sure that if we're only running the {@link SubscriptionSubmitterConfig}, we don't need + * a transaction manager + */ + @Test + public void testSubmitterCanRunWithoutTransactionManager() { + assertNull(myTxManager); + } + + @Configuration + public static class MyConfig { + + @Bean + public FhirContext fhirContext() { + return FhirContext.forR4(); + } + + } + +}