Restore subscription loading at startup
This commit is contained in:
parent
99b9ec0b1c
commit
ccda668d0c
|
@ -236,6 +236,34 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
|
||||||
waitForSize(100, ourUpdatedObservations);
|
waitForSize(100, ourUpdatedObservations);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSubscriptionRegistryLoadsSubscriptionsFromDatabase() throws Exception {
|
||||||
|
String payload = "application/fhir+json";
|
||||||
|
|
||||||
|
String code = "1000000050";
|
||||||
|
String criteria1 = "Observation?";
|
||||||
|
|
||||||
|
createSubscription(criteria1, payload);
|
||||||
|
waitForActivatedSubscriptionCount(1);
|
||||||
|
|
||||||
|
// Manually unregister all subscriptions
|
||||||
|
mySubscriptionRegistry.unregisterAllSubscriptions();
|
||||||
|
waitForActivatedSubscriptionCount(0);
|
||||||
|
|
||||||
|
// Force a reload
|
||||||
|
mySubscriptionLoader.doSyncSubscriptionsForUnitTest();
|
||||||
|
|
||||||
|
// Send a matching observation
|
||||||
|
Observation observation = new Observation();
|
||||||
|
observation.getIdentifierFirstRep().setSystem("foo").setValue("ID");
|
||||||
|
observation.getCode().addCoding().setCode(code).setSystem("SNOMED-CT");
|
||||||
|
observation.setStatus(Observation.ObservationStatus.FINAL);
|
||||||
|
myObservationDao.create(observation);
|
||||||
|
|
||||||
|
waitForSize(1, ourUpdatedObservations);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testActiveSubscriptionShouldntReActivate() throws Exception {
|
public void testActiveSubscriptionShouldntReActivate() throws Exception {
|
||||||
String criteria = "Observation?code=111111111&_format=xml";
|
String criteria = "Observation?code=111111111&_format=xml";
|
||||||
|
|
|
@ -23,7 +23,6 @@ package ca.uhn.fhir.jpa.subscription.match.config;
|
||||||
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
|
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
|
||||||
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionDeliveryChannelNamer;
|
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionDeliveryChannelNamer;
|
||||||
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionDeliveryHandlerFactory;
|
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionDeliveryHandlerFactory;
|
||||||
import ca.uhn.fhir.jpa.subscription.model.config.SubscriptionModelConfig;
|
|
||||||
import ca.uhn.fhir.jpa.subscription.match.deliver.email.IEmailSender;
|
import ca.uhn.fhir.jpa.subscription.match.deliver.email.IEmailSender;
|
||||||
import ca.uhn.fhir.jpa.subscription.match.deliver.email.SubscriptionDeliveringEmailSubscriber;
|
import ca.uhn.fhir.jpa.subscription.match.deliver.email.SubscriptionDeliveringEmailSubscriber;
|
||||||
import ca.uhn.fhir.jpa.subscription.match.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
|
import ca.uhn.fhir.jpa.subscription.match.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
|
||||||
|
@ -35,10 +34,9 @@ import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.MatchingQueueSubscr
|
||||||
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber;
|
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber;
|
||||||
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber;
|
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber;
|
||||||
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionRegisteringSubscriber;
|
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionRegisteringSubscriber;
|
||||||
import ca.uhn.fhir.jpa.subscription.match.registry.DaoSubscriptionProvider;
|
|
||||||
import ca.uhn.fhir.jpa.subscription.match.registry.ISubscriptionProvider;
|
|
||||||
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionLoader;
|
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionLoader;
|
||||||
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
|
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
|
||||||
|
import ca.uhn.fhir.jpa.subscription.model.config.SubscriptionModelConfig;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Import;
|
import org.springframework.context.annotation.Import;
|
||||||
import org.springframework.context.annotation.Primary;
|
import org.springframework.context.annotation.Primary;
|
||||||
|
@ -81,11 +79,6 @@ public class SubscriptionProcessorConfig {
|
||||||
return new SubscriptionDeliveryChannelNamer();
|
return new SubscriptionDeliveryChannelNamer();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
|
||||||
public ISubscriptionProvider subscriptionProvider() {
|
|
||||||
return new DaoSubscriptionProvider();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public SubscriptionLoader subscriptionLoader() {
|
public SubscriptionLoader subscriptionLoader() {
|
||||||
return new SubscriptionLoader();
|
return new SubscriptionLoader();
|
||||||
|
|
|
@ -1,48 +0,0 @@
|
||||||
package ca.uhn.fhir.jpa.subscription.match.registry;
|
|
||||||
|
|
||||||
/*-
|
|
||||||
* #%L
|
|
||||||
* HAPI FHIR Subscription Server
|
|
||||||
* %%
|
|
||||||
* Copyright (C) 2014 - 2020 University Health Network
|
|
||||||
* %%
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
* #L%
|
|
||||||
*/
|
|
||||||
|
|
||||||
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
|
||||||
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
|
|
||||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
|
||||||
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber;
|
|
||||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
|
||||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
|
|
||||||
public class DaoSubscriptionProvider implements ISubscriptionProvider {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
DaoRegistry myDaoRegistry;
|
|
||||||
@Autowired
|
|
||||||
private SubscriptionActivatingSubscriber mySubscriptionActivatingInterceptor;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public IBundleProvider search(SearchParameterMap theMap) {
|
|
||||||
IFhirResourceDao subscriptionDao = myDaoRegistry.getSubscriptionDao();
|
|
||||||
return subscriptionDao.search(theMap);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean loadSubscription(IBaseResource theResource) {
|
|
||||||
return mySubscriptionActivatingInterceptor.activateOrRegisterSubscriptionIfRequired(theResource);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,31 +0,0 @@
|
||||||
package ca.uhn.fhir.jpa.subscription.match.registry;
|
|
||||||
|
|
||||||
/*-
|
|
||||||
* #%L
|
|
||||||
* HAPI FHIR Subscription Server
|
|
||||||
* %%
|
|
||||||
* Copyright (C) 2014 - 2020 University Health Network
|
|
||||||
* %%
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
* #L%
|
|
||||||
*/
|
|
||||||
|
|
||||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
|
||||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
|
||||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
|
||||||
|
|
||||||
public interface ISubscriptionProvider {
|
|
||||||
IBundleProvider search(SearchParameterMap theMap);
|
|
||||||
|
|
||||||
boolean loadSubscription(IBaseResource theResource);
|
|
||||||
}
|
|
|
@ -21,11 +21,14 @@ package ca.uhn.fhir.jpa.subscription.match.registry;
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import ca.uhn.fhir.jpa.api.IDaoRegistry;
|
import ca.uhn.fhir.jpa.api.IDaoRegistry;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
|
||||||
import ca.uhn.fhir.jpa.model.sched.HapiJob;
|
import ca.uhn.fhir.jpa.model.sched.HapiJob;
|
||||||
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
|
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
|
||||||
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
|
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
|
||||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||||
import ca.uhn.fhir.jpa.searchparam.retry.Retrier;
|
import ca.uhn.fhir.jpa.searchparam.retry.Retrier;
|
||||||
|
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber;
|
||||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
||||||
import ca.uhn.fhir.rest.param.TokenOrListParam;
|
import ca.uhn.fhir.rest.param.TokenOrListParam;
|
||||||
import ca.uhn.fhir.rest.param.TokenParam;
|
import ca.uhn.fhir.rest.param.TokenParam;
|
||||||
|
@ -50,14 +53,14 @@ public class SubscriptionLoader {
|
||||||
private static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes
|
private static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes
|
||||||
private final Object mySyncSubscriptionsLock = new Object();
|
private final Object mySyncSubscriptionsLock = new Object();
|
||||||
@Autowired
|
@Autowired
|
||||||
private ISubscriptionProvider mySubscriptionProvider;
|
|
||||||
@Autowired
|
|
||||||
private SubscriptionRegistry mySubscriptionRegistry;
|
private SubscriptionRegistry mySubscriptionRegistry;
|
||||||
@Autowired(required = false)
|
@Autowired(required = false)
|
||||||
private IDaoRegistry myDaoRegistry;
|
private DaoRegistry myDaoRegistry;
|
||||||
private Semaphore mySyncSubscriptionsSemaphore = new Semaphore(1);
|
private Semaphore mySyncSubscriptionsSemaphore = new Semaphore(1);
|
||||||
@Autowired
|
@Autowired
|
||||||
private ISchedulerService mySchedulerService;
|
private ISchedulerService mySchedulerService;
|
||||||
|
@Autowired
|
||||||
|
private SubscriptionActivatingSubscriber mySubscriptionActivatingInterceptor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
|
@ -120,15 +123,12 @@ public class SubscriptionLoader {
|
||||||
ourLog.debug("Starting sync subscriptions");
|
ourLog.debug("Starting sync subscriptions");
|
||||||
SearchParameterMap map = new SearchParameterMap();
|
SearchParameterMap map = new SearchParameterMap();
|
||||||
map.add(Subscription.SP_STATUS, new TokenOrListParam()
|
map.add(Subscription.SP_STATUS, new TokenOrListParam()
|
||||||
// TODO KHS Ideally we should only be pulling ACTIVE subscriptions here, but this class is overloaded so that
|
|
||||||
// the @Scheduled task also activates requested subscriptions if their type was enabled after they were requested
|
|
||||||
// There should be a separate @Scheduled task that looks for requested subscriptions that need to be activated
|
|
||||||
// independent of the registry loading process.
|
|
||||||
.addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode()))
|
.addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode()))
|
||||||
.addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode())));
|
.addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode())));
|
||||||
map.setLoadSynchronousUpTo(SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS);
|
map.setLoadSynchronousUpTo(SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS);
|
||||||
|
|
||||||
IBundleProvider subscriptionBundleList = mySubscriptionProvider.search(map);
|
IFhirResourceDao subscriptionDao = myDaoRegistry.getSubscriptionDao();
|
||||||
|
IBundleProvider subscriptionBundleList = subscriptionDao.search(map);
|
||||||
|
|
||||||
Integer subscriptionCount = subscriptionBundleList.size();
|
Integer subscriptionCount = subscriptionBundleList.size();
|
||||||
assert subscriptionCount != null;
|
assert subscriptionCount != null;
|
||||||
|
@ -139,20 +139,28 @@ public class SubscriptionLoader {
|
||||||
List<IBaseResource> resourceList = subscriptionBundleList.getResources(0, subscriptionCount);
|
List<IBaseResource> resourceList = subscriptionBundleList.getResources(0, subscriptionCount);
|
||||||
|
|
||||||
Set<String> allIds = new HashSet<>();
|
Set<String> allIds = new HashSet<>();
|
||||||
int changesCount = 0;
|
int activatedCount = 0;
|
||||||
|
int registeredCount = 0;
|
||||||
|
|
||||||
for (IBaseResource resource : resourceList) {
|
for (IBaseResource resource : resourceList) {
|
||||||
String nextId = resource.getIdElement().getIdPart();
|
String nextId = resource.getIdElement().getIdPart();
|
||||||
allIds.add(nextId);
|
allIds.add(nextId);
|
||||||
boolean changed = mySubscriptionProvider.loadSubscription(resource);
|
|
||||||
if (changed) {
|
boolean activated = mySubscriptionActivatingInterceptor.activateOrRegisterSubscriptionIfRequired(resource);
|
||||||
changesCount++;
|
if (activated) {
|
||||||
|
activatedCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean registered = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(resource);
|
||||||
|
if (registered) {
|
||||||
|
registeredCount++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mySubscriptionRegistry.unregisterAllSubscriptionsNotInCollection(allIds);
|
mySubscriptionRegistry.unregisterAllSubscriptionsNotInCollection(allIds);
|
||||||
ourLog.debug("Finished sync subscriptions - found {}", resourceList.size());
|
ourLog.debug("Finished sync subscriptions - activated {} and registered {}", resourceList.size(), registeredCount);
|
||||||
|
|
||||||
return changesCount;
|
return activatedCount;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -80,8 +80,7 @@ public class SubscriptionRegistry {
|
||||||
return activeSubscription.map(ActiveSubscription::getSubscription);
|
return activeSubscription.map(ActiveSubscription::getSubscription);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("UnusedReturnValue")
|
private void registerSubscription(IIdType theId, IBaseResource theSubscription) {
|
||||||
private CanonicalSubscription registerSubscription(IIdType theId, IBaseResource theSubscription) {
|
|
||||||
Validate.notNull(theId);
|
Validate.notNull(theId);
|
||||||
String subscriptionId = theId.getIdPart();
|
String subscriptionId = theId.getIdPart();
|
||||||
Validate.notBlank(subscriptionId);
|
Validate.notBlank(subscriptionId);
|
||||||
|
@ -101,7 +100,6 @@ public class SubscriptionRegistry {
|
||||||
.add(CanonicalSubscription.class, canonicalized);
|
.add(CanonicalSubscription.class, canonicalized);
|
||||||
myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params);
|
myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params);
|
||||||
|
|
||||||
return canonicalized;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void unregisterSubscriptionIfRegistered(String theSubscriptionId) {
|
public void unregisterSubscriptionIfRegistered(String theSubscriptionId) {
|
||||||
|
@ -174,15 +172,6 @@ public class SubscriptionRegistry {
|
||||||
return theExistingSubscription.getChannelType().equals(theNewSubscription.getChannelType());
|
return theExistingSubscription.getChannelType().equals(theNewSubscription.getChannelType());
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) {
|
|
||||||
if (hasSubscription(theSubscription.getIdElement()).isPresent()) {
|
|
||||||
ourLog.info("Removing {} subscription {}", theStatusString, theSubscription.getIdElement().toUnqualified().getValue());
|
|
||||||
unregisterSubscriptionIfRegistered(theSubscription.getIdElement().getIdPart());
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int size() {
|
public int size() {
|
||||||
return myActiveSubscriptionCache.size();
|
return myActiveSubscriptionCache.size();
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,13 +9,11 @@ import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannelFactory;
|
||||||
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
|
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
|
||||||
import ca.uhn.fhir.jpa.subscription.match.config.SubscriptionProcessorConfig;
|
import ca.uhn.fhir.jpa.subscription.match.config.SubscriptionProcessorConfig;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSearchParamProvider;
|
import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSearchParamProvider;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSubscriptionProvider;
|
|
||||||
import ca.uhn.fhir.jpa.subscription.module.config.TestSubscriptionConfig;
|
import ca.uhn.fhir.jpa.subscription.module.config.TestSubscriptionConfig;
|
||||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.test.context.ContextConfiguration;
|
import org.springframework.test.context.ContextConfiguration;
|
||||||
|
@ -32,8 +30,6 @@ public abstract class BaseSubscriptionTest {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
protected IInterceptorService myInterceptorRegistry;
|
protected IInterceptorService myInterceptorRegistry;
|
||||||
@Autowired
|
|
||||||
MockFhirClientSubscriptionProvider myMockFhirClientSubscriptionProvider;
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
ISearchParamRegistry mySearchParamRegistry;
|
ISearchParamRegistry mySearchParamRegistry;
|
||||||
|
|
|
@ -1,55 +1,13 @@
|
||||||
package ca.uhn.fhir.jpa.subscription.module.cache;
|
package ca.uhn.fhir.jpa.subscription.module.cache;
|
||||||
|
|
||||||
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionLoader;
|
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionLoader;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSubscriptionProvider;
|
|
||||||
import ca.uhn.fhir.jpa.subscription.module.standalone.BaseBlockingQueueSubscribableChannelDstu3Test;
|
import ca.uhn.fhir.jpa.subscription.module.standalone.BaseBlockingQueueSubscribableChannelDstu3Test;
|
||||||
import org.hl7.fhir.dstu3.model.Subscription;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
|
|
||||||
public class SubscriptionLoaderTest extends BaseBlockingQueueSubscribableChannelDstu3Test {
|
public class SubscriptionLoaderTest extends BaseBlockingQueueSubscribableChannelDstu3Test {
|
||||||
private static final int MOCK_FHIR_CLIENT_FAILURES = 3;
|
|
||||||
@Autowired
|
|
||||||
private MockFhirClientSubscriptionProvider myMockFhirClientSubscriptionProvider;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setFailCount() {
|
|
||||||
myMockFhirClientSubscriptionProvider.setFailCount(MOCK_FHIR_CLIENT_FAILURES);
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void restoreFailCount() {
|
|
||||||
myMockFhirClientSubscriptionProvider.setFailCount(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
@Ignore
|
|
||||||
public void testSubscriptionLoaderFhirClientDown() throws Exception {
|
|
||||||
String payload = "application/fhir+json";
|
|
||||||
|
|
||||||
String criteria1 = "Observation?code=SNOMED-CT|" + myCode + "&_format=xml";
|
|
||||||
String criteria2 = "Observation?code=SNOMED-CT|" + myCode + "111&_format=xml";
|
|
||||||
|
|
||||||
List<Subscription> subs = new ArrayList<>();
|
|
||||||
subs.add(makeActiveSubscription(criteria1, payload, ourListenerServerBase));
|
|
||||||
subs.add(makeActiveSubscription(criteria2, payload, ourListenerServerBase));
|
|
||||||
|
|
||||||
mySubscriptionActivatedPost.setExpectedCount(2);
|
|
||||||
initSubscriptionLoader(subs, "uuid");
|
|
||||||
mySubscriptionActivatedPost.awaitExpected();
|
|
||||||
assertEquals(0, myMockFhirClientSubscriptionProvider.getFailCount());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMultipleThreadsDontBlock() throws InterruptedException {
|
public void testMultipleThreadsDontBlock() throws InterruptedException {
|
||||||
|
|
|
@ -1,26 +0,0 @@
|
||||||
package ca.uhn.fhir.jpa.subscription.module.config;
|
|
||||||
|
|
||||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
|
||||||
import ca.uhn.fhir.jpa.subscription.match.registry.ISubscriptionProvider;
|
|
||||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
|
||||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
|
||||||
|
|
||||||
public class MockFhirClientSubscriptionProvider implements ISubscriptionProvider {
|
|
||||||
private final MockProvider myMockProvider = new MockProvider();
|
|
||||||
|
|
||||||
public MockFhirClientSubscriptionProvider() {
|
|
||||||
super();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setBundleProvider(IBundleProvider theBundleProvider) { myMockProvider.setBundleProvider(theBundleProvider); }
|
|
||||||
public void setFailCount(int theFailCount) { myMockProvider.setFailCount(theFailCount); }
|
|
||||||
public int getFailCount() { return myMockProvider.getFailCount(); }
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public IBundleProvider search(SearchParameterMap theParams) { return myMockProvider.search(theParams); }
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean loadSubscription(IBaseResource theResource) {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -5,7 +5,6 @@ import ca.uhn.fhir.context.support.IValidationSupport;
|
||||||
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||||
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
|
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
|
||||||
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamProvider;
|
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamProvider;
|
||||||
import ca.uhn.fhir.jpa.subscription.match.registry.ISubscriptionProvider;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.context.annotation.Import;
|
import org.springframework.context.annotation.Import;
|
||||||
|
@ -33,12 +32,6 @@ public class TestSubscriptionDstu3Config {
|
||||||
return new MockFhirClientSearchParamProvider();
|
return new MockFhirClientSearchParamProvider();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
|
||||||
@Primary
|
|
||||||
public ISubscriptionProvider subscriptionProvider() {
|
|
||||||
return new MockFhirClientSubscriptionProvider();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public ISchedulerService schedulerService() {
|
public ISchedulerService schedulerService() {
|
||||||
return mock(ISchedulerService.class);
|
return mock(ISchedulerService.class);
|
||||||
|
|
|
@ -14,7 +14,6 @@ import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
|
||||||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
|
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
|
||||||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
|
import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSubscriptionProvider;
|
|
||||||
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriberTest;
|
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriberTest;
|
||||||
import ca.uhn.fhir.model.primitive.IdDt;
|
import ca.uhn.fhir.model.primitive.IdDt;
|
||||||
import ca.uhn.fhir.rest.annotation.Create;
|
import ca.uhn.fhir.rest.annotation.Create;
|
||||||
|
@ -24,7 +23,6 @@ import ca.uhn.fhir.rest.api.Constants;
|
||||||
import ca.uhn.fhir.rest.api.MethodOutcome;
|
import ca.uhn.fhir.rest.api.MethodOutcome;
|
||||||
import ca.uhn.fhir.rest.server.IResourceProvider;
|
import ca.uhn.fhir.rest.server.IResourceProvider;
|
||||||
import ca.uhn.fhir.rest.server.RestfulServer;
|
import ca.uhn.fhir.rest.server.RestfulServer;
|
||||||
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
|
|
||||||
import ca.uhn.fhir.test.utilities.JettyUtil;
|
import ca.uhn.fhir.test.utilities.JettyUtil;
|
||||||
import ca.uhn.test.concurrency.IPointcutLatch;
|
import ca.uhn.test.concurrency.IPointcutLatch;
|
||||||
import ca.uhn.test.concurrency.PointcutLatch;
|
import ca.uhn.test.concurrency.PointcutLatch;
|
||||||
|
@ -81,8 +79,6 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
|
||||||
@Autowired
|
@Autowired
|
||||||
protected SubscriptionRegistry mySubscriptionRegistry;
|
protected SubscriptionRegistry mySubscriptionRegistry;
|
||||||
@Autowired
|
@Autowired
|
||||||
private MockFhirClientSubscriptionProvider myMockFhirClientSubscriptionProvider;
|
|
||||||
@Autowired
|
|
||||||
private SubscriptionLoader mySubscriptionLoader;
|
private SubscriptionLoader mySubscriptionLoader;
|
||||||
@Autowired
|
@Autowired
|
||||||
private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer;
|
private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer;
|
||||||
|
@ -136,7 +132,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void initSubscriptionLoader(List<Subscription> subscriptions, String uuid) throws InterruptedException {
|
protected void initSubscriptionLoader(List<Subscription> subscriptions, String uuid) throws InterruptedException {
|
||||||
myMockFhirClientSubscriptionProvider.setBundleProvider(new SimpleBundleProvider(new ArrayList<>(subscriptions), uuid));
|
// myMockFhirClientSubscriptionProvider.setBundleProvider(new SimpleBundleProvider(new ArrayList<>(subscriptions), uuid));
|
||||||
mySubscriptionLoader.doSyncSubscriptionsForUnitTest();
|
mySubscriptionLoader.doSyncSubscriptionsForUnitTest();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue