From a24cbd7da5df0d84d717054e4822d9a22d5a599f Mon Sep 17 00:00:00 2001 From: James Date: Thu, 31 Aug 2017 06:46:26 -0400 Subject: [PATCH] uto register subscriptions on startup --- .../BaseSubscriptionInterceptor.java | 31 ++-- .../BaseSubscriptionSubscriber.java | 5 +- .../SubscriptionActivatingSubscriber.java | 41 ++--- .../r4/BaseResourceProviderR4Test.java | 82 +++++---- ...tivatesPreExistingSubscriptionsR4Test.java | 172 ++++++++++++++++++ .../subscription/r4/RestHookTestR4Test.java | 6 +- ...nterceptorRegisteredToDaoConfigR4Test.java | 10 +- .../derby_maintenance.txt | 1 + 8 files changed, 270 insertions(+), 78 deletions(-) create mode 100644 hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/r4/RestHookActivatesPreExistingSubscriptionsR4Test.java diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java index 4f11c874220..7612f714bb0 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java @@ -26,6 +26,7 @@ import ca.uhn.fhir.jpa.provider.ServletSubRequestDetails; import ca.uhn.fhir.rest.api.RestOperationTypeEnum; import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.RequestDetails; +import ca.uhn.fhir.rest.param.TokenOrListParam; import ca.uhn.fhir.rest.param.TokenParam; import ca.uhn.fhir.rest.server.interceptor.ServerOperationInterceptorAdapter; import org.apache.commons.lang3.concurrent.BasicThreadFactory; @@ -61,9 +62,8 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce private SubscribableChannel myProcessingChannel; private SubscribableChannel myDeliveryChannel; private ExecutorService myExecutor; - private boolean myAutoActivateSubscriptions = true; private int myExecutorThreadCount = 1; - private MessageHandler mySubscriptionActivatingSubscriber; + private SubscriptionActivatingSubscriber mySubscriptionActivatingSubscriber; private MessageHandler mySubscriptionCheckingSubscriber; private ConcurrentHashMap myIdToSubscription = new ConcurrentHashMap<>(); private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class); @@ -97,6 +97,13 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce protected abstract IFhirResourceDao getSubscriptionDao(); + /** + * Constructor + */ + public BaseSubscriptionInterceptor() { + super(); + } + /** * Read the existing subscriptions from the database */ @@ -105,7 +112,9 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce public void initSubscriptions() { SearchParameterMap map = new SearchParameterMap(); map.add(Subscription.SP_TYPE, new TokenParam(null, getChannelType().toCode())); - map.add(Subscription.SP_STATUS, new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode())); + map.add(Subscription.SP_STATUS, new TokenOrListParam() + .addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode())) + .addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode()))); map.setLoadSynchronousUpTo(MAX_SUBSCRIPTION_RESULTS); RequestDetails req = new ServletSubRequestDetails(); @@ -122,12 +131,13 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce for (IBaseResource resource : resourceList) { String nextId = resource.getIdElement().getIdPart(); allIds.add(nextId); - myIdToSubscription.put(nextId, resource); + mySubscriptionActivatingSubscriber.activateAndRegisterSubscriptionIfRequired(resource); } for (Enumeration keyEnum = myIdToSubscription.keys(); keyEnum.hasMoreElements(); ) { String next = keyEnum.nextElement(); if (!allIds.contains(next)) { + ourLog.info("Unregistering Subscription/{} as it no longer exists", next); myIdToSubscription.remove(next); } } @@ -160,12 +170,10 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce setDeliveryChannel(new ExecutorSubscribableChannel(myExecutor)); } - if (myAutoActivateSubscriptions) { - if (mySubscriptionActivatingSubscriber == null) { - mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), myIdToSubscription, getChannelType(), this); - } - getProcessingChannel().subscribe(mySubscriptionActivatingSubscriber); + if (mySubscriptionActivatingSubscriber == null) { + mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), myIdToSubscription, getChannelType(), this); } + getProcessingChannel().subscribe(mySubscriptionActivatingSubscriber); if (mySubscriptionCheckingSubscriber == null) { mySubscriptionCheckingSubscriber = new SubscriptionCheckingSubscriber(getSubscriptionDao(), myIdToSubscription, getChannelType(), this); @@ -174,14 +182,13 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce registerDeliverySubscriber(); + initSubscriptions(); } @SuppressWarnings("unused") @PreDestroy public void preDestroy() { - if (myAutoActivateSubscriptions) { - getProcessingChannel().unsubscribe(mySubscriptionActivatingSubscriber); - } + getProcessingChannel().unsubscribe(mySubscriptionActivatingSubscriber); getProcessingChannel().unsubscribe(mySubscriptionCheckingSubscriber); unregisterDeliverySubscriber(); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionSubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionSubscriber.java index 27a690ad4d2..5ae4a504e47 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionSubscriber.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionSubscriber.java @@ -69,10 +69,9 @@ public abstract class BaseSubscriptionSubscriber implements MessageHandler { /** * Does this subscription type (e.g. rest hook, websocket, etc) apply to this interceptor? */ - protected boolean subscriptionTypeApplies(ResourceModifiedMessage theMsg) { + protected boolean subscriptionTypeApplies(IBaseResource theSubscription) { FhirContext ctx = mySubscriptionDao.getContext(); - IBaseResource subscription = theMsg.getNewPayload(); - return subscriptionTypeApplies(ctx, subscription); + return subscriptionTypeApplies(ctx, theSubscription); } /** diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java index b5b62b0a5ed..541ffbeb5f6 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java @@ -30,7 +30,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import org.springframework.messaging.MessagingException; -import org.springframework.messaging.SubscribableChannel; import java.util.concurrent.ConcurrentHashMap; @@ -46,39 +45,44 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriptionSubscriber } private void activateAndRegisterSubscriptionIfRequired(ResourceModifiedMessage theMsg) { - FhirContext ctx = getSubscriptionDao().getContext(); IBaseResource subscription = theMsg.getNewPayload(); - IPrimitiveType status = ctx.newTerser().getSingleValueOrNull(subscription, BaseSubscriptionInterceptor.SUBSCRIPTION_STATUS, IPrimitiveType.class); + activateAndRegisterSubscriptionIfRequired(subscription); + } + + public void activateAndRegisterSubscriptionIfRequired(IBaseResource theSubscription) { + boolean subscriptionTypeApplies = subscriptionTypeApplies(theSubscription); + if (subscriptionTypeApplies == false) { + return; + } + + FhirContext ctx = getSubscriptionDao().getContext(); + IPrimitiveType status = ctx.newTerser().getSingleValueOrNull(theSubscription, BaseSubscriptionInterceptor.SUBSCRIPTION_STATUS, IPrimitiveType.class); String statusString = status.getValueAsString(); String requestedStatus = Subscription.SubscriptionStatus.REQUESTED.toCode(); String activeStatus = Subscription.SubscriptionStatus.ACTIVE.toCode(); if (requestedStatus.equals(statusString)) { status.setValueAsString(activeStatus); - ourLog.info("Activating and registering subscription {} from status {} to {}", subscription.getIdElement().toUnqualified().getValue(), requestedStatus, activeStatus); - getSubscriptionDao().update(subscription); - getIdToSubscription().put(subscription.getIdElement().getIdPart(), subscription); + ourLog.info("Activating and registering subscription {} from status {} to {}", theSubscription.getIdElement().toUnqualified().getValue(), requestedStatus, activeStatus); + getSubscriptionDao().update(theSubscription); + getIdToSubscription().put(theSubscription.getIdElement().getIdPart(), theSubscription); } else if (activeStatus.equals(statusString)) { - ourLog.info("Registering active subscription {}", subscription.getIdElement().toUnqualified().getValue()); - getIdToSubscription().put(subscription.getIdElement().getIdPart(), subscription); + ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue()); + getIdToSubscription().put(theSubscription.getIdElement().getIdPart(), theSubscription); } else { - if (getIdToSubscription().containsKey(subscription.getIdElement().getIdPart())) { - ourLog.info("Removing {} subscription {}", statusString, subscription.getIdElement().toUnqualified().getValue()); + if (getIdToSubscription().containsKey(theSubscription.getIdElement().getIdPart())) { + ourLog.info("Removing {} subscription {}", statusString, theSubscription.getIdElement().toUnqualified().getValue()); } - getIdToSubscription().remove(subscription.getIdElement().getIdPart()); + getIdToSubscription().remove(theSubscription.getIdElement().getIdPart()); } } + private void handleCreate(ResourceModifiedMessage theMsg) { if (!theMsg.getId().getResourceType().equals("Subscription")) { return; } - boolean subscriptionTypeApplies = subscriptionTypeApplies(theMsg); - if (subscriptionTypeApplies == false) { - return; - } - activateAndRegisterSubscriptionIfRequired(theMsg); } @@ -111,11 +115,6 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriptionSubscriber return; } - boolean subscriptionTypeApplies = subscriptionTypeApplies(theMsg); - if (subscriptionTypeApplies == false) { - return; - } - activateAndRegisterSubscriptionIfRequired(theMsg); } } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/BaseResourceProviderR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/BaseResourceProviderR4Test.java index 16cf764826a..24241390292 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/BaseResourceProviderR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/BaseResourceProviderR4Test.java @@ -1,37 +1,13 @@ package ca.uhn.fhir.jpa.provider.r4; -import static org.apache.commons.lang3.StringUtils.isNotBlank; - -import java.util.*; -import java.util.concurrent.TimeUnit; - -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.hl7.fhir.r4.hapi.rest.server.GraphQLProvider; -import org.hl7.fhir.r4.model.Bundle; -import org.hl7.fhir.r4.model.Bundle.BundleEntryComponent; -import org.hl7.fhir.r4.model.Patient; -import org.junit.*; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Lazy; -import org.springframework.web.context.ContextLoader; -import org.springframework.web.context.WebApplicationContext; -import org.springframework.web.context.support.*; -import org.springframework.web.cors.CorsConfiguration; -import org.springframework.web.servlet.DispatcherServlet; - import ca.uhn.fhir.jpa.config.r4.WebsocketR4Config; import ca.uhn.fhir.jpa.config.r4.WebsocketR4DispatcherConfig; import ca.uhn.fhir.jpa.dao.data.ISearchDao; import ca.uhn.fhir.jpa.dao.r4.BaseJpaR4Test; import ca.uhn.fhir.jpa.dao.r4.SearchParamRegistryR4; -import ca.uhn.fhir.jpa.subscription.r4.RestHookSubscriptionR4Interceptor; import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider; import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc; +import ca.uhn.fhir.jpa.subscription.r4.RestHookSubscriptionR4Interceptor; import ca.uhn.fhir.jpa.validation.JpaValidationSupportChainR4; import ca.uhn.fhir.narrative.DefaultThymeleafNarrativeGenerator; import ca.uhn.fhir.parser.StrictErrorHandler; @@ -42,6 +18,32 @@ import ca.uhn.fhir.rest.server.RestfulServer; import ca.uhn.fhir.rest.server.interceptor.CorsInterceptor; import ca.uhn.fhir.util.PortUtil; import ca.uhn.fhir.util.TestUtil; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.hl7.fhir.r4.model.Bundle; +import org.hl7.fhir.r4.model.Bundle.BundleEntryComponent; +import org.hl7.fhir.r4.model.Patient; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.springframework.web.context.ContextLoader; +import org.springframework.web.context.WebApplicationContext; +import org.springframework.web.context.support.AnnotationConfigWebApplicationContext; +import org.springframework.web.context.support.GenericWebApplicationContext; +import org.springframework.web.context.support.WebApplicationContextUtils; +import org.springframework.web.cors.CorsConfiguration; +import org.springframework.web.servlet.DispatcherServlet; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.commons.lang3.StringUtils.isNotBlank; public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test { @@ -50,16 +52,16 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test { protected static CloseableHttpClient ourHttpClient; protected static int ourPort; protected static RestfulServer ourRestServer; - private static Server ourServer; protected static String ourServerBase; - private static GenericWebApplicationContext ourWebApplicationContext; - private TerminologyUploaderProviderR4 myTerminologyUploaderProvider; protected static SearchParamRegistryR4 ourSearchParamRegistry; protected static DatabaseBackedPagingProvider ourPagingProvider; - protected static RestHookSubscriptionR4Interceptor ourRestHookSubscriptionInterceptor; protected static ISearchDao mySearchEntityDao; protected static ISearchCoordinatorSvc mySearchCoordinatorSvc; + private static Server ourServer; + private static GenericWebApplicationContext ourWebApplicationContext; + private TerminologyUploaderProviderR4 myTerminologyUploaderProvider; private Object ourGraphQLProvider; + private boolean ourRestHookSubscriptionInterceptorRequested; public BaseResourceProviderR4Test() { super(); @@ -70,7 +72,7 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test { myFhirCtx.getRestfulClientFactory().setServerValidationMode(ServerValidationModeEnum.ONCE); } - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings({"unchecked", "rawtypes"}) @Before public void before() throws Exception { myFhirCtx.getRestfulClientFactory().setServerValidationMode(ServerValidationModeEnum.NEVER); @@ -119,8 +121,8 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test { ServletHolder subsServletHolder = new ServletHolder(); subsServletHolder.setServlet(dispatcherServlet); subsServletHolder.setInitParameter( - ContextLoader.CONFIG_LOCATION_PARAM, - WebsocketR4Config.class.getName() + "\n" + + ContextLoader.CONFIG_LOCATION_PARAM, + WebsocketR4Config.class.getName() + "\n" + WebsocketR4DispatcherConfig.class.getName()); proxyHandler.addServlet(subsServletHolder, "/*"); @@ -147,7 +149,6 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test { myValidationSupport = wac.getBean(JpaValidationSupportChainR4.class); mySearchCoordinatorSvc = wac.getBean(ISearchCoordinatorSvc.class); mySearchEntityDao = wac.getBean(ISearchDao.class); - ourRestHookSubscriptionInterceptor = wac.getBean(RestHookSubscriptionR4Interceptor.class); ourSearchParamRegistry = wac.getBean(SearchParamRegistryR4.class); myFhirCtx.getRestfulClientFactory().setSocketTimeout(5000000); @@ -155,7 +156,7 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test { if (shouldLogClient()) { ourClient.registerInterceptor(new LoggingInterceptor()); } - + PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(5000, TimeUnit.MILLISECONDS); HttpClientBuilder builder = HttpClientBuilder.create(); builder.setConnectionManager(connectionManager); @@ -168,6 +169,19 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test { ourRestServer.setPagingProvider(ourPagingProvider); } + /** + * This is lazy created so we only ask for it if its needed + */ + protected RestHookSubscriptionR4Interceptor getRestHookSubscriptionInterceptor() { + RestHookSubscriptionR4Interceptor retVal = ourWebApplicationContext.getBean(RestHookSubscriptionR4Interceptor.class); + ourRestHookSubscriptionInterceptorRequested = true; + return retVal; + } + + protected boolean hasRestHookSubscriptionInterceptor() { + return ourRestHookSubscriptionInterceptorRequested; + } + protected boolean shouldLogClient() { return true; } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/r4/RestHookActivatesPreExistingSubscriptionsR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/r4/RestHookActivatesPreExistingSubscriptionsR4Test.java new file mode 100644 index 00000000000..27b6935593e --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/r4/RestHookActivatesPreExistingSubscriptionsR4Test.java @@ -0,0 +1,172 @@ +package ca.uhn.fhir.jpa.subscription.r4; + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.provider.r4.BaseResourceProviderR4Test; +import ca.uhn.fhir.jpa.subscription.RestHookTestDstu2Test; +import ca.uhn.fhir.rest.annotation.ResourceParam; +import ca.uhn.fhir.rest.annotation.Update; +import ca.uhn.fhir.rest.api.Constants; +import ca.uhn.fhir.rest.api.MethodOutcome; +import ca.uhn.fhir.rest.server.IResourceProvider; +import ca.uhn.fhir.rest.server.RestfulServer; +import ca.uhn.fhir.util.PortUtil; +import com.google.common.collect.Lists; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.r4.model.*; +import org.junit.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.http.HttpServletRequest; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; + +import static org.junit.Assert.*; + +public class RestHookActivatesPreExistingSubscriptionsR4Test extends BaseResourceProviderR4Test { + + private static final Logger ourLog = LoggerFactory.getLogger(RestHookActivatesPreExistingSubscriptionsR4Test.class); + private static int ourListenerPort; + private static RestfulServer ourListenerRestServer; + private static String ourListenerServerBase; + private static Server ourListenerServer; + private static List ourUpdatedObservations = Lists.newArrayList(); + private static List ourContentTypes = new ArrayList<>(); + private static List ourHeaders = new ArrayList<>(); + + @After + public void afterUnregisterRestHookListener() { + ourRestServer.unregisterInterceptor(getRestHookSubscriptionInterceptor()); + } + + private Subscription createSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException { + Subscription subscription = new Subscription(); + subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)"); + subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED); + subscription.setCriteria(theCriteria); + + Subscription.SubscriptionChannelComponent channel = subscription.getChannel(); + channel.setType(Subscription.SubscriptionChannelType.RESTHOOK); + channel.setPayload(thePayload); + channel.setEndpoint(theEndpoint); + + MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute(); + subscription.setId(methodOutcome.getId().getIdPart()); + + waitForQueueToDrain(); + return subscription; + } + + private Observation sendObservation(String code, String system) { + Observation observation = new Observation(); + CodeableConcept codeableConcept = new CodeableConcept(); + observation.setCode(codeableConcept); + Coding coding = codeableConcept.addCoding(); + coding.setCode(code); + coding.setSystem(system); + + observation.setStatus(Observation.ObservationStatus.FINAL); + + MethodOutcome methodOutcome = ourClient.create().resource(observation).execute(); + + String observationId = methodOutcome.getId().getIdPart(); + observation.setId(observationId); + + return observation; + } + + @Test + public void testSubscriptionInterceptorRegisteredAfterSubscriptionCreated() throws Exception { + String payload = "application/fhir+json"; + + String code = "1000000050"; + String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; + String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml"; + + createSubscription(criteria1, payload, ourListenerServerBase); + createSubscription(criteria2, payload, ourListenerServerBase); + + assertFalse(hasRestHookSubscriptionInterceptor()); + + ourRestServer.registerInterceptor(getRestHookSubscriptionInterceptor()); + + assertTrue(hasRestHookSubscriptionInterceptor()); + + + sendObservation(code, "SNOMED-CT"); + + // Should see 1 subscription notification + waitForQueueToDrain(); + waitForSize(1, ourUpdatedObservations); + assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); + } + + private void waitForQueueToDrain() throws InterruptedException { + if (hasRestHookSubscriptionInterceptor()) { + RestHookTestDstu2Test.waitForQueueToDrain(getRestHookSubscriptionInterceptor()); + } + } + + @BeforeClass + public static void startListenerServer() throws Exception { + ourListenerPort = PortUtil.findFreePort(); + ourListenerRestServer = new RestfulServer(FhirContext.forR4()); + ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context"; + + ObservationListener obsListener = new ObservationListener(); + ourListenerRestServer.setResourceProviders(obsListener); + + ourListenerServer = new Server(ourListenerPort); + + ServletContextHandler proxyHandler = new ServletContextHandler(); + proxyHandler.setContextPath("/"); + + ServletHolder servletHolder = new ServletHolder(); + servletHolder.setServlet(ourListenerRestServer); + proxyHandler.addServlet(servletHolder, "/fhir/context/*"); + + ourListenerServer.setHandler(proxyHandler); + ourListenerServer.start(); + } + + @AfterClass + public static void stopListenerServer() throws Exception { + ourListenerServer.stop(); + } + + public static class ObservationListener implements IResourceProvider { + + + private void extractHeaders(HttpServletRequest theRequest) { + java.util.Enumeration headerNamesEnum = theRequest.getHeaderNames(); + while (headerNamesEnum.hasMoreElements()) { + String nextName = headerNamesEnum.nextElement(); + Enumeration valueEnum = theRequest.getHeaders(nextName); + while (valueEnum.hasMoreElements()) { + String nextValue = valueEnum.nextElement(); + ourHeaders.add(nextName + ": " + nextValue); + } + } + } + + @Override + public Class getResourceType() { + return Observation.class; + } + + @Update + public MethodOutcome update(@ResourceParam Observation theObservation, HttpServletRequest theRequest) { + ourLog.info("Received Listener Update"); + ourUpdatedObservations.add(theObservation); + ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", "")); + extractHeaders(theRequest); + return new MethodOutcome(new IdType("Observation/1"), false); + } + + } + +} diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/r4/RestHookTestR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/r4/RestHookTestR4Test.java index 3f05c58164a..515e85d82e4 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/r4/RestHookTestR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/r4/RestHookTestR4Test.java @@ -62,12 +62,12 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test { ourLog.info("Done deleting all subscriptions"); myDaoConfig.setAllowMultipleDelete(new DaoConfig().isAllowMultipleDelete()); - ourRestServer.unregisterInterceptor(ourRestHookSubscriptionInterceptor); + ourRestServer.unregisterInterceptor(getRestHookSubscriptionInterceptor()); } @Before public void beforeRegisterRestHookListener() { - ourRestServer.registerInterceptor(ourRestHookSubscriptionInterceptor); + ourRestServer.registerInterceptor(getRestHookSubscriptionInterceptor()); } @Before @@ -376,7 +376,7 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test { } private void waitForQueueToDrain() throws InterruptedException { - RestHookTestDstu2Test.waitForQueueToDrain(ourRestHookSubscriptionInterceptor); + RestHookTestDstu2Test.waitForQueueToDrain(getRestHookSubscriptionInterceptor()); } @BeforeClass diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/r4/RestHookTestWithInterceptorRegisteredToDaoConfigR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/r4/RestHookTestWithInterceptorRegisteredToDaoConfigR4Test.java index 7d70a3fb644..240f26ec3a5 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/r4/RestHookTestWithInterceptorRegisteredToDaoConfigR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/r4/RestHookTestWithInterceptorRegisteredToDaoConfigR4Test.java @@ -50,12 +50,12 @@ public class RestHookTestWithInterceptorRegisteredToDaoConfigR4Test extends Base ourLog.info("Done deleting all subscriptions"); myDaoConfig.setAllowMultipleDelete(new DaoConfig().isAllowMultipleDelete()); - myDaoConfig.getInterceptors().remove(ourRestHookSubscriptionInterceptor); + myDaoConfig.getInterceptors().remove(getRestHookSubscriptionInterceptor()); } @Before public void beforeRegisterRestHookListener() { - myDaoConfig.getInterceptors().add(ourRestHookSubscriptionInterceptor); + myDaoConfig.getInterceptors().add(getRestHookSubscriptionInterceptor()); } @Before @@ -84,11 +84,11 @@ public class RestHookTestWithInterceptorRegisteredToDaoConfigR4Test extends Base } private void waitForQueueToDrain() throws InterruptedException { - ourLog.info("QUEUE HAS {} ITEMS", ourRestHookSubscriptionInterceptor.getExecutorQueueForUnitTests().size()); - while (ourRestHookSubscriptionInterceptor.getExecutorQueueForUnitTests().size() > 0) { + ourLog.info("QUEUE HAS {} ITEMS", getRestHookSubscriptionInterceptor().getExecutorQueueForUnitTests().size()); + while (getRestHookSubscriptionInterceptor().getExecutorQueueForUnitTests().size() > 0) { Thread.sleep(250); } - ourLog.info("QUEUE HAS {} ITEMS", ourRestHookSubscriptionInterceptor.getExecutorQueueForUnitTests().size()); + ourLog.info("QUEUE HAS {} ITEMS", getRestHookSubscriptionInterceptor().getExecutorQueueForUnitTests().size()); } private Observation sendObservation(String code, String system) throws InterruptedException { diff --git a/hapi-fhir-jpaserver-uhnfhirtest/derby_maintenance.txt b/hapi-fhir-jpaserver-uhnfhirtest/derby_maintenance.txt index b931f5f0f50..3646c2ac1b7 100644 --- a/hapi-fhir-jpaserver-uhnfhirtest/derby_maintenance.txt +++ b/hapi-fhir-jpaserver-uhnfhirtest/derby_maintenance.txt @@ -25,6 +25,7 @@ delete from hfj_spidx_token where res_id in (select res_id from hfj_resource whe delete from hfj_spidx_uri where res_id in (select res_id from hfj_resource where sp_index_status = 2); delete from hfj_res_tag where res_id in (select res_id from hfj_resource where sp_index_status = 2); delete from hfj_search_result where resource_pid in (select res_id from hfj_resource where sp_index_status = 2); +delete from hfj_res_param_present where res_id in (select res_id from hfj_resource where sp_index_status = 2); delete from hfj_resource where res_id in (select res_id from hfj_resource where sp_index_status = 2);