Subscription cleanup

This commit is contained in:
jamesagnew 2020-04-04 20:04:32 -04:00
parent fef447afee
commit 79da1578cb
8 changed files with 105 additions and 44 deletions

View File

@ -22,22 +22,18 @@ package ca.uhn.fhir.jpa.dao;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.registry.SearchParamRegistryImpl;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamProvider; import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamProvider;
import ca.uhn.fhir.jpa.searchparam.registry.SearchParamRegistryImpl;
import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum; import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.IBundleProvider;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary; import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
@Service @Service
@Primary @Primary
public class DaoSearchParamProvider implements ISearchParamProvider { public class DaoSearchParamProvider implements ISearchParamProvider {
@Autowired
private PlatformTransactionManager myTxManager;
@Autowired @Autowired
private DaoRegistry myDaoRegistry; private DaoRegistry myDaoRegistry;
@ -48,7 +44,6 @@ public class DaoSearchParamProvider implements ISearchParamProvider {
@Override @Override
public int refreshCache(SearchParamRegistryImpl theSearchParamRegistry, long theRefreshInterval) { public int refreshCache(SearchParamRegistryImpl theSearchParamRegistry, long theRefreshInterval) {
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager); return theSearchParamRegistry.doRefresh(theRefreshInterval);
return txTemplate.execute(t-> theSearchParamRegistry.doRefresh(theRefreshInterval));
} }
} }

View File

