Subscription refactor started

This commit is contained in:
James Agnew 2017-09-09 14:25:05 -07:00
parent 736e037b1a
commit 3bd7810bd7
17 changed files with 514 additions and 75 deletions

View File

@ -33,6 +33,7 @@ import ca.uhn.fhir.rest.server.interceptor.ServerOperationInterceptorAdapter;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,7 +50,7 @@ import javax.annotation.PreDestroy;
import java.util.*;
import java.util.concurrent.*;
public abstract class BaseSubscriptionInterceptor extends ServerOperationInterceptorAdapter {
public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> extends ServerOperationInterceptorAdapter {
static final String SUBSCRIPTION_STATUS = "Subscription.status";
static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
@ -64,7 +65,7 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
private int myExecutorThreadCount;
private SubscriptionActivatingSubscriber mySubscriptionActivatingSubscriber;
private MessageHandler mySubscriptionCheckingSubscriber;
private ConcurrentHashMap<String, IBaseResource> myIdToSubscription = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, CanonicalSubscription> myIdToSubscription = new ConcurrentHashMap<>();
private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class);
private ThreadPoolExecutor myDeliveryExecutor;
private LinkedBlockingQueue<Runnable> myProcessingExecutorQueue;
@ -78,6 +79,8 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
setExecutorThreadCount(5);
}
protected abstract CanonicalSubscription canonicalize(S theSubscription);
public abstract Subscription.SubscriptionChannelType getChannelType();
public SubscribableChannel getDeliveryChannel() {
@ -101,8 +104,8 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
myExecutorThreadCount = theExecutorThreadCount;
}
public ConcurrentHashMap<String, IBaseResource> getIdToSubscription() {
return myIdToSubscription;
public Map<String, CanonicalSubscription> getIdToSubscription() {
return Collections.unmodifiableMap(myIdToSubscription);
}
public SubscribableChannel getProcessingChannel() {
@ -115,6 +118,16 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
protected abstract IFhirResourceDao<?> getSubscriptionDao();
public List<CanonicalSubscription> getSubscriptions() {
return new ArrayList<>(myIdToSubscription.values());
}
public boolean hasSubscription(IIdType theId) {
Validate.notNull(theId);
Validate.notBlank(theId.getIdPart());
return myIdToSubscription.containsKey(theId.getIdPart());
}
/**
* Read the existing subscriptions from the database
*/
@ -226,12 +239,12 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
}
if (mySubscriptionActivatingSubscriber == null) {
mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), myIdToSubscription, getChannelType(), this);
mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), getChannelType(), this);
}
getProcessingChannel().subscribe(mySubscriptionActivatingSubscriber);
if (mySubscriptionCheckingSubscriber == null) {
mySubscriptionCheckingSubscriber = new SubscriptionCheckingSubscriber(getSubscriptionDao(), myIdToSubscription, getChannelType(), this);
mySubscriptionCheckingSubscriber = new SubscriptionCheckingSubscriber(getSubscriptionDao(), getChannelType(), this);
}
getProcessingChannel().subscribe(mySubscriptionCheckingSubscriber);
@ -251,6 +264,14 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
protected abstract void registerDeliverySubscriber();
public void registerSubscription(IIdType theId, S theSubscription) {
Validate.notNull(theId);
Validate.notBlank(theId.getIdPart());
Validate.notNull(theSubscription);
myIdToSubscription.put(theId.getIdPart(), canonicalize(theSubscription));
}
@Override
public void resourceCreated(RequestDetails theRequest, IBaseResource theResource) {
ResourceModifiedMessage msg = new ResourceModifiedMessage();
@ -290,4 +311,11 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
}
protected abstract void unregisterDeliverySubscriber();
public void unregisterSubscription(IIdType theId) {
Validate.notNull(theId);
Validate.notBlank(theId.getIdPart());
myIdToSubscription.remove(theId.getIdPart());
}
}

View File

@ -26,7 +26,7 @@ public abstract class BaseSubscriptionRestHookInterceptor extends BaseSubscripti
@Override
protected void registerDeliverySubscriber() {
if (mySubscriptionDeliverySubscriber == null) {
mySubscriptionDeliverySubscriber = new SubscriptionDeliveringRestHookSubscriber(getSubscriptionDao(), getIdToSubscription(), getChannelType(), this);
mySubscriptionDeliverySubscriber = new SubscriptionDeliveringRestHookSubscriber(getSubscriptionDao(), getChannelType(), this);
}
getDeliveryChannel().subscribe(mySubscriptionDeliverySubscriber);
}

View File

