uto register subscriptions on startup
This commit is contained in:
parent
cae21f4898
commit
a24cbd7da5
|
@ -26,6 +26,7 @@ import ca.uhn.fhir.jpa.provider.ServletSubRequestDetails;
|
||||||
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
|
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
|
||||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
||||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
|
import ca.uhn.fhir.rest.param.TokenOrListParam;
|
||||||
import ca.uhn.fhir.rest.param.TokenParam;
|
import ca.uhn.fhir.rest.param.TokenParam;
|
||||||
import ca.uhn.fhir.rest.server.interceptor.ServerOperationInterceptorAdapter;
|
import ca.uhn.fhir.rest.server.interceptor.ServerOperationInterceptorAdapter;
|
||||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||||
|
@ -61,9 +62,8 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
|
||||||
private SubscribableChannel myProcessingChannel;
|
private SubscribableChannel myProcessingChannel;
|
||||||
private SubscribableChannel myDeliveryChannel;
|
private SubscribableChannel myDeliveryChannel;
|
||||||
private ExecutorService myExecutor;
|
private ExecutorService myExecutor;
|
||||||
private boolean myAutoActivateSubscriptions = true;
|
|
||||||
private int myExecutorThreadCount = 1;
|
private int myExecutorThreadCount = 1;
|
||||||
private MessageHandler mySubscriptionActivatingSubscriber;
|
private SubscriptionActivatingSubscriber mySubscriptionActivatingSubscriber;
|
||||||
private MessageHandler mySubscriptionCheckingSubscriber;
|
private MessageHandler mySubscriptionCheckingSubscriber;
|
||||||
private ConcurrentHashMap<String, IBaseResource> myIdToSubscription = new ConcurrentHashMap<>();
|
private ConcurrentHashMap<String, IBaseResource> myIdToSubscription = new ConcurrentHashMap<>();
|
||||||
private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class);
|
private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class);
|
||||||
|
@ -97,6 +97,13 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
|
||||||
|
|
||||||
protected abstract IFhirResourceDao<?> getSubscriptionDao();
|
protected abstract IFhirResourceDao<?> getSubscriptionDao();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
*/
|
||||||
|
public BaseSubscriptionInterceptor() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read the existing subscriptions from the database
|
* Read the existing subscriptions from the database
|
||||||
*/
|
*/
|
||||||
|
@ -105,7 +112,9 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
|
||||||
public void initSubscriptions() {
|
public void initSubscriptions() {
|
||||||
SearchParameterMap map = new SearchParameterMap();
|
SearchParameterMap map = new SearchParameterMap();
|
||||||
map.add(Subscription.SP_TYPE, new TokenParam(null, getChannelType().toCode()));
|
map.add(Subscription.SP_TYPE, new TokenParam(null, getChannelType().toCode()));
|
||||||
map.add(Subscription.SP_STATUS, new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode()));
|
map.add(Subscription.SP_STATUS, new TokenOrListParam()
|
||||||
|
.addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode()))
|
||||||
|
.addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode())));
|
||||||
map.setLoadSynchronousUpTo(MAX_SUBSCRIPTION_RESULTS);
|
map.setLoadSynchronousUpTo(MAX_SUBSCRIPTION_RESULTS);
|
||||||
|
|
||||||
RequestDetails req = new ServletSubRequestDetails();
|
RequestDetails req = new ServletSubRequestDetails();
|
||||||
|
@ -122,12 +131,13 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
|
||||||
for (IBaseResource resource : resourceList) {
|
for (IBaseResource resource : resourceList) {
|
||||||
String nextId = resource.getIdElement().getIdPart();
|
String nextId = resource.getIdElement().getIdPart();
|
||||||
allIds.add(nextId);
|
allIds.add(nextId);
|
||||||
myIdToSubscription.put(nextId, resource);
|
mySubscriptionActivatingSubscriber.activateAndRegisterSubscriptionIfRequired(resource);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (Enumeration<String> keyEnum = myIdToSubscription.keys(); keyEnum.hasMoreElements(); ) {
|
for (Enumeration<String> keyEnum = myIdToSubscription.keys(); keyEnum.hasMoreElements(); ) {
|
||||||
String next = keyEnum.nextElement();
|
String next = keyEnum.nextElement();
|
||||||
if (!allIds.contains(next)) {
|
if (!allIds.contains(next)) {
|
||||||
|
ourLog.info("Unregistering Subscription/{} as it no longer exists", next);
|
||||||
myIdToSubscription.remove(next);
|
myIdToSubscription.remove(next);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -160,12 +170,10 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
|
||||||
setDeliveryChannel(new ExecutorSubscribableChannel(myExecutor));
|
setDeliveryChannel(new ExecutorSubscribableChannel(myExecutor));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (myAutoActivateSubscriptions) {
|
if (mySubscriptionActivatingSubscriber == null) {
|
||||||
if (mySubscriptionActivatingSubscriber == null) {
|
mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), myIdToSubscription, getChannelType(), this);
|
||||||
mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), myIdToSubscription, getChannelType(), this);
|
|
||||||
}
|
|
||||||
getProcessingChannel().subscribe(mySubscriptionActivatingSubscriber);
|
|
||||||
}
|
}
|
||||||
|
getProcessingChannel().subscribe(mySubscriptionActivatingSubscriber);
|
||||||
|
|
||||||
if (mySubscriptionCheckingSubscriber == null) {
|
if (mySubscriptionCheckingSubscriber == null) {
|
||||||
mySubscriptionCheckingSubscriber = new SubscriptionCheckingSubscriber(getSubscriptionDao(), myIdToSubscription, getChannelType(), this);
|
mySubscriptionCheckingSubscriber = new SubscriptionCheckingSubscriber(getSubscriptionDao(), myIdToSubscription, getChannelType(), this);
|
||||||
|
@ -174,14 +182,13 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
|
||||||
|
|
||||||
registerDeliverySubscriber();
|
registerDeliverySubscriber();
|
||||||
|
|
||||||
|
initSubscriptions();
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
@PreDestroy
|
@PreDestroy
|
||||||
public void preDestroy() {
|
public void preDestroy() {
|
||||||
if (myAutoActivateSubscriptions) {
|
getProcessingChannel().unsubscribe(mySubscriptionActivatingSubscriber);
|
||||||
getProcessingChannel().unsubscribe(mySubscriptionActivatingSubscriber);
|
|
||||||
}
|
|
||||||
getProcessingChannel().unsubscribe(mySubscriptionCheckingSubscriber);
|
getProcessingChannel().unsubscribe(mySubscriptionCheckingSubscriber);
|
||||||
|
|
||||||
unregisterDeliverySubscriber();
|
unregisterDeliverySubscriber();
|
||||||
|
|
|
@ -69,10 +69,9 @@ public abstract class BaseSubscriptionSubscriber implements MessageHandler {
|
||||||
/**
|
/**
|
||||||
* Does this subscription type (e.g. rest hook, websocket, etc) apply to this interceptor?
|
* Does this subscription type (e.g. rest hook, websocket, etc) apply to this interceptor?
|
||||||
*/
|
*/
|
||||||
protected boolean subscriptionTypeApplies(ResourceModifiedMessage theMsg) {
|
protected boolean subscriptionTypeApplies(IBaseResource theSubscription) {
|
||||||
FhirContext ctx = mySubscriptionDao.getContext();
|
FhirContext ctx = mySubscriptionDao.getContext();
|
||||||
IBaseResource subscription = theMsg.getNewPayload();
|
return subscriptionTypeApplies(ctx, theSubscription);
|
||||||
return subscriptionTypeApplies(ctx, subscription);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -30,7 +30,6 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.messaging.Message;
|
import org.springframework.messaging.Message;
|
||||||
import org.springframework.messaging.MessagingException;
|
import org.springframework.messaging.MessagingException;
|
||||||
import org.springframework.messaging.SubscribableChannel;
|
|
||||||
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
@ -46,39 +45,44 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriptionSubscriber
|
||||||
}
|
}
|
||||||
|
|
||||||
private void activateAndRegisterSubscriptionIfRequired(ResourceModifiedMessage theMsg) {
|
private void activateAndRegisterSubscriptionIfRequired(ResourceModifiedMessage theMsg) {
|
||||||
FhirContext ctx = getSubscriptionDao().getContext();
|
|
||||||
IBaseResource subscription = theMsg.getNewPayload();
|
IBaseResource subscription = theMsg.getNewPayload();
|
||||||
IPrimitiveType<?> status = ctx.newTerser().getSingleValueOrNull(subscription, BaseSubscriptionInterceptor.SUBSCRIPTION_STATUS, IPrimitiveType.class);
|
activateAndRegisterSubscriptionIfRequired(subscription);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void activateAndRegisterSubscriptionIfRequired(IBaseResource theSubscription) {
|
||||||
|
boolean subscriptionTypeApplies = subscriptionTypeApplies(theSubscription);
|
||||||
|
if (subscriptionTypeApplies == false) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
FhirContext ctx = getSubscriptionDao().getContext();
|
||||||
|
IPrimitiveType<?> status = ctx.newTerser().getSingleValueOrNull(theSubscription, BaseSubscriptionInterceptor.SUBSCRIPTION_STATUS, IPrimitiveType.class);
|
||||||
String statusString = status.getValueAsString();
|
String statusString = status.getValueAsString();
|
||||||
|
|
||||||
String requestedStatus = Subscription.SubscriptionStatus.REQUESTED.toCode();
|
String requestedStatus = Subscription.SubscriptionStatus.REQUESTED.toCode();
|
||||||
String activeStatus = Subscription.SubscriptionStatus.ACTIVE.toCode();
|
String activeStatus = Subscription.SubscriptionStatus.ACTIVE.toCode();
|
||||||
if (requestedStatus.equals(statusString)) {
|
if (requestedStatus.equals(statusString)) {
|
||||||
status.setValueAsString(activeStatus);
|
status.setValueAsString(activeStatus);
|
||||||
ourLog.info("Activating and registering subscription {} from status {} to {}", subscription.getIdElement().toUnqualified().getValue(), requestedStatus, activeStatus);
|
ourLog.info("Activating and registering subscription {} from status {} to {}", theSubscription.getIdElement().toUnqualified().getValue(), requestedStatus, activeStatus);
|
||||||
getSubscriptionDao().update(subscription);
|
getSubscriptionDao().update(theSubscription);
|
||||||
getIdToSubscription().put(subscription.getIdElement().getIdPart(), subscription);
|
getIdToSubscription().put(theSubscription.getIdElement().getIdPart(), theSubscription);
|
||||||
} else if (activeStatus.equals(statusString)) {
|
} else if (activeStatus.equals(statusString)) {
|
||||||
ourLog.info("Registering active subscription {}", subscription.getIdElement().toUnqualified().getValue());
|
ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
|
||||||
getIdToSubscription().put(subscription.getIdElement().getIdPart(), subscription);
|
getIdToSubscription().put(theSubscription.getIdElement().getIdPart(), theSubscription);
|
||||||
} else {
|
} else {
|
||||||
if (getIdToSubscription().containsKey(subscription.getIdElement().getIdPart())) {
|
if (getIdToSubscription().containsKey(theSubscription.getIdElement().getIdPart())) {
|
||||||
ourLog.info("Removing {} subscription {}", statusString, subscription.getIdElement().toUnqualified().getValue());
|
ourLog.info("Removing {} subscription {}", statusString, theSubscription.getIdElement().toUnqualified().getValue());
|
||||||
}
|
}
|
||||||
getIdToSubscription().remove(subscription.getIdElement().getIdPart());
|
getIdToSubscription().remove(theSubscription.getIdElement().getIdPart());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private void handleCreate(ResourceModifiedMessage theMsg) {
|
private void handleCreate(ResourceModifiedMessage theMsg) {
|
||||||
if (!theMsg.getId().getResourceType().equals("Subscription")) {
|
if (!theMsg.getId().getResourceType().equals("Subscription")) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean subscriptionTypeApplies = subscriptionTypeApplies(theMsg);
|
|
||||||
if (subscriptionTypeApplies == false) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
activateAndRegisterSubscriptionIfRequired(theMsg);
|
activateAndRegisterSubscriptionIfRequired(theMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,11 +115,6 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriptionSubscriber
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean subscriptionTypeApplies = subscriptionTypeApplies(theMsg);
|
|
||||||
if (subscriptionTypeApplies == false) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
activateAndRegisterSubscriptionIfRequired(theMsg);
|
activateAndRegisterSubscriptionIfRequired(theMsg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,37 +1,13 @@
|
||||||
package ca.uhn.fhir.jpa.provider.r4;
|
package ca.uhn.fhir.jpa.provider.r4;
|
||||||
|
|
||||||
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
|
||||||
|
|
||||||
import java.util.*;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.apache.http.impl.client.CloseableHttpClient;
|
|
||||||
import org.apache.http.impl.client.HttpClientBuilder;
|
|
||||||
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
|
|
||||||
import org.eclipse.jetty.server.Server;
|
|
||||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
|
||||||
import org.eclipse.jetty.servlet.ServletHolder;
|
|
||||||
import org.hl7.fhir.r4.hapi.rest.server.GraphQLProvider;
|
|
||||||
import org.hl7.fhir.r4.model.Bundle;
|
|
||||||
import org.hl7.fhir.r4.model.Bundle.BundleEntryComponent;
|
|
||||||
import org.hl7.fhir.r4.model.Patient;
|
|
||||||
import org.junit.*;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
|
||||||
import org.springframework.context.annotation.Lazy;
|
|
||||||
import org.springframework.web.context.ContextLoader;
|
|
||||||
import org.springframework.web.context.WebApplicationContext;
|
|
||||||
import org.springframework.web.context.support.*;
|
|
||||||
import org.springframework.web.cors.CorsConfiguration;
|
|
||||||
import org.springframework.web.servlet.DispatcherServlet;
|
|
||||||
|
|
||||||
import ca.uhn.fhir.jpa.config.r4.WebsocketR4Config;
|
import ca.uhn.fhir.jpa.config.r4.WebsocketR4Config;
|
||||||
import ca.uhn.fhir.jpa.config.r4.WebsocketR4DispatcherConfig;
|
import ca.uhn.fhir.jpa.config.r4.WebsocketR4DispatcherConfig;
|
||||||
import ca.uhn.fhir.jpa.dao.data.ISearchDao;
|
import ca.uhn.fhir.jpa.dao.data.ISearchDao;
|
||||||
import ca.uhn.fhir.jpa.dao.r4.BaseJpaR4Test;
|
import ca.uhn.fhir.jpa.dao.r4.BaseJpaR4Test;
|
||||||
import ca.uhn.fhir.jpa.dao.r4.SearchParamRegistryR4;
|
import ca.uhn.fhir.jpa.dao.r4.SearchParamRegistryR4;
|
||||||
import ca.uhn.fhir.jpa.subscription.r4.RestHookSubscriptionR4Interceptor;
|
|
||||||
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
|
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
|
||||||
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
|
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
|
||||||
|
import ca.uhn.fhir.jpa.subscription.r4.RestHookSubscriptionR4Interceptor;
|
||||||
import ca.uhn.fhir.jpa.validation.JpaValidationSupportChainR4;
|
import ca.uhn.fhir.jpa.validation.JpaValidationSupportChainR4;
|
||||||
import ca.uhn.fhir.narrative.DefaultThymeleafNarrativeGenerator;
|
import ca.uhn.fhir.narrative.DefaultThymeleafNarrativeGenerator;
|
||||||
import ca.uhn.fhir.parser.StrictErrorHandler;
|
import ca.uhn.fhir.parser.StrictErrorHandler;
|
||||||
|
@ -42,6 +18,32 @@ import ca.uhn.fhir.rest.server.RestfulServer;
|
||||||
import ca.uhn.fhir.rest.server.interceptor.CorsInterceptor;
|
import ca.uhn.fhir.rest.server.interceptor.CorsInterceptor;
|
||||||
import ca.uhn.fhir.util.PortUtil;
|
import ca.uhn.fhir.util.PortUtil;
|
||||||
import ca.uhn.fhir.util.TestUtil;
|
import ca.uhn.fhir.util.TestUtil;
|
||||||
|
import org.apache.http.impl.client.CloseableHttpClient;
|
||||||
|
import org.apache.http.impl.client.HttpClientBuilder;
|
||||||
|
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
|
||||||
|
import org.eclipse.jetty.server.Server;
|
||||||
|
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||||
|
import org.eclipse.jetty.servlet.ServletHolder;
|
||||||
|
import org.hl7.fhir.r4.model.Bundle;
|
||||||
|
import org.hl7.fhir.r4.model.Bundle.BundleEntryComponent;
|
||||||
|
import org.hl7.fhir.r4.model.Patient;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.springframework.web.context.ContextLoader;
|
||||||
|
import org.springframework.web.context.WebApplicationContext;
|
||||||
|
import org.springframework.web.context.support.AnnotationConfigWebApplicationContext;
|
||||||
|
import org.springframework.web.context.support.GenericWebApplicationContext;
|
||||||
|
import org.springframework.web.context.support.WebApplicationContextUtils;
|
||||||
|
import org.springframework.web.cors.CorsConfiguration;
|
||||||
|
import org.springframework.web.servlet.DispatcherServlet;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||||
|
|
||||||
public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test {
|
public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test {
|
||||||
|
|
||||||
|
@ -50,16 +52,16 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test {
|
||||||
protected static CloseableHttpClient ourHttpClient;
|
protected static CloseableHttpClient ourHttpClient;
|
||||||
protected static int ourPort;
|
protected static int ourPort;
|
||||||
protected static RestfulServer ourRestServer;
|
protected static RestfulServer ourRestServer;
|
||||||
private static Server ourServer;
|
|
||||||
protected static String ourServerBase;
|
protected static String ourServerBase;
|
||||||
private static GenericWebApplicationContext ourWebApplicationContext;
|
|
||||||
private TerminologyUploaderProviderR4 myTerminologyUploaderProvider;
|
|
||||||
protected static SearchParamRegistryR4 ourSearchParamRegistry;
|
protected static SearchParamRegistryR4 ourSearchParamRegistry;
|
||||||
protected static DatabaseBackedPagingProvider ourPagingProvider;
|
protected static DatabaseBackedPagingProvider ourPagingProvider;
|
||||||
protected static RestHookSubscriptionR4Interceptor ourRestHookSubscriptionInterceptor;
|
|
||||||
protected static ISearchDao mySearchEntityDao;
|
protected static ISearchDao mySearchEntityDao;
|
||||||
protected static ISearchCoordinatorSvc mySearchCoordinatorSvc;
|
protected static ISearchCoordinatorSvc mySearchCoordinatorSvc;
|
||||||
|
private static Server ourServer;
|
||||||
|
private static GenericWebApplicationContext ourWebApplicationContext;
|
||||||
|
private TerminologyUploaderProviderR4 myTerminologyUploaderProvider;
|
||||||
private Object ourGraphQLProvider;
|
private Object ourGraphQLProvider;
|
||||||
|
private boolean ourRestHookSubscriptionInterceptorRequested;
|
||||||
|
|
||||||
public BaseResourceProviderR4Test() {
|
public BaseResourceProviderR4Test() {
|
||||||
super();
|
super();
|
||||||
|
@ -70,7 +72,7 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test {
|
||||||
myFhirCtx.getRestfulClientFactory().setServerValidationMode(ServerValidationModeEnum.ONCE);
|
myFhirCtx.getRestfulClientFactory().setServerValidationMode(ServerValidationModeEnum.ONCE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||||
@Before
|
@Before
|
||||||
public void before() throws Exception {
|
public void before() throws Exception {
|
||||||
myFhirCtx.getRestfulClientFactory().setServerValidationMode(ServerValidationModeEnum.NEVER);
|
myFhirCtx.getRestfulClientFactory().setServerValidationMode(ServerValidationModeEnum.NEVER);
|
||||||
|
@ -119,8 +121,8 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test {
|
||||||
ServletHolder subsServletHolder = new ServletHolder();
|
ServletHolder subsServletHolder = new ServletHolder();
|
||||||
subsServletHolder.setServlet(dispatcherServlet);
|
subsServletHolder.setServlet(dispatcherServlet);
|
||||||
subsServletHolder.setInitParameter(
|
subsServletHolder.setInitParameter(
|
||||||
ContextLoader.CONFIG_LOCATION_PARAM,
|
ContextLoader.CONFIG_LOCATION_PARAM,
|
||||||
WebsocketR4Config.class.getName() + "\n" +
|
WebsocketR4Config.class.getName() + "\n" +
|
||||||
WebsocketR4DispatcherConfig.class.getName());
|
WebsocketR4DispatcherConfig.class.getName());
|
||||||
proxyHandler.addServlet(subsServletHolder, "/*");
|
proxyHandler.addServlet(subsServletHolder, "/*");
|
||||||
|
|
||||||
|
@ -147,7 +149,6 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test {
|
||||||
myValidationSupport = wac.getBean(JpaValidationSupportChainR4.class);
|
myValidationSupport = wac.getBean(JpaValidationSupportChainR4.class);
|
||||||
mySearchCoordinatorSvc = wac.getBean(ISearchCoordinatorSvc.class);
|
mySearchCoordinatorSvc = wac.getBean(ISearchCoordinatorSvc.class);
|
||||||
mySearchEntityDao = wac.getBean(ISearchDao.class);
|
mySearchEntityDao = wac.getBean(ISearchDao.class);
|
||||||
ourRestHookSubscriptionInterceptor = wac.getBean(RestHookSubscriptionR4Interceptor.class);
|
|
||||||
ourSearchParamRegistry = wac.getBean(SearchParamRegistryR4.class);
|
ourSearchParamRegistry = wac.getBean(SearchParamRegistryR4.class);
|
||||||
|
|
||||||
myFhirCtx.getRestfulClientFactory().setSocketTimeout(5000000);
|
myFhirCtx.getRestfulClientFactory().setSocketTimeout(5000000);
|
||||||
|
@ -168,6 +169,19 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test {
|
||||||
ourRestServer.setPagingProvider(ourPagingProvider);
|
ourRestServer.setPagingProvider(ourPagingProvider);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is lazy created so we only ask for it if its needed
|
||||||
|
*/
|
||||||
|
protected RestHookSubscriptionR4Interceptor getRestHookSubscriptionInterceptor() {
|
||||||
|
RestHookSubscriptionR4Interceptor retVal = ourWebApplicationContext.getBean(RestHookSubscriptionR4Interceptor.class);
|
||||||
|
ourRestHookSubscriptionInterceptorRequested = true;
|
||||||
|
return retVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean hasRestHookSubscriptionInterceptor() {
|
||||||
|
return ourRestHookSubscriptionInterceptorRequested;
|
||||||
|
}
|
||||||
|
|
||||||
protected boolean shouldLogClient() {
|
protected boolean shouldLogClient() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,172 @@
|
||||||
|
package ca.uhn.fhir.jpa.subscription.r4;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
|
import ca.uhn.fhir.jpa.provider.r4.BaseResourceProviderR4Test;
|
||||||
|
import ca.uhn.fhir.jpa.subscription.RestHookTestDstu2Test;
|
||||||
|
import ca.uhn.fhir.rest.annotation.ResourceParam;
|
||||||
|
import ca.uhn.fhir.rest.annotation.Update;
|
||||||
|
import ca.uhn.fhir.rest.api.Constants;
|
||||||
|
import ca.uhn.fhir.rest.api.MethodOutcome;
|
||||||
|
import ca.uhn.fhir.rest.server.IResourceProvider;
|
||||||
|
import ca.uhn.fhir.rest.server.RestfulServer;
|
||||||
|
import ca.uhn.fhir.util.PortUtil;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import org.eclipse.jetty.server.Server;
|
||||||
|
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||||
|
import org.eclipse.jetty.servlet.ServletHolder;
|
||||||
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
|
import org.hl7.fhir.r4.model.*;
|
||||||
|
import org.junit.*;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Enumeration;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
public class RestHookActivatesPreExistingSubscriptionsR4Test extends BaseResourceProviderR4Test {
|
||||||
|
|
||||||
|
private static final Logger ourLog = LoggerFactory.getLogger(RestHookActivatesPreExistingSubscriptionsR4Test.class);
|
||||||
|
private static int ourListenerPort;
|
||||||
|
private static RestfulServer ourListenerRestServer;
|
||||||
|
private static String ourListenerServerBase;
|
||||||
|
private static Server ourListenerServer;
|
||||||
|
private static List<Observation> ourUpdatedObservations = Lists.newArrayList();
|
||||||
|
private static List<String> ourContentTypes = new ArrayList<>();
|
||||||
|
private static List<String> ourHeaders = new ArrayList<>();
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void afterUnregisterRestHookListener() {
|
||||||
|
ourRestServer.unregisterInterceptor(getRestHookSubscriptionInterceptor());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Subscription createSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException {
|
||||||
|
Subscription subscription = new Subscription();
|
||||||
|
subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)");
|
||||||
|
subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED);
|
||||||
|
subscription.setCriteria(theCriteria);
|
||||||
|
|
||||||
|
Subscription.SubscriptionChannelComponent channel = subscription.getChannel();
|
||||||
|
channel.setType(Subscription.SubscriptionChannelType.RESTHOOK);
|
||||||
|
channel.setPayload(thePayload);
|
||||||
|
channel.setEndpoint(theEndpoint);
|
||||||
|
|
||||||
|
MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute();
|
||||||
|
subscription.setId(methodOutcome.getId().getIdPart());
|
||||||
|
|
||||||
|
waitForQueueToDrain();
|
||||||
|
return subscription;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Observation sendObservation(String code, String system) {
|
||||||
|
Observation observation = new Observation();
|
||||||
|
CodeableConcept codeableConcept = new CodeableConcept();
|
||||||
|
observation.setCode(codeableConcept);
|
||||||
|
Coding coding = codeableConcept.addCoding();
|
||||||
|
coding.setCode(code);
|
||||||
|
coding.setSystem(system);
|
||||||
|
|
||||||
|
observation.setStatus(Observation.ObservationStatus.FINAL);
|
||||||
|
|
||||||
|
MethodOutcome methodOutcome = ourClient.create().resource(observation).execute();
|
||||||
|
|
||||||
|
String observationId = methodOutcome.getId().getIdPart();
|
||||||
|
observation.setId(observationId);
|
||||||
|
|
||||||
|
return observation;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSubscriptionInterceptorRegisteredAfterSubscriptionCreated() throws Exception {
|
||||||
|
String payload = "application/fhir+json";
|
||||||
|
|
||||||
|
String code = "1000000050";
|
||||||
|
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
|
||||||
|
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml";
|
||||||
|
|
||||||
|
createSubscription(criteria1, payload, ourListenerServerBase);
|
||||||
|
createSubscription(criteria2, payload, ourListenerServerBase);
|
||||||
|
|
||||||
|
assertFalse(hasRestHookSubscriptionInterceptor());
|
||||||
|
|
||||||
|
ourRestServer.registerInterceptor(getRestHookSubscriptionInterceptor());
|
||||||
|
|
||||||
|
assertTrue(hasRestHookSubscriptionInterceptor());
|
||||||
|
|
||||||
|
|
||||||
|
sendObservation(code, "SNOMED-CT");
|
||||||
|
|
||||||
|
// Should see 1 subscription notification
|
||||||
|
waitForQueueToDrain();
|
||||||
|
waitForSize(1, ourUpdatedObservations);
|
||||||
|
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitForQueueToDrain() throws InterruptedException {
|
||||||
|
if (hasRestHookSubscriptionInterceptor()) {
|
||||||
|
RestHookTestDstu2Test.waitForQueueToDrain(getRestHookSubscriptionInterceptor());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void startListenerServer() throws Exception {
|
||||||
|
ourListenerPort = PortUtil.findFreePort();
|
||||||
|
ourListenerRestServer = new RestfulServer(FhirContext.forR4());
|
||||||
|
ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context";
|
||||||
|
|
||||||
|
ObservationListener obsListener = new ObservationListener();
|
||||||
|
ourListenerRestServer.setResourceProviders(obsListener);
|
||||||
|
|
||||||
|
ourListenerServer = new Server(ourListenerPort);
|
||||||
|
|
||||||
|
ServletContextHandler proxyHandler = new ServletContextHandler();
|
||||||
|
proxyHandler.setContextPath("/");
|
||||||
|
|
||||||
|
ServletHolder servletHolder = new ServletHolder();
|
||||||
|
servletHolder.setServlet(ourListenerRestServer);
|
||||||
|
proxyHandler.addServlet(servletHolder, "/fhir/context/*");
|
||||||
|
|
||||||
|
ourListenerServer.setHandler(proxyHandler);
|
||||||
|
ourListenerServer.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void stopListenerServer() throws Exception {
|
||||||
|
ourListenerServer.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class ObservationListener implements IResourceProvider {
|
||||||
|
|
||||||
|
|
||||||
|
private void extractHeaders(HttpServletRequest theRequest) {
|
||||||
|
java.util.Enumeration<String> headerNamesEnum = theRequest.getHeaderNames();
|
||||||
|
while (headerNamesEnum.hasMoreElements()) {
|
||||||
|
String nextName = headerNamesEnum.nextElement();
|
||||||
|
Enumeration<String> valueEnum = theRequest.getHeaders(nextName);
|
||||||
|
while (valueEnum.hasMoreElements()) {
|
||||||
|
String nextValue = valueEnum.nextElement();
|
||||||
|
ourHeaders.add(nextName + ": " + nextValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Class<? extends IBaseResource> getResourceType() {
|
||||||
|
return Observation.class;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Update
|
||||||
|
public MethodOutcome update(@ResourceParam Observation theObservation, HttpServletRequest theRequest) {
|
||||||
|
ourLog.info("Received Listener Update");
|
||||||
|
ourUpdatedObservations.add(theObservation);
|
||||||
|
ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", ""));
|
||||||
|
extractHeaders(theRequest);
|
||||||
|
return new MethodOutcome(new IdType("Observation/1"), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -62,12 +62,12 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
|
||||||
ourLog.info("Done deleting all subscriptions");
|
ourLog.info("Done deleting all subscriptions");
|
||||||
myDaoConfig.setAllowMultipleDelete(new DaoConfig().isAllowMultipleDelete());
|
myDaoConfig.setAllowMultipleDelete(new DaoConfig().isAllowMultipleDelete());
|
||||||
|
|
||||||
ourRestServer.unregisterInterceptor(ourRestHookSubscriptionInterceptor);
|
ourRestServer.unregisterInterceptor(getRestHookSubscriptionInterceptor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void beforeRegisterRestHookListener() {
|
public void beforeRegisterRestHookListener() {
|
||||||
ourRestServer.registerInterceptor(ourRestHookSubscriptionInterceptor);
|
ourRestServer.registerInterceptor(getRestHookSubscriptionInterceptor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -376,7 +376,7 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForQueueToDrain() throws InterruptedException {
|
private void waitForQueueToDrain() throws InterruptedException {
|
||||||
RestHookTestDstu2Test.waitForQueueToDrain(ourRestHookSubscriptionInterceptor);
|
RestHookTestDstu2Test.waitForQueueToDrain(getRestHookSubscriptionInterceptor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
|
|
|
@ -50,12 +50,12 @@ public class RestHookTestWithInterceptorRegisteredToDaoConfigR4Test extends Base
|
||||||
ourLog.info("Done deleting all subscriptions");
|
ourLog.info("Done deleting all subscriptions");
|
||||||
myDaoConfig.setAllowMultipleDelete(new DaoConfig().isAllowMultipleDelete());
|
myDaoConfig.setAllowMultipleDelete(new DaoConfig().isAllowMultipleDelete());
|
||||||
|
|
||||||
myDaoConfig.getInterceptors().remove(ourRestHookSubscriptionInterceptor);
|
myDaoConfig.getInterceptors().remove(getRestHookSubscriptionInterceptor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void beforeRegisterRestHookListener() {
|
public void beforeRegisterRestHookListener() {
|
||||||
myDaoConfig.getInterceptors().add(ourRestHookSubscriptionInterceptor);
|
myDaoConfig.getInterceptors().add(getRestHookSubscriptionInterceptor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -84,11 +84,11 @@ public class RestHookTestWithInterceptorRegisteredToDaoConfigR4Test extends Base
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForQueueToDrain() throws InterruptedException {
|
private void waitForQueueToDrain() throws InterruptedException {
|
||||||
ourLog.info("QUEUE HAS {} ITEMS", ourRestHookSubscriptionInterceptor.getExecutorQueueForUnitTests().size());
|
ourLog.info("QUEUE HAS {} ITEMS", getRestHookSubscriptionInterceptor().getExecutorQueueForUnitTests().size());
|
||||||
while (ourRestHookSubscriptionInterceptor.getExecutorQueueForUnitTests().size() > 0) {
|
while (getRestHookSubscriptionInterceptor().getExecutorQueueForUnitTests().size() > 0) {
|
||||||
Thread.sleep(250);
|
Thread.sleep(250);
|
||||||
}
|
}
|
||||||
ourLog.info("QUEUE HAS {} ITEMS", ourRestHookSubscriptionInterceptor.getExecutorQueueForUnitTests().size());
|
ourLog.info("QUEUE HAS {} ITEMS", getRestHookSubscriptionInterceptor().getExecutorQueueForUnitTests().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
private Observation sendObservation(String code, String system) throws InterruptedException {
|
private Observation sendObservation(String code, String system) throws InterruptedException {
|
||||||
|
|
|
@ -25,6 +25,7 @@ delete from hfj_spidx_token where res_id in (select res_id from hfj_resource whe
|
||||||
delete from hfj_spidx_uri where res_id in (select res_id from hfj_resource where sp_index_status = 2);
|
delete from hfj_spidx_uri where res_id in (select res_id from hfj_resource where sp_index_status = 2);
|
||||||
delete from hfj_res_tag where res_id in (select res_id from hfj_resource where sp_index_status = 2);
|
delete from hfj_res_tag where res_id in (select res_id from hfj_resource where sp_index_status = 2);
|
||||||
delete from hfj_search_result where resource_pid in (select res_id from hfj_resource where sp_index_status = 2);
|
delete from hfj_search_result where resource_pid in (select res_id from hfj_resource where sp_index_status = 2);
|
||||||
|
delete from hfj_res_param_present where res_id in (select res_id from hfj_resource where sp_index_status = 2);
|
||||||
delete from hfj_resource where res_id in (select res_id from hfj_resource where sp_index_status = 2);
|
delete from hfj_resource where res_id in (select res_id from hfj_resource where sp_index_status = 2);
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue