Subscription module startup changes (#1154)
* Reorganizing packages and dependencies to support standalone subscription running within a CDR container where all hapi modules are on the classpath. * EXPERIMENTAL: Moved Subscription registry out of interceptor and introduced ISubscriptionLoader that will be either a Database or FhirClient loader. 5 tests fail. Looks like we're getting too many matches--likely because there is now just one list of subscriptions instead of one list per interceptor. * Created ActiveSubscription and moved cache bits into it * Compiles. Next step is get app context to load. * Application context loads. Now fix NullPointer. * All subscription tests pass * FIXME cleanup * jpa-subscription tests * fixed config so other module tests work * MAJOR MILESTONE: All hapi-fhir tests pass. updated READMEs in example projects * Moved ExecutorQueue stuff out into its own class * Organize Imports * FIXME cleanup * Null check -> Optional * Add test and supporting code to validate SubscriptionConstants.EXT_SUBSCRIPTION_RESTHOOK_DELIVER_LATEST_VERSION behaviour * Added SubscriptionCheckingSubscriber test that works without a database * Moved a few beans to @ComponentScan * Replaced use of beanFactory with concrete factory classes * Switched test to use subscribablechannel * Added SubscriptionLoaderFhirClientTest * Undid changes that caused SearchParamProviderFhirClientTest to revert to the Database version. It's now calling the FhirClient version again. (oops) * Confirm that our SubscriptionProviderFhirClient works with a live fhir client * Organize imports * Organize imports * Register interceptors with DaoConfig instead of RestServer. Also, Rename @VisibleForTesting methods with ForUnitTest * Ready to go * organize imports * add processing queue * Fixed interface implementation names * Fix triggering service so it uses new subscriptionmatcherinterceptor * fixed example * Renamed "Database" classes to "Dao" * Tightened up StoppableSubscriber API * final code review * processing -> matching naming change * fix required by CDR * oops * Updated changes.xml * Renamed subscriptioncheckingsubscriber to subscriptionmatchingsubscriber * Renamed subscriptioncheckingsubscriber to subscriptionmatchingsubscriber * CDR integration now works * Changing subscribable channel abstraction layer in preparation for supporting more types of subscribable channels * Add interface over both types of messages put on queues * cosmetic change * added subscription support for r4 * fixed again * oops * self code-review
This commit is contained in:
parent
298cf96084
commit
fedc59a8d6
|
@ -10,8 +10,8 @@ import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
|
||||||
import ca.uhn.fhir.jpa.search.reindex.ResourceReindexingSvcImpl;
|
import ca.uhn.fhir.jpa.search.reindex.ResourceReindexingSvcImpl;
|
||||||
import ca.uhn.fhir.jpa.subscription.dbmatcher.CompositeInMemoryDaoSubscriptionMatcher;
|
import ca.uhn.fhir.jpa.subscription.dbmatcher.CompositeInMemoryDaoSubscriptionMatcher;
|
||||||
import ca.uhn.fhir.jpa.subscription.dbmatcher.DaoSubscriptionMatcher;
|
import ca.uhn.fhir.jpa.subscription.dbmatcher.DaoSubscriptionMatcher;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.cache.ISubscriptionChannelFactory;
|
import ca.uhn.fhir.jpa.subscription.module.cache.ISubscribableChannelFactory;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.cache.BlockingQueueSubscriptionChannelFactory;
|
import ca.uhn.fhir.jpa.subscription.module.cache.LinkedBlockingQueueSubscribableChannelFactory;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher;
|
import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.matcher.InMemorySubscriptionMatcher;
|
import ca.uhn.fhir.jpa.subscription.module.matcher.InMemorySubscriptionMatcher;
|
||||||
import org.hibernate.jpa.HibernatePersistenceProvider;
|
import org.hibernate.jpa.HibernatePersistenceProvider;
|
||||||
|
@ -146,8 +146,8 @@ public abstract class BaseConfig implements SchedulingConfigurer {
|
||||||
* Create a @Primary @Bean if you need a different implementation
|
* Create a @Primary @Bean if you need a different implementation
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
public ISubscriptionChannelFactory blockingQueueSubscriptionDeliveryChannelFactory() {
|
public ISubscribableChannelFactory linkedBlockingQueueSubscribableChannelFactory() {
|
||||||
return new BlockingQueueSubscriptionChannelFactory();
|
return new LinkedBlockingQueueSubscribableChannelFactory();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.context.annotation.Lazy;
|
||||||
import org.springframework.core.task.AsyncTaskExecutor;
|
import org.springframework.core.task.AsyncTaskExecutor;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.PlatformTransactionManager;
|
import org.springframework.transaction.PlatformTransactionManager;
|
||||||
|
@ -66,6 +67,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
* Also validates criteria. If invalid, rejects the subscription without persisting the subscription.
|
* Also validates criteria. If invalid, rejects the subscription without persisting the subscription.
|
||||||
*/
|
*/
|
||||||
@Service
|
@Service
|
||||||
|
@Lazy
|
||||||
public class SubscriptionActivatingInterceptor extends ServerOperationInterceptorAdapter {
|
public class SubscriptionActivatingInterceptor extends ServerOperationInterceptorAdapter {
|
||||||
private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingInterceptor.class);
|
private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingInterceptor.class);
|
||||||
|
|
||||||
|
|
|
@ -21,31 +21,59 @@ package ca.uhn.fhir.jpa.subscription;
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import ca.uhn.fhir.jpa.dao.DaoConfig;
|
import ca.uhn.fhir.jpa.dao.DaoConfig;
|
||||||
|
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader;
|
||||||
|
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.hl7.fhir.instance.model.Subscription;
|
import org.hl7.fhir.instance.model.Subscription;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.context.ApplicationContext;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
public class SubscriptionInterceptorLoader {
|
public class SubscriptionInterceptorLoader {
|
||||||
|
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionInterceptorLoader.class);
|
||||||
|
|
||||||
|
private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
|
||||||
|
private SubscriptionActivatingInterceptor mySubscriptionActivatingInterceptor;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
DaoConfig myDaoConfig;
|
DaoConfig myDaoConfig;
|
||||||
@Autowired
|
@Autowired
|
||||||
SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
|
private ApplicationContext myAppicationContext;
|
||||||
@Autowired
|
@Autowired
|
||||||
SubscriptionActivatingInterceptor mySubscriptionActivatingInterceptor;
|
private SubscriptionRegistry mySubscriptionRegistry;
|
||||||
|
|
||||||
public void registerInterceptors() {
|
public void registerInterceptors() {
|
||||||
Set<Subscription.SubscriptionChannelType> supportedSubscriptionTypes = myDaoConfig.getSupportedSubscriptionTypes();
|
Set<Subscription.SubscriptionChannelType> supportedSubscriptionTypes = myDaoConfig.getSupportedSubscriptionTypes();
|
||||||
|
|
||||||
if (!supportedSubscriptionTypes.isEmpty()) {
|
if (!supportedSubscriptionTypes.isEmpty()) {
|
||||||
|
loadSubscriptions();
|
||||||
|
|
||||||
|
ourLog.info("Registering subscription interceptors");
|
||||||
myDaoConfig.registerInterceptor(mySubscriptionActivatingInterceptor);
|
myDaoConfig.registerInterceptor(mySubscriptionActivatingInterceptor);
|
||||||
myDaoConfig.registerInterceptor(mySubscriptionMatcherInterceptor);
|
myDaoConfig.registerInterceptor(mySubscriptionMatcherInterceptor);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void loadSubscriptions() {
|
||||||
|
ourLog.info("Loading subscriptions into the SubscriptionRegistry...");
|
||||||
|
// Load subscriptions into the SubscriptionRegistry
|
||||||
|
myAppicationContext.getBean(SubscriptionLoader.class);
|
||||||
|
ourLog.info("...{} subscriptions loaded", mySubscriptionRegistry.size());
|
||||||
|
|
||||||
|
// Once subscriptions have been loaded, now
|
||||||
|
if (mySubscriptionActivatingInterceptor == null) {
|
||||||
|
mySubscriptionActivatingInterceptor = myAppicationContext.getBean(SubscriptionActivatingInterceptor.class);
|
||||||
|
}
|
||||||
|
if (mySubscriptionMatcherInterceptor == null) {
|
||||||
|
mySubscriptionMatcherInterceptor = myAppicationContext.getBean(SubscriptionMatcherInterceptor.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void unregisterInterceptorsForUnitTest() {
|
public void unregisterInterceptorsForUnitTest() {
|
||||||
myDaoConfig.unregisterInterceptor(mySubscriptionActivatingInterceptor);
|
myDaoConfig.unregisterInterceptor(mySubscriptionActivatingInterceptor);
|
||||||
|
|
|
@ -1,11 +1,11 @@
|
||||||
package ca.uhn.fhir.jpa.subscription;
|
package ca.uhn.fhir.jpa.subscription;
|
||||||
|
|
||||||
import ca.uhn.fhir.context.FhirContext;
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
|
import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
|
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.SubscriptionChannel;
|
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.cache.ISubscriptionChannelFactory;
|
|
||||||
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage;
|
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionCheckingSubscriber;
|
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber;
|
||||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
import ca.uhn.fhir.rest.server.interceptor.ServerOperationInterceptorAdapter;
|
import ca.uhn.fhir.rest.server.interceptor.ServerOperationInterceptorAdapter;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -13,6 +13,7 @@ import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.context.annotation.Lazy;
|
||||||
import org.springframework.messaging.SubscribableChannel;
|
import org.springframework.messaging.SubscribableChannel;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@ -40,6 +41,7 @@ import javax.annotation.PreDestroy;
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
|
@Lazy
|
||||||
public class SubscriptionMatcherInterceptor extends ServerOperationInterceptorAdapter {
|
public class SubscriptionMatcherInterceptor extends ServerOperationInterceptorAdapter {
|
||||||
private Logger ourLog = LoggerFactory.getLogger(SubscriptionMatcherInterceptor.class);
|
private Logger ourLog = LoggerFactory.getLogger(SubscriptionMatcherInterceptor.class);
|
||||||
|
|
||||||
|
@ -51,9 +53,9 @@ public class SubscriptionMatcherInterceptor extends ServerOperationInterceptorAd
|
||||||
@Autowired
|
@Autowired
|
||||||
private FhirContext myFhirContext;
|
private FhirContext myFhirContext;
|
||||||
@Autowired
|
@Autowired
|
||||||
private SubscriptionCheckingSubscriber mySubscriptionCheckingSubscriber;
|
private SubscriptionMatchingSubscriber mySubscriptionMatchingSubscriber;
|
||||||
@Autowired
|
@Autowired
|
||||||
private ISubscriptionChannelFactory mySubscriptionChannelFactory;
|
private SubscriptionChannelFactory mySubscriptionChannelFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
|
@ -67,13 +69,13 @@ public class SubscriptionMatcherInterceptor extends ServerOperationInterceptorAd
|
||||||
if (myProcessingChannel == null) {
|
if (myProcessingChannel == null) {
|
||||||
myProcessingChannel = mySubscriptionChannelFactory.newMatchingChannel("subscription-matching");
|
myProcessingChannel = mySubscriptionChannelFactory.newMatchingChannel("subscription-matching");
|
||||||
}
|
}
|
||||||
myProcessingChannel.subscribe(mySubscriptionCheckingSubscriber);
|
myProcessingChannel.subscribe(mySubscriptionMatchingSubscriber);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
@PreDestroy
|
@PreDestroy
|
||||||
public void preDestroy() {
|
public void preDestroy() {
|
||||||
myProcessingChannel.unsubscribe(mySubscriptionCheckingSubscriber);
|
myProcessingChannel.unsubscribe(mySubscriptionMatchingSubscriber);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -121,7 +123,7 @@ public class SubscriptionMatcherInterceptor extends ServerOperationInterceptorAd
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public SubscriptionChannel getProcessingChannelForUnitTest() {
|
public LinkedBlockingQueueSubscribableChannel getProcessingChannelForUnitTest() {
|
||||||
return (SubscriptionChannel) myProcessingChannel;
|
return (LinkedBlockingQueueSubscribableChannel) myProcessingChannel;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,6 @@ import ca.uhn.fhir.jpa.search.warm.CacheWarmingSvcImpl;
|
||||||
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
|
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
|
||||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
|
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.cache.ISubscriptionChannelFactory;
|
|
||||||
import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
|
import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
|
||||||
import ca.uhn.fhir.rest.annotation.IdParam;
|
import ca.uhn.fhir.rest.annotation.IdParam;
|
||||||
import ca.uhn.fhir.rest.api.CacheControlDirective;
|
import ca.uhn.fhir.rest.api.CacheControlDirective;
|
||||||
|
@ -41,7 +40,6 @@ import ca.uhn.fhir.rest.param.UriParam;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
|
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
|
||||||
import ca.uhn.fhir.rest.server.interceptor.IServerOperationInterceptor;
|
|
||||||
import ca.uhn.fhir.util.ParametersUtil;
|
import ca.uhn.fhir.util.ParametersUtil;
|
||||||
import ca.uhn.fhir.util.StopWatch;
|
import ca.uhn.fhir.util.StopWatch;
|
||||||
import ca.uhn.fhir.util.ValidateUtil;
|
import ca.uhn.fhir.util.ValidateUtil;
|
||||||
|
|
|
@ -3,7 +3,7 @@ package ca.uhn.fhir.jpa.subscription;
|
||||||
import ca.uhn.fhir.context.FhirContext;
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
import ca.uhn.fhir.jpa.dao.DaoConfig;
|
import ca.uhn.fhir.jpa.dao.DaoConfig;
|
||||||
import ca.uhn.fhir.jpa.provider.r4.BaseResourceProviderR4Test;
|
import ca.uhn.fhir.jpa.provider.r4.BaseResourceProviderR4Test;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.SubscriptionChannel;
|
import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel;
|
||||||
import ca.uhn.fhir.rest.annotation.Create;
|
import ca.uhn.fhir.rest.annotation.Create;
|
||||||
import ca.uhn.fhir.rest.annotation.ResourceParam;
|
import ca.uhn.fhir.rest.annotation.ResourceParam;
|
||||||
import ca.uhn.fhir.rest.annotation.Update;
|
import ca.uhn.fhir.rest.annotation.Update;
|
||||||
|
@ -24,7 +24,6 @@ import org.hl7.fhir.instance.model.api.IIdType;
|
||||||
import org.hl7.fhir.r4.model.*;
|
import org.hl7.fhir.r4.model.*;
|
||||||
import org.junit.*;
|
import org.junit.*;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.messaging.SubscribableChannel;
|
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
@ -100,7 +99,7 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
|
||||||
}
|
}
|
||||||
waitForActivatedSubscriptionCount(0);
|
waitForActivatedSubscriptionCount(0);
|
||||||
|
|
||||||
SubscriptionChannel processingChannel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest();
|
LinkedBlockingQueueSubscribableChannel processingChannel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest();
|
||||||
processingChannel.clearInterceptorsForUnitTest();
|
processingChannel.clearInterceptorsForUnitTest();
|
||||||
myCountingInterceptor = new CountingInterceptor();
|
myCountingInterceptor = new CountingInterceptor();
|
||||||
processingChannel.addInterceptorForUnitTest(myCountingInterceptor);
|
processingChannel.addInterceptorForUnitTest(myCountingInterceptor);
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
package ca.uhn.fhir.jpa.subscription;
|
package ca.uhn.fhir.jpa.subscription;
|
||||||
|
|
||||||
import ca.uhn.fhir.jpa.dao.DaoConfig;
|
import ca.uhn.fhir.jpa.dao.DaoConfig;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscriptionChannel;
|
import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
|
import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
|
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.subscriber.email.IEmailSender;
|
import ca.uhn.fhir.jpa.subscription.module.subscriber.email.IEmailSender;
|
||||||
|
@ -26,7 +26,7 @@ public class SubscriptionTestUtil {
|
||||||
private SubscriptionRegistry mySubscriptionRegistry;
|
private SubscriptionRegistry mySubscriptionRegistry;
|
||||||
|
|
||||||
public int getExecutorQueueSize() {
|
public int getExecutorQueueSize() {
|
||||||
LinkedBlockingQueueSubscriptionChannel channel = (LinkedBlockingQueueSubscriptionChannel) mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest();
|
LinkedBlockingQueueSubscribableChannel channel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest();
|
||||||
return channel.getQueueSizeForUnitTest();
|
return channel.getQueueSizeForUnitTest();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,13 +35,13 @@ import org.springframework.messaging.support.ExecutorSubscribableChannel;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
|
|
||||||
public class SubscriptionChannel implements SubscribableChannel {
|
public class LinkedBlockingQueueSubscribableChannel implements SubscribableChannel {
|
||||||
private Logger ourLog = LoggerFactory.getLogger(SubscriptionChannel.class);
|
private Logger ourLog = LoggerFactory.getLogger(LinkedBlockingQueueSubscribableChannel.class);
|
||||||
|
|
||||||
private final ExecutorSubscribableChannel mySubscribableChannel;
|
private final ExecutorSubscribableChannel mySubscribableChannel;
|
||||||
private final BlockingQueue<Runnable> myQueue;
|
private final BlockingQueue<Runnable> myQueue;
|
||||||
|
|
||||||
public SubscriptionChannel(BlockingQueue<Runnable> theQueue, String theThreadNamingPattern) {
|
public LinkedBlockingQueueSubscribableChannel(BlockingQueue<Runnable> theQueue, String theThreadNamingPattern) {
|
||||||
|
|
||||||
ThreadFactory threadFactory = new BasicThreadFactory.Builder()
|
ThreadFactory threadFactory = new BasicThreadFactory.Builder()
|
||||||
.namingPattern(theThreadNamingPattern)
|
.namingPattern(theThreadNamingPattern)
|
|
@ -1,32 +0,0 @@
|
||||||
package ca.uhn.fhir.jpa.subscription.module;
|
|
||||||
|
|
||||||
/*-
|
|
||||||
* #%L
|
|
||||||
* HAPI FHIR Subscription Server
|
|
||||||
* %%
|
|
||||||
* Copyright (C) 2014 - 2018 University Health Network
|
|
||||||
* %%
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
* #L%
|
|
||||||
*/
|
|
||||||
|
|
||||||
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionConstants;
|
|
||||||
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
|
|
||||||
public class LinkedBlockingQueueSubscriptionChannel extends SubscriptionChannel {
|
|
||||||
|
|
||||||
public LinkedBlockingQueueSubscriptionChannel(String theThreadNamingPattern) {
|
|
||||||
super(new LinkedBlockingQueue<>(SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE), theThreadNamingPattern);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -21,6 +21,7 @@ package ca.uhn.fhir.jpa.subscription.module;
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import ca.uhn.fhir.context.FhirContext;
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
|
import ca.uhn.fhir.jpa.subscription.module.subscriber.IResourceMessage;
|
||||||
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
|
@ -32,7 +33,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||||
|
|
||||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||||
@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
|
@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
|
||||||
public class ResourceModifiedMessage {
|
public class ResourceModifiedMessage implements IResourceMessage {
|
||||||
|
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
|
|
@ -1,42 +0,0 @@
|
||||||
package ca.uhn.fhir.jpa.subscription.module.cache;
|
|
||||||
|
|
||||||
/*-
|
|
||||||
* #%L
|
|
||||||
* HAPI FHIR Subscription Server
|
|
||||||
* %%
|
|
||||||
* Copyright (C) 2014 - 2018 University Health Network
|
|
||||||
* %%
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
* #L%
|
|
||||||
*/
|
|
||||||
|
|
||||||
import ca.uhn.fhir.jpa.subscription.module.SubscriptionChannel;
|
|
||||||
import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscriptionChannel;
|
|
||||||
|
|
||||||
public class BlockingQueueSubscriptionChannelFactory implements ISubscriptionChannelFactory {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public SubscriptionChannel newDeliveryChannel(String theSubscriptionId, String theChannelType) {
|
|
||||||
String threadName = "subscription-delivery-" +
|
|
||||||
theChannelType +
|
|
||||||
"-" +
|
|
||||||
theSubscriptionId +
|
|
||||||
"-%d";
|
|
||||||
return new LinkedBlockingQueueSubscriptionChannel(threadName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public SubscriptionChannel newMatchingChannel(String theChannelName) {
|
|
||||||
return new LinkedBlockingQueueSubscriptionChannel(theChannelName + "-%d");
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,7 @@
|
||||||
|
package ca.uhn.fhir.jpa.subscription.module.cache;
|
||||||
|
|
||||||
|
import org.springframework.messaging.SubscribableChannel;
|
||||||
|
|
||||||
|
public interface ISubscribableChannelFactory {
|
||||||
|
SubscribableChannel createSubscribableChannel(String theChannelName);
|
||||||
|
}
|
|
@ -1,29 +0,0 @@
|
||||||
package ca.uhn.fhir.jpa.subscription.module.cache;
|
|
||||||
|
|
||||||
/*-
|
|
||||||
* #%L
|
|
||||||
* HAPI FHIR Subscription Server
|
|
||||||
* %%
|
|
||||||
* Copyright (C) 2014 - 2018 University Health Network
|
|
||||||
* %%
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
* #L%
|
|
||||||
*/
|
|
||||||
|
|
||||||
import org.springframework.messaging.SubscribableChannel;
|
|
||||||
|
|
||||||
public interface ISubscriptionChannelFactory {
|
|
||||||
SubscribableChannel newDeliveryChannel(String theSubscriptionId, String theChannelType);
|
|
||||||
|
|
||||||
SubscribableChannel newMatchingChannel(String theChannelName);
|
|
||||||
}
|
|
|
@ -0,0 +1,13 @@
|
||||||
|
package ca.uhn.fhir.jpa.subscription.module.cache;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel;
|
||||||
|
import org.springframework.messaging.SubscribableChannel;
|
||||||
|
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
|
public class LinkedBlockingQueueSubscribableChannelFactory implements ISubscribableChannelFactory {
|
||||||
|
@Override
|
||||||
|
public SubscribableChannel createSubscribableChannel(String theChannelName) {
|
||||||
|
return new LinkedBlockingQueueSubscribableChannel(new LinkedBlockingQueue<>(SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE), theChannelName + "-%d");
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,28 @@
|
||||||
|
package ca.uhn.fhir.jpa.subscription.module.cache;
|
||||||
|
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.messaging.SubscribableChannel;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class SubscriptionChannelFactory {
|
||||||
|
|
||||||
|
private ISubscribableChannelFactory mySubscribableChannelFactory;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
public SubscriptionChannelFactory(ISubscribableChannelFactory theSubscribableChannelFactory) {
|
||||||
|
mySubscribableChannelFactory = theSubscribableChannelFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SubscribableChannel newDeliveryChannel(String theSubscriptionId, String theChannelType) {
|
||||||
|
String channelName = "subscription-delivery-" +
|
||||||
|
theChannelType +
|
||||||
|
"-" +
|
||||||
|
theSubscriptionId;
|
||||||
|
return mySubscribableChannelFactory.createSubscribableChannel(channelName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public SubscribableChannel newMatchingChannel(String theChannelName) {
|
||||||
|
return mySubscribableChannelFactory.createSubscribableChannel(theChannelName);
|
||||||
|
}
|
||||||
|
}
|
|
@ -50,7 +50,7 @@ public class SubscriptionRegistry {
|
||||||
@Autowired
|
@Autowired
|
||||||
SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory;
|
SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory;
|
||||||
@Autowired
|
@Autowired
|
||||||
ISubscriptionChannelFactory mySubscriptionDeliveryChannelFactory;
|
SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory;
|
||||||
|
|
||||||
private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache();
|
private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache();
|
||||||
|
|
||||||
|
|
|
@ -21,8 +21,8 @@ package ca.uhn.fhir.jpa.subscription.module.config;
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import ca.uhn.fhir.context.FhirContext;
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.cache.ISubscriptionChannelFactory;
|
import ca.uhn.fhir.jpa.subscription.module.cache.ISubscribableChannelFactory;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.cache.BlockingQueueSubscriptionChannelFactory;
|
import ca.uhn.fhir.jpa.subscription.module.cache.LinkedBlockingQueueSubscribableChannelFactory;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.ComponentScan;
|
import org.springframework.context.annotation.ComponentScan;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
@ -33,7 +33,7 @@ public abstract class BaseSubscriptionConfig {
|
||||||
public abstract FhirContext fhirContext();
|
public abstract FhirContext fhirContext();
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public ISubscriptionChannelFactory blockingQueueSubscriptionDeliveryChannelFactory() {
|
public ISubscribableChannelFactory blockingQueueSubscriptionDeliveryChannelFactory() {
|
||||||
return new BlockingQueueSubscriptionChannelFactory();
|
return new LinkedBlockingQueueSubscribableChannelFactory();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,67 @@
|
||||||
|
package ca.uhn.fhir.jpa.subscription.module.config;
|
||||||
|
|
||||||
|
/*-
|
||||||
|
* #%L
|
||||||
|
* HAPI FHIR Subscription Server
|
||||||
|
* %%
|
||||||
|
* Copyright (C) 2014 - 2018 University Health Network
|
||||||
|
* %%
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
* #L%
|
||||||
|
*/
|
||||||
|
|
||||||
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
|
import ca.uhn.fhir.context.ParserOptions;
|
||||||
|
import ca.uhn.fhir.jpa.searchparam.extractor.SearchParamExtractorR4;
|
||||||
|
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
|
||||||
|
import ca.uhn.fhir.jpa.searchparam.registry.SearchParamRegistryR4;
|
||||||
|
import org.hl7.fhir.r4.hapi.ctx.DefaultProfileValidationSupport;
|
||||||
|
import org.hl7.fhir.r4.hapi.ctx.IValidationSupport;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowire;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Primary;
|
||||||
|
|
||||||
|
public class SubscriptionR4Config extends BaseSubscriptionConfig {
|
||||||
|
@Override
|
||||||
|
public FhirContext fhirContext() {
|
||||||
|
return fhirContextR4();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@Primary
|
||||||
|
public FhirContext fhirContextR4() {
|
||||||
|
FhirContext retVal = FhirContext.forR4();
|
||||||
|
|
||||||
|
// Don't strip versions in some places
|
||||||
|
ParserOptions parserOptions = retVal.getParserOptions();
|
||||||
|
parserOptions.setDontStripVersionsFromReferencesAtPaths("AuditEvent.entity.reference");
|
||||||
|
|
||||||
|
return retVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ISearchParamRegistry searchParamRegistry() {
|
||||||
|
return new SearchParamRegistryR4();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean(autowire = Autowire.BY_TYPE)
|
||||||
|
public SearchParamExtractorR4 searchParamExtractor() {
|
||||||
|
return new SearchParamExtractorR4();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Primary
|
||||||
|
@Bean(autowire = Autowire.BY_NAME, name = "myJpaValidationSupportChainR4")
|
||||||
|
public IValidationSupport validationSupportChainR4() {
|
||||||
|
return new DefaultProfileValidationSupport();
|
||||||
|
}
|
||||||
|
}
|
|
@ -25,7 +25,7 @@ import ca.uhn.fhir.context.RuntimeResourceDefinition;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
|
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
|
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage;
|
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionCheckingSubscriber;
|
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber;
|
||||||
import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
|
import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
|
||||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -43,7 +43,7 @@ public class StandaloneSubscriptionMessageHandler implements MessageHandler {
|
||||||
@Autowired
|
@Autowired
|
||||||
FhirContext myFhirContext;
|
FhirContext myFhirContext;
|
||||||
@Autowired
|
@Autowired
|
||||||
SubscriptionCheckingSubscriber mySubscriptionCheckingSubscriber;
|
SubscriptionMatchingSubscriber mySubscriptionMatchingSubscriber;
|
||||||
@Autowired
|
@Autowired
|
||||||
SubscriptionRegistry mySubscriptionRegistry;
|
SubscriptionRegistry mySubscriptionRegistry;
|
||||||
|
|
||||||
|
@ -53,13 +53,16 @@ public class StandaloneSubscriptionMessageHandler implements MessageHandler {
|
||||||
ourLog.warn("Unexpected message payload type: {}", theMessage);
|
ourLog.warn("Unexpected message payload type: {}", theMessage);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ResourceModifiedMessage resourceModifiedMessage = ((ResourceModifiedJsonMessage) theMessage).getPayload();
|
updateSubscriptionRegistryAndPerformMatching(((ResourceModifiedJsonMessage) theMessage).getPayload());
|
||||||
IBaseResource resource = resourceModifiedMessage.getNewPayload(myFhirContext);
|
}
|
||||||
|
|
||||||
|
public void updateSubscriptionRegistryAndPerformMatching(ResourceModifiedMessage theResourceModifiedMessage) {
|
||||||
|
IBaseResource resource = theResourceModifiedMessage.getNewPayload(myFhirContext);
|
||||||
RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(resource);
|
RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(resource);
|
||||||
|
|
||||||
if (resourceDef.getName().equals(ResourceTypeEnum.SUBSCRIPTION.getCode())) {
|
if (resourceDef.getName().equals(ResourceTypeEnum.SUBSCRIPTION.getCode())) {
|
||||||
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(resource);
|
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(resource);
|
||||||
}
|
}
|
||||||
mySubscriptionCheckingSubscriber.handleMessage(theMessage);
|
mySubscriptionMatchingSubscriber.matchActiveSubscriptionsAndDeliver(theResourceModifiedMessage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
package ca.uhn.fhir.jpa.subscription.module.subscriber;
|
||||||
|
|
||||||
|
public interface IResourceMessage {
|
||||||
|
String getPayloadId();
|
||||||
|
}
|
|
@ -35,7 +35,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||||
|
|
||||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||||
@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
|
@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
|
||||||
public class ResourceDeliveryMessage {
|
public class ResourceDeliveryMessage implements IResourceMessage {
|
||||||
|
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
@ -101,6 +101,7 @@ public class ResourceDeliveryMessage {
|
||||||
public void setPayload(FhirContext theCtx, IBaseResource thePayload) {
|
public void setPayload(FhirContext theCtx, IBaseResource thePayload) {
|
||||||
myPayload = thePayload;
|
myPayload = thePayload;
|
||||||
myPayloadString = theCtx.newJsonParser().encodeResourceToString(thePayload);
|
myPayloadString = theCtx.newJsonParser().encodeResourceToString(thePayload);
|
||||||
|
myPayloadId = thePayload.getIdElement().toUnqualified().getValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setPayloadId(IIdType thePayloadId) {
|
public void setPayloadId(IIdType thePayloadId) {
|
||||||
|
@ -109,5 +110,8 @@ public class ResourceDeliveryMessage {
|
||||||
myPayloadId = thePayloadId.getValue();
|
myPayloadId = thePayloadId.getValue();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@Override
|
||||||
|
public String getPayloadId() {
|
||||||
|
return myPayloadId;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,8 +41,8 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
public class SubscriptionCheckingSubscriber implements MessageHandler {
|
public class SubscriptionMatchingSubscriber implements MessageHandler {
|
||||||
private Logger ourLog = LoggerFactory.getLogger(SubscriptionCheckingSubscriber.class);
|
private Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchingSubscriber.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private ISubscriptionMatcher mySubscriptionMatcher;
|
private ISubscriptionMatcher mySubscriptionMatcher;
|
||||||
|
@ -61,19 +61,23 @@ public class SubscriptionCheckingSubscriber implements MessageHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload();
|
ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload();
|
||||||
switch (msg.getOperationType()) {
|
matchActiveSubscriptionsAndDeliver(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void matchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg) {
|
||||||
|
switch (theMsg.getOperationType()) {
|
||||||
case CREATE:
|
case CREATE:
|
||||||
case UPDATE:
|
case UPDATE:
|
||||||
case MANUALLY_TRIGGERED:
|
case MANUALLY_TRIGGERED:
|
||||||
break;
|
break;
|
||||||
case DELETE:
|
case DELETE:
|
||||||
default:
|
default:
|
||||||
ourLog.trace("Not processing modified message for {}", msg.getOperationType());
|
ourLog.trace("Not processing modified message for {}", theMsg.getOperationType());
|
||||||
// ignore anything else
|
// ignore anything else
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
IIdType id = msg.getId(myFhirContext);
|
IIdType id = theMsg.getId(myFhirContext);
|
||||||
String resourceType = id.getResourceType();
|
String resourceType = id.getResourceType();
|
||||||
|
|
||||||
Collection<ActiveSubscription> subscriptions = mySubscriptionRegistry.getAll();
|
Collection<ActiveSubscription> subscriptions = mySubscriptionRegistry.getAll();
|
||||||
|
@ -85,9 +89,9 @@ public class SubscriptionCheckingSubscriber implements MessageHandler {
|
||||||
String nextSubscriptionId = nextActiveSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue();
|
String nextSubscriptionId = nextActiveSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue();
|
||||||
String nextCriteriaString = nextActiveSubscription.getCriteriaString();
|
String nextCriteriaString = nextActiveSubscription.getCriteriaString();
|
||||||
|
|
||||||
if (isNotBlank(msg.getSubscriptionId())) {
|
if (isNotBlank(theMsg.getSubscriptionId())) {
|
||||||
if (!msg.getSubscriptionId().equals(nextSubscriptionId)) {
|
if (!theMsg.getSubscriptionId().equals(nextSubscriptionId)) {
|
||||||
ourLog.debug("Ignoring subscription {} because it is not {}", nextSubscriptionId, msg.getSubscriptionId());
|
ourLog.debug("Ignoring subscription {} because it is not {}", nextSubscriptionId, theMsg.getSubscriptionId());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -109,17 +113,17 @@ public class SubscriptionCheckingSubscriber implements MessageHandler {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!mySubscriptionMatcher.match(nextCriteriaString, msg).matched()) {
|
if (!mySubscriptionMatcher.match(nextCriteriaString, theMsg).matched()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ourLog.debug("Found match: queueing rest-hook notification for resource: {}", id.toUnqualifiedVersionless().getValue());
|
ourLog.debug("Found match: queueing rest-hook notification for resource: {}", id.toUnqualifiedVersionless().getValue());
|
||||||
|
|
||||||
ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
|
ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
|
||||||
deliveryMsg.setPayload(myFhirContext, msg.getNewPayload(myFhirContext));
|
deliveryMsg.setPayload(myFhirContext, theMsg.getNewPayload(myFhirContext));
|
||||||
deliveryMsg.setSubscription(nextActiveSubscription.getSubscription());
|
deliveryMsg.setSubscription(nextActiveSubscription.getSubscription());
|
||||||
deliveryMsg.setOperationType(msg.getOperationType());
|
deliveryMsg.setOperationType(theMsg.getOperationType());
|
||||||
deliveryMsg.setPayloadId(msg.getId(myFhirContext));
|
deliveryMsg.setPayloadId(theMsg.getId(myFhirContext));
|
||||||
|
|
||||||
ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(deliveryMsg);
|
ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(deliveryMsg);
|
||||||
MessageChannel deliveryChannel = nextActiveSubscription.getSubscribableChannel();
|
MessageChannel deliveryChannel = nextActiveSubscription.getSubscribableChannel();
|
|
@ -2,6 +2,7 @@ package ca.uhn.fhir.jpa.subscription.module.config;
|
||||||
|
|
||||||
import ca.uhn.fhir.context.FhirContext;
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
|
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
|
||||||
|
import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.matcher.InMemorySubscriptionMatcher;
|
import ca.uhn.fhir.jpa.subscription.module.matcher.InMemorySubscriptionMatcher;
|
||||||
import ca.uhn.fhir.rest.client.api.IGenericClient;
|
import ca.uhn.fhir.rest.client.api.IGenericClient;
|
||||||
import ca.uhn.fhir.util.PortUtil;
|
import ca.uhn.fhir.util.PortUtil;
|
||||||
|
@ -31,7 +32,7 @@ public class TestSubscriptionConfig {
|
||||||
};
|
};
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public InMemorySubscriptionMatcher inMemorySubscriptionMatcher() {
|
public ISubscriptionMatcher inMemorySubscriptionMatcher() {
|
||||||
return new InMemorySubscriptionMatcher();
|
return new InMemorySubscriptionMatcher();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,9 +3,9 @@ package ca.uhn.fhir.jpa.subscription.module.standalone;
|
||||||
import ca.uhn.fhir.context.FhirContext;
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
|
import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
|
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.cache.ISubscriptionChannelFactory;
|
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage;
|
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionCheckingSubscriberTest;
|
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriberTest;
|
||||||
import ca.uhn.fhir.rest.annotation.Create;
|
import ca.uhn.fhir.rest.annotation.Create;
|
||||||
import ca.uhn.fhir.rest.annotation.ResourceParam;
|
import ca.uhn.fhir.rest.annotation.ResourceParam;
|
||||||
import ca.uhn.fhir.rest.annotation.Update;
|
import ca.uhn.fhir.rest.annotation.Update;
|
||||||
|
@ -35,15 +35,15 @@ import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public abstract class BaseSubscriptionChannelDstu3Test extends BaseSubscriptionDstu3Test {
|
public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends BaseSubscriptionDstu3Test {
|
||||||
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionCheckingSubscriberTest.class);
|
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchingSubscriberTest.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
FhirContext myFhirContext;
|
FhirContext myFhirContext;
|
||||||
@Autowired
|
@Autowired
|
||||||
StandaloneSubscriptionMessageHandler myStandaloneSubscriptionMessageHandler;
|
StandaloneSubscriptionMessageHandler myStandaloneSubscriptionMessageHandler;
|
||||||
@Autowired
|
@Autowired
|
||||||
ISubscriptionChannelFactory mySubscriptionDeliveryChannelFactory;
|
SubscriptionChannelFactory mySubscriptionChannelFactory;
|
||||||
|
|
||||||
private static int ourListenerPort;
|
private static int ourListenerPort;
|
||||||
private static RestfulServer ourListenerRestServer;
|
private static RestfulServer ourListenerRestServer;
|
||||||
|
@ -67,7 +67,7 @@ public abstract class BaseSubscriptionChannelDstu3Test extends BaseSubscriptionD
|
||||||
ourUpdatedObservations.clear();
|
ourUpdatedObservations.clear();
|
||||||
ourContentTypes.clear();
|
ourContentTypes.clear();
|
||||||
if (ourSubscribableChannel == null) {
|
if (ourSubscribableChannel == null) {
|
||||||
ourSubscribableChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel("test", Subscription.SubscriptionChannelType.RESTHOOK.toCode().toLowerCase());
|
ourSubscribableChannel = mySubscriptionChannelFactory.newDeliveryChannel("test", Subscription.SubscriptionChannelType.RESTHOOK.toCode().toLowerCase());
|
||||||
ourSubscribableChannel.subscribe(myStandaloneSubscriptionMessageHandler);
|
ourSubscribableChannel.subscribe(myStandaloneSubscriptionMessageHandler);
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -12,7 +12,7 @@ import java.util.List;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
public class SubscriptionLoaderFhirClientTest extends BaseSubscriptionChannelDstu3Test {
|
public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscribableChannelDstu3Test {
|
||||||
private String myCode = "1000000050";
|
private String myCode = "1000000050";
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package ca.uhn.fhir.jpa.subscription.module.subscriber;
|
package ca.uhn.fhir.jpa.subscription.module.subscriber;
|
||||||
|
|
||||||
import ca.uhn.fhir.jpa.subscription.module.standalone.BaseSubscriptionChannelDstu3Test;
|
import ca.uhn.fhir.jpa.subscription.module.standalone.BaseBlockingQueueSubscribableChannelDstu3Test;
|
||||||
import ca.uhn.fhir.rest.api.Constants;
|
import ca.uhn.fhir.rest.api.Constants;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -11,7 +11,7 @@ import static org.junit.Assert.assertEquals;
|
||||||
/**
|
/**
|
||||||
* Tests copied from jpa.subscription.resthook.RestHookTestDstu3Test
|
* Tests copied from jpa.subscription.resthook.RestHookTestDstu3Test
|
||||||
*/
|
*/
|
||||||
public class SubscriptionCheckingSubscriberTest extends BaseSubscriptionChannelDstu3Test {
|
public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscribableChannelDstu3Test {
|
||||||
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionCheckingSubscriberTest.class);
|
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionCheckingSubscriberTest.class);
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -0,0 +1,69 @@
|
||||||
|
package ca.uhn.fhir.jpa.subscription.module.subscriber;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.jpa.subscription.module.standalone.BaseBlockingQueueSubscribableChannelDstu3Test;
|
||||||
|
import ca.uhn.fhir.rest.api.Constants;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests copied from jpa.subscription.resthook.RestHookTestDstu3Test
|
||||||
|
*/
|
||||||
|
public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscribableChannelDstu3Test {
|
||||||
|
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchingSubscriberTest.class);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRestHookSubscriptionApplicationFhirJson() throws Exception {
|
||||||
|
String payload = "application/fhir+json";
|
||||||
|
|
||||||
|
String code = "1000000050";
|
||||||
|
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
|
||||||
|
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml";
|
||||||
|
|
||||||
|
createSubscription(criteria1, payload, ourListenerServerBase);
|
||||||
|
createSubscription(criteria2, payload, ourListenerServerBase);
|
||||||
|
|
||||||
|
sendObservation(code, "SNOMED-CT");
|
||||||
|
|
||||||
|
waitForSize(0, ourCreatedObservations);
|
||||||
|
waitForSize(1, ourUpdatedObservations);
|
||||||
|
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRestHookSubscriptionApplicationXmlJson() throws Exception {
|
||||||
|
String payload = "application/fhir+xml";
|
||||||
|
|
||||||
|
String code = "1000000050";
|
||||||
|
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
|
||||||
|
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml";
|
||||||
|
|
||||||
|
createSubscription(criteria1, payload, ourListenerServerBase);
|
||||||
|
createSubscription(criteria2, payload, ourListenerServerBase);
|
||||||
|
|
||||||
|
sendObservation(code, "SNOMED-CT");
|
||||||
|
|
||||||
|
waitForSize(0, ourCreatedObservations);
|
||||||
|
waitForSize(1, ourUpdatedObservations);
|
||||||
|
assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRestHookSubscriptionWithoutPayload() throws Exception {
|
||||||
|
String payload = "";
|
||||||
|
|
||||||
|
String code = "1000000050";
|
||||||
|
String criteria1 = "Observation?code=SNOMED-CT|" + code;
|
||||||
|
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111";
|
||||||
|
|
||||||
|
createSubscription(criteria1, payload, ourListenerServerBase);
|
||||||
|
createSubscription(criteria2, payload, ourListenerServerBase);
|
||||||
|
|
||||||
|
sendObservation(code, "SNOMED-CT");
|
||||||
|
|
||||||
|
waitForSize(0, ourCreatedObservations);
|
||||||
|
waitForSize(0, ourUpdatedObservations);
|
||||||
|
}
|
||||||
|
}
|
|
@ -39,8 +39,9 @@
|
||||||
and SubscriptionMatchingInterceptor that is responsible for matching incoming resources against activated
|
and SubscriptionMatchingInterceptor that is responsible for matching incoming resources against activated
|
||||||
subscriptions. Call DaoConfig.addSupportedSubscriptionType(type) to configure which subscription types
|
subscriptions. Call DaoConfig.addSupportedSubscriptionType(type) to configure which subscription types
|
||||||
are supported in your environment. The helper method SubscriptionInterceptorLoader.registerInterceptors()
|
are supported in your environment. The helper method SubscriptionInterceptorLoader.registerInterceptors()
|
||||||
will check if any subscription types are supported, and if so then register both the activating and matching
|
will check if any subscription types are supported, and if so then load active subscriptions into the
|
||||||
interceptors. See https://github.com/jamesagnew/hapi-fhir/wiki/Proposed-Subscription-Design-Change for more
|
SubscriptionRegistry and then register both the activating and matching interceptors.
|
||||||
|
See https://github.com/jamesagnew/hapi-fhir/wiki/Proposed-Subscription-Design-Change for more
|
||||||
details.
|
details.
|
||||||
</action>
|
</action>
|
||||||
<action type="change">
|
<action type="change">
|
||||||
|
@ -207,10 +208,10 @@
|
||||||
contains the subscription, it's delivery channel, and a list of delivery handlers.
|
contains the subscription, it's delivery channel, and a list of delivery handlers.
|
||||||
</action>
|
</action>
|
||||||
<action type="change">
|
<action type="change">
|
||||||
Introduced a Spring factory interfaces called ISubscriptionDeliveryChannelFactory and
|
Introduced a new Spring factory interface ISubscribableChannelFactory that is used to create delivery
|
||||||
ISubscriptionDeliveryHandlerFactory that are used to create delivery channels and handlers. By default,
|
channels and handlers. By default, HAPI FHIR ships with a LinkedBlockingQueue implementation of the
|
||||||
HAPI FHIR ships with a LinkedBlockingQueue implementation of the delivery channel factory. If a different
|
delivery channel factory. If a different type of channel factory is required (e.g. JMS or Kafka), add it
|
||||||
type of channel factory is required, add it to your application context and mark it as @Primary.
|
to your application context and mark it as @Primary.
|
||||||
</action>
|
</action>
|
||||||
<action type="fix" issue="980">
|
<action type="fix" issue="980">
|
||||||
When using the HL7.org DSTU2 structures, a QuestionnaireResponse with a
|
When using the HL7.org DSTU2 structures, a QuestionnaireResponse with a
|
||||||
|
|
Loading…
Reference in New Issue