@ -27,21 +27,17 @@ import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Subscription;
import org.springframework.messaging.MessageHandler;
import java.util.concurrent.ConcurrentHashMap;
public abstract class BaseSubscriptionSubscriber implements MessageHandler {
private final IFhirResourceDao mySubscriptionDao;
private final ConcurrentHashMap<String, IBaseResource> myIdToSubscription;
private final Subscription.SubscriptionChannelType myChannelType;
private final BaseSubscriptionInterceptor mySubscriptionInterceptor;
/**
* Constructor
*/
public BaseSubscriptionSubscriber(IFhirResourceDao<? extends IBaseResource> theSubscriptionDao, ConcurrentHashMap<String, IBaseResource> theIdToSubscription, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor) {
public BaseSubscriptionSubscriber(IFhirResourceDao<? extends IBaseResource> theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor) {
mySubscriptionDao = theSubscriptionDao;
myIdToSubscription = theIdToSubscription;
myChannelType = theChannelType;
mySubscriptionInterceptor = theSubscriptionInterceptor;
}
@ -54,10 +50,6 @@ public abstract class BaseSubscriptionSubscriber implements MessageHandler {
return getSubscriptionDao().getContext();
}
public ConcurrentHashMap<String, IBaseResource> getIdToSubscription() {
return myIdToSubscription;
}
public IFhirResourceDao getSubscriptionDao() {
return mySubscriptionDao;
}

View File

@ -44,7 +44,7 @@ public abstract class BaseSubscriptionWebsocketInterceptor extends BaseSubscript
@Override
protected void registerDeliverySubscriber() {
if (mySubscriptionDeliverySubscriber == null) {
mySubscriptionDeliverySubscriber = new SubscriptionDeliveringWebsocketSubscriber(getSubscriptionDao(), getIdToSubscription(), getChannelType(), this, myTxManager, mySubscriptionFlaggedResourceDataDao, mySubscriptionTableDao, myResourceTableDao);
mySubscriptionDeliverySubscriber = new SubscriptionDeliveringWebsocketSubscriber(getSubscriptionDao(), getChannelType(), this, myTxManager, mySubscriptionFlaggedResourceDataDao, mySubscriptionTableDao, myResourceTableDao);
}
getDeliveryChannel().subscribe(mySubscriptionDeliverySubscriber);
}

View File

@ -0,0 +1,120 @@
package ca.uhn.fhir.jpa.subscription;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Subscription;
import org.hl7.fhir.r4.model.TriggerDefinition;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class CanonicalSubscription implements Serializable {
private static final long serialVersionUID = 364269017L;
private IIdType myIdElement;
private String myCriteriaString;
private String myEndpointUrl;
private String myPayloadString;
private List<String> myHeaders;
private Subscription.SubscriptionChannelType myChannelType;
private Subscription.SubscriptionStatus myStatus;
private IBaseResource myBackingSubscription;
private TriggerDefinition myTrigger;
/**
* For now we're using the R4 TriggerDefinition, but this
* may change in the future when things stabilize
*/
public void addTrigger(TriggerDefinition theTrigger) {
myTrigger = theTrigger;
}
public IBaseResource getBackingSubscription() {
return myBackingSubscription;
}
public void setBackingSubscription(IBaseResource theBackingSubscription) {
myBackingSubscription = theBackingSubscription;
}
public Subscription.SubscriptionChannelType getChannelType() {
return myChannelType;
}
public void setChannelType(Subscription.SubscriptionChannelType theChannelType) {
myChannelType = theChannelType;
}
public String getCriteriaString() {
return myCriteriaString;
}
public void setCriteriaString(String theCriteriaString) {
myCriteriaString = theCriteriaString;
}
public String getEndpointUrl() {
return myEndpointUrl;
}
public void setEndpointUrl(String theEndpointUrl) {
myEndpointUrl = theEndpointUrl;
}
public List<String> getHeaders() {
return myHeaders;
}
public void setHeaders(String theHeaders) {
myHeaders = new ArrayList<>();
if (isNotBlank(theHeaders)) {
myHeaders.add(theHeaders);
}
}
public IIdType getIdElement() {
return myIdElement;
}
public void setIdElement(IIdType theIdElement) {
myIdElement = theIdElement;
}
public String getPayloadString() {
return myPayloadString;
}
public void setPayloadString(String thePayloadString) {
myPayloadString = thePayloadString;
}
public Subscription.SubscriptionStatus getStatus() {
return myStatus;
}
public void setStatus(Subscription.SubscriptionStatus theStatus) {
myStatus = theStatus;
}
/**
* For now we're using the R4 triggerdefinition, but this
* may change in the future when things stabilize
*/
public TriggerDefinition getTrigger() {
return myTrigger;
}
public void setHeaders(List<? extends IPrimitiveType<String>> theHeader) {
myHeaders = new ArrayList<>();
for (IPrimitiveType<String> next : theHeader) {
if (isNotBlank(next.getValueAsString())) {
myHeaders.add(next.getValueAsString());
}
}
}
}

View File

@ -30,7 +30,7 @@ public class ResourceDeliveryMessage implements Serializable {
private static final long serialVersionUID = 0L;
private IBaseResource mySubscription;
private CanonicalSubscription mySubscription;
private IBaseResource myPayoad;
private IIdType myPayloadId;
private RestOperationTypeEnum myOperationType;
@ -51,19 +51,19 @@ public class ResourceDeliveryMessage implements Serializable {
myPayloadId = thePayloadId;
}
public IBaseResource getPayoad() {
public IBaseResource getPayload() {
return myPayoad;
}
public void setPayoad(IBaseResource thePayoad) {
myPayoad = thePayoad;
public void setPayload(IBaseResource thePayload) {
myPayoad = thePayload;
}
public IBaseResource getSubscription() {
public CanonicalSubscription getSubscription() {
return mySubscription;
}
public void setSubscription(IBaseResource theSubscription) {
public void setSubscription(CanonicalSubscription theSubscription) {
mySubscription = theSubscription;
}

View File

@ -26,6 +26,7 @@ import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Subscription;
import org.hl7.fhir.utilities.ucum.Canonical;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
@ -40,8 +41,8 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriptionSubscriber
/**
* Constructor
*/
public SubscriptionActivatingSubscriber(IFhirResourceDao<? extends IBaseResource> theSubscriptionDao, ConcurrentHashMap<String, IBaseResource> theIdToSubscription, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor) {
super(theSubscriptionDao, theIdToSubscription, theChannelType, theSubscriptionInterceptor);
public SubscriptionActivatingSubscriber(IFhirResourceDao<? extends IBaseResource> theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor) {
super(theSubscriptionDao, theChannelType, theSubscriptionInterceptor);
}
private void activateAndRegisterSubscriptionIfRequired(ResourceModifiedMessage theMsg) {
@ -65,17 +66,17 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriptionSubscriber
status.setValueAsString(activeStatus);
ourLog.info("Activating and registering subscription {} from status {} to {}", theSubscription.getIdElement().toUnqualified().getValue(), requestedStatus, activeStatus);
getSubscriptionDao().update(theSubscription);
getIdToSubscription().put(theSubscription.getIdElement().getIdPart(), theSubscription);
getSubscriptionInterceptor().registerSubscription(theSubscription.getIdElement(), theSubscription);
} else if (activeStatus.equals(statusString)) {
if (!getIdToSubscription().containsKey(theSubscription.getIdElement().getIdPart())) {
if (!getSubscriptionInterceptor().hasSubscription(theSubscription.getIdElement())) {
ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
}
getIdToSubscription().put(theSubscription.getIdElement().getIdPart(), theSubscription);
getSubscriptionInterceptor().registerSubscription(theSubscription.getIdElement(), theSubscription);
} else {
if (getIdToSubscription().containsKey(theSubscription.getIdElement().getIdPart())) {
if (getSubscriptionInterceptor().hasSubscription(theSubscription.getIdElement())) {
ourLog.info("Removing {} subscription {}", statusString, theSubscription.getIdElement().toUnqualified().getValue());
}
getIdToSubscription().remove(theSubscription.getIdElement().getIdPart());
getSubscriptionInterceptor().unregisterSubscription(theSubscription.getIdElement());
}
}
@ -100,7 +101,7 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriptionSubscriber
switch (msg.getOperationType()) {
case DELETE:
getIdToSubscription().remove(id.getIdPart());
getSubscriptionInterceptor().unregisterSubscription(id);
return;
case CREATE:
handleCreate(msg);

View File

@ -25,28 +25,27 @@ import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.jpa.provider.ServletSubRequestDetails;
import ca.uhn.fhir.model.api.IResource;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Subscription;
import org.hl7.fhir.utilities.ucum.Canonical;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Collection;
import java.util.List;
public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
private Logger ourLog = LoggerFactory.getLogger(SubscriptionCheckingSubscriber.class);
public SubscriptionCheckingSubscriber(IFhirResourceDao theSubscriptionDao, ConcurrentHashMap<String, IBaseResource> theIdToSubscription, Subscription.SubscriptionChannelType theChannelType,BaseSubscriptionInterceptor theSubscriptionInterceptor) {
super(theSubscriptionDao, theIdToSubscription, theChannelType, theSubscriptionInterceptor);
public SubscriptionCheckingSubscriber(IFhirResourceDao theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor) {
super(theSubscriptionDao, theChannelType, theSubscriptionInterceptor);
}
@Override
@ -68,11 +67,11 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
String resourceType = msg.getId().getResourceType();
String resourceId = msg.getId().getIdPart();
for (IBaseResource nextSubscription : getIdToSubscription().values()) {
List<CanonicalSubscription> subscriptions = getSubscriptionInterceptor().getSubscriptions();
for (CanonicalSubscription nextSubscription : subscriptions) {
String nextSubscriptionId = nextSubscription.getIdElement().toUnqualifiedVersionless().getValue();
IPrimitiveType<?> nextCriteria = getContext().newTerser().getSingleValueOrNull(nextSubscription, BaseSubscriptionInterceptor.SUBSCRIPTION_CRITERIA, IPrimitiveType.class);
String nextCriteriaString = nextCriteria != null ? nextCriteria.getValueAsString() : null;
String nextCriteriaString = nextSubscription.getCriteriaString();
if (StringUtils.isBlank(nextCriteriaString)) {
continue;
@ -107,7 +106,7 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
ourLog.info("Found match: queueing rest-hook notification for resource: {}", nextBase.getIdElement());
ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
deliveryMsg.setPayoad(nextBase);
deliveryMsg.setPayload(nextBase);
deliveryMsg.setSubscription(nextSubscription);
deliveryMsg.setOperationType(msg.getOperationType());
deliveryMsg.setPayloadId(msg.getId());

View File

@ -45,12 +45,12 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionSubscriber {
private Logger ourLog = LoggerFactory.getLogger(SubscriptionDeliveringRestHookSubscriber.class);
public SubscriptionDeliveringRestHookSubscriber(IFhirResourceDao<?> theSubscriptionDao, ConcurrentHashMap<String, IBaseResource> theIdToSubscription, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor) {
super(theSubscriptionDao, theIdToSubscription, theChannelType, theSubscriptionInterceptor);
public SubscriptionDeliveringRestHookSubscriber(IFhirResourceDao<?> theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor) {
super(theSubscriptionDao, theChannelType, theSubscriptionInterceptor);
}
protected void deliverPayload(ResourceDeliveryMessage theMsg, IBaseResource theSubscription, EncodingEnum thePayloadType, IGenericClient theClient) {
IBaseResource payloadResource = theMsg.getPayoad();
protected void deliverPayload(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, EncodingEnum thePayloadType, IGenericClient theClient) {
IBaseResource payloadResource = theMsg.getPayload();
IClientExecutable<?, ?> operation;
switch (theMsg.getOperationType()) {
@ -83,20 +83,17 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionSu
try {
ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload();
if (!subscriptionTypeApplies(getContext(), msg.getSubscription())) {
if (!subscriptionTypeApplies(getContext(), msg.getSubscription().getBackingSubscription())) {
return;
}
IBaseResource subscription = msg.getSubscription();
CanonicalSubscription subscription = msg.getSubscription();
// Grab the endpoint from the subscription
IPrimitiveType<?> endpoint = getContext().newTerser().getSingleValueOrNull(subscription, BaseSubscriptionInterceptor.SUBSCRIPTION_ENDPOINT, IPrimitiveType.class);
String endpointUrl = endpoint != null ? endpoint.getValueAsString() : null;
String endpointUrl = subscription.getEndpointUrl();
// Grab the payload type (encoding mimetype) from the subscription
IPrimitiveType<?> payload = getContext().newTerser().getSingleValueOrNull(subscription, BaseSubscriptionInterceptor.SUBSCRIPTION_PAYLOAD, IPrimitiveType.class);
String payloadString = payload != null ? payload.getValueAsString() : null;
String payloadString = subscription.getPayloadString();
payloadString = StringUtils.defaultString(payloadString, Constants.CT_FHIR_XML_NEW);
if (payloadString.contains(";")) {
payloadString = payloadString.substring(0, payloadString.indexOf(';'));
@ -112,10 +109,10 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionSu
client = getContext().newRestfulGenericClient(endpointUrl);
// Additional headers specified in the subscription
List<IPrimitiveType> headers = getContext().newTerser().getValues(subscription, BaseSubscriptionInterceptor.SUBSCRIPTION_HEADER, IPrimitiveType.class);
for (IPrimitiveType next : headers) {
if (isNotBlank(next.getValueAsString())) {
client.registerInterceptor(new SimpleRequestHeaderInterceptor(next.getValueAsString()));
List<String> headers = subscription.getHeaders();
for (String next : headers) {
if (isNotBlank(next)) {
client.registerInterceptor(new SimpleRequestHeaderInterceptor(next));
}
}
}

View File

@ -43,14 +43,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.concurrent.ConcurrentHashMap;
public class SubscriptionDeliveringWebsocketSubscriber extends BaseSubscriptionSubscriber {
private final PlatformTransactionManager myTxManager;
private final ISubscriptionFlaggedResourceDataDao mySubscriptionFlaggedResourceDao;
@ -58,8 +55,8 @@ public class SubscriptionDeliveringWebsocketSubscriber extends BaseSubscriptionS
private final IResourceTableDao myResourceTableDao;
private Logger ourLog = LoggerFactory.getLogger(SubscriptionDeliveringWebsocketSubscriber.class);
public SubscriptionDeliveringWebsocketSubscriber(IFhirResourceDao theSubscriptionDao, ConcurrentHashMap<String, IBaseResource> theIdToSubscription, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor, PlatformTransactionManager theTxManager, ISubscriptionFlaggedResourceDataDao theSubscriptionFlaggedResourceDataDao, ISubscriptionTableDao theSubscriptionTableDao, IResourceTableDao theResourceTableDao) {
super(theSubscriptionDao, theIdToSubscription, theChannelType, theSubscriptionInterceptor);
public SubscriptionDeliveringWebsocketSubscriber(IFhirResourceDao theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor, PlatformTransactionManager theTxManager, ISubscriptionFlaggedResourceDataDao theSubscriptionFlaggedResourceDataDao, ISubscriptionTableDao theSubscriptionTableDao, IResourceTableDao theResourceTableDao) {
super(theSubscriptionDao, theChannelType, theSubscriptionInterceptor);
myTxManager = theTxManager;
mySubscriptionFlaggedResourceDao = theSubscriptionFlaggedResourceDataDao;
@ -76,7 +73,7 @@ public class SubscriptionDeliveringWebsocketSubscriber extends BaseSubscriptionS
final ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload();
if (!subscriptionTypeApplies(getContext(), msg.getSubscription())) {
if (!subscriptionTypeApplies(getContext(), msg.getSubscription().getBackingSubscription())) {
return;
}
@ -85,11 +82,11 @@ public class SubscriptionDeliveringWebsocketSubscriber extends BaseSubscriptionS
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
IBaseResource payload = msg.getPayoad();
IBaseResource payload = msg.getPayload();
Long payloadPid = extractResourcePid(payload);
ResourceTable payloadTable = myResourceTableDao.findOne(payloadPid);
IBaseResource subscription = msg.getSubscription();
IBaseResource subscription = msg.getSubscription().getBackingSubscription();
Long subscriptionPid = extractResourcePid(subscription);
SubscriptionTable subscriptionTable = mySubscriptionTableDao.findOneByResourcePid(subscriptionPid);
@ -105,15 +102,13 @@ public class SubscriptionDeliveringWebsocketSubscriber extends BaseSubscriptionS
});
RestOperationTypeEnum operationType = msg.getOperationType();
IBaseResource subscription = msg.getSubscription();
CanonicalSubscription subscription = msg.getSubscription();
// Grab the endpoint from the subscription
IPrimitiveType<?> endpoint = getContext().newTerser().getSingleValueOrNull(subscription, BaseSubscriptionInterceptor.SUBSCRIPTION_ENDPOINT, IPrimitiveType.class);
String endpointUrl = endpoint.getValueAsString();
String endpointUrl = subscription.getEndpointUrl();
// Grab the payload type (encoding mimetype ) from the subscription
IPrimitiveType<?> payload = getContext().newTerser().getSingleValueOrNull(subscription, BaseSubscriptionInterceptor.SUBSCRIPTION_PAYLOAD, IPrimitiveType.class);
String payloadString = payload.getValueAsString();
String payloadString = subscription.getPayloadString();
if (payloadString.contains(";")) {
payloadString = payloadString.substring(0, payloadString.indexOf(';'));
}
@ -124,7 +119,7 @@ public class SubscriptionDeliveringWebsocketSubscriber extends BaseSubscriptionS
getContext().getRestfulClientFactory().setServerValidationMode(ServerValidationModeEnum.NEVER);
IGenericClient client = getContext().newRestfulGenericClient(endpointUrl);
IBaseResource payloadResource = msg.getPayoad();
IBaseResource payloadResource = msg.getPayload();
IClientExecutable<?, ?> operation;
switch (operationType) {

View File

@ -22,15 +22,46 @@ package ca.uhn.fhir.jpa.subscription.dstu2;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionRestHookInterceptor;
import ca.uhn.fhir.jpa.subscription.CanonicalSubscription;
import ca.uhn.fhir.model.dstu2.resource.Subscription;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import org.hl7.fhir.exceptions.FHIRException;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import java.util.Arrays;
public class RestHookSubscriptionDstu2Interceptor extends BaseSubscriptionRestHookInterceptor {
@Autowired
@Qualifier("mySubscriptionDaoDstu2")
private IFhirResourceDao<Subscription> mySubscriptionDao;
@Override
protected CanonicalSubscription canonicalize(IBaseResource theSubscription) {
return doCanonicalize(theSubscription);
}
static CanonicalSubscription doCanonicalize(IBaseResource theSubscription) {
Subscription subscription = (Subscription) theSubscription;
CanonicalSubscription retVal = new CanonicalSubscription();
try {
retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(subscription.getStatus()));
retVal.setBackingSubscription(theSubscription);
retVal.setChannelType(org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.fromCode(subscription.getStatus()));
retVal.setCriteriaString(subscription.getCriteria());
retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
retVal.setHeaders(subscription.getChannel().getHeader());
retVal.setIdElement(subscription.getIdElement());
retVal.setPayloadString(subscription.getChannel().getPayload());
} catch (FHIRException theE) {
throw new InternalErrorException(theE);
}
return retVal;
}
public org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType getChannelType() {
return org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.RESTHOOK;
}
@ -40,9 +71,5 @@ public class RestHookSubscriptionDstu2Interceptor extends BaseSubscriptionRestHo
return mySubscriptionDao;
}
public void setSubscriptionDao(IFhirResourceDao<Subscription> theSubscriptionDao) {
mySubscriptionDao = theSubscriptionDao;
}
}

View File

@ -1,9 +1,10 @@
package ca.uhn.fhir.jpa.subscription.dstu2;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionWebsocketInterceptor;
import ca.uhn.fhir.jpa.subscription.CanonicalSubscription;
import ca.uhn.fhir.model.dstu2.resource.Subscription;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
@ -33,6 +34,11 @@ public class WebSocketSubscriptionDstu2Interceptor extends BaseSubscriptionWebso
@Qualifier("mySubscriptionDaoDstu2")
private IFhirResourceDao<Subscription> mySubscriptionDao;
@Override
protected CanonicalSubscription canonicalize(IBaseResource theSubscription) {
return RestHookSubscriptionDstu2Interceptor.doCanonicalize(theSubscription);
}
@Override
public org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType getChannelType() {
return org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.WEBSOCKET;

View File

@ -22,10 +22,16 @@ package ca.uhn.fhir.jpa.subscription.dstu3;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionRestHookInterceptor;
import ca.uhn.fhir.jpa.subscription.CanonicalSubscription;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import org.hl7.fhir.dstu3.model.Subscription;
import org.hl7.fhir.exceptions.FHIRException;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import java.util.Arrays;
public class RestHookSubscriptionDstu3Interceptor extends BaseSubscriptionRestHookInterceptor {
@Autowired
@Qualifier("mySubscriptionDaoDstu3")
@ -44,5 +50,30 @@ public class RestHookSubscriptionDstu3Interceptor extends BaseSubscriptionRestHo
return org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.RESTHOOK;
}
@Override
protected CanonicalSubscription canonicalize(IBaseResource theSubscription) {
return doCanonicalize(theSubscription);
}
static CanonicalSubscription doCanonicalize(IBaseResource theSubscription) {
Subscription subscription = (Subscription) theSubscription;
CanonicalSubscription retVal = new CanonicalSubscription();
try {
retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(subscription.getStatus().toCode()));
retVal.setBackingSubscription(theSubscription);
retVal.setChannelType(org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.fromCode(subscription.getStatus().toCode()));
retVal.setCriteriaString(subscription.getCriteria());
retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
retVal.setHeaders(subscription.getChannel().getHeader());
retVal.setIdElement(subscription.getIdElement());
retVal.setPayloadString(subscription.getChannel().getPayload());
} catch (FHIRException theE) {
throw new InternalErrorException(theE);
}
return retVal;
}
}

View File

@ -6,6 +6,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionWebsocketInterceptor;
import ca.uhn.fhir.jpa.subscription.CanonicalSubscription;
import org.hl7.fhir.dstu3.model.Subscription;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.beans.factory.annotation.Autowired;
@ -50,6 +51,12 @@ public class WebSocketSubscriptionDstu3Interceptor extends BaseSubscriptionWebso
return org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.WEBSOCKET;
}
@Override
protected CanonicalSubscription canonicalize(IBaseResource theSubscription) {
return RestHookSubscriptionDstu3Interceptor.doCanonicalize(theSubscription);
}
@Override
protected IFhirResourceDao<?> getSubscriptionDao() {
return mySubscriptionDao;

View File

@ -22,14 +22,31 @@ package ca.uhn.fhir.jpa.subscription.r4;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionRestHookInterceptor;
import ca.uhn.fhir.jpa.subscription.CanonicalSubscription;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import org.hl7.fhir.exceptions.FHIRException;
import org.hl7.fhir.instance.model.api.IBaseReference;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.EventDefinition;
import org.hl7.fhir.r4.model.Extension;
import org.hl7.fhir.r4.model.Subscription;
import org.hl7.fhir.r4.model.TriggerDefinition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import java.util.List;
public class RestHookSubscriptionR4Interceptor extends BaseSubscriptionRestHookInterceptor {
@Autowired
@Qualifier("mySubscriptionDaoR4")
private IFhirResourceDao<org.hl7.fhir.r4.model.Subscription> mySubscriptionDao;
@Autowired
@Qualifier("myEventDefinitionDaoR4")
private IFhirResourceDao<org.hl7.fhir.r4.model.EventDefinition> myEventDefinitionDao;
public org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType getChannelType() {
return org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.RESTHOOK;
}
@ -43,5 +60,41 @@ public class RestHookSubscriptionR4Interceptor extends BaseSubscriptionRestHookI
mySubscriptionDao = theSubscriptionDao;
}
@Override
protected CanonicalSubscription canonicalize(IBaseResource theSubscription) {
return doCanonicalize(theSubscription, myEventDefinitionDao);
}
static CanonicalSubscription doCanonicalize(IBaseResource theSubscription, IFhirResourceDao<org.hl7.fhir.r4.model.EventDefinition> theEventDefinitionDao) {
Subscription subscription = (Subscription) theSubscription;
CanonicalSubscription retVal = new CanonicalSubscription();
retVal.setStatus(subscription.getStatus());
retVal.setBackingSubscription(theSubscription);
retVal.setChannelType(subscription.getChannel().getType());
retVal.setCriteriaString(subscription.getCriteria());
retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
retVal.setHeaders(subscription.getChannel().getHeader());
retVal.setIdElement(subscription.getIdElement());
retVal.setPayloadString(subscription.getChannel().getPayload());
List<Extension> topicExts = subscription.getExtensionsByUrl("http://hl7.org/fhir/subscription/topics");
if (topicExts.size() > 0) {
IBaseReference ref = (IBaseReference) topicExts.get(0).getValueAsPrimitive();
if (!"EventDefinition".equals(ref.getReferenceElement().getResourceType())) {
throw new PreconditionFailedException("Topic reference must be an EventDefinition");
}
EventDefinition def = theEventDefinitionDao.read(ref.getReferenceElement());
for (TriggerDefinition next : def.getTrigger()) {
retVal.addTrigger(next);
}
}
return retVal;
}
}

View File

@ -2,6 +2,8 @@ package ca.uhn.fhir.jpa.subscription.r4;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionWebsocketInterceptor;
import ca.uhn.fhir.jpa.subscription.CanonicalSubscription;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Subscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
@ -32,6 +34,11 @@ public class WebSocketSubscriptionR4Interceptor extends BaseSubscriptionWebsocke
@Qualifier("mySubscriptionDaoR4")
private IFhirResourceDao<org.hl7.fhir.r4.model.Subscription> mySubscriptionDao;
@Override
protected CanonicalSubscription canonicalize(IBaseResource theSubscription) {
return RestHookSubscriptionR4Interceptor.doCanonicalize(theSubscription);
}
@Override
public Subscription.SubscriptionChannelType getChannelType() {
return Subscription.SubscriptionChannelType.WEBSOCKET;

View File

@ -0,0 +1,176 @@
package ca.uhn.fhir.jpa.subscription.r4;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.provider.r4.BaseResourceProviderR4Test;
import ca.uhn.fhir.jpa.subscription.SocketImplementation;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.api.MethodOutcome;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.hl7.fhir.r4.model.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.*;
/**
* Adds a FHIR subscription with criteria through the rest interface. Then creates a websocket with the id of the
* subscription
* <p>
* Note: This test only returns a ping with the subscription id, Check FhirSubscriptionWithSubscriptionIdR4Test for
* a test that returns the xml of the observation
* <p>
* To execute the following test, execute it the following way:
* 0. execute 'clean' test
* 1. Execute the 'createPatient' test
* 2. Update the patient id static variable
* 3. Execute the 'createSubscription' test
* 4. Update the subscription id static variable
* 5. Execute the 'attachWebSocket' test
* 6. Execute the 'sendObservation' test
* 7. Look in the 'attachWebSocket' terminal execution and wait for your ping with the subscription id
*/
public class FhirSubscriptionWithEventDefinitionR4Test extends BaseResourceProviderR4Test {
private static final Logger ourLog = org.slf4j.LoggerFactory.getLogger(FhirSubscriptionWithEventDefinitionR4Test.class);
private String myPatientId;
private String mySubscriptionId;
private WebSocketClient myWebSocketClient;
private SocketImplementation mySocketImplementation;
@Override
@After
public void after() throws Exception {
super.after();
myDaoConfig.setSubscriptionEnabled(new DaoConfig().isSubscriptionEnabled());
myDaoConfig.setSubscriptionPollDelay(new DaoConfig().getSubscriptionPollDelay());
}
@Override
@Before
public void before() throws Exception {
super.before();
myDaoConfig.setSubscriptionEnabled(true);
myDaoConfig.setSubscriptionPollDelay(0L);
/*
* Create patient
*/
Patient patient = FhirR4Util.getPatient();
MethodOutcome methodOutcome = ourClient.create().resource(patient).execute();
myPatientId = methodOutcome.getId().getIdPart();
/*
* Create EventDefinition
*/
EventDefinition eventDef = new EventDefinition();
*/
/*
* Create subscription
*/
Subscription subscription = new Subscription();
subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)");
subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE);
Subscription.SubscriptionChannelComponent channel = new Subscription.SubscriptionChannelComponent();
channel.setType(Subscription.SubscriptionChannelType.WEBSOCKET);
channel.setPayload("application/json");
subscription.setChannel(channel);
methodOutcome = ourClient.create().resource(subscription).execute();
mySubscriptionId = methodOutcome.getId().getIdPart();
/*
* Attach websocket
*/
myWebSocketClient = new WebSocketClient();
mySocketImplementation = new SocketImplementation(mySubscriptionId, EncodingEnum.JSON);
myWebSocketClient.start();
URI echoUri = new URI("ws://localhost:" + ourPort + "/websocket/r4");
ClientUpgradeRequest request = new ClientUpgradeRequest();
ourLog.info("Connecting to : {}", echoUri);
Future<Session> connection = myWebSocketClient.connect(mySocketImplementation, echoUri, request);
Session session = connection.get(2, TimeUnit.SECONDS);
ourLog.info("Connected to WS: {}", session.isOpen());
}
@After
public void afterCloseWebsocket() throws Exception {
ourLog.info("Shutting down websocket client");
myWebSocketClient.stop();
}
@Test
public void createObservation() throws Exception {
Observation observation = new Observation();
CodeableConcept codeableConcept = new CodeableConcept();
observation.setCode(codeableConcept);
Coding coding = codeableConcept.addCoding();
coding.setCode("82313006");
coding.setSystem("SNOMED-CT");
Reference reference = new Reference();
reference.setReference("Patient/" + myPatientId);
observation.setSubject(reference);
observation.setStatus(Observation.ObservationStatus.FINAL);
MethodOutcome methodOutcome2 = ourClient.create().resource(observation).execute();
String observationId = methodOutcome2.getId().getIdPart();
observation.setId(observationId);
ourLog.info("Observation id generated by server is: " + observationId);
int changes = mySubscriptionDao.pollForNewUndeliveredResources();
ourLog.info("Polling showed {}", changes);
assertEquals(1, changes);
Thread.sleep(2000);
ourLog.info("WS Messages: {}", mySocketImplementation.getMessages());
assertThat(mySocketImplementation.getMessages(), contains("bound " + mySubscriptionId, "ping " + mySubscriptionId));
}
@Test
public void createObservationThatDoesNotMatch() throws Exception {
Observation observation = new Observation();
CodeableConcept codeableConcept = new CodeableConcept();
observation.setCode(codeableConcept);
Coding coding = codeableConcept.addCoding();
coding.setCode("8231");
coding.setSystem("SNOMED-CT");
Reference reference = new Reference();
reference.setReference("Patient/" + myPatientId);
observation.setSubject(reference);
observation.setStatus(Observation.ObservationStatus.FINAL);
MethodOutcome methodOutcome2 = ourClient.create().resource(observation).execute();
String observationId = methodOutcome2.getId().getIdPart();
observation.setId(observationId);
ourLog.info("Observation id generated by server is: " + observationId);
int changes = mySubscriptionDao.pollForNewUndeliveredResources();
ourLog.info("Polling showed {}", changes);
assertEquals(0, changes);
Thread.sleep(2000);
ourLog.info("WS Messages: {}", mySocketImplementation.getMessages());
assertThat(mySocketImplementation.getMessages(), contains("bound " + mySubscriptionId));
}
}