diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirDao.java index 3e9e9fae6cf..75128bbfd60 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirDao.java @@ -111,6 +111,7 @@ import ca.uhn.fhir.rest.method.QualifiedParamList; import ca.uhn.fhir.rest.method.RestSearchParameterTypeEnum; import ca.uhn.fhir.rest.param.DateRangeParam; import ca.uhn.fhir.rest.server.Constants; +import ca.uhn.fhir.rest.server.EncodingEnum; import ca.uhn.fhir.rest.server.IBundleProvider; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; @@ -715,7 +716,7 @@ public abstract class BaseHapiFhirDao implements IDao { return ids; } - static SearchParameterMap translateMatchUrl(String theMatchUrl, RuntimeResourceDefinition resourceDef) { + public static SearchParameterMap translateMatchUrl(String theMatchUrl, RuntimeResourceDefinition resourceDef) { SearchParameterMap paramMap = new SearchParameterMap(); List parameters; try { @@ -778,7 +779,11 @@ public abstract class BaseHapiFhirDao implements IDao { } continue; } - + + if (nextParamName.startsWith("_")) { + continue; + } + RuntimeSearchParam paramDef = resourceDef.getSearchParam(nextParamName); if (paramDef == null) { throw new InvalidRequestException("Failed to parse match URL[" + theMatchUrl + "] - Resource type " + resourceDef.getName() + " does not have a parameter with name: " + nextParamName); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java index d0b46cfa3e8..615d3c22adb 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java @@ -2013,6 +2013,9 @@ public abstract class BaseHapiFhirResourceDao extends BaseH for (Tuple next : query.getResultList()) { loadPids.add(next.get(0, Long.class)); } + if (loadPids.isEmpty()) { + return new SimpleBundleProvider(); + } } else { loadPids = searchForIdsWithAndOr(theParams); if (loadPids.isEmpty()) { @@ -2028,7 +2031,7 @@ public abstract class BaseHapiFhirResourceDao extends BaseH CriteriaQuery cq = builder.createQuery(Long.class); Root from = cq.from(ResourceTable.class); cq.select(from.get("myId").as(Long.class)); - + Predicate predicateIds = (from.get("myId").in(loadPids)); Predicate predicateLower = lu.getLowerBoundAsInstant() != null ? builder.greaterThanOrEqualTo(from. get("myUpdated"), lu.getLowerBoundAsInstant()) : null; Predicate predicateUpper = lu.getUpperBoundAsInstant() != null ? builder.lessThanOrEqualTo(from. get("myUpdated"), lu.getUpperBoundAsInstant()) : null; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/SearchParameterMap.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/SearchParameterMap.java index 7e29eafef26..c01ef3c185d 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/SearchParameterMap.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/SearchParameterMap.java @@ -46,6 +46,7 @@ public class SearchParameterMap extends LinkedHashMap myIncludes; private DateRangeParam myLastUpdated; private Set myRevIncludes; + private SortSpec mySort; public void add(String theName, IQueryParameterAnd theAnd) { @@ -76,8 +77,8 @@ public class SearchParameterMap extends LinkedHashMap mySubscriptionDao; - + private IIdType mySubscriptionId; private Long mySubscriptionPid; - + @Autowired private TaskScheduler myTaskScheduler; - + @Override public void afterConnectionClosed(WebSocketSession theSession, CloseStatus theStatus) throws Exception { super.afterConnectionClosed(theSession, theStatus); @@ -68,15 +76,15 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement super.afterConnectionEstablished(theSession); ourLog.info("Incoming WebSocket connection from {}", theSession.getRemoteAddress()); } - + protected void handleFailure(Exception theE) { ourLog.error("Failure during communication", theE); } - + @Override protected void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) throws Exception { ourLog.info("Textmessage: " + theMessage.getPayload()); - + myState.handleTextMessage(theSession, theMessage); } @@ -85,22 +93,26 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement ourLog.info("Creating scheduled task for subscription websocket connection"); myScheduleFuture = myTaskScheduler.scheduleWithFixedDelay(this, 1000); } - + @PreDestroy public void preDescroy() { ourLog.info("Cancelling scheduled task for subscription websocket connection"); myScheduleFuture.cancel(true); + IState state = myState; + if (state != null) { + state.closing(); + } } - + @Override public void run() { Long subscriptionPid = mySubscriptionPid; if (subscriptionPid == null) { return; } - + ourLog.debug("Subscription {} websocket handler polling", subscriptionPid); - + List results = mySubscriptionDao.getUndeliveredResourcesAndPurge(subscriptionPid); if (results.isEmpty() == false) { myState.deliver(results); @@ -134,7 +146,60 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement handleFailure(e); } } - + + @Override + public void closing() { + // nothing + } + + } + + @Autowired + private FhirContext myCtx; + + private class ResourceBoundState implements IState { + + private WebSocketSession mySession; + private EncodingEnum myEncoding; + + public ResourceBoundState(WebSocketSession theSession, EncodingEnum theEncoding) { + mySession = theSession; + myEncoding = theEncoding; + } + + @Override + public void deliver(List theResults) { + try { + for (IBaseResource nextResource : theResults) { + ourLog.info("Sending WebSocket message for resource: {}", nextResource.getIdElement()); + String encoded = myEncoding.newParser(myCtx).encodeResourceToString(nextResource); + String payload = "add " + mySubscriptionId.getIdPart() + '\n' + encoded; + mySession.sendMessage(new TextMessage(payload)); + } + } catch (IOException e) { + handleFailure(e); + } + } + + @Override + public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) { + try { + theSession.sendMessage(new TextMessage("Unexpected client message: " + theMessage.getPayload())); + } catch (IOException e) { + handleFailure(e); + } + } + + @Override + public void closing() { + ourLog.info("Deleting subscription {}", mySubscriptionId); + try { + mySubscriptionDao.delete(mySubscriptionId); + } catch (Exception e) { + handleFailure(e); + } + } + } private class InitialState implements IState { @@ -148,52 +213,119 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) { String message = theMessage.getPayload(); if (message.startsWith("bind ")) { - IdDt id = new IdDt(message.substring("bind ".length())); - - if (!id.hasIdPart() || !id.isIdPartValid()) { - try { - theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), "Invalid bind request - No ID included")); - } catch (IOException e) { - handleFailure(e); + String remaining = message.substring("bind ".length()); + + IIdType subscriptionId; + if (remaining.contains("?")) { + subscriptionId = bingSearch(theSession, remaining); + } else { + subscriptionId = bindSimple(theSession, remaining); + if (subscriptionId == null) { + return; } - return; } - - if (id.hasResourceType()==false) { - id = id.withResourceType("Subscription"); - } - + try { - Subscription subscription = mySubscriptionDao.read(id); - mySubscriptionPid = mySubscriptionDao.getSubscriptionTablePidForSubscriptionResource(id); - mySubscriptionId = subscription.getIdElement(); - myState = new SimpleBoundState(theSession); - } catch (ResourceNotFoundException e) { - try { - theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), "Invalid bind request - Unknown subscription: " + id.getValue())); - } catch (IOException e1) { - handleFailure(e); - } - return; - } - - try { - theSession.sendMessage(new TextMessage("bound " + id.getIdPart())); + theSession.sendMessage(new TextMessage("bound " + subscriptionId.getIdPart())); } catch (IOException e) { handleFailure(e); } - + } } - + + private IIdType bingSearch(WebSocketSession theSession, String theRemaining) { + Subscription subscription = new Subscription(); + subscription.getChannel().setType(SubscriptionChannelTypeEnum.WEBSOCKET); + subscription.setStatus(SubscriptionStatusEnum.ACTIVE); + subscription.setCriteria(theRemaining); + + try { + String params = theRemaining.substring(theRemaining.indexOf('?')); + List paramValues = URLEncodedUtils.parse("http://example.com" + params, Constants.CHARSET_UTF8); + EncodingEnum encoding = EncodingEnum.JSON; + for (NameValuePair nameValuePair : paramValues) { + if (Constants.PARAM_FORMAT.equals(nameValuePair)) { + EncodingEnum nextEncoding = EncodingEnum.forContentType(nameValuePair.getValue()); + if (nextEncoding != null) { + encoding = nextEncoding; + } + } + } + + IIdType id = mySubscriptionDao.create(subscription).getId(); + + mySubscriptionPid = mySubscriptionDao.getSubscriptionTablePidForSubscriptionResource(id); + mySubscriptionId = subscription.getIdElement(); + myState = new ResourceBoundState(theSession, encoding); + + return id; + } catch (UnprocessableEntityException e) { + ourLog.warn("Failed to bind subscription: " + e.getMessage()); + try { + theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), "Invalid bind request - " + e.getMessage())); + } catch (IOException e2) { + handleFailure(e2); + } + } catch (Exception e) { + handleFailure(e); + try { + theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), "Invalid bind request - No ID included")); + } catch (IOException e2) { + handleFailure(e2); + } + } + return null; + } + + private IIdType bindSimple(WebSocketSession theSession, String theBindString) { + IdDt id = new IdDt(theBindString); + + if (!id.hasIdPart() || !id.isIdPartValid()) { + try { + theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), "Invalid bind request - No ID included")); + } catch (IOException e) { + handleFailure(e); + } + return null; + } + + if (id.hasResourceType() == false) { + id = id.withResourceType("Subscription"); + } + + try { + Subscription subscription = mySubscriptionDao.read(id); + mySubscriptionPid = mySubscriptionDao.getSubscriptionTablePidForSubscriptionResource(id); + mySubscriptionId = subscription.getIdElement(); + myState = new SimpleBoundState(theSession); + } catch (ResourceNotFoundException e) { + try { + theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), "Invalid bind request - Unknown subscription: " + id.getValue())); + } catch (IOException e1) { + handleFailure(e); + } + return null; + } + + return id; + } + + @Override + public void closing() { + // nothing + } + } - private interface IState{ + private interface IState { void deliver(List theResults); + void closing(); + void handleTextMessage(WebSocketSession theSession, TextMessage theMessage); - + } - + } \ No newline at end of file diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/FhirResourceDaoDstu2SearchTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/FhirResourceDaoDstu2SearchTest.java index 9137609fc73..8bf241731ec 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/FhirResourceDaoDstu2SearchTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/FhirResourceDaoDstu2SearchTest.java @@ -65,6 +65,8 @@ import ca.uhn.fhir.model.primitive.DateTimeDt; import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.model.primitive.InstantDt; import ca.uhn.fhir.model.primitive.StringDt; +import ca.uhn.fhir.rest.api.SortOrderEnum; +import ca.uhn.fhir.rest.api.SortSpec; import ca.uhn.fhir.rest.param.CompositeParam; import ca.uhn.fhir.rest.param.DateParam; import ca.uhn.fhir.rest.param.DateRangeParam; @@ -78,6 +80,7 @@ import ca.uhn.fhir.rest.param.TokenAndListParam; import ca.uhn.fhir.rest.param.TokenOrListParam; import ca.uhn.fhir.rest.param.TokenParam; import ca.uhn.fhir.rest.param.UriParam; +import ca.uhn.fhir.rest.server.Constants; import ca.uhn.fhir.rest.server.IBundleProvider; import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; @@ -85,6 +88,19 @@ import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; public class FhirResourceDaoDstu2SearchTest extends BaseJpaDstu2Test { private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(FhirResourceDaoDstu2SearchTest.class); + @Test + public void testSearchWithEmptySort() { + SearchParameterMap criteriaUrl = new SearchParameterMap(); + DateRangeParam range = new DateRangeParam(); + range.setLowerBound(new DateParam(QuantityCompararatorEnum.GREATERTHAN, 1000000)); + range.setUpperBound(new DateParam(QuantityCompararatorEnum.LESSTHAN, 2000000)); + criteriaUrl.setLastUpdated(range); + criteriaUrl.setSort(new SortSpec(Constants.PARAM_LASTUPDATED, SortOrderEnum.ASC)); + IBundleProvider results = myObservationDao.search(criteriaUrl); + assertEquals(0, results.size()); + } + + @Test public void testIndexNoDuplicatesString() { Patient p = new Patient(); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/SubscriptionsDstu2Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/SubscriptionsDstu2Test.java index 7bb2c171c3e..a1321654967 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/SubscriptionsDstu2Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/SubscriptionsDstu2Test.java @@ -5,6 +5,8 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; import java.net.URI; +import java.util.ArrayList; +import java.util.List; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; @@ -12,6 +14,7 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; import org.eclipse.jetty.websocket.api.annotations.WebSocket; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.junit.Test; @@ -22,10 +25,20 @@ import ca.uhn.fhir.model.dstu2.resource.Subscription; import ca.uhn.fhir.model.dstu2.valueset.ObservationStatusEnum; import ca.uhn.fhir.model.dstu2.valueset.SubscriptionChannelTypeEnum; import ca.uhn.fhir.model.dstu2.valueset.SubscriptionStatusEnum; +import ca.uhn.fhir.model.primitive.IdDt; +import ca.uhn.fhir.rest.server.EncodingEnum; import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; public class SubscriptionsDstu2Test extends BaseResourceProviderDstu2Test { + public class BaseSocket { + protected String myError; + protected boolean myGotBound; + protected int myPingCount; + protected String mySubsId; + + } + private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionsDstu2Test.class); @Override @@ -37,7 +50,7 @@ public class SubscriptionsDstu2Test extends BaseResourceProviderDstu2Test { myDaoConfig.getInterceptors().add(interceptor); } - private void sleepUntilPingCount(SimpleEchoSocket socket, int wantPingCount) throws InterruptedException { + private void sleepUntilPingCount(BaseSocket socket, int wantPingCount) throws InterruptedException { ourLog.info("Entering loop"); for (long start = System.currentTimeMillis(), now = System.currentTimeMillis(); now - start <= 20000; now = System.currentTimeMillis()) { ourLog.debug("Starting"); @@ -90,7 +103,7 @@ public class SubscriptionsDstu2Test extends BaseResourceProviderDstu2Test { subs.setStatus(SubscriptionStatusEnum.REQUESTED); IIdType id = ourClient.create().resource(subs).execute().getId(); subs.setId(id); - + try { subs.setStatus(SubscriptionStatusEnum.ACTIVE); ourClient.update().resource(subs).execute(); @@ -100,7 +113,7 @@ public class SubscriptionsDstu2Test extends BaseResourceProviderDstu2Test { } try { - subs.setStatus((SubscriptionStatusEnum)null); + subs.setStatus((SubscriptionStatusEnum) null); ourClient.update().resource(subs).execute(); fail(); } catch (UnprocessableEntityException e) { @@ -111,14 +124,13 @@ public class SubscriptionsDstu2Test extends BaseResourceProviderDstu2Test { ourClient.update().resource(subs).execute(); } - @Test public void testCreateWithPopulatedButInvalidStatue() { Subscription subs = new Subscription(); subs.getChannel().setType(SubscriptionChannelTypeEnum.WEBSOCKET); subs.setCriteria("Observation?identifier=123"); subs.getStatusElement().setValue("aaaaa"); - + try { ourClient.create().resource(subs).execute(); fail(); @@ -149,6 +161,135 @@ public class SubscriptionsDstu2Test extends BaseResourceProviderDstu2Test { } } + @Test + public void testSubscriptionDynamic() throws Exception { + myDaoConfig.setSubscriptionEnabled(true); + myDaoConfig.setSubscriptionPollDelay(0); + + String methodName = "testSubscriptionDynamic"; + Patient p = new Patient(); + p.addName().addFamily(methodName); + IIdType pId = myPatientDao.create(p).getId().toUnqualifiedVersionless(); + + String criteria = "Observation?subject=Patient/" + pId.getIdPart(); + DynamicEchoSocket socket = new DynamicEchoSocket(criteria, EncodingEnum.JSON); + WebSocketClient client = new WebSocketClient(); + try { + client.start(); + URI echoUri = new URI("ws://localhost:" + ourPort + "/baseDstu2/websocket"); + client.connect(socket, echoUri, new ClientUpgradeRequest()); + ourLog.info("Connecting to : {}", echoUri); + + sleepUntilPingCount(socket, 1); + + mySubscriptionDao.read(new IdDt("Subscription", socket.mySubsId)); + + Observation obs = new Observation(); + obs.getSubject().setReference(pId); + obs.setStatus(ObservationStatusEnum.FINAL); + IIdType afterId1 = myObservationDao.create(obs).getId().toUnqualifiedVersionless(); + + obs = new Observation(); + obs.getSubject().setReference(pId); + obs.setStatus(ObservationStatusEnum.FINAL); + IIdType afterId2 = myObservationDao.create(obs).getId().toUnqualifiedVersionless(); + + Thread.sleep(100); + + sleepUntilPingCount(socket, 3); + + obs = (Observation) socket.myReceived.get(0); + assertEquals(afterId1.getValue(), obs.getIdElement().toUnqualifiedVersionless().getValue()); + + obs = (Observation) socket.myReceived.get(1); + assertEquals(afterId2.getValue(), obs.getIdElement().toUnqualifiedVersionless().getValue()); + + obs = new Observation(); + obs.getSubject().setReference(pId); + obs.setStatus(ObservationStatusEnum.FINAL); + IIdType afterId3 = myObservationDao.create(obs).getId().toUnqualifiedVersionless(); + + sleepUntilPingCount(socket, 4); + + obs = (Observation) socket.myReceived.get(2); + assertEquals(afterId3.getValue(), obs.getIdElement().toUnqualifiedVersionless().getValue()); + + } finally { + try { + client.stop(); + } catch (Exception e) { + ourLog.error("Failure", e); + fail(e.getMessage()); + } + } + + } + + + @Test + public void testSubscriptionDynamicXml() throws Exception { + myDaoConfig.setSubscriptionEnabled(true); + myDaoConfig.setSubscriptionPollDelay(0); + + String methodName = "testSubscriptionDynamic"; + Patient p = new Patient(); + p.addName().addFamily(methodName); + IIdType pId = myPatientDao.create(p).getId().toUnqualifiedVersionless(); + + String criteria = "Observation?subject=Patient/" + pId.getIdPart() + "&_format=xml"; + DynamicEchoSocket socket = new DynamicEchoSocket(criteria, EncodingEnum.XML); + WebSocketClient client = new WebSocketClient(); + try { + client.start(); + URI echoUri = new URI("ws://localhost:" + ourPort + "/baseDstu2/websocket"); + client.connect(socket, echoUri, new ClientUpgradeRequest()); + ourLog.info("Connecting to : {}", echoUri); + + sleepUntilPingCount(socket, 1); + + mySubscriptionDao.read(new IdDt("Subscription", socket.mySubsId)); + + Observation obs = new Observation(); + obs.getSubject().setReference(pId); + obs.setStatus(ObservationStatusEnum.FINAL); + IIdType afterId1 = myObservationDao.create(obs).getId().toUnqualifiedVersionless(); + + obs = new Observation(); + obs.getSubject().setReference(pId); + obs.setStatus(ObservationStatusEnum.FINAL); + IIdType afterId2 = myObservationDao.create(obs).getId().toUnqualifiedVersionless(); + + Thread.sleep(100); + + sleepUntilPingCount(socket, 3); + + obs = (Observation) socket.myReceived.get(0); + assertEquals(afterId1.getValue(), obs.getIdElement().toUnqualifiedVersionless().getValue()); + + obs = (Observation) socket.myReceived.get(1); + assertEquals(afterId2.getValue(), obs.getIdElement().toUnqualifiedVersionless().getValue()); + + obs = new Observation(); + obs.getSubject().setReference(pId); + obs.setStatus(ObservationStatusEnum.FINAL); + IIdType afterId3 = myObservationDao.create(obs).getId().toUnqualifiedVersionless(); + + sleepUntilPingCount(socket, 4); + + obs = (Observation) socket.myReceived.get(2); + assertEquals(afterId3.getValue(), obs.getIdElement().toUnqualifiedVersionless().getValue()); + + } finally { + try { + client.stop(); + } catch (Exception e) { + ourLog.error("Failure", e); + fail(e.getMessage()); + } + } + + } + @Test public void testSubscriptionSimple() throws Exception { myDaoConfig.setSubscriptionEnabled(true); @@ -196,7 +337,7 @@ public class SubscriptionsDstu2Test extends BaseResourceProviderDstu2Test { IIdType afterId3 = myObservationDao.create(obs).getId().toUnqualifiedVersionless(); sleepUntilPingCount(socket, 2); - + } finally { try { client.stop(); @@ -241,28 +382,15 @@ public class SubscriptionsDstu2Test extends BaseResourceProviderDstu2Test { * Basic Echo Client Socket */ @WebSocket(maxTextMessageSize = 64 * 1024) - public static class SimpleEchoSocket { - - private String myError; - - private boolean myGotBound; - - private int myPingCount; - - // @OnWebSocketClose - // public void onClose(int statusCode, String reason) { - // ourLog.info("Connection closed: {} - {}", statusCode, reason); - // this.session = null; - // this.closeLatch.countDown(); - // } - - private String mySubsId; + public class SimpleEchoSocket extends BaseSocket { @SuppressWarnings("unused") private Session session; + public SimpleEchoSocket(String theSubsId) { mySubsId = theSubsId; } + @OnWebSocketConnect public void onConnect(Session session) { ourLog.info("Got connect: {}", session); @@ -275,13 +403,13 @@ public class SubscriptionsDstu2Test extends BaseResourceProviderDstu2Test { ourLog.error("Failure", t); } } - + @OnWebSocketMessage public void onMessage(String theMsg) { ourLog.info("Got msg: {}", theMsg); if (theMsg.equals("bound " + mySubsId)) { myGotBound = true; - } else if (myGotBound && theMsg.startsWith("ping " + mySubsId)){ + } else if (myGotBound && theMsg.startsWith("ping " + mySubsId)) { myPingCount++; } else { myError = "Unexpected message: " + theMsg; @@ -289,4 +417,51 @@ public class SubscriptionsDstu2Test extends BaseResourceProviderDstu2Test { } } + /** + * Basic Echo Client Socket + */ + @WebSocket(maxTextMessageSize = 64 * 1024) + public class DynamicEchoSocket extends BaseSocket { + + private List myReceived = new ArrayList(); + @SuppressWarnings("unused") + private Session session; + private String myCriteria; + private EncodingEnum myEncoding; + + public DynamicEchoSocket(String theCriteria, EncodingEnum theEncoding) { + myCriteria = theCriteria; + myEncoding = theEncoding; + } + + @OnWebSocketConnect + public void onConnect(Session session) { + ourLog.info("Got connect: {}", session); + this.session = session; + try { + String sending = "bind " + myCriteria; + ourLog.info("Sending: {}", sending); + session.getRemote().sendString(sending); + } catch (Throwable t) { + ourLog.error("Failure", t); + } + } + + @OnWebSocketMessage + public void onMessage(String theMsg) { + ourLog.info("Got msg: {}", theMsg); + if (theMsg.startsWith("bound ")) { + myGotBound = true; + mySubsId = (theMsg.substring("bound ".length())); + myPingCount++; + } else if (myGotBound && theMsg.startsWith("add " + mySubsId + "\n")) { + String text = theMsg.substring(("add " + mySubsId + "\n").length()); + IBaseResource res = myEncoding.newParser(myFhirCtx).parseResource(text); + myReceived.add(res); + myPingCount++; + } else { + myError = "Unexpected message: " + theMsg; + } + } + } }