Work on subscriptions
This commit is contained in:
parent
12f89a423a
commit
b9dbd64101
|
@ -264,6 +264,10 @@
|
|||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-messaging</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-tx</artifactId>
|
||||
|
|
|
@ -58,10 +58,10 @@ public class RestHookSubscriptionR4Interceptor extends BaseRestHookSubscriptionI
|
|||
private final static int MAX_THREADS = 1;
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(RestHookSubscriptionR4Interceptor.class);
|
||||
|
||||
private final List<Subscription> myRestHookSubscriptions = new ArrayList<>();
|
||||
@Autowired
|
||||
private FhirContext myFhirContext;
|
||||
|
||||
private final List<Subscription> myRestHookSubscriptions = new ArrayList<Subscription>();
|
||||
|
||||
@Autowired
|
||||
@Qualifier("mySubscriptionDaoR4")
|
||||
|
|
|
@ -0,0 +1,152 @@
|
|||
package ca.uhn.fhir.jpa.subscription;
|
||||
|
||||
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
|
||||
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
|
||||
import ca.uhn.fhir.jpa.provider.ServletSubRequestDetails;
|
||||
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
|
||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||
import ca.uhn.fhir.rest.param.TokenParam;
|
||||
import ca.uhn.fhir.rest.server.interceptor.ServerOperationInterceptorAdapter;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.r4.model.Subscription;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.support.ExecutorSubscribableChannel;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
public abstract class BaseSubscriptionInterceptor extends ServerOperationInterceptorAdapter {
|
||||
|
||||
private static final Integer MAX_SUBSCRIPTION_RESULTS = 1000;
|
||||
private SubscribableChannel myProcessingChannel;
|
||||
private ExecutorService myExecutor;
|
||||
private boolean myAutoActivateSubscriptions = true;
|
||||
private int myExecutorThreadCount = 1;
|
||||
private MessageHandler mySubscriptionActivatingSubscriber;
|
||||
private MessageHandler mySubscriptionCheckingSubscriber;
|
||||
private ConcurrentHashMap<String, IBaseResource> myIdToSubscription = new ConcurrentHashMap<>();
|
||||
private Subscription.SubscriptionChannelType myChannelType = Subscription.SubscriptionChannelType.RESTHOOK;
|
||||
private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class);
|
||||
|
||||
protected abstract IFhirResourceDao<?> getSubscriptionDao();
|
||||
|
||||
/**
|
||||
* Read the existing subscriptions from the database
|
||||
*/
|
||||
@SuppressWarnings("unused")
|
||||
@Scheduled(fixedDelay = 10000)
|
||||
public void initSubscriptions() {
|
||||
SearchParameterMap map = new SearchParameterMap();
|
||||
map.add(Subscription.SP_TYPE, new TokenParam(null, myChannelType.toCode()));
|
||||
map.add(Subscription.SP_STATUS, new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode()));
|
||||
map.setLoadSynchronousUpTo(MAX_SUBSCRIPTION_RESULTS);
|
||||
|
||||
RequestDetails req = new ServletSubRequestDetails();
|
||||
req.setSubRequest(true);
|
||||
|
||||
IBundleProvider subscriptionBundleList = getSubscriptionDao().search(map, req);
|
||||
if (subscriptionBundleList.size() >= MAX_SUBSCRIPTION_RESULTS) {
|
||||
ourLog.error("Currently over " + MAX_SUBSCRIPTION_RESULTS + " subscriptions. Some subscriptions have not been loaded.");
|
||||
}
|
||||
|
||||
List<IBaseResource> resourceList = subscriptionBundleList.getResources(0, subscriptionBundleList.size());
|
||||
|
||||
Set<String> allIds = new HashSet<>();
|
||||
for (IBaseResource resource : resourceList) {
|
||||
String nextId = resource.getIdElement().getIdPart();
|
||||
allIds.add(nextId);
|
||||
myIdToSubscription.put(nextId, resource);
|
||||
}
|
||||
|
||||
for (String next : myIdToSubscription.keySet()) {
|
||||
if (!allIds.contains(next)) {
|
||||
myIdToSubscription.remove(next);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void postConstruct() {
|
||||
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
|
||||
ThreadFactory threadFactory = new BasicThreadFactory.Builder()
|
||||
.namingPattern("subscription-%d")
|
||||
.daemon(false)
|
||||
.priority(Thread.NORM_PRIORITY)
|
||||
.build();
|
||||
myExecutor = new ThreadPoolExecutor(
|
||||
myExecutorThreadCount,
|
||||
myExecutorThreadCount,
|
||||
0L,
|
||||
TimeUnit.MILLISECONDS,
|
||||
new LinkedBlockingQueue<Runnable>(1000),
|
||||
threadFactory,
|
||||
rejectedExecutionHandler);
|
||||
|
||||
|
||||
if (myProcessingChannel == null) {
|
||||
myProcessingChannel = new ExecutorSubscribableChannel(myExecutor);
|
||||
}
|
||||
|
||||
if (myAutoActivateSubscriptions) {
|
||||
if (mySubscriptionActivatingSubscriber == null) {
|
||||
mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), myIdToSubscription, myChannelType, myProcessingChannel);
|
||||
}
|
||||
myProcessingChannel.subscribe(mySubscriptionActivatingSubscriber);
|
||||
}
|
||||
|
||||
if (mySubscriptionCheckingSubscriber == null) {
|
||||
mySubscriptionCheckingSubscriber = new SubscriptionCheckingSubscriber(getSubscriptionDao(), myIdToSubscription, myChannelType, myProcessingChannel);
|
||||
}
|
||||
myProcessingChannel.subscribe(mySubscriptionCheckingSubscriber);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
@PreDestroy
|
||||
public void preDestroy() {
|
||||
if (myAutoActivateSubscriptions) {
|
||||
myProcessingChannel.unsubscribe(mySubscriptionActivatingSubscriber);
|
||||
}
|
||||
myProcessingChannel.unsubscribe(mySubscriptionCheckingSubscriber);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resourceCreated(RequestDetails theRequest, IBaseResource theResource) {
|
||||
ResourceModifiedMessage msg = new ResourceModifiedMessage();
|
||||
msg.setId(theResource.getIdElement());
|
||||
msg.setOperationType(RestOperationTypeEnum.CREATE);
|
||||
msg.setNewPayload(theResource);
|
||||
submitResourceModified(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resourceDeleted(RequestDetails theRequest, IBaseResource theResource) {
|
||||
ResourceModifiedMessage msg = new ResourceModifiedMessage();
|
||||
msg.setId(theResource.getIdElement());
|
||||
msg.setOperationType(RestOperationTypeEnum.DELETE);
|
||||
submitResourceModified(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resourceUpdated(RequestDetails theRequest, IBaseResource theOldResource, IBaseResource theNewResource) {
|
||||
ResourceModifiedMessage msg = new ResourceModifiedMessage();
|
||||
msg.setId(theNewResource.getIdElement());
|
||||
msg.setOperationType(RestOperationTypeEnum.UPDATE);
|
||||
msg.setNewPayload(theNewResource);
|
||||
submitResourceModified(msg);
|
||||
}
|
||||
|
||||
private void submitResourceModified(ResourceModifiedMessage theMsg) {
|
||||
myProcessingChannel.send(new GenericMessage<>(theMsg));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
package ca.uhn.fhir.jpa.subscription;
|
||||
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.instance.model.api.IPrimitiveType;
|
||||
import org.hl7.fhir.r4.model.Subscription;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public abstract class BaseSubscriptionSubscriber implements MessageHandler {
|
||||
static final String SUBSCRIPTION_STATUS = "Subscription.status";
|
||||
private static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
|
||||
private final IFhirResourceDao mySubscriptionDao;
|
||||
private final ConcurrentHashMap<String, IBaseResource> myIdToSubscription;
|
||||
private final Subscription.SubscriptionChannelType myChannelType;
|
||||
private final SubscribableChannel myProcessingChannel;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*/
|
||||
public BaseSubscriptionSubscriber(IFhirResourceDao<? extends IBaseResource> theSubscriptionDao, ConcurrentHashMap<String, IBaseResource> theIdToSubscription, Subscription.SubscriptionChannelType theChannelType, SubscribableChannel theProcessingChannel) {
|
||||
mySubscriptionDao = theSubscriptionDao;
|
||||
myIdToSubscription = theIdToSubscription;
|
||||
myChannelType = theChannelType;
|
||||
myProcessingChannel = theProcessingChannel;
|
||||
}
|
||||
|
||||
public Subscription.SubscriptionChannelType getChannelType() {
|
||||
return myChannelType;
|
||||
}
|
||||
|
||||
public FhirContext getContext() {
|
||||
return getSubscriptionDao().getContext();
|
||||
}
|
||||
|
||||
public ConcurrentHashMap<String, IBaseResource> getIdToSubscription() {
|
||||
return myIdToSubscription;
|
||||
}
|
||||
|
||||
public SubscribableChannel getProcessingChannel() {
|
||||
return myProcessingChannel;
|
||||
}
|
||||
|
||||
public IFhirResourceDao getSubscriptionDao() {
|
||||
return mySubscriptionDao;
|
||||
}
|
||||
|
||||
/**
|
||||
* Does this subscription type (e.g. rest hook, websocket, etc) apply to this interceptor?
|
||||
*/
|
||||
protected boolean subscriptionTypeApplies(ResourceModifiedMessage theMsg) {
|
||||
FhirContext ctx = mySubscriptionDao.getContext();
|
||||
IBaseResource subscription = theMsg.getNewPayload();
|
||||
return subscriptionTypeApplies(ctx, subscription);
|
||||
}
|
||||
|
||||
/**
|
||||
* Does this subscription type (e.g. rest hook, websocket, etc) apply to this interceptor?
|
||||
*/
|
||||
protected boolean subscriptionTypeApplies(FhirContext theCtx, IBaseResource theSubscription) {
|
||||
IPrimitiveType<?> status = theCtx.newTerser().getSingleValueOrNull(theSubscription, SUBSCRIPTION_TYPE, IPrimitiveType.class);
|
||||
boolean subscriptionTypeApplies = false;
|
||||
if (getChannelType().toCode().equals(status.getValueAsString())) {
|
||||
subscriptionTypeApplies = true;
|
||||
}
|
||||
return subscriptionTypeApplies;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
package ca.uhn.fhir.jpa.subscription;
|
||||
|
||||
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class ResourceDeliveryMessage implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 0L;
|
||||
|
||||
private IBaseResource mySubscription;
|
||||
private IBaseResource myPayoad;
|
||||
private RestOperationTypeEnum myOperationType;
|
||||
|
||||
public RestOperationTypeEnum getOperationType() {
|
||||
return myOperationType;
|
||||
}
|
||||
|
||||
public void setOperationType(RestOperationTypeEnum theOperationType) {
|
||||
myOperationType = theOperationType;
|
||||
}
|
||||
|
||||
public IBaseResource getPayoad() {
|
||||
return myPayoad;
|
||||
}
|
||||
|
||||
public void setPayoad(IBaseResource thePayoad) {
|
||||
myPayoad = thePayoad;
|
||||
}
|
||||
|
||||
public IBaseResource getSubscription() {
|
||||
return mySubscription;
|
||||
}
|
||||
|
||||
public void setSubscription(IBaseResource theSubscription) {
|
||||
mySubscription = theSubscription;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
package ca.uhn.fhir.jpa.subscription;
|
||||
|
||||
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class ResourceModifiedMessage implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 0L;
|
||||
|
||||
private IIdType myId;
|
||||
private RestOperationTypeEnum myOperationType;
|
||||
private IBaseResource myNewPayload;
|
||||
|
||||
public IIdType getId() {
|
||||
return myId;
|
||||
}
|
||||
|
||||
public void setId(IIdType theId) {
|
||||
myId = theId;
|
||||
}
|
||||
|
||||
|
||||
public RestOperationTypeEnum getOperationType() {
|
||||
return myOperationType;
|
||||
}
|
||||
|
||||
public void setOperationType(RestOperationTypeEnum theOperationType) {
|
||||
myOperationType = theOperationType;
|
||||
}
|
||||
|
||||
public IBaseResource getNewPayload() {
|
||||
return myNewPayload;
|
||||
}
|
||||
|
||||
public void setNewPayload(IBaseResource theNewPayload) {
|
||||
myNewPayload = theNewPayload;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
package ca.uhn.fhir.jpa.subscription;
|
||||
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
import org.hl7.fhir.instance.model.api.IPrimitiveType;
|
||||
import org.hl7.fhir.r4.model.Subscription;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public class SubscriptionActivatingSubscriber extends BaseSubscriptionSubscriber {
|
||||
private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class);
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*/
|
||||
public SubscriptionActivatingSubscriber(IFhirResourceDao<? extends IBaseResource> theSubscriptionDao, ConcurrentHashMap<String, IBaseResource> theIdToSubscription, Subscription.SubscriptionChannelType theChannelType, SubscribableChannel theProcessingChannel) {
|
||||
super(theSubscriptionDao, theIdToSubscription, theChannelType, theProcessingChannel);
|
||||
}
|
||||
|
||||
private void handleCreate(ResourceModifiedMessage theMsg) {
|
||||
if (!theMsg.getId().getResourceType().equals("Subscription")) {
|
||||
return;
|
||||
}
|
||||
|
||||
boolean subscriptionTypeApplies = subscriptionTypeApplies(theMsg);
|
||||
if (subscriptionTypeApplies == false) {
|
||||
return;
|
||||
}
|
||||
|
||||
FhirContext ctx = getSubscriptionDao().getContext();
|
||||
IBaseResource subscription = theMsg.getNewPayload();
|
||||
IPrimitiveType<?> status = ctx.newTerser().getSingleValueOrNull(subscription, SUBSCRIPTION_STATUS, IPrimitiveType.class);
|
||||
String statusString = status.getValueAsString();
|
||||
|
||||
String oldStatus = Subscription.SubscriptionStatus.REQUESTED.toCode();
|
||||
if (oldStatus.equals(statusString)) {
|
||||
String newStatus = Subscription.SubscriptionStatus.ACTIVE.toCode();
|
||||
status.setValueAsString(newStatus);
|
||||
ourLog.info("Activating subscription {} from status {} to {}", subscription.getIdElement().toUnqualifiedVersionless().getValue(), oldStatus, newStatus);
|
||||
getSubscriptionDao().update(subscription);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleMessage(Message<?> theMessage) throws MessagingException {
|
||||
|
||||
if (!(theMessage.getPayload() instanceof ResourceModifiedMessage)) {
|
||||
return;
|
||||
}
|
||||
|
||||
ResourceModifiedMessage msg = (ResourceModifiedMessage) theMessage.getPayload();
|
||||
IIdType id = msg.getId();
|
||||
|
||||
switch (msg.getOperationType()) {
|
||||
case DELETE:
|
||||
getIdToSubscription().remove(id.getIdPart());
|
||||
return;
|
||||
case CREATE:
|
||||
handleCreate(msg);
|
||||
break;
|
||||
case UPDATE:
|
||||
handleUpdate(msg);
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void handleUpdate(ResourceModifiedMessage theMsg) {
|
||||
if (!theMsg.getId().getResourceType().equals("Subscription")) {
|
||||
return;
|
||||
}
|
||||
|
||||
boolean subscriptionTypeApplies = subscriptionTypeApplies(theMsg);
|
||||
if (subscriptionTypeApplies == false) {
|
||||
return;
|
||||
}
|
||||
|
||||
FhirContext ctx = getSubscriptionDao().getContext();
|
||||
IBaseResource subscription = theMsg.getNewPayload();
|
||||
IPrimitiveType<?> status = ctx.newTerser().getSingleValueOrNull(subscription, SUBSCRIPTION_STATUS, IPrimitiveType.class);
|
||||
String statusString = status.getValueAsString();
|
||||
|
||||
if (Subscription.SubscriptionStatus.ACTIVE.toCode().equals(statusString)) {
|
||||
getIdToSubscription().put(theMsg.getId().getIdPart(), theMsg.getNewPayload());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,125 @@
|
|||
package ca.uhn.fhir.jpa.subscription;
|
||||
|
||||
import ca.uhn.fhir.context.RuntimeResourceDefinition;
|
||||
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
|
||||
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
|
||||
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
|
||||
import ca.uhn.fhir.jpa.provider.ServletSubRequestDetails;
|
||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.hl7.fhir.instance.model.api.IAnyResource;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.instance.model.api.IPrimitiveType;
|
||||
import org.hl7.fhir.r4.model.Subscription;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
|
||||
private Logger ourLog = LoggerFactory.getLogger(SubscriptionCheckingSubscriber.class);
|
||||
|
||||
public SubscriptionCheckingSubscriber(IFhirResourceDao theSubscriptionDao, ConcurrentHashMap<String, IBaseResource> theIdToSubscription, Subscription.SubscriptionChannelType theChannelType, SubscribableChannel theProcessingChannel) {
|
||||
super(theSubscriptionDao, theIdToSubscription, theChannelType, theProcessingChannel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleMessage(Message<?> theMessage) throws MessagingException {
|
||||
if (!(theMessage.getPayload() instanceof ResourceModifiedMessage)) {
|
||||
return;
|
||||
}
|
||||
|
||||
ResourceModifiedMessage msg = (ResourceModifiedMessage) theMessage.getPayload();
|
||||
switch (msg.getOperationType()) {
|
||||
case CREATE:
|
||||
case UPDATE:
|
||||
break;
|
||||
default:
|
||||
// ignore anything else
|
||||
return;
|
||||
}
|
||||
|
||||
String resourceType = msg.getId().getResourceType();
|
||||
String resourceId = msg.getId().getIdPart();
|
||||
|
||||
for (IBaseResource nextSubscription : getIdToSubscription().values()) {
|
||||
|
||||
String nextSubscriptionId = nextSubscription.getIdElement().toUnqualifiedVersionless().getValue();
|
||||
IPrimitiveType<?> nextCriteria = getContext().newTerser().getSingleValueOrNull(nextSubscription, "Subscription.criteria", IPrimitiveType.class);
|
||||
String nextCriteriaString = nextCriteria != null ? nextCriteria.getValueAsString() : null;
|
||||
|
||||
if (StringUtils.isBlank(nextCriteriaString)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// see if the criteria matches the created object
|
||||
ourLog.info("Checking subscription {} for {} with criteria {}", nextSubscriptionId, resourceType, nextCriteriaString);
|
||||
|
||||
String criteriaResource = nextCriteriaString;
|
||||
int index = criteriaResource.indexOf("?");
|
||||
if (index != -1) {
|
||||
criteriaResource = criteriaResource.substring(0, criteriaResource.indexOf("?"));
|
||||
}
|
||||
|
||||
if (resourceType != null && nextCriteriaString != null && !criteriaResource.equals(resourceType)) {
|
||||
ourLog.info("Skipping subscription search for {} because it does not match the criteria {}", resourceType, nextCriteriaString);
|
||||
continue;
|
||||
}
|
||||
|
||||
// run the subscriptions query and look for matches, add the id as part of the criteria to avoid getting matches of previous resources rather than the recent resource
|
||||
String criteria = nextCriteriaString;
|
||||
criteria += "&_id=" + resourceType + "/" + resourceId;
|
||||
criteria = massageCriteria(criteria);
|
||||
|
||||
IBundleProvider results = performSearch(criteria);
|
||||
if (results.size() == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// should just be one resource as it was filtered by the id
|
||||
for (IBaseResource nextBase : results.getResources(0, results.size())) {
|
||||
IAnyResource next = (IAnyResource) nextBase;
|
||||
ourLog.info("Found match: queueing rest-hook notification for resource: {}", next.getIdElement());
|
||||
|
||||
ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
|
||||
deliveryMsg.setPayoad(next);
|
||||
deliveryMsg.setSubscription(nextSubscription);
|
||||
deliveryMsg.setOperationType(msg.getOperationType());
|
||||
|
||||
getProcessingChannel().send(new GenericMessage<>(deliveryMsg));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclasses may override
|
||||
*/
|
||||
protected String massageCriteria(String theCriteria) {
|
||||
return theCriteria;
|
||||
}
|
||||
|
||||
/**
|
||||
* Search based on a query criteria
|
||||
*/
|
||||
protected IBundleProvider performSearch(String theCriteria) {
|
||||
RuntimeResourceDefinition responseResourceDef = getSubscriptionDao().validateCriteriaAndReturnResourceDefinition(theCriteria);
|
||||
SearchParameterMap responseCriteriaUrl = BaseHapiFhirDao.translateMatchUrl(getSubscriptionDao(), getSubscriptionDao().getContext(), theCriteria, responseResourceDef);
|
||||
|
||||
RequestDetails req = new ServletSubRequestDetails();
|
||||
req.setSubRequest(true);
|
||||
|
||||
IFhirResourceDao<? extends IBaseResource> responseDao = getSubscriptionDao().getDao(responseResourceDef.getImplementingClass());
|
||||
responseCriteriaUrl.setLoadSynchronousUpTo(1);
|
||||
|
||||
IBundleProvider responseResults = responseDao.search(responseCriteriaUrl, req);
|
||||
return responseResults;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
package ca.uhn.fhir.jpa.subscription;
|
||||
|
||||
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.r4.model.Subscription;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionSubscriber {
|
||||
|
||||
public SubscriptionDeliveringRestHookSubscriber(IFhirResourceDao theSubscriptionDao, ConcurrentHashMap<String, IBaseResource> theIdToSubscription, Subscription.SubscriptionChannelType theChannelType, SubscribableChannel theProcessingChannel) {
|
||||
super(theSubscriptionDao, theIdToSubscription, theChannelType, theProcessingChannel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleMessage(Message<?> theMessage) throws MessagingException {
|
||||
if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) {
|
||||
return;
|
||||
}
|
||||
|
||||
ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload();
|
||||
|
||||
if (!subscriptionTypeApplies(getContext(), msg.getSubscription())) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
}
|
|
@ -57,11 +57,8 @@ public class SubscriptionWebsocketHandlerR4 extends TextWebSocketHandler impleme
|
|||
private static IFhirResourceDaoSubscription<Subscription> ourSubscriptionDao;
|
||||
|
||||
private ScheduledFuture<?> myScheduleFuture;
|
||||
|
||||
private IState myState = new InitialState();
|
||||
|
||||
private IIdType mySubscriptionId;
|
||||
|
||||
private Long mySubscriptionPid;
|
||||
|
||||
@Autowired
|
||||
|
|
|
@ -1,138 +0,0 @@
|
|||
package org.hl7.fhir.r4.model.api;
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* #%L
|
||||
* HAPI FHIR - Core Library
|
||||
* %%
|
||||
* Copyright (C) 2014 - 2015 University Health Network
|
||||
* %%
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* #L%
|
||||
*/
|
||||
|
||||
/**
|
||||
* Base interface for ID datatype.
|
||||
*
|
||||
* <p>
|
||||
* <b>Concrete Implementations:</b> This interface is often returned and/or accepted by methods in HAPI's API
|
||||
* where either {@link ca.uhn.fhir.model.primitive.IdDt} (the HAPI structure ID type) or
|
||||
* <code>org.hl7.fhir.instance.model.IdType</code> (the RI structure ID type) will be used, depending on
|
||||
* which version of the strctures your application is using.
|
||||
* </p>
|
||||
*/
|
||||
public interface IIdType {
|
||||
|
||||
void applyTo(IBaseResource theResource);
|
||||
|
||||
/**
|
||||
* Returns the server base URL if this ID contains one. For example, the base URL is
|
||||
* the 'http://example.com/fhir' in the following ID: <code>http://example.com/fhir/Patient/123/_history/55</code>
|
||||
*/
|
||||
String getBaseUrl();
|
||||
|
||||
/**
|
||||
* Returns only the logical ID part of this ID. For example, given the ID
|
||||
* "http://example,.com/fhir/Patient/123/_history/456", this method would
|
||||
* return "123".
|
||||
*/
|
||||
String getIdPart();
|
||||
|
||||
/**
|
||||
* Returns the ID part of this ID (e.g. in the ID http://example.com/Patient/123/_history/456 this would be the
|
||||
* part "123") parsed as a {@link Long}.
|
||||
*
|
||||
* @throws NumberFormatException If the value can't be parsed as a long
|
||||
*/
|
||||
Long getIdPartAsLong();
|
||||
|
||||
String getResourceType();
|
||||
|
||||
/**
|
||||
* Returns the value of this ID. Note that this value may be a fully qualified URL, a relative/partial URL, or a simple ID. Use {@link #getIdPart()} to get just the ID portion.
|
||||
*
|
||||
* @see #getIdPart()
|
||||
*/
|
||||
String getValue();
|
||||
|
||||
String getVersionIdPart();
|
||||
|
||||
/**
|
||||
* Returns the version ID part of this ID (e.g. in the ID http://example.com/Patient/123/_history/456 this would be the
|
||||
* part "456") parsed as a {@link Long}.
|
||||
*
|
||||
* @throws NumberFormatException If the value can't be parsed as a long
|
||||
*/
|
||||
Long getVersionIdPartAsLong();
|
||||
|
||||
boolean hasBaseUrl();
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if this ID contains an actual ID part. For example, the ID part is
|
||||
* the '123' in the following ID: <code>http://example.com/fhir/Patient/123/_history/55</code>
|
||||
*/
|
||||
boolean hasIdPart();
|
||||
|
||||
boolean hasResourceType();
|
||||
|
||||
boolean hasVersionIdPart();
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if this ID contains an absolute URL (in other words, a URL starting with "http://" or "https://"
|
||||
*/
|
||||
boolean isAbsolute();
|
||||
|
||||
boolean isEmpty();
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if the {@link #getIdPart() ID part of this object} is valid according to the FHIR rules for valid IDs.
|
||||
* <p>
|
||||
* The FHIR specification states:
|
||||
* <code>Any combination of upper or lower case ASCII letters ('A'..'Z', and 'a'..'z', numerals ('0'..'9'), '-' and '.', with a length limit of 64 characters. (This might be an integer, an un-prefixed OID, UUID or any other identifier pattern that meets these constraints.) regex: [A-Za-z0-9\-\.]{1,64}</code>
|
||||
* </p>
|
||||
*/
|
||||
boolean isIdPartValid();
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if the {@link #getIdPart() ID part of this object} contains
|
||||
* only numbers
|
||||
*/
|
||||
boolean isIdPartValidLong();
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if the ID is a local reference (in other words, it begins with the '#' character)
|
||||
*/
|
||||
boolean isLocal();
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if the {@link #getVersionIdPart() version ID part of this object} contains
|
||||
* only numbers
|
||||
*/
|
||||
boolean isVersionIdPartValidLong();
|
||||
|
||||
IIdType setValue(String theString);
|
||||
|
||||
IIdType toUnqualified();
|
||||
|
||||
IIdType toUnqualifiedVersionless();
|
||||
|
||||
IIdType toVersionless();
|
||||
|
||||
IIdType withResourceType(String theResName);
|
||||
|
||||
IIdType withServerBase(String theServerBase, String theResourceName);
|
||||
|
||||
IIdType withVersion(String theVersion);
|
||||
|
||||
}
|
Loading…
Reference in New Issue