@ -1,8 +1,7 @@
package ca.uhn.fhir.jpa.dao.r4; package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionActivatingSubscriber; import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.subscription.SubscriptionTestUtil; import ca.uhn.fhir.jpa.subscription.SubscriptionTestUtil;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.TestUtil; import ca.uhn.fhir.util.TestUtil;
@ -14,7 +13,8 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import static org.junit.Assert.*; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class FhirResourceDaoR4InvalidSubscriptionTest extends BaseJpaR4Test { public class FhirResourceDaoR4InvalidSubscriptionTest extends BaseJpaR4Test {
@ -23,20 +23,13 @@ public class FhirResourceDaoR4InvalidSubscriptionTest extends BaseJpaR4Test {
@After @After
public void afterResetDao() { public void afterResetDao() {
SubscriptionActivatingSubscriber.setWaitForSubscriptionActivationSynchronouslyForUnitTest(false);
myDaoConfig.setResourceServerIdStrategy(new DaoConfig().getResourceServerIdStrategy()); myDaoConfig.setResourceServerIdStrategy(new DaoConfig().getResourceServerIdStrategy());
BaseHapiFhirDao.setValidationDisabledForUnitTest(false); BaseHapiFhirDao.setValidationDisabledForUnitTest(false);
} }
@Before
public void before() {
SubscriptionActivatingSubscriber.setWaitForSubscriptionActivationSynchronouslyForUnitTest(true);
}
@After @After
public void afterUnregisterRestHookListener() { public void afterUnregisterRestHookListener() {
mySubscriptionTestUtil.unregisterSubscriptionInterceptor(); mySubscriptionTestUtil.unregisterSubscriptionInterceptor();
SubscriptionActivatingSubscriber.setWaitForSubscriptionActivationSynchronouslyForUnitTest(false);
} }
@Before @Before

View File

@ -45,11 +45,6 @@ public class RestHookActivatesPreExistingSubscriptionsR4Test extends BaseResourc
@Autowired @Autowired
private SubscriptionTestUtil mySubscriptionTestUtil; private SubscriptionTestUtil mySubscriptionTestUtil;
@After
public void afterResetSubscriptionActivatingInterceptor() {
SubscriptionActivatingSubscriber.setWaitForSubscriptionActivationSynchronouslyForUnitTest(false);
}
@After @After
public void afterUnregisterRestHookListener() { public void afterUnregisterRestHookListener() {
mySubscriptionTestUtil.unregisterSubscriptionInterceptor(); mySubscriptionTestUtil.unregisterSubscriptionInterceptor();
@ -57,7 +52,6 @@ public class RestHookActivatesPreExistingSubscriptionsR4Test extends BaseResourc
@Before @Before
public void beforeSetSubscriptionActivatingInterceptor() { public void beforeSetSubscriptionActivatingInterceptor() {
SubscriptionActivatingSubscriber.setWaitForSubscriptionActivationSynchronouslyForUnitTest(true);
mySubscriptionLoader.doSyncSubscriptionsForUnitTest(); mySubscriptionLoader.doSyncSubscriptionsForUnitTest();
} }

View File

@ -4,7 +4,6 @@ package ca.uhn.fhir.jpa.subscription.resthook;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.provider.dstu3.BaseResourceProviderDstu3Test; import ca.uhn.fhir.jpa.provider.dstu3.BaseResourceProviderDstu3Test;
import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.jpa.subscription.SubscriptionTestUtil; import ca.uhn.fhir.jpa.subscription.SubscriptionTestUtil;
import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.annotation.Create; import ca.uhn.fhir.rest.annotation.Create;
@ -18,9 +17,18 @@ import com.google.common.collect.Lists;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlet.ServletHolder;
import org.hl7.fhir.dstu3.model.*; import org.hl7.fhir.dstu3.model.CodeableConcept;
import org.hl7.fhir.dstu3.model.Coding;
import org.hl7.fhir.dstu3.model.IdType;
import org.hl7.fhir.dstu3.model.Observation;
import org.hl7.fhir.dstu3.model.Subscription;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.junit.*; import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import java.util.Collections; import java.util.Collections;
@ -56,7 +64,6 @@ public class RestHookTestWithInterceptorRegisteredToDaoConfigDstu3Test extends B
myDaoConfig.setAllowMultipleDelete(new DaoConfig().isAllowMultipleDelete()); myDaoConfig.setAllowMultipleDelete(new DaoConfig().isAllowMultipleDelete());
mySubscriptionTestUtil.unregisterSubscriptionInterceptor(); mySubscriptionTestUtil.unregisterSubscriptionInterceptor();
SubscriptionActivatingSubscriber.setWaitForSubscriptionActivationSynchronouslyForUnitTest(false);
} }
@Before @Before
@ -68,7 +75,6 @@ public class RestHookTestWithInterceptorRegisteredToDaoConfigDstu3Test extends B
public void beforeReset() { public void beforeReset() {
ourCreatedObservations.clear(); ourCreatedObservations.clear();
ourUpdatedObservations.clear(); ourUpdatedObservations.clear();
SubscriptionActivatingSubscriber.setWaitForSubscriptionActivationSynchronouslyForUnitTest(true);
} }
private Subscription createSubscription(String criteria, String payload, String endpoint) throws InterruptedException { private Subscription createSubscription(String criteria, String payload, String endpoint) throws InterruptedException {

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelFactory;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants; import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
@ -60,10 +61,13 @@ public class SubscriptionChannelFactory {
} }
private static class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements MessageHandler { private static class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements MessageHandler, DisposableBean {
private final SubscribableChannel myWrappedChannel;
public BroadcastingSubscribableChannelWrapper(SubscribableChannel theChannel) { public BroadcastingSubscribableChannelWrapper(SubscribableChannel theChannel) {
theChannel.subscribe(this); theChannel.subscribe(this);
myWrappedChannel = theChannel;
} }
@ -79,6 +83,13 @@ public class SubscriptionChannelFactory {
public void handleMessage(Message<?> message) throws MessagingException { public void handleMessage(Message<?> message) throws MessagingException {
send(message); send(message);
} }
@Override
public void destroy() throws Exception {
if (myWrappedChannel instanceof DisposableBean) {
((DisposableBean) myWrappedChannel).destroy();
}
}
} }

View File

@ -47,8 +47,6 @@ public class DaoSubscriptionMatcher implements ISubscriptionMatcher {
private Logger ourLog = LoggerFactory.getLogger(DaoSubscriptionMatcher.class); private Logger ourLog = LoggerFactory.getLogger(DaoSubscriptionMatcher.class);
@Autowired @Autowired
private FhirContext myCtx; private FhirContext myCtx;
@Autowired
private PlatformTransactionManager myTxManager;
@Override @Override
public InMemoryMatchResult match(CanonicalSubscription theSubscription, ResourceModifiedMessage theMsg) { public InMemoryMatchResult match(CanonicalSubscription theSubscription, ResourceModifiedMessage theMsg) {
@ -78,10 +76,7 @@ public class DaoSubscriptionMatcher implements ISubscriptionMatcher {
IFhirResourceDao<? extends IBaseResource> responseDao = myDaoRegistry.getResourceDao(responseResourceDef.getImplementingClass()); IFhirResourceDao<? extends IBaseResource> responseDao = myDaoRegistry.getResourceDao(responseResourceDef.getImplementingClass());
responseCriteriaUrl.setLoadSynchronousUpTo(1); responseCriteriaUrl.setLoadSynchronousUpTo(1);
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager); return responseDao.search(responseCriteriaUrl);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
return txTemplate.execute(t -> responseDao.search(responseCriteriaUrl));
} }
} }

View File

@ -57,8 +57,6 @@ import javax.annotation.Nonnull;
public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscriptionResources implements MessageHandler { public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscriptionResources implements MessageHandler {
private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class); private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class);
@Autowired @Autowired
private PlatformTransactionManager myTransactionManager;
@Autowired
private SubscriptionRegistry mySubscriptionRegistry; private SubscriptionRegistry mySubscriptionRegistry;
@Autowired @Autowired
private DaoRegistry myDaoRegistry; private DaoRegistry myDaoRegistry;
@ -150,13 +148,7 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript
} }
private void activateAndRegisterSubscriptionIfRequiredInTransaction(IBaseResource theSubscription) { private void activateAndRegisterSubscriptionIfRequiredInTransaction(IBaseResource theSubscription) {
TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager);
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(@Nonnull TransactionStatus theStatus) {
activateOrRegisterSubscriptionIfRequired(theSubscription); activateOrRegisterSubscriptionIfRequired(theSubscription);
} }
});
}
} }

