diff --git a/hapi-fhir-jpaserver-base/pom.xml b/hapi-fhir-jpaserver-base/pom.xml index b7a45574dc4..eafbaffd193 100644 --- a/hapi-fhir-jpaserver-base/pom.xml +++ b/hapi-fhir-jpaserver-base/pom.xml @@ -264,6 +264,10 @@ org.springframework spring-messaging + + org.springframework.integration + spring-integration-core + org.springframework spring-tx diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/interceptor/r4/RestHookSubscriptionR4Interceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/interceptor/r4/RestHookSubscriptionR4Interceptor.java index ba8745fb8ca..b0d7040b210 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/interceptor/r4/RestHookSubscriptionR4Interceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/interceptor/r4/RestHookSubscriptionR4Interceptor.java @@ -58,10 +58,10 @@ public class RestHookSubscriptionR4Interceptor extends BaseRestHookSubscriptionI private final static int MAX_THREADS = 1; private static final Logger ourLog = LoggerFactory.getLogger(RestHookSubscriptionR4Interceptor.class); + private final List myRestHookSubscriptions = new ArrayList<>(); @Autowired private FhirContext myFhirContext; - private final List myRestHookSubscriptions = new ArrayList(); @Autowired @Qualifier("mySubscriptionDaoR4") diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java new file mode 100644 index 00000000000..ef62c52a8c9 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java @@ -0,0 +1,152 @@ +package ca.uhn.fhir.jpa.subscription; + +import ca.uhn.fhir.jpa.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.dao.SearchParameterMap; +import ca.uhn.fhir.jpa.provider.ServletSubRequestDetails; +import ca.uhn.fhir.rest.api.RestOperationTypeEnum; +import ca.uhn.fhir.rest.api.server.IBundleProvider; +import ca.uhn.fhir.rest.api.server.RequestDetails; +import ca.uhn.fhir.rest.param.TokenParam; +import ca.uhn.fhir.rest.server.interceptor.ServerOperationInterceptorAdapter; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.r4.model.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.support.ExecutorSubscribableChannel; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.scheduling.annotation.Scheduled; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.*; + +public abstract class BaseSubscriptionInterceptor extends ServerOperationInterceptorAdapter { + + private static final Integer MAX_SUBSCRIPTION_RESULTS = 1000; + private SubscribableChannel myProcessingChannel; + private ExecutorService myExecutor; + private boolean myAutoActivateSubscriptions = true; + private int myExecutorThreadCount = 1; + private MessageHandler mySubscriptionActivatingSubscriber; + private MessageHandler mySubscriptionCheckingSubscriber; + private ConcurrentHashMap myIdToSubscription = new ConcurrentHashMap<>(); + private Subscription.SubscriptionChannelType myChannelType = Subscription.SubscriptionChannelType.RESTHOOK; + private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class); + + protected abstract IFhirResourceDao getSubscriptionDao(); + + /** + * Read the existing subscriptions from the database + */ + @SuppressWarnings("unused") + @Scheduled(fixedDelay = 10000) + public void initSubscriptions() { + SearchParameterMap map = new SearchParameterMap(); + map.add(Subscription.SP_TYPE, new TokenParam(null, myChannelType.toCode())); + map.add(Subscription.SP_STATUS, new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode())); + map.setLoadSynchronousUpTo(MAX_SUBSCRIPTION_RESULTS); + + RequestDetails req = new ServletSubRequestDetails(); + req.setSubRequest(true); + + IBundleProvider subscriptionBundleList = getSubscriptionDao().search(map, req); + if (subscriptionBundleList.size() >= MAX_SUBSCRIPTION_RESULTS) { + ourLog.error("Currently over " + MAX_SUBSCRIPTION_RESULTS + " subscriptions. Some subscriptions have not been loaded."); + } + + List resourceList = subscriptionBundleList.getResources(0, subscriptionBundleList.size()); + + Set allIds = new HashSet<>(); + for (IBaseResource resource : resourceList) { + String nextId = resource.getIdElement().getIdPart(); + allIds.add(nextId); + myIdToSubscription.put(nextId, resource); + } + + for (String next : myIdToSubscription.keySet()) { + if (!allIds.contains(next)) { + myIdToSubscription.remove(next); + } + } + } + + @PostConstruct + public void postConstruct() { + RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); + ThreadFactory threadFactory = new BasicThreadFactory.Builder() + .namingPattern("subscription-%d") + .daemon(false) + .priority(Thread.NORM_PRIORITY) + .build(); + myExecutor = new ThreadPoolExecutor( + myExecutorThreadCount, + myExecutorThreadCount, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(1000), + threadFactory, + rejectedExecutionHandler); + + + if (myProcessingChannel == null) { + myProcessingChannel = new ExecutorSubscribableChannel(myExecutor); + } + + if (myAutoActivateSubscriptions) { + if (mySubscriptionActivatingSubscriber == null) { + mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), myIdToSubscription, myChannelType, myProcessingChannel); + } + myProcessingChannel.subscribe(mySubscriptionActivatingSubscriber); + } + + if (mySubscriptionCheckingSubscriber == null) { + mySubscriptionCheckingSubscriber = new SubscriptionCheckingSubscriber(getSubscriptionDao(), myIdToSubscription, myChannelType, myProcessingChannel); + } + myProcessingChannel.subscribe(mySubscriptionCheckingSubscriber); + } + + @SuppressWarnings("unused") + @PreDestroy + public void preDestroy() { + if (myAutoActivateSubscriptions) { + myProcessingChannel.unsubscribe(mySubscriptionActivatingSubscriber); + } + myProcessingChannel.unsubscribe(mySubscriptionCheckingSubscriber); + } + + @Override + public void resourceCreated(RequestDetails theRequest, IBaseResource theResource) { + ResourceModifiedMessage msg = new ResourceModifiedMessage(); + msg.setId(theResource.getIdElement()); + msg.setOperationType(RestOperationTypeEnum.CREATE); + msg.setNewPayload(theResource); + submitResourceModified(msg); + } + + @Override + public void resourceDeleted(RequestDetails theRequest, IBaseResource theResource) { + ResourceModifiedMessage msg = new ResourceModifiedMessage(); + msg.setId(theResource.getIdElement()); + msg.setOperationType(RestOperationTypeEnum.DELETE); + submitResourceModified(msg); + } + + @Override + public void resourceUpdated(RequestDetails theRequest, IBaseResource theOldResource, IBaseResource theNewResource) { + ResourceModifiedMessage msg = new ResourceModifiedMessage(); + msg.setId(theNewResource.getIdElement()); + msg.setOperationType(RestOperationTypeEnum.UPDATE); + msg.setNewPayload(theNewResource); + submitResourceModified(msg); + } + + private void submitResourceModified(ResourceModifiedMessage theMsg) { + myProcessingChannel.send(new GenericMessage<>(theMsg)); + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionSubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionSubscriber.java new file mode 100644 index 00000000000..972af9b5ad6 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionSubscriber.java @@ -0,0 +1,72 @@ +package ca.uhn.fhir.jpa.subscription; + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.dao.IFhirResourceDao; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IPrimitiveType; +import org.hl7.fhir.r4.model.Subscription; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.SubscribableChannel; + +import java.util.concurrent.ConcurrentHashMap; + +public abstract class BaseSubscriptionSubscriber implements MessageHandler { + static final String SUBSCRIPTION_STATUS = "Subscription.status"; + private static final String SUBSCRIPTION_TYPE = "Subscription.channel.type"; + private final IFhirResourceDao mySubscriptionDao; + private final ConcurrentHashMap myIdToSubscription; + private final Subscription.SubscriptionChannelType myChannelType; + private final SubscribableChannel myProcessingChannel; + + /** + * Constructor + */ + public BaseSubscriptionSubscriber(IFhirResourceDao theSubscriptionDao, ConcurrentHashMap theIdToSubscription, Subscription.SubscriptionChannelType theChannelType, SubscribableChannel theProcessingChannel) { + mySubscriptionDao = theSubscriptionDao; + myIdToSubscription = theIdToSubscription; + myChannelType = theChannelType; + myProcessingChannel = theProcessingChannel; + } + + public Subscription.SubscriptionChannelType getChannelType() { + return myChannelType; + } + + public FhirContext getContext() { + return getSubscriptionDao().getContext(); + } + + public ConcurrentHashMap getIdToSubscription() { + return myIdToSubscription; + } + + public SubscribableChannel getProcessingChannel() { + return myProcessingChannel; + } + + public IFhirResourceDao getSubscriptionDao() { + return mySubscriptionDao; + } + + /** + * Does this subscription type (e.g. rest hook, websocket, etc) apply to this interceptor? + */ + protected boolean subscriptionTypeApplies(ResourceModifiedMessage theMsg) { + FhirContext ctx = mySubscriptionDao.getContext(); + IBaseResource subscription = theMsg.getNewPayload(); + return subscriptionTypeApplies(ctx, subscription); + } + + /** + * Does this subscription type (e.g. rest hook, websocket, etc) apply to this interceptor? + */ + protected boolean subscriptionTypeApplies(FhirContext theCtx, IBaseResource theSubscription) { + IPrimitiveType status = theCtx.newTerser().getSingleValueOrNull(theSubscription, SUBSCRIPTION_TYPE, IPrimitiveType.class); + boolean subscriptionTypeApplies = false; + if (getChannelType().toCode().equals(status.getValueAsString())) { + subscriptionTypeApplies = true; + } + return subscriptionTypeApplies; + } + +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryMessage.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryMessage.java new file mode 100644 index 00000000000..316534d3c71 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryMessage.java @@ -0,0 +1,40 @@ +package ca.uhn.fhir.jpa.subscription; + +import ca.uhn.fhir.rest.api.RestOperationTypeEnum; +import org.hl7.fhir.instance.model.api.IBaseResource; + +import java.io.Serializable; + +public class ResourceDeliveryMessage implements Serializable { + + private static final long serialVersionUID = 0L; + + private IBaseResource mySubscription; + private IBaseResource myPayoad; + private RestOperationTypeEnum myOperationType; + + public RestOperationTypeEnum getOperationType() { + return myOperationType; + } + + public void setOperationType(RestOperationTypeEnum theOperationType) { + myOperationType = theOperationType; + } + + public IBaseResource getPayoad() { + return myPayoad; + } + + public void setPayoad(IBaseResource thePayoad) { + myPayoad = thePayoad; + } + + public IBaseResource getSubscription() { + return mySubscription; + } + + public void setSubscription(IBaseResource theSubscription) { + mySubscription = theSubscription; + } + +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessage.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessage.java new file mode 100644 index 00000000000..ca871f04b64 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessage.java @@ -0,0 +1,41 @@ +package ca.uhn.fhir.jpa.subscription; + +import ca.uhn.fhir.rest.api.RestOperationTypeEnum; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IIdType; + +import java.io.Serializable; + +public class ResourceModifiedMessage implements Serializable { + + private static final long serialVersionUID = 0L; + + private IIdType myId; + private RestOperationTypeEnum myOperationType; + private IBaseResource myNewPayload; + + public IIdType getId() { + return myId; + } + + public void setId(IIdType theId) { + myId = theId; + } + + + public RestOperationTypeEnum getOperationType() { + return myOperationType; + } + + public void setOperationType(RestOperationTypeEnum theOperationType) { + myOperationType = theOperationType; + } + + public IBaseResource getNewPayload() { + return myNewPayload; + } + + public void setNewPayload(IBaseResource theNewPayload) { + myNewPayload = theNewPayload; + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java new file mode 100644 index 00000000000..e76853e34b9 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java @@ -0,0 +1,96 @@ +package ca.uhn.fhir.jpa.subscription; + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.dao.IFhirResourceDao; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IIdType; +import org.hl7.fhir.instance.model.api.IPrimitiveType; +import org.hl7.fhir.r4.model.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.SubscribableChannel; + +import java.util.concurrent.ConcurrentHashMap; + +@SuppressWarnings("unchecked") +public class SubscriptionActivatingSubscriber extends BaseSubscriptionSubscriber { + private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class); + + /** + * Constructor + */ + public SubscriptionActivatingSubscriber(IFhirResourceDao theSubscriptionDao, ConcurrentHashMap theIdToSubscription, Subscription.SubscriptionChannelType theChannelType, SubscribableChannel theProcessingChannel) { + super(theSubscriptionDao, theIdToSubscription, theChannelType, theProcessingChannel); + } + + private void handleCreate(ResourceModifiedMessage theMsg) { + if (!theMsg.getId().getResourceType().equals("Subscription")) { + return; + } + + boolean subscriptionTypeApplies = subscriptionTypeApplies(theMsg); + if (subscriptionTypeApplies == false) { + return; + } + + FhirContext ctx = getSubscriptionDao().getContext(); + IBaseResource subscription = theMsg.getNewPayload(); + IPrimitiveType status = ctx.newTerser().getSingleValueOrNull(subscription, SUBSCRIPTION_STATUS, IPrimitiveType.class); + String statusString = status.getValueAsString(); + + String oldStatus = Subscription.SubscriptionStatus.REQUESTED.toCode(); + if (oldStatus.equals(statusString)) { + String newStatus = Subscription.SubscriptionStatus.ACTIVE.toCode(); + status.setValueAsString(newStatus); + ourLog.info("Activating subscription {} from status {} to {}", subscription.getIdElement().toUnqualifiedVersionless().getValue(), oldStatus, newStatus); + getSubscriptionDao().update(subscription); + } + } + + @Override + public void handleMessage(Message theMessage) throws MessagingException { + + if (!(theMessage.getPayload() instanceof ResourceModifiedMessage)) { + return; + } + + ResourceModifiedMessage msg = (ResourceModifiedMessage) theMessage.getPayload(); + IIdType id = msg.getId(); + + switch (msg.getOperationType()) { + case DELETE: + getIdToSubscription().remove(id.getIdPart()); + return; + case CREATE: + handleCreate(msg); + break; + case UPDATE: + handleUpdate(msg); + break; + } + + } + + private void handleUpdate(ResourceModifiedMessage theMsg) { + if (!theMsg.getId().getResourceType().equals("Subscription")) { + return; + } + + boolean subscriptionTypeApplies = subscriptionTypeApplies(theMsg); + if (subscriptionTypeApplies == false) { + return; + } + + FhirContext ctx = getSubscriptionDao().getContext(); + IBaseResource subscription = theMsg.getNewPayload(); + IPrimitiveType status = ctx.newTerser().getSingleValueOrNull(subscription, SUBSCRIPTION_STATUS, IPrimitiveType.class); + String statusString = status.getValueAsString(); + + if (Subscription.SubscriptionStatus.ACTIVE.toCode().equals(statusString)) { + getIdToSubscription().put(theMsg.getId().getIdPart(), theMsg.getNewPayload()); + } + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionCheckingSubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionCheckingSubscriber.java new file mode 100644 index 00000000000..5000ead6e3e --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionCheckingSubscriber.java @@ -0,0 +1,125 @@ +package ca.uhn.fhir.jpa.subscription; + +import ca.uhn.fhir.context.RuntimeResourceDefinition; +import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao; +import ca.uhn.fhir.jpa.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.dao.SearchParameterMap; +import ca.uhn.fhir.jpa.provider.ServletSubRequestDetails; +import ca.uhn.fhir.rest.api.server.IBundleProvider; +import ca.uhn.fhir.rest.api.server.RequestDetails; +import org.apache.commons.lang3.StringUtils; +import org.hl7.fhir.instance.model.api.IAnyResource; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IPrimitiveType; +import org.hl7.fhir.r4.model.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.support.GenericMessage; + +import java.util.concurrent.ConcurrentHashMap; + +public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber { + private Logger ourLog = LoggerFactory.getLogger(SubscriptionCheckingSubscriber.class); + + public SubscriptionCheckingSubscriber(IFhirResourceDao theSubscriptionDao, ConcurrentHashMap theIdToSubscription, Subscription.SubscriptionChannelType theChannelType, SubscribableChannel theProcessingChannel) { + super(theSubscriptionDao, theIdToSubscription, theChannelType, theProcessingChannel); + } + + @Override + public void handleMessage(Message theMessage) throws MessagingException { + if (!(theMessage.getPayload() instanceof ResourceModifiedMessage)) { + return; + } + + ResourceModifiedMessage msg = (ResourceModifiedMessage) theMessage.getPayload(); + switch (msg.getOperationType()) { + case CREATE: + case UPDATE: + break; + default: + // ignore anything else + return; + } + + String resourceType = msg.getId().getResourceType(); + String resourceId = msg.getId().getIdPart(); + + for (IBaseResource nextSubscription : getIdToSubscription().values()) { + + String nextSubscriptionId = nextSubscription.getIdElement().toUnqualifiedVersionless().getValue(); + IPrimitiveType nextCriteria = getContext().newTerser().getSingleValueOrNull(nextSubscription, "Subscription.criteria", IPrimitiveType.class); + String nextCriteriaString = nextCriteria != null ? nextCriteria.getValueAsString() : null; + + if (StringUtils.isBlank(nextCriteriaString)) { + continue; + } + + // see if the criteria matches the created object + ourLog.info("Checking subscription {} for {} with criteria {}", nextSubscriptionId, resourceType, nextCriteriaString); + + String criteriaResource = nextCriteriaString; + int index = criteriaResource.indexOf("?"); + if (index != -1) { + criteriaResource = criteriaResource.substring(0, criteriaResource.indexOf("?")); + } + + if (resourceType != null && nextCriteriaString != null && !criteriaResource.equals(resourceType)) { + ourLog.info("Skipping subscription search for {} because it does not match the criteria {}", resourceType, nextCriteriaString); + continue; + } + + // run the subscriptions query and look for matches, add the id as part of the criteria to avoid getting matches of previous resources rather than the recent resource + String criteria = nextCriteriaString; + criteria += "&_id=" + resourceType + "/" + resourceId; + criteria = massageCriteria(criteria); + + IBundleProvider results = performSearch(criteria); + if (results.size() == 0) { + continue; + } + + // should just be one resource as it was filtered by the id + for (IBaseResource nextBase : results.getResources(0, results.size())) { + IAnyResource next = (IAnyResource) nextBase; + ourLog.info("Found match: queueing rest-hook notification for resource: {}", next.getIdElement()); + + ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage(); + deliveryMsg.setPayoad(next); + deliveryMsg.setSubscription(nextSubscription); + deliveryMsg.setOperationType(msg.getOperationType()); + + getProcessingChannel().send(new GenericMessage<>(deliveryMsg)); + } + } + + + } + + /** + * Subclasses may override + */ + protected String massageCriteria(String theCriteria) { + return theCriteria; + } + + /** + * Search based on a query criteria + */ + protected IBundleProvider performSearch(String theCriteria) { + RuntimeResourceDefinition responseResourceDef = getSubscriptionDao().validateCriteriaAndReturnResourceDefinition(theCriteria); + SearchParameterMap responseCriteriaUrl = BaseHapiFhirDao.translateMatchUrl(getSubscriptionDao(), getSubscriptionDao().getContext(), theCriteria, responseResourceDef); + + RequestDetails req = new ServletSubRequestDetails(); + req.setSubRequest(true); + + IFhirResourceDao responseDao = getSubscriptionDao().getDao(responseResourceDef.getImplementingClass()); + responseCriteriaUrl.setLoadSynchronousUpTo(1); + + IBundleProvider responseResults = responseDao.search(responseCriteriaUrl, req); + return responseResults; + } + +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionDeliveringRestHookSubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionDeliveringRestHookSubscriber.java new file mode 100644 index 00000000000..07ca77393a7 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionDeliveringRestHookSubscriber.java @@ -0,0 +1,33 @@ +package ca.uhn.fhir.jpa.subscription; + +import ca.uhn.fhir.jpa.dao.IFhirResourceDao; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.r4.model.Subscription; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.SubscribableChannel; + +import java.util.concurrent.ConcurrentHashMap; + +public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionSubscriber { + + public SubscriptionDeliveringRestHookSubscriber(IFhirResourceDao theSubscriptionDao, ConcurrentHashMap theIdToSubscription, Subscription.SubscriptionChannelType theChannelType, SubscribableChannel theProcessingChannel) { + super(theSubscriptionDao, theIdToSubscription, theChannelType, theProcessingChannel); + } + + @Override + public void handleMessage(Message theMessage) throws MessagingException { + if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) { + return; + } + + ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload(); + + if (!subscriptionTypeApplies(getContext(), msg.getSubscription())) { + return; + } + + + + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/r4/SubscriptionWebsocketHandlerR4.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/r4/SubscriptionWebsocketHandlerR4.java index d33fb54a66c..2f0287f25b2 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/r4/SubscriptionWebsocketHandlerR4.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/r4/SubscriptionWebsocketHandlerR4.java @@ -57,11 +57,8 @@ public class SubscriptionWebsocketHandlerR4 extends TextWebSocketHandler impleme private static IFhirResourceDaoSubscription ourSubscriptionDao; private ScheduledFuture myScheduleFuture; - private IState myState = new InitialState(); - private IIdType mySubscriptionId; - private Long mySubscriptionPid; @Autowired diff --git a/hapi-fhir-structures-r4/src/main/java/org/hl7/fhir/r4/model/api/IIdType.java b/hapi-fhir-structures-r4/src/main/java/org/hl7/fhir/r4/model/api/IIdType.java deleted file mode 100644 index 00239159a9d..00000000000 --- a/hapi-fhir-structures-r4/src/main/java/org/hl7/fhir/r4/model/api/IIdType.java +++ /dev/null @@ -1,138 +0,0 @@ -package org.hl7.fhir.r4.model.api; - - - -/* - * #%L - * HAPI FHIR - Core Library - * %% - * Copyright (C) 2014 - 2015 University Health Network - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -/** - * Base interface for ID datatype. - * - *

- * Concrete Implementations: This interface is often returned and/or accepted by methods in HAPI's API - * where either {@link ca.uhn.fhir.model.primitive.IdDt} (the HAPI structure ID type) or - * org.hl7.fhir.instance.model.IdType (the RI structure ID type) will be used, depending on - * which version of the strctures your application is using. - *

- */ -public interface IIdType { - - void applyTo(IBaseResource theResource); - - /** - * Returns the server base URL if this ID contains one. For example, the base URL is - * the 'http://example.com/fhir' in the following ID: http://example.com/fhir/Patient/123/_history/55 - */ - String getBaseUrl(); - - /** - * Returns only the logical ID part of this ID. For example, given the ID - * "http://example,.com/fhir/Patient/123/_history/456", this method would - * return "123". - */ - String getIdPart(); - - /** - * Returns the ID part of this ID (e.g. in the ID http://example.com/Patient/123/_history/456 this would be the - * part "123") parsed as a {@link Long}. - * - * @throws NumberFormatException If the value can't be parsed as a long - */ - Long getIdPartAsLong(); - - String getResourceType(); - - /** - * Returns the value of this ID. Note that this value may be a fully qualified URL, a relative/partial URL, or a simple ID. Use {@link #getIdPart()} to get just the ID portion. - * - * @see #getIdPart() - */ - String getValue(); - - String getVersionIdPart(); - - /** - * Returns the version ID part of this ID (e.g. in the ID http://example.com/Patient/123/_history/456 this would be the - * part "456") parsed as a {@link Long}. - * - * @throws NumberFormatException If the value can't be parsed as a long - */ - Long getVersionIdPartAsLong(); - - boolean hasBaseUrl(); - - /** - * Returns true if this ID contains an actual ID part. For example, the ID part is - * the '123' in the following ID: http://example.com/fhir/Patient/123/_history/55 - */ - boolean hasIdPart(); - - boolean hasResourceType(); - - boolean hasVersionIdPart(); - - /** - * Returns true if this ID contains an absolute URL (in other words, a URL starting with "http://" or "https://" - */ - boolean isAbsolute(); - - boolean isEmpty(); - - /** - * Returns true if the {@link #getIdPart() ID part of this object} is valid according to the FHIR rules for valid IDs. - *

- * The FHIR specification states: - * Any combination of upper or lower case ASCII letters ('A'..'Z', and 'a'..'z', numerals ('0'..'9'), '-' and '.', with a length limit of 64 characters. (This might be an integer, an un-prefixed OID, UUID or any other identifier pattern that meets these constraints.) regex: [A-Za-z0-9\-\.]{1,64} - *

- */ - boolean isIdPartValid(); - - /** - * Returns true if the {@link #getIdPart() ID part of this object} contains - * only numbers - */ - boolean isIdPartValidLong(); - - /** - * Returns true if the ID is a local reference (in other words, it begins with the '#' character) - */ - boolean isLocal(); - - /** - * Returns true if the {@link #getVersionIdPart() version ID part of this object} contains - * only numbers - */ - boolean isVersionIdPartValidLong(); - - IIdType setValue(String theString); - - IIdType toUnqualified(); - - IIdType toUnqualifiedVersionless(); - - IIdType toVersionless(); - - IIdType withResourceType(String theResName); - - IIdType withServerBase(String theServerBase, String theResourceName); - - IIdType withVersion(String theVersion); - -}