Work on subscriptions
This commit is contained in:
parent
12f89a423a
commit
b9dbd64101
|
@ -264,6 +264,10 @@
|
||||||
<groupId>org.springframework</groupId>
|
<groupId>org.springframework</groupId>
|
||||||
<artifactId>spring-messaging</artifactId>
|
<artifactId>spring-messaging</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.integration</groupId>
|
||||||
|
<artifactId>spring-integration-core</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework</groupId>
|
<groupId>org.springframework</groupId>
|
||||||
<artifactId>spring-tx</artifactId>
|
<artifactId>spring-tx</artifactId>
|
||||||
|
|
|
@ -58,10 +58,10 @@ public class RestHookSubscriptionR4Interceptor extends BaseRestHookSubscriptionI
|
||||||
private final static int MAX_THREADS = 1;
|
private final static int MAX_THREADS = 1;
|
||||||
private static final Logger ourLog = LoggerFactory.getLogger(RestHookSubscriptionR4Interceptor.class);
|
private static final Logger ourLog = LoggerFactory.getLogger(RestHookSubscriptionR4Interceptor.class);
|
||||||
|
|
||||||
|
private final List<Subscription> myRestHookSubscriptions = new ArrayList<>();
|
||||||
@Autowired
|
@Autowired
|
||||||
private FhirContext myFhirContext;
|
private FhirContext myFhirContext;
|
||||||
|
|
||||||
private final List<Subscription> myRestHookSubscriptions = new ArrayList<Subscription>();
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
@Qualifier("mySubscriptionDaoR4")
|
@Qualifier("mySubscriptionDaoR4")
|
||||||
|
|
|
@ -0,0 +1,152 @@
|
||||||
|
package ca.uhn.fhir.jpa.subscription;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
|
||||||
|
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
|
||||||
|
import ca.uhn.fhir.jpa.provider.ServletSubRequestDetails;
|
||||||
|
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
|
||||||
|
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
||||||
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
|
import ca.uhn.fhir.rest.param.TokenParam;
|
||||||
|
import ca.uhn.fhir.rest.server.interceptor.ServerOperationInterceptorAdapter;
|
||||||
|
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||||
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
|
import org.hl7.fhir.r4.model.Subscription;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.messaging.MessageHandler;
|
||||||
|
import org.springframework.messaging.SubscribableChannel;
|
||||||
|
import org.springframework.messaging.support.ExecutorSubscribableChannel;
|
||||||
|
import org.springframework.messaging.support.GenericMessage;
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
import javax.annotation.PreDestroy;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.*;
|
||||||
|
|
||||||
|
public abstract class BaseSubscriptionInterceptor extends ServerOperationInterceptorAdapter {
|
||||||
|
|
||||||
|
private static final Integer MAX_SUBSCRIPTION_RESULTS = 1000;
|
||||||
|
private SubscribableChannel myProcessingChannel;
|
||||||
|
private ExecutorService myExecutor;
|
||||||
|
private boolean myAutoActivateSubscriptions = true;
|
||||||
|
private int myExecutorThreadCount = 1;
|
||||||
|
private MessageHandler mySubscriptionActivatingSubscriber;
|
||||||
|
private MessageHandler mySubscriptionCheckingSubscriber;
|
||||||
|
private ConcurrentHashMap<String, IBaseResource> myIdToSubscription = new ConcurrentHashMap<>();
|
||||||
|
private Subscription.SubscriptionChannelType myChannelType = Subscription.SubscriptionChannelType.RESTHOOK;
|
||||||
|
private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class);
|
||||||
|
|
||||||
|
protected abstract IFhirResourceDao<?> getSubscriptionDao();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the existing subscriptions from the database
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
@Scheduled(fixedDelay = 10000)
|
||||||
|
public void initSubscriptions() {
|
||||||
|
SearchParameterMap map = new SearchParameterMap();
|
||||||
|
map.add(Subscription.SP_TYPE, new TokenParam(null, myChannelType.toCode()));
|
||||||
|
map.add(Subscription.SP_STATUS, new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode()));
|
||||||
|
map.setLoadSynchronousUpTo(MAX_SUBSCRIPTION_RESULTS);
|
||||||
|
|
||||||
|
RequestDetails req = new ServletSubRequestDetails();
|
||||||
|
req.setSubRequest(true);
|
||||||
|
|
||||||
|
IBundleProvider subscriptionBundleList = getSubscriptionDao().search(map, req);
|
||||||
|
if (subscriptionBundleList.size() >= MAX_SUBSCRIPTION_RESULTS) {
|
||||||
|
ourLog.error("Currently over " + MAX_SUBSCRIPTION_RESULTS + " subscriptions. Some subscriptions have not been loaded.");
|
||||||
|
}
|
||||||
|
|
||||||
|
List<IBaseResource> resourceList = subscriptionBundleList.getResources(0, subscriptionBundleList.size());
|
||||||
|
|
||||||
|
Set<String> allIds = new HashSet<>();
|
||||||
|
for (IBaseResource resource : resourceList) {
|
||||||
|
String nextId = resource.getIdElement().getIdPart();
|
||||||
|
allIds.add(nextId);
|
||||||
|
myIdToSubscription.put(nextId, resource);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (String next : myIdToSubscription.keySet()) {
|
||||||
|
if (!allIds.contains(next)) {
|
||||||
|
myIdToSubscription.remove(next);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void postConstruct() {
|
||||||
|
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
|
||||||
|
ThreadFactory threadFactory = new BasicThreadFactory.Builder()
|
||||||
|
.namingPattern("subscription-%d")
|
||||||
|
.daemon(false)
|
||||||
|
.priority(Thread.NORM_PRIORITY)
|
||||||
|
.build();
|
||||||
|
myExecutor = new ThreadPoolExecutor(
|
||||||
|
myExecutorThreadCount,
|
||||||
|
myExecutorThreadCount,
|
||||||
|
0L,
|
||||||
|
TimeUnit.MILLISECONDS,
|
||||||
|
new LinkedBlockingQueue<Runnable>(1000),
|
||||||
|
threadFactory,
|
||||||
|
rejectedExecutionHandler);
|
||||||
|
|
||||||
|
|
||||||
|
if (myProcessingChannel == null) {
|
||||||
|
myProcessingChannel = new ExecutorSubscribableChannel(myExecutor);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (myAutoActivateSubscriptions) {
|
||||||
|
if (mySubscriptionActivatingSubscriber == null) {
|
||||||
|
mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), myIdToSubscription, myChannelType, myProcessingChannel);
|
||||||
|
}
|
||||||
|
myProcessingChannel.subscribe(mySubscriptionActivatingSubscriber);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mySubscriptionCheckingSubscriber == null) {
|
||||||
|
mySubscriptionCheckingSubscriber = new SubscriptionCheckingSubscriber(getSubscriptionDao(), myIdToSubscription, myChannelType, myProcessingChannel);
|
||||||
|
}
|
||||||
|
myProcessingChannel.subscribe(mySubscriptionCheckingSubscriber);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
@PreDestroy
|
||||||
|
public void preDestroy() {
|
||||||
|
if (myAutoActivateSubscriptions) {
|
||||||
|
myProcessingChannel.unsubscribe(mySubscriptionActivatingSubscriber);
|
||||||
|
}
|
||||||
|
myProcessingChannel.unsubscribe(mySubscriptionCheckingSubscriber);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void resourceCreated(RequestDetails theRequest, IBaseResource theResource) {
|
||||||
|
ResourceModifiedMessage msg = new ResourceModifiedMessage();
|
||||||
|
msg.setId(theResource.getIdElement());
|
||||||
|
msg.setOperationType(RestOperationTypeEnum.CREATE);
|
||||||
|
msg.setNewPayload(theResource);
|
||||||
|
submitResourceModified(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void resourceDeleted(RequestDetails theRequest, IBaseResource theResource) {
|
||||||
|
ResourceModifiedMessage msg = new ResourceModifiedMessage();
|
||||||
|
msg.setId(theResource.getIdElement());
|
||||||
|
msg.setOperationType(RestOperationTypeEnum.DELETE);
|
||||||
|
submitResourceModified(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void resourceUpdated(RequestDetails theRequest, IBaseResource theOldResource, IBaseResource theNewResource) {
|
||||||
|
ResourceModifiedMessage msg = new ResourceModifiedMessage();
|
||||||
|
msg.setId(theNewResource.getIdElement());
|
||||||
|
msg.setOperationType(RestOperationTypeEnum.UPDATE);
|
||||||
|
msg.setNewPayload(theNewResource);
|
||||||
|
submitResourceModified(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void submitResourceModified(ResourceModifiedMessage theMsg) {
|
||||||
|
myProcessingChannel.send(new GenericMessage<>(theMsg));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,72 @@
|
||||||
|
package ca.uhn.fhir.jpa.subscription;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
|
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
|
||||||
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
|
import org.hl7.fhir.instance.model.api.IPrimitiveType;
|
||||||
|
import org.hl7.fhir.r4.model.Subscription;
|
||||||
|
import org.springframework.messaging.MessageHandler;
|
||||||
|
import org.springframework.messaging.SubscribableChannel;
|
||||||
|
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
public abstract class BaseSubscriptionSubscriber implements MessageHandler {
|
||||||
|
static final String SUBSCRIPTION_STATUS = "Subscription.status";
|
||||||
|
private static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
|
||||||
|
private final IFhirResourceDao mySubscriptionDao;
|
||||||
|
private final ConcurrentHashMap<String, IBaseResource> myIdToSubscription;
|
||||||
|
private final Subscription.SubscriptionChannelType myChannelType;
|
||||||
|
private final SubscribableChannel myProcessingChannel;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
*/
|
||||||
|
public BaseSubscriptionSubscriber(IFhirResourceDao<? extends IBaseResource> theSubscriptionDao, ConcurrentHashMap<String, IBaseResource> theIdToSubscription, Subscription.SubscriptionChannelType theChannelType, SubscribableChannel theProcessingChannel) {
|
||||||
|
mySubscriptionDao = theSubscriptionDao;
|
||||||
|
myIdToSubscription = theIdToSubscription;
|
||||||
|
myChannelType = theChannelType;
|
||||||
|
myProcessingChannel = theProcessingChannel;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Subscription.SubscriptionChannelType getChannelType() {
|
||||||
|
return myChannelType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FhirContext getContext() {
|
||||||
|
return getSubscriptionDao().getContext();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ConcurrentHashMap<String, IBaseResource> getIdToSubscription() {
|
||||||
|
return myIdToSubscription;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SubscribableChannel getProcessingChannel() {
|
||||||
|
return myProcessingChannel;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IFhirResourceDao getSubscriptionDao() {
|
||||||
|
return mySubscriptionDao;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Does this subscription type (e.g. rest hook, websocket, etc) apply to this interceptor?
|
||||||
|
*/
|
||||||
|
protected boolean subscriptionTypeApplies(ResourceModifiedMessage theMsg) {
|
||||||
|
FhirContext ctx = mySubscriptionDao.getContext();
|
||||||
|
IBaseResource subscription = theMsg.getNewPayload();
|
||||||
|
return subscriptionTypeApplies(ctx, subscription);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Does this subscription type (e.g. rest hook, websocket, etc) apply to this interceptor?
|
||||||
|
*/
|
||||||
|
protected boolean subscriptionTypeApplies(FhirContext theCtx, IBaseResource theSubscription) {
|
||||||
|
IPrimitiveType<?> status = theCtx.newTerser().getSingleValueOrNull(theSubscription, SUBSCRIPTION_TYPE, IPrimitiveType.class);
|
||||||
|
boolean subscriptionTypeApplies = false;
|
||||||
|
if (getChannelType().toCode().equals(status.getValueAsString())) {
|
||||||
|
subscriptionTypeApplies = true;
|
||||||
|
}
|
||||||
|
return subscriptionTypeApplies;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,40 @@
|
||||||
|
package ca.uhn.fhir.jpa.subscription;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
|
||||||
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class ResourceDeliveryMessage implements Serializable {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 0L;
|
||||||
|
|
||||||
|
private IBaseResource mySubscription;
|
||||||
|
private IBaseResource myPayoad;
|
||||||
|
private RestOperationTypeEnum myOperationType;
|
||||||
|
|
||||||
|
public RestOperationTypeEnum getOperationType() {
|
||||||
|
return myOperationType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOperationType(RestOperationTypeEnum theOperationType) {
|
||||||
|
myOperationType = theOperationType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IBaseResource getPayoad() {
|
||||||
|
return myPayoad;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPayoad(IBaseResource thePayoad) {
|
||||||
|
myPayoad = thePayoad;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IBaseResource getSubscription() {
|
||||||
|
return mySubscription;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSubscription(IBaseResource theSubscription) {
|
||||||
|
mySubscription = theSubscription;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,41 @@
|
||||||
|
package ca.uhn.fhir.jpa.subscription;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
|
||||||
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
|
import org.hl7.fhir.instance.model.api.IIdType;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class ResourceModifiedMessage implements Serializable {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 0L;
|
||||||
|
|
||||||
|
private IIdType myId;
|
||||||
|
private RestOperationTypeEnum myOperationType;
|
||||||
|
private IBaseResource myNewPayload;
|
||||||
|
|
||||||
|
public IIdType getId() {
|
||||||
|
return myId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setId(IIdType theId) {
|
||||||
|
myId = theId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public RestOperationTypeEnum getOperationType() {
|
||||||
|
return myOperationType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOperationType(RestOperationTypeEnum theOperationType) {
|
||||||
|
myOperationType = theOperationType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IBaseResource getNewPayload() {
|
||||||
|
return myNewPayload;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setNewPayload(IBaseResource theNewPayload) {
|
||||||
|
myNewPayload = theNewPayload;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,96 @@
|
||||||
|
package ca.uhn.fhir.jpa.subscription;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
|
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
|
||||||
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
|
import org.hl7.fhir.instance.model.api.IIdType;
|
||||||
|
import org.hl7.fhir.instance.model.api.IPrimitiveType;
|
||||||
|
import org.hl7.fhir.r4.model.Subscription;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.messaging.Message;
|
||||||
|
import org.springframework.messaging.MessageHandler;
|
||||||
|
import org.springframework.messaging.MessagingException;
|
||||||
|
import org.springframework.messaging.SubscribableChannel;
|
||||||
|
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public class SubscriptionActivatingSubscriber extends BaseSubscriptionSubscriber {
|
||||||
|
private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
*/
|
||||||
|
public SubscriptionActivatingSubscriber(IFhirResourceDao<? extends IBaseResource> theSubscriptionDao, ConcurrentHashMap<String, IBaseResource> theIdToSubscription, Subscription.SubscriptionChannelType theChannelType, SubscribableChannel theProcessingChannel) {
|
||||||
|
super(theSubscriptionDao, theIdToSubscription, theChannelType, theProcessingChannel);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleCreate(ResourceModifiedMessage theMsg) {
|
||||||
|
if (!theMsg.getId().getResourceType().equals("Subscription")) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean subscriptionTypeApplies = subscriptionTypeApplies(theMsg);
|
||||||
|
if (subscriptionTypeApplies == false) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
FhirContext ctx = getSubscriptionDao().getContext();
|
||||||
|
IBaseResource subscription = theMsg.getNewPayload();
|
||||||
|
IPrimitiveType<?> status = ctx.newTerser().getSingleValueOrNull(subscription, SUBSCRIPTION_STATUS, IPrimitiveType.class);
|
||||||
|
String statusString = status.getValueAsString();
|
||||||
|
|
||||||
|
String oldStatus = Subscription.SubscriptionStatus.REQUESTED.toCode();
|
||||||
|
if (oldStatus.equals(statusString)) {
|
||||||
|
String newStatus = Subscription.SubscriptionStatus.ACTIVE.toCode();
|
||||||
|
status.setValueAsString(newStatus);
|
||||||
|
ourLog.info("Activating subscription {} from status {} to {}", subscription.getIdElement().toUnqualifiedVersionless().getValue(), oldStatus, newStatus);
|
||||||
|
getSubscriptionDao().update(subscription);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleMessage(Message<?> theMessage) throws MessagingException {
|
||||||
|
|
||||||
|
if (!(theMessage.getPayload() instanceof ResourceModifiedMessage)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ResourceModifiedMessage msg = (ResourceModifiedMessage) theMessage.getPayload();
|
||||||
|
IIdType id = msg.getId();
|
||||||
|
|
||||||
|
switch (msg.getOperationType()) {
|
||||||
|
case DELETE:
|
||||||
|
getIdToSubscription().remove(id.getIdPart());
|
||||||
|
return;
|
||||||
|
case CREATE:
|
||||||
|
handleCreate(msg);
|
||||||
|
break;
|
||||||
|
case UPDATE:
|
||||||
|
handleUpdate(msg);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleUpdate(ResourceModifiedMessage theMsg) {
|
||||||
|
if (!theMsg.getId().getResourceType().equals("Subscription")) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean subscriptionTypeApplies = subscriptionTypeApplies(theMsg);
|
||||||
|
if (subscriptionTypeApplies == false) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
FhirContext ctx = getSubscriptionDao().getContext();
|
||||||
|
IBaseResource subscription = theMsg.getNewPayload();
|
||||||
|
IPrimitiveType<?> status = ctx.newTerser().getSingleValueOrNull(subscription, SUBSCRIPTION_STATUS, IPrimitiveType.class);
|
||||||
|
String statusString = status.getValueAsString();
|
||||||
|
|
||||||
|
if (Subscription.SubscriptionStatus.ACTIVE.toCode().equals(statusString)) {
|
||||||
|
getIdToSubscription().put(theMsg.getId().getIdPart(), theMsg.getNewPayload());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,125 @@
|
||||||
|
package ca.uhn.fhir.jpa.subscription;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.context.RuntimeResourceDefinition;
|
||||||
|
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
|
||||||
|
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
|
||||||
|
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
|
||||||
|
import ca.uhn.fhir.jpa.provider.ServletSubRequestDetails;
|
||||||
|
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
||||||
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.hl7.fhir.instance.model.api.IAnyResource;
|
||||||
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
|
import org.hl7.fhir.instance.model.api.IPrimitiveType;
|
||||||
|
import org.hl7.fhir.r4.model.Subscription;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.messaging.Message;
|
||||||
|
import org.springframework.messaging.MessagingException;
|
||||||
|
import org.springframework.messaging.SubscribableChannel;
|
||||||
|
import org.springframework.messaging.support.GenericMessage;
|
||||||
|
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
|
||||||
|
private Logger ourLog = LoggerFactory.getLogger(SubscriptionCheckingSubscriber.class);
|
||||||
|
|
||||||
|
public SubscriptionCheckingSubscriber(IFhirResourceDao theSubscriptionDao, ConcurrentHashMap<String, IBaseResource> theIdToSubscription, Subscription.SubscriptionChannelType theChannelType, SubscribableChannel theProcessingChannel) {
|
||||||
|
super(theSubscriptionDao, theIdToSubscription, theChannelType, theProcessingChannel);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleMessage(Message<?> theMessage) throws MessagingException {
|
||||||
|
if (!(theMessage.getPayload() instanceof ResourceModifiedMessage)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ResourceModifiedMessage msg = (ResourceModifiedMessage) theMessage.getPayload();
|
||||||
|
switch (msg.getOperationType()) {
|
||||||
|
case CREATE:
|
||||||
|
case UPDATE:
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
// ignore anything else
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
String resourceType = msg.getId().getResourceType();
|
||||||
|
String resourceId = msg.getId().getIdPart();
|
||||||
|
|
||||||
|
for (IBaseResource nextSubscription : getIdToSubscription().values()) {
|
||||||
|
|
||||||
|
String nextSubscriptionId = nextSubscription.getIdElement().toUnqualifiedVersionless().getValue();
|
||||||
|
IPrimitiveType<?> nextCriteria = getContext().newTerser().getSingleValueOrNull(nextSubscription, "Subscription.criteria", IPrimitiveType.class);
|
||||||
|
String nextCriteriaString = nextCriteria != null ? nextCriteria.getValueAsString() : null;
|
||||||
|
|
||||||
|
if (StringUtils.isBlank(nextCriteriaString)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// see if the criteria matches the created object
|
||||||
|
ourLog.info("Checking subscription {} for {} with criteria {}", nextSubscriptionId, resourceType, nextCriteriaString);
|
||||||
|
|
||||||
|
String criteriaResource = nextCriteriaString;
|
||||||
|
int index = criteriaResource.indexOf("?");
|
||||||
|
if (index != -1) {
|
||||||
|
criteriaResource = criteriaResource.substring(0, criteriaResource.indexOf("?"));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (resourceType != null && nextCriteriaString != null && !criteriaResource.equals(resourceType)) {
|
||||||
|
ourLog.info("Skipping subscription search for {} because it does not match the criteria {}", resourceType, nextCriteriaString);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// run the subscriptions query and look for matches, add the id as part of the criteria to avoid getting matches of previous resources rather than the recent resource
|
||||||
|
String criteria = nextCriteriaString;
|
||||||
|
criteria += "&_id=" + resourceType + "/" + resourceId;
|
||||||
|
criteria = massageCriteria(criteria);
|
||||||
|
|
||||||
|
IBundleProvider results = performSearch(criteria);
|
||||||
|
if (results.size() == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// should just be one resource as it was filtered by the id
|
||||||
|
for (IBaseResource nextBase : results.getResources(0, results.size())) {
|
||||||
|
IAnyResource next = (IAnyResource) nextBase;
|
||||||
|
ourLog.info("Found match: queueing rest-hook notification for resource: {}", next.getIdElement());
|
||||||
|
|
||||||
|
ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
|
||||||
|
deliveryMsg.setPayoad(next);
|
||||||
|
deliveryMsg.setSubscription(nextSubscription);
|
||||||
|
deliveryMsg.setOperationType(msg.getOperationType());
|
||||||
|
|
||||||
|
getProcessingChannel().send(new GenericMessage<>(deliveryMsg));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Subclasses may override
|
||||||
|
*/
|
||||||
|
protected String massageCriteria(String theCriteria) {
|
||||||
|
return theCriteria;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Search based on a query criteria
|
||||||
|
*/
|
||||||
|
protected IBundleProvider performSearch(String theCriteria) {
|
||||||
|
RuntimeResourceDefinition responseResourceDef = getSubscriptionDao().validateCriteriaAndReturnResourceDefinition(theCriteria);
|
||||||
|
SearchParameterMap responseCriteriaUrl = BaseHapiFhirDao.translateMatchUrl(getSubscriptionDao(), getSubscriptionDao().getContext(), theCriteria, responseResourceDef);
|
||||||
|
|
||||||
|
RequestDetails req = new ServletSubRequestDetails();
|
||||||
|
req.setSubRequest(true);
|
||||||
|
|
||||||
|
IFhirResourceDao<? extends IBaseResource> responseDao = getSubscriptionDao().getDao(responseResourceDef.getImplementingClass());
|
||||||
|
responseCriteriaUrl.setLoadSynchronousUpTo(1);
|
||||||
|
|
||||||
|
IBundleProvider responseResults = responseDao.search(responseCriteriaUrl, req);
|
||||||
|
return responseResults;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,33 @@
|
||||||
|
package ca.uhn.fhir.jpa.subscription;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
|
||||||
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
|
import org.hl7.fhir.r4.model.Subscription;
|
||||||
|
import org.springframework.messaging.Message;
|
||||||
|
import org.springframework.messaging.MessagingException;
|
||||||
|
import org.springframework.messaging.SubscribableChannel;
|
||||||
|
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionSubscriber {
|
||||||
|
|
||||||
|
public SubscriptionDeliveringRestHookSubscriber(IFhirResourceDao theSubscriptionDao, ConcurrentHashMap<String, IBaseResource> theIdToSubscription, Subscription.SubscriptionChannelType theChannelType, SubscribableChannel theProcessingChannel) {
|
||||||
|
super(theSubscriptionDao, theIdToSubscription, theChannelType, theProcessingChannel);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleMessage(Message<?> theMessage) throws MessagingException {
|
||||||
|
if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload();
|
||||||
|
|
||||||
|
if (!subscriptionTypeApplies(getContext(), msg.getSubscription())) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -57,11 +57,8 @@ public class SubscriptionWebsocketHandlerR4 extends TextWebSocketHandler impleme
|
||||||
private static IFhirResourceDaoSubscription<Subscription> ourSubscriptionDao;
|
private static IFhirResourceDaoSubscription<Subscription> ourSubscriptionDao;
|
||||||
|
|
||||||
private ScheduledFuture<?> myScheduleFuture;
|
private ScheduledFuture<?> myScheduleFuture;
|
||||||
|
|
||||||
private IState myState = new InitialState();
|
private IState myState = new InitialState();
|
||||||
|
|
||||||
private IIdType mySubscriptionId;
|
private IIdType mySubscriptionId;
|
||||||
|
|
||||||
private Long mySubscriptionPid;
|
private Long mySubscriptionPid;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
|
|
|
@ -1,138 +0,0 @@
|
||||||
package org.hl7.fhir.r4.model.api;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* #%L
|
|
||||||
* HAPI FHIR - Core Library
|
|
||||||
* %%
|
|
||||||
* Copyright (C) 2014 - 2015 University Health Network
|
|
||||||
* %%
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
* #L%
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Base interface for ID datatype.
|
|
||||||
*
|
|
||||||
* <p>
|
|
||||||
* <b>Concrete Implementations:</b> This interface is often returned and/or accepted by methods in HAPI's API
|
|
||||||
* where either {@link ca.uhn.fhir.model.primitive.IdDt} (the HAPI structure ID type) or
|
|
||||||
* <code>org.hl7.fhir.instance.model.IdType</code> (the RI structure ID type) will be used, depending on
|
|
||||||
* which version of the strctures your application is using.
|
|
||||||
* </p>
|
|
||||||
*/
|
|
||||||
public interface IIdType {
|
|
||||||
|
|
||||||
void applyTo(IBaseResource theResource);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the server base URL if this ID contains one. For example, the base URL is
|
|
||||||
* the 'http://example.com/fhir' in the following ID: <code>http://example.com/fhir/Patient/123/_history/55</code>
|
|
||||||
*/
|
|
||||||
String getBaseUrl();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns only the logical ID part of this ID. For example, given the ID
|
|
||||||
* "http://example,.com/fhir/Patient/123/_history/456", this method would
|
|
||||||
* return "123".
|
|
||||||
*/
|
|
||||||
String getIdPart();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the ID part of this ID (e.g. in the ID http://example.com/Patient/123/_history/456 this would be the
|
|
||||||
* part "123") parsed as a {@link Long}.
|
|
||||||
*
|
|
||||||
* @throws NumberFormatException If the value can't be parsed as a long
|
|
||||||
*/
|
|
||||||
Long getIdPartAsLong();
|
|
||||||
|
|
||||||
String getResourceType();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the value of this ID. Note that this value may be a fully qualified URL, a relative/partial URL, or a simple ID. Use {@link #getIdPart()} to get just the ID portion.
|
|
||||||
*
|
|
||||||
* @see #getIdPart()
|
|
||||||
*/
|
|
||||||
String getValue();
|
|
||||||
|
|
||||||
String getVersionIdPart();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the version ID part of this ID (e.g. in the ID http://example.com/Patient/123/_history/456 this would be the
|
|
||||||
* part "456") parsed as a {@link Long}.
|
|
||||||
*
|
|
||||||
* @throws NumberFormatException If the value can't be parsed as a long
|
|
||||||
*/
|
|
||||||
Long getVersionIdPartAsLong();
|
|
||||||
|
|
||||||
boolean hasBaseUrl();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns <code>true</code> if this ID contains an actual ID part. For example, the ID part is
|
|
||||||
* the '123' in the following ID: <code>http://example.com/fhir/Patient/123/_history/55</code>
|
|
||||||
*/
|
|
||||||
boolean hasIdPart();
|
|
||||||
|
|
||||||
boolean hasResourceType();
|
|
||||||
|
|
||||||
boolean hasVersionIdPart();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns <code>true</code> if this ID contains an absolute URL (in other words, a URL starting with "http://" or "https://"
|
|
||||||
*/
|
|
||||||
boolean isAbsolute();
|
|
||||||
|
|
||||||
boolean isEmpty();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns <code>true</code> if the {@link #getIdPart() ID part of this object} is valid according to the FHIR rules for valid IDs.
|
|
||||||
* <p>
|
|
||||||
* The FHIR specification states:
|
|
||||||
* <code>Any combination of upper or lower case ASCII letters ('A'..'Z', and 'a'..'z', numerals ('0'..'9'), '-' and '.', with a length limit of 64 characters. (This might be an integer, an un-prefixed OID, UUID or any other identifier pattern that meets these constraints.) regex: [A-Za-z0-9\-\.]{1,64}</code>
|
|
||||||
* </p>
|
|
||||||
*/
|
|
||||||
boolean isIdPartValid();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns <code>true</code> if the {@link #getIdPart() ID part of this object} contains
|
|
||||||
* only numbers
|
|
||||||
*/
|
|
||||||
boolean isIdPartValidLong();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns <code>true</code> if the ID is a local reference (in other words, it begins with the '#' character)
|
|
||||||
*/
|
|
||||||
boolean isLocal();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns <code>true</code> if the {@link #getVersionIdPart() version ID part of this object} contains
|
|
||||||
* only numbers
|
|
||||||
*/
|
|
||||||
boolean isVersionIdPartValidLong();
|
|
||||||
|
|
||||||
IIdType setValue(String theString);
|
|
||||||
|
|
||||||
IIdType toUnqualified();
|
|
||||||
|
|
||||||
IIdType toUnqualifiedVersionless();
|
|
||||||
|
|
||||||
IIdType toVersionless();
|
|
||||||
|
|
||||||
IIdType withResourceType(String theResName);
|
|
||||||
|
|
||||||
IIdType withServerBase(String theServerBase, String theResourceName);
|
|
||||||
|
|
||||||
IIdType withVersion(String theVersion);
|
|
||||||
|
|
||||||
}
|
|
Loading…
Reference in New Issue