View File

@ -0,0 +1,75 @@
package ca.uhn.fhir.jpa.subscription.process.matcher.matching;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.support.IValidationSupport;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.searchparam.config.SearchParamConfig;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamProvider;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.process.config.SubscriptionProcessorConfig;
import ca.uhn.fhir.jpa.subscription.submit.config.SubscriptionSubmitterConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.PlatformTransactionManager;
import static org.junit.Assert.*;
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {
SubscriptionProcessorConfig.class,
SearchParamConfig.class,
DaoSubscriptionMatcherTest.MyConfig.class
})
public class DaoSubscriptionMatcherTest {
@Autowired(required = false)
private PlatformTransactionManager myTxManager;
@Autowired
private DaoSubscriptionMatcher mySvc;
@MockBean
private ModelConfig myModelConfig;
@MockBean
private DaoConfig myDaoConfig;
@MockBean
private ISearchParamProvider mySearchParamProvider;
@MockBean
private ISchedulerService mySchedulerService;
@MockBean
private IInterceptorService myInterceptorService;
@MockBean
private DaoRegistry myDaoRegistry;
@MockBean
private IValidationSupport myValidationSupport;
@MockBean
private SubscriptionChannelFactory mySubscriptionChannelFactory;
/**
* Make sure that if we're only running the {@link SubscriptionSubmitterConfig}, we don't need
* a transaction manager
*/
@Test
public void testSubmitterCanRunWithoutTransactionManager() {
assertNull(myTxManager);
}
@Configuration
public static class MyConfig {
@Bean
public FhirContext fhirContext() {
return FhirContext.forR4();
}
}
}