Work on dynamic subscriptions

This commit is contained in:
jamesagnew 2015-09-29 17:55:49 -04:00
parent 79047ef6ab
commit bc910a1d4c
6 changed files with 405 additions and 73 deletions

View File

@ -111,6 +111,7 @@ import ca.uhn.fhir.rest.method.QualifiedParamList;
import ca.uhn.fhir.rest.method.RestSearchParameterTypeEnum;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.rest.server.Constants;
import ca.uhn.fhir.rest.server.EncodingEnum;
import ca.uhn.fhir.rest.server.IBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
@ -715,7 +716,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao {
return ids;
}
static SearchParameterMap translateMatchUrl(String theMatchUrl, RuntimeResourceDefinition resourceDef) {
public static SearchParameterMap translateMatchUrl(String theMatchUrl, RuntimeResourceDefinition resourceDef) {
SearchParameterMap paramMap = new SearchParameterMap();
List<NameValuePair> parameters;
try {
@ -779,6 +780,10 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao {
continue;
}
if (nextParamName.startsWith("_")) {
continue;
}
RuntimeSearchParam paramDef = resourceDef.getSearchParam(nextParamName);
if (paramDef == null) {
throw new InvalidRequestException("Failed to parse match URL[" + theMatchUrl + "] - Resource type " + resourceDef.getName() + " does not have a parameter with name: " + nextParamName);

View File

@ -2013,6 +2013,9 @@ public abstract class BaseHapiFhirResourceDao<T extends IResource> extends BaseH
for (Tuple next : query.getResultList()) {
loadPids.add(next.get(0, Long.class));
}
if (loadPids.isEmpty()) {
return new SimpleBundleProvider();
}
} else {
loadPids = searchForIdsWithAndOr(theParams);
if (loadPids.isEmpty()) {

View File

@ -46,6 +46,7 @@ public class SearchParameterMap extends LinkedHashMap<String, List<List<? extend
private Set<Include> myIncludes;
private DateRangeParam myLastUpdated;
private Set<Include> myRevIncludes;
private SortSpec mySort;
public void add(String theName, IQueryParameterAnd<?> theAnd) {

View File

@ -27,6 +27,8 @@ import java.util.concurrent.ScheduledFuture;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URLEncodedUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.springframework.beans.factory.annotation.Autowired;
@ -36,10 +38,16 @@ import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.IFhirResourceDaoSubscription;
import ca.uhn.fhir.model.dstu2.resource.Subscription;
import ca.uhn.fhir.model.dstu2.valueset.SubscriptionChannelTypeEnum;
import ca.uhn.fhir.model.dstu2.valueset.SubscriptionStatusEnum;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.server.Constants;
import ca.uhn.fhir.rest.server.EncodingEnum;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
public class SubscriptionWebsocketHandler extends TextWebSocketHandler implements ISubscriptionWebsocketHandler, Runnable {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionWebsocketHandler.class);
@ -90,6 +98,10 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
public void preDescroy() {
ourLog.info("Cancelling scheduled task for subscription websocket connection");
myScheduleFuture.cancel(true);
IState state = myState;
if (state != null) {
state.closing();
}
}
@Override
@ -135,6 +147,59 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
}
}
@Override
public void closing() {
// nothing
}
}
@Autowired
private FhirContext myCtx;
private class ResourceBoundState implements IState {
private WebSocketSession mySession;
private EncodingEnum myEncoding;
public ResourceBoundState(WebSocketSession theSession, EncodingEnum theEncoding) {
mySession = theSession;
myEncoding = theEncoding;
}
@Override
public void deliver(List<IBaseResource> theResults) {
try {
for (IBaseResource nextResource : theResults) {
ourLog.info("Sending WebSocket message for resource: {}", nextResource.getIdElement());
String encoded = myEncoding.newParser(myCtx).encodeResourceToString(nextResource);
String payload = "add " + mySubscriptionId.getIdPart() + '\n' + encoded;
mySession.sendMessage(new TextMessage(payload));
}
} catch (IOException e) {
handleFailure(e);
}
}
@Override
public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) {
try {
theSession.sendMessage(new TextMessage("Unexpected client message: " + theMessage.getPayload()));
} catch (IOException e) {
handleFailure(e);
}
}
@Override
public void closing() {
ourLog.info("Deleting subscription {}", mySubscriptionId);
try {
mySubscriptionDao.delete(mySubscriptionId);
} catch (Exception e) {
handleFailure(e);
}
}
}
private class InitialState implements IState {
@ -148,7 +213,73 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) {
String message = theMessage.getPayload();
if (message.startsWith("bind ")) {
IdDt id = new IdDt(message.substring("bind ".length()));
String remaining = message.substring("bind ".length());
IIdType subscriptionId;
if (remaining.contains("?")) {
subscriptionId = bingSearch(theSession, remaining);
} else {
subscriptionId = bindSimple(theSession, remaining);
if (subscriptionId == null) {
return;
}
}
try {
theSession.sendMessage(new TextMessage("bound " + subscriptionId.getIdPart()));
} catch (IOException e) {
handleFailure(e);
}
}
}
private IIdType bingSearch(WebSocketSession theSession, String theRemaining) {
Subscription subscription = new Subscription();
subscription.getChannel().setType(SubscriptionChannelTypeEnum.WEBSOCKET);
subscription.setStatus(SubscriptionStatusEnum.ACTIVE);
subscription.setCriteria(theRemaining);
try {
String params = theRemaining.substring(theRemaining.indexOf('?'));
List<NameValuePair> paramValues = URLEncodedUtils.parse("http://example.com" + params, Constants.CHARSET_UTF8);
EncodingEnum encoding = EncodingEnum.JSON;
for (NameValuePair nameValuePair : paramValues) {
if (Constants.PARAM_FORMAT.equals(nameValuePair)) {
EncodingEnum nextEncoding = EncodingEnum.forContentType(nameValuePair.getValue());
if (nextEncoding != null) {
encoding = nextEncoding;
}
}
}
IIdType id = mySubscriptionDao.create(subscription).getId();
mySubscriptionPid = mySubscriptionDao.getSubscriptionTablePidForSubscriptionResource(id);
mySubscriptionId = subscription.getIdElement();
myState = new ResourceBoundState(theSession, encoding);
return id;
} catch (UnprocessableEntityException e) {
ourLog.warn("Failed to bind subscription: " + e.getMessage());
try {
theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), "Invalid bind request - " + e.getMessage()));
} catch (IOException e2) {
handleFailure(e2);
}
} catch (Exception e) {
handleFailure(e);
try {
theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), "Invalid bind request - No ID included"));
} catch (IOException e2) {
handleFailure(e2);
}
}
return null;
}
private IIdType bindSimple(WebSocketSession theSession, String theBindString) {
IdDt id = new IdDt(theBindString);
if (!id.hasIdPart() || !id.isIdPartValid()) {
try {
@ -156,7 +287,7 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
} catch (IOException e) {
handleFailure(e);
}
return;
return null;
}
if (id.hasResourceType() == false) {
@ -174,16 +305,15 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
} catch (IOException e1) {
handleFailure(e);
}
return;
return null;
}
try {
theSession.sendMessage(new TextMessage("bound " + id.getIdPart()));
} catch (IOException e) {
handleFailure(e);
return id;
}
}
@Override
public void closing() {
// nothing
}
}
@ -192,6 +322,8 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
void deliver(List<IBaseResource> theResults);
void closing();
void handleTextMessage(WebSocketSession theSession, TextMessage theMessage);
}

