diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/WebsocketDstu21Config.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/WebsocketDstu21Config.java index 30da2db629f..3211add6073 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/WebsocketDstu21Config.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/WebsocketDstu21Config.java @@ -31,7 +31,7 @@ import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; import org.springframework.web.socket.handler.PerConnectionWebSocketHandler; -import ca.uhn.fhir.jpa.subscription.SubscriptionWebsocketHandler; +import ca.uhn.fhir.jpa.subscription.SubscriptionWebsocketHandlerDstu21; @Configuration @EnableWebSocket() @@ -44,7 +44,7 @@ public class WebsocketDstu21Config implements WebSocketConfigurer { @Bean(autowire = Autowire.BY_TYPE) public WebSocketHandler subscriptionWebSocketHandler() { - return new PerConnectionWebSocketHandler(SubscriptionWebsocketHandler.class); + return new PerConnectionWebSocketHandler(SubscriptionWebsocketHandlerDstu21.class); } @Bean diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/WebsocketDstu2Config.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/WebsocketDstu2Config.java index 2e4ea229d30..df0b01ed122 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/WebsocketDstu2Config.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/WebsocketDstu2Config.java @@ -31,7 +31,7 @@ import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; import org.springframework.web.socket.handler.PerConnectionWebSocketHandler; -import ca.uhn.fhir.jpa.subscription.SubscriptionWebsocketHandler; +import ca.uhn.fhir.jpa.subscription.SubscriptionWebsocketHandlerDstu2; @Configuration @EnableWebSocket() @@ -44,7 +44,7 @@ public class WebsocketDstu2Config implements WebSocketConfigurer { @Bean(autowire = Autowire.BY_TYPE) public WebSocketHandler subscriptionWebSocketHandler() { - return new PerConnectionWebSocketHandler(SubscriptionWebsocketHandler.class); + return new PerConnectionWebSocketHandler(SubscriptionWebsocketHandlerDstu2.class); } @Bean diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionWebsocketHandler.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionWebsocketHandlerDstu2.java similarity index 98% rename from hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionWebsocketHandler.java rename to hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionWebsocketHandlerDstu2.java index 39f4ee4abdb..6accdd5ea52 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionWebsocketHandler.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionWebsocketHandlerDstu2.java @@ -50,8 +50,8 @@ import ca.uhn.fhir.rest.server.EncodingEnum; import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; -public class SubscriptionWebsocketHandler extends TextWebSocketHandler implements ISubscriptionWebsocketHandler, Runnable { - private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionWebsocketHandler.class); +public class SubscriptionWebsocketHandlerDstu2 extends TextWebSocketHandler implements ISubscriptionWebsocketHandler, Runnable { + private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionWebsocketHandlerDstu2.class); @Autowired @Qualifier("myFhirContextDstu2") diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionWebsocketHandlerDstu21.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionWebsocketHandlerDstu21.java new file mode 100644 index 00000000000..efeb5e013b2 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionWebsocketHandlerDstu21.java @@ -0,0 +1,344 @@ +package ca.uhn.fhir.jpa.subscription; + +/* + * #%L + * HAPI FHIR JPA Server + * %% + * Copyright (C) 2014 - 2015 University Health Network + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ScheduledFuture; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import org.apache.http.NameValuePair; +import org.apache.http.client.utils.URLEncodedUtils; +import org.hl7.fhir.dstu21.model.IdType; +import org.hl7.fhir.dstu21.model.Subscription; +import org.hl7.fhir.dstu21.model.Subscription.SubscriptionChannelType; +import org.hl7.fhir.dstu21.model.Subscription.SubscriptionStatus; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IIdType; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.dao.IFhirResourceDaoSubscription; +import ca.uhn.fhir.rest.server.Constants; +import ca.uhn.fhir.rest.server.EncodingEnum; +import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; +import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; + +public class SubscriptionWebsocketHandlerDstu21 extends TextWebSocketHandler implements ISubscriptionWebsocketHandler, Runnable { + private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionWebsocketHandlerDstu21.class); + + @Autowired + @Qualifier("myFhirContextDstu21") + private FhirContext myCtx; + + private ScheduledFuture myScheduleFuture; + + private IState myState = new InitialState(); + + @Autowired + private IFhirResourceDaoSubscription mySubscriptionDao; + + private IIdType mySubscriptionId; + private Long mySubscriptionPid; + + @Autowired + @Qualifier("websocketTaskScheduler") + private TaskScheduler myTaskScheduler; + + @Override + public void afterConnectionClosed(WebSocketSession theSession, CloseStatus theStatus) throws Exception { + super.afterConnectionClosed(theSession, theStatus); + ourLog.info("Closing WebSocket connection from {}", theSession.getRemoteAddress()); + } + + @Override + public void afterConnectionEstablished(WebSocketSession theSession) throws Exception { + super.afterConnectionEstablished(theSession); + ourLog.info("Incoming WebSocket connection from {}", theSession.getRemoteAddress()); + } + + protected void handleFailure(Exception theE) { + ourLog.error("Failure during communication", theE); + } + + @Override + protected void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) throws Exception { + ourLog.info("Textmessage: " + theMessage.getPayload()); + + myState.handleTextMessage(theSession, theMessage); + } + + @Override + public void handleTransportError(WebSocketSession theSession, Throwable theException) throws Exception { + super.handleTransportError(theSession, theException); + ourLog.error("Transport error", theException); + } + + @PostConstruct + public void postConstruct() { + ourLog.info("Creating scheduled task for subscription websocket connection"); + myScheduleFuture = myTaskScheduler.scheduleWithFixedDelay(this, 1000); + } + + @PreDestroy + public void preDescroy() { + ourLog.info("Cancelling scheduled task for subscription websocket connection"); + myScheduleFuture.cancel(true); + IState state = myState; + if (state != null) { + state.closing(); + } + } + + @Override + public void run() { + Long subscriptionPid = mySubscriptionPid; + if (subscriptionPid == null) { + return; + } + + ourLog.debug("Subscription {} websocket handler polling", subscriptionPid); + + List results = mySubscriptionDao.getUndeliveredResourcesAndPurge(subscriptionPid); + if (results.isEmpty() == false) { + myState.deliver(results); + } + } + + private class BoundDynamicSubscriptionState implements IState { + + private EncodingEnum myEncoding; + private WebSocketSession mySession; + + public BoundDynamicSubscriptionState(WebSocketSession theSession, EncodingEnum theEncoding) { + mySession = theSession; + myEncoding = theEncoding; + } + + @Override + public void closing() { + ourLog.info("Deleting subscription {}", mySubscriptionId); + try { + mySubscriptionDao.delete(mySubscriptionId); + } catch (Exception e) { + handleFailure(e); + } + } + + @Override + public void deliver(List theResults) { + try { + for (IBaseResource nextResource : theResults) { + ourLog.info("Sending WebSocket message for resource: {}", nextResource.getIdElement()); + String encoded = myEncoding.newParser(myCtx).encodeResourceToString(nextResource); + String payload = "add " + mySubscriptionId.getIdPart() + '\n' + encoded; + mySession.sendMessage(new TextMessage(payload)); + } + } catch (IOException e) { + handleFailure(e); + } + } + + @Override + public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) { + try { + theSession.sendMessage(new TextMessage("Unexpected client message: " + theMessage.getPayload())); + } catch (IOException e) { + handleFailure(e); + } + } + + } + + private class BoundStaticSubscipriptionState implements IState { + + private WebSocketSession mySession; + + public BoundStaticSubscipriptionState(WebSocketSession theSession) { + mySession = theSession; + } + + @Override + public void closing() { + // nothing + } + + @Override + public void deliver(List theResults) { + try { + String payload = "ping " + mySubscriptionId.getIdPart(); + ourLog.info("Sending WebSocket message: {}", payload); + mySession.sendMessage(new TextMessage(payload)); + } catch (IOException e) { + handleFailure(e); + } + } + + @Override + public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) { + try { + theSession.sendMessage(new TextMessage("Unexpected client message: " + theMessage.getPayload())); + } catch (IOException e) { + handleFailure(e); + } + } + + } + + private class InitialState implements IState { + + private IIdType bindSimple(WebSocketSession theSession, String theBindString) { + IdType id = new IdType(theBindString); + + if (!id.hasIdPart() || !id.isIdPartValid()) { + try { + String message = "Invalid bind request - No ID included"; + ourLog.warn(message); + theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), message)); + } catch (IOException e) { + handleFailure(e); + } + return null; + } + + if (id.hasResourceType() == false) { + id = id.withResourceType("Subscription"); + } + + try { + Subscription subscription = mySubscriptionDao.read(id); + mySubscriptionPid = mySubscriptionDao.getSubscriptionTablePidForSubscriptionResource(id); + mySubscriptionId = subscription.getIdElement(); + myState = new BoundStaticSubscipriptionState(theSession); + } catch (ResourceNotFoundException e) { + try { + String message = "Invalid bind request - Unknown subscription: " + id.getValue(); + ourLog.warn(message); + theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), message)); + } catch (IOException e1) { + handleFailure(e); + } + return null; + } + + return id; + } + + private IIdType bingSearch(WebSocketSession theSession, String theRemaining) { + Subscription subscription = new Subscription(); + subscription.getChannel().setType(SubscriptionChannelType.WEBSOCKET); + subscription.setStatus(SubscriptionStatus.ACTIVE); + subscription.setCriteria(theRemaining); + + try { + String params = theRemaining.substring(theRemaining.indexOf('?')+1); + List paramValues = URLEncodedUtils.parse(params, Constants.CHARSET_UTF8, '&'); + EncodingEnum encoding = EncodingEnum.JSON; + for (NameValuePair nameValuePair : paramValues) { + if (Constants.PARAM_FORMAT.equals(nameValuePair.getName())) { + EncodingEnum nextEncoding = Constants.FORMAT_VAL_TO_ENCODING.get(nameValuePair.getValue()); + if (nextEncoding != null) { + encoding = nextEncoding; + } + } + } + + IIdType id = mySubscriptionDao.create(subscription).getId(); + + mySubscriptionPid = mySubscriptionDao.getSubscriptionTablePidForSubscriptionResource(id); + mySubscriptionId = subscription.getIdElement(); + myState = new BoundDynamicSubscriptionState(theSession, encoding); + + return id; + } catch (UnprocessableEntityException e) { + ourLog.warn("Failed to bind subscription: " + e.getMessage()); + try { + theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), "Invalid bind request - " + e.getMessage())); + } catch (IOException e2) { + handleFailure(e2); + } + } catch (Exception e) { + handleFailure(e); + try { + theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), "Invalid bind request - No ID included")); + } catch (IOException e2) { + handleFailure(e2); + } + } + return null; + } + + @Override + public void closing() { + // nothing + } + + @Override + public void deliver(List theResults) { + throw new IllegalStateException(); + } + + @Override + public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) { + String message = theMessage.getPayload(); + if (message.startsWith("bind ")) { + String remaining = message.substring("bind ".length()); + + IIdType subscriptionId; + if (remaining.contains("?")) { + subscriptionId = bingSearch(theSession, remaining); + } else { + subscriptionId = bindSimple(theSession, remaining); + if (subscriptionId == null) { + return; + } + } + + try { + theSession.sendMessage(new TextMessage("bound " + subscriptionId.getIdPart())); + } catch (IOException e) { + handleFailure(e); + } + + } + } + + } + + private interface IState { + + void closing(); + + void deliver(List theResults); + + void handleTextMessage(WebSocketSession theSession, TextMessage theMessage); + + } + +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionWebsocketHandlerFactory.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionWebsocketHandlerFactoryDstu2.java similarity index 83% rename from hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionWebsocketHandlerFactory.java rename to hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionWebsocketHandlerFactoryDstu2.java index 11799795ddf..77c1cc3a44d 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionWebsocketHandlerFactory.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionWebsocketHandlerFactoryDstu2.java @@ -22,12 +22,12 @@ package ca.uhn.fhir.jpa.subscription; import org.springframework.beans.factory.FactoryBean; -public class SubscriptionWebsocketHandlerFactory implements FactoryBean { - static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionWebsocketHandler.class); +public class SubscriptionWebsocketHandlerFactoryDstu2 implements FactoryBean { + static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionWebsocketHandlerDstu2.class); @Override public ISubscriptionWebsocketHandler getObject() throws Exception { - return new SubscriptionWebsocketHandler(); + return new SubscriptionWebsocketHandlerDstu2(); } @Override diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionWebsocketHandlerFactoryDstu21.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionWebsocketHandlerFactoryDstu21.java new file mode 100644 index 00000000000..70f5657be67 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionWebsocketHandlerFactoryDstu21.java @@ -0,0 +1,42 @@ +package ca.uhn.fhir.jpa.subscription; + +/* + * #%L + * HAPI FHIR JPA Server + * %% + * Copyright (C) 2014 - 2015 University Health Network + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.springframework.beans.factory.FactoryBean; + +public class SubscriptionWebsocketHandlerFactoryDstu21 implements FactoryBean { + static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionWebsocketHandlerDstu21.class); + + @Override + public ISubscriptionWebsocketHandler getObject() throws Exception { + return new SubscriptionWebsocketHandlerDstu21(); + } + + @Override + public Class getObjectType() { + return ISubscriptionWebsocketHandler.class; + } + + @Override + public boolean isSingleton() { + return false; + } +} diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu2/BaseJpaDstu2Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu2/BaseJpaDstu2Test.java index 1149b1548d4..abfa5310619 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu2/BaseJpaDstu2Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu2/BaseJpaDstu2Test.java @@ -82,7 +82,7 @@ import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor; //@formatter:off @RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes= {TestDstu2Config.class, ca.uhn.fhir.jpa.config.WebsocketDstu2Config.class}) +@ContextConfiguration(classes= {TestDstu2Config.class/*, ca.uhn.fhir.jpa.config.WebsocketDstu2Config.class*/ }) //@formatter:on public abstract class BaseJpaDstu2Test extends BaseJpaTest { diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu21/BaseJpaDstu21Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu21/BaseJpaDstu21Test.java index bca6a3f2356..7fda8695b58 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu21/BaseJpaDstu21Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu21/BaseJpaDstu21Test.java @@ -83,7 +83,7 @@ import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor; //@formatter:off @RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes= {TestDstu21Config.class, ca.uhn.fhir.jpa.config.WebsocketDstu2Config.class}) +@ContextConfiguration(classes= {TestDstu21Config.class /*, ca.uhn.fhir.jpa.config.WebsocketDstu21Config.class*/ }) //@formatter:on public abstract class BaseJpaDstu21Test extends BaseJpaTest { diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/BaseResourceProviderDstu2Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/BaseResourceProviderDstu2Test.java index 6f7ed2dcf50..e4e99ac3f3d 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/BaseResourceProviderDstu2Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/BaseResourceProviderDstu2Test.java @@ -15,11 +15,13 @@ import org.eclipse.jetty.servlet.ServletHolder; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; +import org.springframework.web.context.ContextLoader; import org.springframework.web.context.WebApplicationContext; import org.springframework.web.context.support.AnnotationConfigWebApplicationContext; import org.springframework.web.context.support.GenericWebApplicationContext; import org.springframework.web.servlet.DispatcherServlet; +import ca.uhn.fhir.jpa.config.WebsocketDstu2Config; import ca.uhn.fhir.jpa.dao.dstu2.BaseJpaDstu2Test; import ca.uhn.fhir.jpa.testutil.RandomServerPortProvider; import ca.uhn.fhir.model.api.Bundle; @@ -115,16 +117,14 @@ public abstract class BaseResourceProviderDstu2Test extends BaseJpaDstu2Test { GenericWebApplicationContext webApplicationContext = new GenericWebApplicationContext(); webApplicationContext.setParent(myAppCtx); webApplicationContext.refresh(); -// ContextLoaderListener loaderListener = new ContextLoaderListener(webApplicationContext); -// loaderListener.initWebApplicationContext(mock(ServletContext.class)); -// + proxyHandler.getServletContext().setAttribute(WebApplicationContext.ROOT_WEB_APPLICATION_CONTEXT_ATTRIBUTE, webApplicationContext); DispatcherServlet dispatcherServlet = new DispatcherServlet(); -// dispatcherServlet.setApplicationContext(webApplicationContext); dispatcherServlet.setContextClass(AnnotationConfigWebApplicationContext.class); ServletHolder subsServletHolder = new ServletHolder(); subsServletHolder.setServlet(dispatcherServlet); + subsServletHolder.setInitParameter(ContextLoader.CONFIG_LOCATION_PARAM, WebsocketDstu2Config.class.getName()); proxyHandler.addServlet(subsServletHolder, "/*"); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu21/BaseResourceProviderDstu21Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu21/BaseResourceProviderDstu21Test.java index 6bef64b68bb..ddd30332a04 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu21/BaseResourceProviderDstu21Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu21/BaseResourceProviderDstu21Test.java @@ -18,17 +18,16 @@ import org.hl7.fhir.dstu21.model.Patient; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; +import org.springframework.web.context.ContextLoader; import org.springframework.web.context.WebApplicationContext; import org.springframework.web.context.support.AnnotationConfigWebApplicationContext; import org.springframework.web.context.support.GenericWebApplicationContext; import org.springframework.web.servlet.DispatcherServlet; +import ca.uhn.fhir.jpa.config.WebsocketDstu21Config; import ca.uhn.fhir.jpa.dao.dstu21.BaseJpaDstu21Test; -import ca.uhn.fhir.jpa.provider.JpaConformanceProviderDstu2; import ca.uhn.fhir.jpa.provider.JpaConformanceProviderDstu21; import ca.uhn.fhir.jpa.testutil.RandomServerPortProvider; -import ca.uhn.fhir.model.api.BundleEntry; -import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.narrative.DefaultThymeleafNarrativeGenerator; import ca.uhn.fhir.rest.client.IGenericClient; import ca.uhn.fhir.rest.client.ServerValidationModeEnum; @@ -120,6 +119,7 @@ public abstract class BaseResourceProviderDstu21Test extends BaseJpaDstu21Test { dispatcherServlet.setContextClass(AnnotationConfigWebApplicationContext.class); ServletHolder subsServletHolder = new ServletHolder(); subsServletHolder.setServlet(dispatcherServlet); + subsServletHolder.setInitParameter(ContextLoader.CONFIG_LOCATION_PARAM, WebsocketDstu21Config.class.getName()); proxyHandler.addServlet(subsServletHolder, "/*"); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu21/SubscriptionsDstu21Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu21/SubscriptionsDstu21Test.java new file mode 100644 index 00000000000..e9d70455e79 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu21/SubscriptionsDstu21Test.java @@ -0,0 +1,463 @@ +package ca.uhn.fhir.jpa.provider.dstu21; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.hl7.fhir.dstu21.model.Observation; +import org.hl7.fhir.dstu21.model.Observation.ObservationStatus; +import org.hl7.fhir.dstu21.model.Patient; +import org.hl7.fhir.dstu21.model.Subscription; +import org.hl7.fhir.dstu21.model.Subscription.SubscriptionChannelType; +import org.hl7.fhir.dstu21.model.Subscription.SubscriptionStatus; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IIdType; +import org.junit.Before; +import org.junit.Test; + +import ca.uhn.fhir.jpa.util.SubscriptionsRequireManualActivationInterceptorDstu21; +import ca.uhn.fhir.model.primitive.IdDt; +import ca.uhn.fhir.rest.server.EncodingEnum; +import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; + +public class SubscriptionsDstu21Test extends BaseResourceProviderDstu21Test { + + private static final String WEBSOCKET_PATH = "/websocket/dstu2.1"; + + public class BaseSocket { + protected String myError; + protected boolean myGotBound; + protected int myPingCount; + protected String mySubsId; + + } + + private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionsDstu21Test.class); + + @Before + public void beforeEnableScheduling() { + myDaoConfig.setSchedulingDisabled(false); + } + + @Override + public void beforeCreateInterceptor() { + super.beforeCreateInterceptor(); + + SubscriptionsRequireManualActivationInterceptorDstu21 interceptor = new SubscriptionsRequireManualActivationInterceptorDstu21(); + interceptor.setDao(mySubscriptionDao); + myDaoConfig.getInterceptors().add(interceptor); + } + + private void sleepUntilPingCount(BaseSocket socket, int wantPingCount) throws InterruptedException { + ourLog.info("Entering loop"); + for (long start = System.currentTimeMillis(), now = System.currentTimeMillis(); now - start <= 20000; now = System.currentTimeMillis()) { + ourLog.debug("Starting"); + if (socket.myError != null) { + fail(socket.myError); + } + if (socket.myPingCount >= wantPingCount) { + ourLog.info("Breaking loop"); + break; + } + ourLog.debug("Sleeping"); + Thread.sleep(100); + } + + ourLog.info("Out of loop, pingcount {} error {}", socket.myPingCount, socket.myError); + + assertNull(socket.myError, socket.myError); + assertEquals(wantPingCount, socket.myPingCount); + } + + @Test + public void testCreateInvalidNoStatus() { + Subscription subs = new Subscription(); + subs.getChannel().setType(SubscriptionChannelType.RESTHOOK); + subs.setCriteria("Observation?identifier=123"); + try { + ourClient.create().resource(subs).execute(); + fail(); + } catch (UnprocessableEntityException e) { + assertEquals("HTTP 422 Unprocessable Entity: Can not create resource: Subscription.status must be populated", e.getMessage()); + } + + subs.setId("ABC"); + try { + ourClient.update().resource(subs).execute(); + fail(); + } catch (UnprocessableEntityException e) { + assertEquals("HTTP 422 Unprocessable Entity: Can not create resource: Subscription.status must be populated", e.getMessage()); + } + + subs.setStatus(SubscriptionStatus.REQUESTED); + ourClient.update().resource(subs).execute(); + } + + @Test + public void testUpdateToInvalidStatus() { + Subscription subs = new Subscription(); + subs.getChannel().setType(SubscriptionChannelType.RESTHOOK); + subs.setCriteria("Observation?identifier=123"); + subs.setStatus(SubscriptionStatus.REQUESTED); + IIdType id = ourClient.create().resource(subs).execute().getId(); + subs.setId(id); + + try { + subs.setStatus(SubscriptionStatus.ACTIVE); + ourClient.update().resource(subs).execute(); + fail(); + } catch (UnprocessableEntityException e) { + assertEquals("HTTP 422 Unprocessable Entity: Subscription.status can not be changed from 'requested' to 'active'", e.getMessage()); + } + + try { + subs.setStatus((SubscriptionStatus) null); + ourClient.update().resource(subs).execute(); + fail(); + } catch (UnprocessableEntityException e) { + assertEquals("HTTP 422 Unprocessable Entity: Can not update resource: Subscription.status must be populated", e.getMessage()); + } + + subs.setStatus(SubscriptionStatus.OFF); + ourClient.update().resource(subs).execute(); + } + + + @Test + public void testCreateInvalidWrongStatus() { + Subscription subs = new Subscription(); + subs.getChannel().setType(SubscriptionChannelType.RESTHOOK); + subs.setStatus(SubscriptionStatus.ACTIVE); + subs.setCriteria("Observation?identifier=123"); + try { + ourClient.create().resource(subs).execute(); + fail(); + } catch (UnprocessableEntityException e) { + assertEquals("HTTP 422 Unprocessable Entity: Subscription.status must be 'off' or 'requested' on a newly created subscription", e.getMessage()); + } + + subs.setId("ABC"); + try { + ourClient.update().resource(subs).execute(); + fail(); + } catch (UnprocessableEntityException e) { + assertEquals("HTTP 422 Unprocessable Entity: Subscription.status must be 'off' or 'requested' on a newly created subscription", e.getMessage()); + } + } + + @Test + public void testSubscriptionDynamic() throws Exception { + myDaoConfig.setSubscriptionEnabled(true); + myDaoConfig.setSubscriptionPollDelay(0); + + String methodName = "testSubscriptionDynamic"; + Patient p = new Patient(); + p.addName().addFamily(methodName); + IIdType pId = myPatientDao.create(p).getId().toUnqualifiedVersionless(); + + String criteria = "Observation?subject=Patient/" + pId.getIdPart(); + DynamicEchoSocket socket = new DynamicEchoSocket(criteria, EncodingEnum.JSON); + WebSocketClient client = new WebSocketClient(); + try { + client.start(); + URI echoUri = new URI("ws://localhost:" + ourPort + WEBSOCKET_PATH); + client.connect(socket, echoUri, new ClientUpgradeRequest()); + ourLog.info("Connecting to : {}", echoUri); + + sleepUntilPingCount(socket, 1); + + mySubscriptionDao.read(new IdDt("Subscription", socket.mySubsId)); + + Observation obs = new Observation(); + obs.getSubject().setReferenceElement(pId); + obs.setStatus(ObservationStatus.FINAL); + IIdType afterId1 = myObservationDao.create(obs).getId().toUnqualifiedVersionless(); + + obs = new Observation(); + obs.getSubject().setReferenceElement(pId); + obs.setStatus(ObservationStatus.FINAL); + IIdType afterId2 = myObservationDao.create(obs).getId().toUnqualifiedVersionless(); + + Thread.sleep(100); + + sleepUntilPingCount(socket, 3); + + obs = (Observation) socket.myReceived.get(0); + assertEquals(afterId1.getValue(), obs.getIdElement().toUnqualifiedVersionless().getValue()); + + obs = (Observation) socket.myReceived.get(1); + assertEquals(afterId2.getValue(), obs.getIdElement().toUnqualifiedVersionless().getValue()); + + obs = new Observation(); + obs.getSubject().setReferenceElement(pId); + obs.setStatus(ObservationStatus.FINAL); + IIdType afterId3 = myObservationDao.create(obs).getId().toUnqualifiedVersionless(); + + sleepUntilPingCount(socket, 4); + + obs = (Observation) socket.myReceived.get(2); + assertEquals(afterId3.getValue(), obs.getIdElement().toUnqualifiedVersionless().getValue()); + + } finally { + try { + client.stop(); + } catch (Exception e) { + ourLog.error("Failure", e); + fail(e.getMessage()); + } + } + + } + + + @Test + public void testSubscriptionDynamicXml() throws Exception { + myDaoConfig.setSubscriptionEnabled(true); + myDaoConfig.setSubscriptionPollDelay(0); + + String methodName = "testSubscriptionDynamic"; + Patient p = new Patient(); + p.addName().addFamily(methodName); + IIdType pId = myPatientDao.create(p).getId().toUnqualifiedVersionless(); + + String criteria = "Observation?subject=Patient/" + pId.getIdPart() + "&_format=xml"; + DynamicEchoSocket socket = new DynamicEchoSocket(criteria, EncodingEnum.XML); + WebSocketClient client = new WebSocketClient(); + try { + client.start(); + URI echoUri = new URI("ws://localhost:" + ourPort + WEBSOCKET_PATH); + client.connect(socket, echoUri, new ClientUpgradeRequest()); + ourLog.info("Connecting to : {}", echoUri); + + sleepUntilPingCount(socket, 1); + + mySubscriptionDao.read(new IdDt("Subscription", socket.mySubsId)); + + Observation obs = new Observation(); + obs.getMeta().addProfile("http://foo"); + obs.getSubject().setReferenceElement(pId); + obs.setStatus(ObservationStatus.FINAL); + IIdType afterId1 = myObservationDao.create(obs).getId().toUnqualifiedVersionless(); + + obs = new Observation(); + obs.getSubject().setReferenceElement(pId); + obs.setStatus(ObservationStatus.FINAL); + IIdType afterId2 = myObservationDao.create(obs).getId().toUnqualifiedVersionless(); + + Thread.sleep(100); + + sleepUntilPingCount(socket, 3); + + obs = (Observation) socket.myReceived.get(0); + assertEquals(afterId1.getValue(), obs.getIdElement().toUnqualifiedVersionless().getValue()); + + obs = (Observation) socket.myReceived.get(1); + assertEquals(afterId2.getValue(), obs.getIdElement().toUnqualifiedVersionless().getValue()); + + obs = new Observation(); + obs.getSubject().setReferenceElement(pId); + obs.setStatus(ObservationStatus.FINAL); + IIdType afterId3 = myObservationDao.create(obs).getId().toUnqualifiedVersionless(); + + sleepUntilPingCount(socket, 4); + + obs = (Observation) socket.myReceived.get(2); + assertEquals(afterId3.getValue(), obs.getIdElement().toUnqualifiedVersionless().getValue()); + + } finally { + try { + client.stop(); + } catch (Exception e) { + ourLog.error("Failure", e); + fail(e.getMessage()); + } + } + + } + + @Test + public void testSubscriptionSimple() throws Exception { + myDaoConfig.setSubscriptionEnabled(true); + myDaoConfig.setSubscriptionPollDelay(0); + + String methodName = "testSubscriptionResourcesAppear"; + Patient p = new Patient(); + p.addName().addFamily(methodName); + IIdType pId = myPatientDao.create(p).getId().toUnqualifiedVersionless(); + + Subscription subs = new Subscription(); + subs.getMeta().addProfile("http://foo"); + subs.getChannel().setType(SubscriptionChannelType.WEBSOCKET); + subs.setCriteria("Observation?subject=Patient/" + pId.getIdPart()); + subs.setStatus(SubscriptionStatus.ACTIVE); + String subsId = mySubscriptionDao.create(subs).getId().getIdPart(); + + Thread.sleep(100); + + Observation obs = new Observation(); + obs.getSubject().setReferenceElement(pId); + obs.setStatus(ObservationStatus.FINAL); + IIdType afterId1 = myObservationDao.create(obs).getId().toUnqualifiedVersionless(); + + obs = new Observation(); + obs.getSubject().setReferenceElement(pId); + obs.setStatus(ObservationStatus.FINAL); + IIdType afterId2 = myObservationDao.create(obs).getId().toUnqualifiedVersionless(); + + Thread.sleep(100); + + WebSocketClient client = new WebSocketClient(); + SimpleEchoSocket socket = new SimpleEchoSocket(subsId); + try { + client.start(); + URI echoUri = new URI("ws://localhost:" + ourPort + WEBSOCKET_PATH); + ClientUpgradeRequest request = new ClientUpgradeRequest(); + client.connect(socket, echoUri, request); + ourLog.info("Connecting to : {}", echoUri); + + sleepUntilPingCount(socket, 1); + + obs = new Observation(); + obs.getSubject().setReferenceElement(pId); + obs.setStatus(ObservationStatus.FINAL); + IIdType afterId3 = myObservationDao.create(obs).getId().toUnqualifiedVersionless(); + + sleepUntilPingCount(socket, 2); + + } finally { + try { + client.stop(); + } catch (Exception e) { + ourLog.error("Failure", e); + fail(e.getMessage()); + } + } + + } + + @Test + public void testUpdateFails() { + Subscription subs = new Subscription(); + subs.getChannel().setType(SubscriptionChannelType.RESTHOOK); + subs.setStatus(SubscriptionStatus.REQUESTED); + subs.setCriteria("Observation?identifier=123"); + IIdType id = ourClient.create().resource(subs).execute().getId().toUnqualifiedVersionless(); + + subs.setId(id); + + try { + subs.setStatus(SubscriptionStatus.ACTIVE); + ourClient.update().resource(subs).execute(); + fail(); + } catch (UnprocessableEntityException e) { + assertEquals("HTTP 422 Unprocessable Entity: Subscription.status can not be changed from 'requested' to 'active'", e.getMessage()); + } + + try { + subs.setStatus((SubscriptionStatus) null); + ourClient.update().resource(subs).execute(); + fail(); + } catch (UnprocessableEntityException e) { + assertEquals("HTTP 422 Unprocessable Entity: Can not update resource: Subscription.status must be populated", e.getMessage()); + } + + subs.setStatus(SubscriptionStatus.OFF); + } + + /** + * Basic Echo Client Socket + */ + @WebSocket(maxTextMessageSize = 64 * 1024) + public class SimpleEchoSocket extends BaseSocket { + + @SuppressWarnings("unused") + private Session session; + + public SimpleEchoSocket(String theSubsId) { + mySubsId = theSubsId; + } + + @OnWebSocketConnect + public void onConnect(Session session) { + ourLog.info("Got connect: {}", session); + this.session = session; + try { + String sending = "bind " + mySubsId; + ourLog.info("Sending: {}", sending); + session.getRemote().sendString(sending); + } catch (Throwable t) { + ourLog.error("Failure", t); + } + } + + @OnWebSocketMessage + public void onMessage(String theMsg) { + ourLog.info("Got msg: {}", theMsg); + if (theMsg.equals("bound " + mySubsId)) { + myGotBound = true; + } else if (myGotBound && theMsg.startsWith("ping " + mySubsId)) { + myPingCount++; + } else { + myError = "Unexpected message: " + theMsg; + } + } + } + + /** + * Basic Echo Client Socket + */ + @WebSocket(maxTextMessageSize = 64 * 1024) + public class DynamicEchoSocket extends BaseSocket { + + private List myReceived = new ArrayList(); + @SuppressWarnings("unused") + private Session session; + private String myCriteria; + private EncodingEnum myEncoding; + + public DynamicEchoSocket(String theCriteria, EncodingEnum theEncoding) { + myCriteria = theCriteria; + myEncoding = theEncoding; + } + + @OnWebSocketConnect + public void onConnect(Session session) { + ourLog.info("Got connect: {}", session); + this.session = session; + try { + String sending = "bind " + myCriteria; + ourLog.info("Sending: {}", sending); + session.getRemote().sendString(sending); + } catch (Throwable t) { + ourLog.error("Failure", t); + } + } + + @OnWebSocketMessage + public void onMessage(String theMsg) { + ourLog.info("Got msg: {}", theMsg); + if (theMsg.startsWith("bound ")) { + myGotBound = true; + mySubsId = (theMsg.substring("bound ".length())); + myPingCount++; + } else if (myGotBound && theMsg.startsWith("add " + mySubsId + "\n")) { + String text = theMsg.substring(("add " + mySubsId + "\n").length()); + IBaseResource res = myEncoding.newParser(myFhirCtx).parseResource(text); + myReceived.add(res); + myPingCount++; + } else { + myError = "Unexpected message: " + theMsg; + } + } + } +} diff --git a/pom.xml b/pom.xml index f7922d01791..1648afd80df 100644 --- a/pom.xml +++ b/pom.xml @@ -235,25 +235,23 @@ ${user.home}/sites/scm/hapi-fhir - 4.4 - 4.4 10.12.1.1 2.22.1 - 9.2.14.v20151106 + + 9.3.6.v20151106 - 5.0.5.Final + 5.0.6.Final 5.2.2.Final 2.5.3 2.18.1 1.8 - 2.8 3.4 2.4 1.1.8 2.7.1 - 4.3.6 - 4.2.3.RELEASE + 4.4.4 + 4.2.4.RELEASE 2.1.4.RELEASE 1.0.1 1.6 @@ -324,7 +322,7 @@ javax.mail javax.mail-api - 1.5.4 + 1.5.5 javax.servlet @@ -344,7 +342,7 @@ javax.ws.rs javax.ws.rs-api - 2.0 + 2.0.1 lt.velykis.maven.skins @@ -379,7 +377,7 @@ org.apache.httpcomponents httpclient - 4.4 + 4.5.1 org.apache.httpcomponents @@ -389,7 +387,7 @@ org.apache.httpcomponents httpcore - 4.4 + 4.4.4 org.apache.lucene @@ -687,7 +685,7 @@ com.google.errorprone error_prone_core - 2.0.6 + 2.0.7 org.codehaus.plexus @@ -755,7 +753,7 @@ org.codehaus.mojo build-helper-maven-plugin - 1.9.1 + 1.10 org.codehaus.mojo @@ -1331,7 +1329,7 @@ org.apache.maven.plugins maven-project-info-reports-plugin - ${maven_project_info_plugin_version} + 2.8.1 false @@ -1410,7 +1408,7 @@ org.codehaus.mojo findbugs-maven-plugin - 3.0.2 + 3.0.3 ./hapi-fhir-base/target/classes diff --git a/restful-server-example-test/.classpath b/restful-server-example-test/.classpath index fd7ad7fbda7..afcbba556d2 100644 --- a/restful-server-example-test/.classpath +++ b/restful-server-example-test/.classpath @@ -12,6 +12,7 @@ + diff --git a/restful-server-example-test/src/test/java/ca/uhn/example/ExampleTest.java b/restful-server-example-test/src/test/java/ca/uhn/example/ExampleTest.java index 86d2e7200f2..9d7a943dfa6 100644 --- a/restful-server-example-test/src/test/java/ca/uhn/example/ExampleTest.java +++ b/restful-server-example-test/src/test/java/ca/uhn/example/ExampleTest.java @@ -2,6 +2,8 @@ package ca.uhn.example; import static org.junit.Assert.*; +import java.io.File; + import org.eclipse.jetty.server.Server; import org.eclipse.jetty.webapp.WebAppContext; import org.hamcrest.core.StringContains; @@ -77,7 +79,7 @@ public class ExampleTest { WebAppContext root = new WebAppContext(); root.setAllowDuplicateFragmentNames(true); - root.setWar("file:../restful-server-example/target/restful-server-example.war"); + root.setWar(new File("../restful-server-example/target/restful-server-example.war").toURI().toString()); root.setContextPath("/"); root.setAttribute(WebAppContext.BASETEMPDIR, "target/tempextrtact"); root.setParentLoaderPriority(false); diff --git a/restful-server-example-test/src/test/resources/logback-test.xml b/restful-server-example-test/src/test/resources/logback-test.xml new file mode 100644 index 00000000000..e5cbbb9c22e --- /dev/null +++ b/restful-server-example-test/src/test/resources/logback-test.xml @@ -0,0 +1,30 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} [%file:%line] - %msg%n + + + + + + + + + + + + + + + + + + + + + diff --git a/src/changes/changes.xml b/src/changes/changes.xml index c918556b385..b03d4491a55 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -12,8 +12,11 @@ latest versions (dependent HAPI modules listed in brackets): -
  • Hibernate (JPA, Web Tester): 5.0.3 -> 5.0.5
  • -
  • Springframework (JPA, Web Tester): 4.2.2 -> 4.2.3
  • +
  • Hibernate (JPA, Web Tester): 5.0.3 -> 5.0.6
  • +
  • Springframework (JPA, Web Tester): 4.2.2 -> 4.2.4
  • +
  • Phloc-Commons (Schematron Validator): 4.3.6 -> 4.4.4
  • +
  • Apache httpclient (Client): 4.4 -> 4.5.1
  • +
  • Apache httpcore (Client): 4.4 -> 4.4.4
  • ]]>