View File

@ -65,6 +65,8 @@ import ca.uhn.fhir.model.primitive.DateTimeDt;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.model.primitive.InstantDt;
import ca.uhn.fhir.model.primitive.StringDt;
import ca.uhn.fhir.rest.api.SortOrderEnum;
import ca.uhn.fhir.rest.api.SortSpec;
import ca.uhn.fhir.rest.param.CompositeParam;
import ca.uhn.fhir.rest.param.DateParam;
import ca.uhn.fhir.rest.param.DateRangeParam;
@ -78,6 +80,7 @@ import ca.uhn.fhir.rest.param.TokenAndListParam;
import ca.uhn.fhir.rest.param.TokenOrListParam;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.param.UriParam;
import ca.uhn.fhir.rest.server.Constants;
import ca.uhn.fhir.rest.server.IBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
@ -85,6 +88,19 @@ import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
public class FhirResourceDaoDstu2SearchTest extends BaseJpaDstu2Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(FhirResourceDaoDstu2SearchTest.class);
@Test
public void testSearchWithEmptySort() {
SearchParameterMap criteriaUrl = new SearchParameterMap();
DateRangeParam range = new DateRangeParam();
range.setLowerBound(new DateParam(QuantityCompararatorEnum.GREATERTHAN, 1000000));
range.setUpperBound(new DateParam(QuantityCompararatorEnum.LESSTHAN, 2000000));
criteriaUrl.setLastUpdated(range);
criteriaUrl.setSort(new SortSpec(Constants.PARAM_LASTUPDATED, SortOrderEnum.ASC));
IBundleProvider results = myObservationDao.search(criteriaUrl);
assertEquals(0, results.size());
}
@Test
public void testIndexNoDuplicatesString() {
Patient p = new Patient();

View File

@ -5,6 +5,8 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
@ -12,6 +14,7 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.Test;
@ -22,10 +25,20 @@ import ca.uhn.fhir.model.dstu2.resource.Subscription;
import ca.uhn.fhir.model.dstu2.valueset.ObservationStatusEnum;
import ca.uhn.fhir.model.dstu2.valueset.SubscriptionChannelTypeEnum;
import ca.uhn.fhir.model.dstu2.valueset.SubscriptionStatusEnum;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.server.EncodingEnum;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
public class SubscriptionsDstu2Test extends BaseResourceProviderDstu2Test {
public class BaseSocket {
protected String myError;
protected boolean myGotBound;
protected int myPingCount;
protected String mySubsId;
}
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionsDstu2Test.class);
@Override
@ -37,7 +50,7 @@ public class SubscriptionsDstu2Test extends BaseResourceProviderDstu2Test {
myDaoConfig.getInterceptors().add(interceptor);
}
private void sleepUntilPingCount(SimpleEchoSocket socket, int wantPingCount) throws InterruptedException {
private void sleepUntilPingCount(BaseSocket socket, int wantPingCount) throws InterruptedException {
ourLog.info("Entering loop");
for (long start = System.currentTimeMillis(), now = System.currentTimeMillis(); now - start <= 20000; now = System.currentTimeMillis()) {
ourLog.debug("Starting");
@ -111,7 +124,6 @@ public class SubscriptionsDstu2Test extends BaseResourceProviderDstu2Test {
ourClient.update().resource(subs).execute();
}
@Test
public void testCreateWithPopulatedButInvalidStatue() {
Subscription subs = new Subscription();
@ -149,6 +161,135 @@ public class SubscriptionsDstu2Test extends BaseResourceProviderDstu2Test {
}
}
@Test
public void testSubscriptionDynamic() throws Exception {
myDaoConfig.setSubscriptionEnabled(true);
myDaoConfig.setSubscriptionPollDelay(0);
String methodName = "testSubscriptionDynamic";
Patient p = new Patient();
p.addName().addFamily(methodName);
IIdType pId = myPatientDao.create(p).getId().toUnqualifiedVersionless();
String criteria = "Observation?subject=Patient/" + pId.getIdPart();
DynamicEchoSocket socket = new DynamicEchoSocket(criteria, EncodingEnum.JSON);
WebSocketClient client = new WebSocketClient();
try {
client.start();
URI echoUri = new URI("ws://localhost:" + ourPort + "/baseDstu2/websocket");
client.connect(socket, echoUri, new ClientUpgradeRequest());
ourLog.info("Connecting to : {}", echoUri);
sleepUntilPingCount(socket, 1);
mySubscriptionDao.read(new IdDt("Subscription", socket.mySubsId));
Observation obs = new Observation();
obs.getSubject().setReference(pId);
obs.setStatus(ObservationStatusEnum.FINAL);
IIdType afterId1 = myObservationDao.create(obs).getId().toUnqualifiedVersionless();
obs = new Observation();
obs.getSubject().setReference(pId);
obs.setStatus(ObservationStatusEnum.FINAL);
IIdType afterId2 = myObservationDao.create(obs).getId().toUnqualifiedVersionless();
Thread.sleep(100);
sleepUntilPingCount(socket, 3);
obs = (Observation) socket.myReceived.get(0);
assertEquals(afterId1.getValue(), obs.getIdElement().toUnqualifiedVersionless().getValue());
obs = (Observation) socket.myReceived.get(1);
assertEquals(afterId2.getValue(), obs.getIdElement().toUnqualifiedVersionless().getValue());
obs = new Observation();
obs.getSubject().setReference(pId);
obs.setStatus(ObservationStatusEnum.FINAL);
IIdType afterId3 = myObservationDao.create(obs).getId().toUnqualifiedVersionless();
sleepUntilPingCount(socket, 4);
obs = (Observation) socket.myReceived.get(2);
assertEquals(afterId3.getValue(), obs.getIdElement().toUnqualifiedVersionless().getValue());
} finally {
try {
client.stop();
} catch (Exception e) {
ourLog.error("Failure", e);
fail(e.getMessage());
}
}
}
@Test
public void testSubscriptionDynamicXml() throws Exception {
myDaoConfig.setSubscriptionEnabled(true);
myDaoConfig.setSubscriptionPollDelay(0);
String methodName = "testSubscriptionDynamic";
Patient p = new Patient();
p.addName().addFamily(methodName);
IIdType pId = myPatientDao.create(p).getId().toUnqualifiedVersionless();
String criteria = "Observation?subject=Patient/" + pId.getIdPart() + "&_format=xml";
DynamicEchoSocket socket = new DynamicEchoSocket(criteria, EncodingEnum.XML);
WebSocketClient client = new WebSocketClient();
try {
client.start();
URI echoUri = new URI("ws://localhost:" + ourPort + "/baseDstu2/websocket");
client.connect(socket, echoUri, new ClientUpgradeRequest());
ourLog.info("Connecting to : {}", echoUri);
sleepUntilPingCount(socket, 1);
mySubscriptionDao.read(new IdDt("Subscription", socket.mySubsId));
Observation obs = new Observation();
obs.getSubject().setReference(pId);
obs.setStatus(ObservationStatusEnum.FINAL);
IIdType afterId1 = myObservationDao.create(obs).getId().toUnqualifiedVersionless();
obs = new Observation();
obs.getSubject().setReference(pId);
obs.setStatus(ObservationStatusEnum.FINAL);
IIdType afterId2 = myObservationDao.create(obs).getId().toUnqualifiedVersionless();
Thread.sleep(100);
sleepUntilPingCount(socket, 3);
obs = (Observation) socket.myReceived.get(0);
assertEquals(afterId1.getValue(), obs.getIdElement().toUnqualifiedVersionless().getValue());
obs = (Observation) socket.myReceived.get(1);
assertEquals(afterId2.getValue(), obs.getIdElement().toUnqualifiedVersionless().getValue());
obs = new Observation();
obs.getSubject().setReference(pId);
obs.setStatus(ObservationStatusEnum.FINAL);
IIdType afterId3 = myObservationDao.create(obs).getId().toUnqualifiedVersionless();
sleepUntilPingCount(socket, 4);
obs = (Observation) socket.myReceived.get(2);
assertEquals(afterId3.getValue(), obs.getIdElement().toUnqualifiedVersionless().getValue());
} finally {
try {
client.stop();
} catch (Exception e) {
ourLog.error("Failure", e);
fail(e.getMessage());
}
}
}
@Test
public void testSubscriptionSimple() throws Exception {
myDaoConfig.setSubscriptionEnabled(true);
@ -241,28 +382,15 @@ public class SubscriptionsDstu2Test extends BaseResourceProviderDstu2Test {
* Basic Echo Client Socket
*/
@WebSocket(maxTextMessageSize = 64 * 1024)
public static class SimpleEchoSocket {
private String myError;
private boolean myGotBound;
private int myPingCount;
// @OnWebSocketClose
// public void onClose(int statusCode, String reason) {
// ourLog.info("Connection closed: {} - {}", statusCode, reason);
// this.session = null;
// this.closeLatch.countDown();
// }
private String mySubsId;
public class SimpleEchoSocket extends BaseSocket {
@SuppressWarnings("unused")
private Session session;
public SimpleEchoSocket(String theSubsId) {
mySubsId = theSubsId;
}
@OnWebSocketConnect
public void onConnect(Session session) {
ourLog.info("Got connect: {}", session);
@ -289,4 +417,51 @@ public class SubscriptionsDstu2Test extends BaseResourceProviderDstu2Test {
}
}
/**
* Basic Echo Client Socket
*/
@WebSocket(maxTextMessageSize = 64 * 1024)
public class DynamicEchoSocket extends BaseSocket {
private List<IBaseResource> myReceived = new ArrayList<IBaseResource>();
@SuppressWarnings("unused")
private Session session;
private String myCriteria;
private EncodingEnum myEncoding;
public DynamicEchoSocket(String theCriteria, EncodingEnum theEncoding) {
myCriteria = theCriteria;
myEncoding = theEncoding;
}
@OnWebSocketConnect
public void onConnect(Session session) {
ourLog.info("Got connect: {}", session);
this.session = session;
try {
String sending = "bind " + myCriteria;
ourLog.info("Sending: {}", sending);
session.getRemote().sendString(sending);
} catch (Throwable t) {
ourLog.error("Failure", t);
}
}
@OnWebSocketMessage
public void onMessage(String theMsg) {
ourLog.info("Got msg: {}", theMsg);
if (theMsg.startsWith("bound ")) {
myGotBound = true;
mySubsId = (theMsg.substring("bound ".length()));
myPingCount++;
} else if (myGotBound && theMsg.startsWith("add " + mySubsId + "\n")) {
String text = theMsg.substring(("add " + mySubsId + "\n").length());
IBaseResource res = myEncoding.newParser(myFhirCtx).parseResource(text);
myReceived.add(res);
myPingCount++;
} else {
myError = "Unexpected message: " + theMsg;
}
}
}
}