Rework to keep several things in subscription

This commit is contained in:
Tadgh 2020-09-10 16:05:56 -04:00
parent 2229233749
commit 28a9a53917
37 changed files with 210 additions and 533 deletions

View File

@ -1620,7 +1620,7 @@ public enum Pointcut {
* <p> * <p>
* Hooks may accept the following parameters: * Hooks may accept the following parameters:
* <ul> * <ul>
* <li>ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage - This parameter should not be modified as processing is complete when this hook is invoked.</li> * <li>ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage- This parameter should not be modified as processing is complete when this hook is invoked.</li>
* <li>ca.uhn.fhir.empi.model.TransactionLogMessages - This parameter is for informational messages provided by the EMPI module during EMPI procesing. .</li> * <li>ca.uhn.fhir.empi.model.TransactionLogMessages - This parameter is for informational messages provided by the EMPI module during EMPI procesing. .</li>
* </ul> * </ul>
* </p> * </p>
@ -1628,7 +1628,7 @@ public enum Pointcut {
* Hooks should return <code>void</code>. * Hooks should return <code>void</code>.
* </p> * </p>
*/ */
EMPI_AFTER_PERSISTED_RESOURCE_CHECKED(void.class, "ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage", "ca.uhn.fhir.rest.server.TransactionLogMessages"), EMPI_AFTER_PERSISTED_RESOURCE_CHECKED(void.class, "ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage", "ca.uhn.fhir.rest.server.TransactionLogMessages"),
/** /**
* <b>Performance Tracing Hook:</b> * <b>Performance Tracing Hook:</b>

View File

@ -11,6 +11,7 @@ import ca.uhn.fhir.jpa.subscription.match.matcher.matching.InMemorySubscriptionM
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy; import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator; import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.util.CoordCalculatorTest; import ca.uhn.fhir.jpa.util.CoordCalculatorTest;
import ca.uhn.fhir.model.api.TemporalPrecisionEnum; import ca.uhn.fhir.model.api.TemporalPrecisionEnum;
import ca.uhn.fhir.rest.param.CompositeParam; import ca.uhn.fhir.rest.param.CompositeParam;
@ -29,8 +30,6 @@ import ca.uhn.fhir.rest.param.TokenOrListParam;
import ca.uhn.fhir.rest.param.TokenParam; import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.param.TokenParamModifier; import ca.uhn.fhir.rest.param.TokenParamModifier;
import ca.uhn.fhir.rest.param.UriParam; import ca.uhn.fhir.rest.param.UriParam;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedSubscriptionMessage;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IAnyResource; import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
@ -466,7 +465,7 @@ public class InMemorySubscriptionMatcherR4Test {
CanonicalSubscription subscription = new CanonicalSubscription(); CanonicalSubscription subscription = new CanonicalSubscription();
subscription.setCriteriaString(criteria); subscription.setCriteriaString(criteria);
subscription.setIdElement(new IdType("Subscription", 123L)); subscription.setIdElement(new IdType("Subscription", 123L));
ResourceModifiedSubscriptionMessage msg = new ResourceModifiedSubscriptionMessage(myFhirContext, patient, ResourceModifiedMessage.OperationTypeEnum.CREATE); ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, patient, ResourceModifiedMessage.OperationTypeEnum.CREATE);
msg.setSubscriptionId("123"); msg.setSubscriptionId("123");
msg.setId(new IdType("Patient/ABC")); msg.setId(new IdType("Patient/ABC"));
InMemoryMatchResult result = myInMemorySubscriptionMatcher.match(subscription, msg); InMemoryMatchResult result = myInMemorySubscriptionMatcher.match(subscription, msg);

View File

@ -10,10 +10,10 @@ import ca.uhn.fhir.jpa.config.StoppableSubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test; import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.util.SubscriptionDebugLogInterceptor; import ca.uhn.fhir.jpa.subscription.util.SubscriptionDebugLogInterceptor;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.MethodOutcome; import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.hl7.fhir.r4.model.IdType; import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Observation; import org.hl7.fhir.r4.model.Observation;

View File

@ -29,10 +29,11 @@ import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.empi.svc.EmpiMatchLinkSvc; import ca.uhn.fhir.jpa.empi.svc.EmpiMatchLinkSvc;
import ca.uhn.fhir.jpa.empi.svc.EmpiResourceFilteringSvc; import ca.uhn.fhir.jpa.empi.svc.EmpiResourceFilteringSvc;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.server.TransactionLogMessages; import ca.uhn.fhir.rest.server.TransactionLogMessages;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.messaging.json.ResourceModifiedJsonMessage; import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage;
import org.hl7.fhir.instance.model.api.IAnyResource; import org.hl7.fhir.instance.model.api.IAnyResource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -58,6 +59,7 @@ public class EmpiMessageHandler implements MessageHandler {
public void handleMessage(Message<?> theMessage) throws MessagingException { public void handleMessage(Message<?> theMessage) throws MessagingException {
ourLog.info("Handling resource modified message: {}", theMessage); ourLog.info("Handling resource modified message: {}", theMessage);
//TODO GGG TEST THAT THE MESSAGE HEADERS COME IN HERE
if (!(theMessage instanceof ResourceModifiedJsonMessage)) { if (!(theMessage instanceof ResourceModifiedJsonMessage)) {
ourLog.warn("Unexpected message payload type: {}", theMessage); ourLog.warn("Unexpected message payload type: {}", theMessage);
return; return;
@ -95,7 +97,8 @@ public class EmpiMessageHandler implements MessageHandler {
} finally { } finally {
// Interceptor call: EMPI_AFTER_PERSISTED_RESOURCE_CHECKED // Interceptor call: EMPI_AFTER_PERSISTED_RESOURCE_CHECKED
HookParams params = new HookParams() HookParams params = new HookParams()
.add(ResourceModifiedMessage.class, theMsg) //Janky upcast.
.add(BaseResourceModifiedMessage.class, (BaseResourceModifiedMessage) theMsg)
.add(TransactionLogMessages.class, empiContext.getTransactionLogMessages()); .add(TransactionLogMessages.class, empiContext.getTransactionLogMessages());
myInterceptorBroadcaster.callHooks(Pointcut.EMPI_AFTER_PERSISTED_RESOURCE_CHECKED, params); myInterceptorBroadcaster.callHooks(Pointcut.EMPI_AFTER_PERSISTED_RESOURCE_CHECKED, params);
} }

View File

@ -5,7 +5,7 @@ import ca.uhn.fhir.empi.log.Logs;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings; import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.rest.server.messaging.json.ResourceModifiedJsonMessage; import ca.uhn.fhir.rest.server.messaging.json.BaseResourceModifiedJsonMessage;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -52,7 +52,7 @@ public class EmpiQueueConsumerLoader {
if (myEmpiChannel == null) { if (myEmpiChannel == null) {
ChannelConsumerSettings config = new ChannelConsumerSettings(); ChannelConsumerSettings config = new ChannelConsumerSettings();
config.setConcurrentConsumers(myEmpiSettings.getConcurrentConsumers()); config.setConcurrentConsumers(myEmpiSettings.getConcurrentConsumers());
myEmpiChannel = myChannelFactory.getOrCreateReceiver(IEmpiSettings.EMPI_CHANNEL_NAME, ResourceModifiedJsonMessage.class, config); myEmpiChannel = myChannelFactory.getOrCreateReceiver(IEmpiSettings.EMPI_CHANNEL_NAME, BaseResourceModifiedJsonMessage.class, config);
} }
if (myEmpiChannel != null) { if (myEmpiChannel != null) {

View File

@ -24,8 +24,8 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.empi.api.IEmpiChannelSubmitterSvc; import ca.uhn.fhir.empi.api.IEmpiChannelSubmitterSvc;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings; import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.rest.server.messaging.json.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import org.hl7.fhir.instance.model.api.IAnyResource; import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;

View File

@ -27,7 +27,7 @@ import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.rest.server.messaging.json.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
public class SubscriptionChannelFactory { public class SubscriptionChannelFactory {

View File

@ -28,9 +28,9 @@ import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.match.deliver.BaseSubscriptionDeliverySubscriber; import ca.uhn.fhir.jpa.subscription.match.deliver.BaseSubscriptionDeliverySubscriber;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.EncodingEnum; import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.server.messaging.json.ResourceModifiedJsonMessage;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -63,6 +63,7 @@ public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDel
} }
protected void doDelivery(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, IChannelProducer theChannelProducer, IBaseResource thePayloadResource) { protected void doDelivery(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, IChannelProducer theChannelProducer, IBaseResource thePayloadResource) {
//TODO GGG is this the point at which we can use a BaseResourceModifiedMessage, since technically we no longer have need of a subscriptionId?
ResourceModifiedMessage payload = new ResourceModifiedMessage(myFhirContext, thePayloadResource, theMsg.getOperationType()); ResourceModifiedMessage payload = new ResourceModifiedMessage(myFhirContext, thePayloadResource, theMsg.getOperationType());
payload.setParentTransactionGuid(theMsg.getParentTransactionGuid()); payload.setParentTransactionGuid(theMsg.getParentTransactionGuid());
ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(payload); ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(payload);

View File

@ -23,7 +23,7 @@ package ca.uhn.fhir.jpa.subscription.match.matcher.matching;
import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;

View File

@ -28,8 +28,8 @@ import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger; import org.slf4j.Logger;

View File

@ -20,8 +20,8 @@ package ca.uhn.fhir.jpa.subscription.match.matcher.matching;
* #L% * #L%
*/ */
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
public interface IResourceModifiedConsumer { public interface IResourceModifiedConsumer {

View File

@ -22,9 +22,8 @@ package ca.uhn.fhir.jpa.subscription.match.matcher.matching;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
public interface ISubscriptionMatcher { public interface ISubscriptionMatcher {
//TODO GGG convert this to a ResourceModifiedSubscriptionMessage
InMemoryMatchResult match(CanonicalSubscription subscription, ResourceModifiedMessage msg); InMemoryMatchResult match(CanonicalSubscription subscription, ResourceModifiedMessage msg);
} }

View File

@ -24,8 +24,8 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher; import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;

View File

@ -21,8 +21,8 @@ package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
*/ */
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum; import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;

View File

@ -28,9 +28,9 @@ import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.rest.server.messaging.json.ResourceModifiedJsonMessage;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage;
import ca.uhn.fhir.util.SubscriptionUtil; import ca.uhn.fhir.util.SubscriptionUtil;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger; import org.slf4j.Logger;

View File

@ -12,10 +12,9 @@ import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.EncodingEnum; import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage;
import ca.uhn.fhir.rest.server.messaging.json.ResourceModifiedSubscriptionJsonMessage;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedSubscriptionMessage;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
@ -81,16 +80,16 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
ourLog.trace("Handling resource modified message: {}", theMessage); ourLog.trace("Handling resource modified message: {}", theMessage);
//TODO ADD BACKPORT FOR HANDLING OLD LEGACY SUBSCRIPTIONS HERE //TODO ADD BACKPORT FOR HANDLING OLD LEGACY SUBSCRIPTIONS HERE
if (!(theMessage instanceof ResourceModifiedSubscriptionJsonMessage)) { if (!(theMessage instanceof ResourceModifiedJsonMessage)) {
ourLog.warn("Unexpected message payload type: {}", theMessage); ourLog.warn("Unexpected message payload type: {}", theMessage);
return; return;
} }
ResourceModifiedSubscriptionMessage msg = ((ResourceModifiedSubscriptionJsonMessage) theMessage).getPayload(); ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload();
matchActiveSubscriptionsAndDeliver(msg); matchActiveSubscriptionsAndDeliver(msg);
} }
public void matchActiveSubscriptionsAndDeliver(ResourceModifiedSubscriptionMessage theMsg) { public void matchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg) {
switch (theMsg.getOperationType()) { switch (theMsg.getOperationType()) {
case CREATE: case CREATE:
case UPDATE: case UPDATE:
@ -118,7 +117,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
} }
} }
private void doMatchActiveSubscriptionsAndDeliver(ResourceModifiedSubscriptionMessage theMsg) { private void doMatchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg) {
IIdType resourceId = theMsg.getId(myFhirContext); IIdType resourceId = theMsg.getId(myFhirContext);
Collection<ActiveSubscription> subscriptions = mySubscriptionRegistry.getAll(); Collection<ActiveSubscription> subscriptions = mySubscriptionRegistry.getAll();

View File

@ -23,8 +23,8 @@ package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.rest.server.messaging.json.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;

View File

@ -1,49 +0,0 @@
package ca.uhn.fhir.jpa.subscription.model;
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
public abstract class BaseJsonMessage<T> implements Message<T>, IModelJson {
private static final long serialVersionUID = 1L;
@JsonProperty("headers")
private MessageHeaders myHeaders;
/**
* Constructor
*/
public BaseJsonMessage() {
super();
}
@Override
public MessageHeaders getHeaders() {
return myHeaders;
}
public void setHeaders(MessageHeaders theHeaders) {
myHeaders = theHeaders;
}
}

View File

@ -1,25 +0,0 @@
package ca.uhn.fhir.jpa.subscription.model;
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
public interface IResourceMessage {
String getPayloadId();
}

View File

@ -1,97 +0,0 @@
package ca.uhn.fhir.jpa.subscription.model;
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.Validate;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@SuppressWarnings("WeakerAccess")
public abstract class LegacyBaseResourceMessage implements IResourceMessage, IModelJson {
@JsonProperty("attributes")
private Map<String, String> myAttributes;
/**
* Returns an attribute stored in this message.
* <p>
* Attributes are just a spot for user data of any kind to be
* added to the message for pasing along the subscription processing
* pipeline (typically by interceptors). Values will be carried from the beginning to the end.
* </p>
* <p>
* Note that messages are designed to be passed into queueing systems
* and serialized as JSON. As a result, only strings are currently allowed
* as values.
* </p>
*/
public Optional<String> getAttribute(String theKey) {
Validate.notBlank(theKey);
if (myAttributes == null) {
return Optional.empty();
}
return Optional.ofNullable(myAttributes.get(theKey));
}
/**
* Sets an attribute stored in this message.
* <p>
* Attributes are just a spot for user data of any kind to be
* added to the message for passing along the subscription processing
* pipeline (typically by interceptors). Values will be carried from the beginning to the end.
* </p>
* <p>
* Note that messages are designed to be passed into queueing systems
* and serialized as JSON. As a result, only strings are currently allowed
* as values.
* </p>
*
* @param theKey The key (must not be null or blank)
* @param theValue The value (must not be null)
*/
public void setAttribute(String theKey, String theValue) {
Validate.notBlank(theKey);
Validate.notNull(theValue);
if (myAttributes == null) {
myAttributes = new HashMap<>();
}
myAttributes.put(theKey, theValue);
}
/**
* Copies any attributes from the given message into this messsage.
*
* @see #setAttribute(String, String)
* @see #getAttribute(String)
*/
public void copyAdditionalPropertiesFrom(LegacyBaseResourceMessage theMsg) {
if (theMsg.myAttributes != null) {
if (myAttributes == null) {
myAttributes = new HashMap<>();
}
myAttributes.putAll(theMsg.myAttributes);
}
}
}

View File

@ -1,60 +0,0 @@
package ca.uhn.fhir.jpa.subscription.model;
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.ToStringBuilder;
public class LegacyResourceModifiedJsonMessage extends BaseJsonMessage<LegacyResourceModifiedMessage> {
@JsonProperty("payload")
private LegacyResourceModifiedMessage myPayload;
/**
* Constructor
*/
public LegacyResourceModifiedJsonMessage() {
super();
}
/**
* Constructor
*/
public LegacyResourceModifiedJsonMessage(LegacyResourceModifiedMessage thePayload) {
myPayload = thePayload;
}
@Override
public LegacyResourceModifiedMessage getPayload() {
return myPayload;
}
public void setPayload(LegacyResourceModifiedMessage thePayload) {
myPayload = thePayload;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("myPayload", myPayload)
.toString();
}
}

View File

@ -1,198 +0,0 @@
package ca.uhn.fhir.jpa.subscription.model;
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.util.ResourceReferenceInfo;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import java.util.List;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class LegacyResourceModifiedMessage extends LegacyBaseResourceMessage implements IResourceMessage, IModelJson {
@JsonProperty("resourceId")
private String myId;
@JsonProperty("operationType")
private OperationTypeEnum myOperationType;
/**
* This will only be set if the resource is being triggered for a specific
* subscription
*/
@JsonProperty(value = "subscriptionId", required = false)
private String mySubscriptionId;
@JsonProperty("payload")
private String myPayload;
@JsonProperty("payloadId")
private String myPayloadId;
@JsonProperty("parentTransactionGuid")
private String myParentTransactionGuid;
@JsonIgnore
private transient IBaseResource myPayloadDecoded;
/**
* Constructor
*/
public LegacyResourceModifiedMessage() {
super();
}
public LegacyResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theResource, OperationTypeEnum theOperationType) {
this();
setId(theResource.getIdElement());
setOperationType(theOperationType);
if (theOperationType != OperationTypeEnum.DELETE) {
setNewPayload(theFhirContext, theResource);
}
}
public LegacyResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theNewResource, OperationTypeEnum theOperationType, RequestDetails theRequest) {
this(theFhirContext, theNewResource, theOperationType);
if (theRequest != null) {
setParentTransactionGuid(theRequest.getTransactionGuid());
}
}
@Override
public String getPayloadId() {
return myPayloadId;
}
public String getSubscriptionId() {
return mySubscriptionId;
}
public void setSubscriptionId(String theSubscriptionId) {
mySubscriptionId = theSubscriptionId;
}
public String getId() {
return myId;
}
public IIdType getId(FhirContext theCtx) {
IIdType retVal = null;
if (myId != null) {
retVal = theCtx.getVersion().newIdType().setValue(myId);
}
return retVal;
}
public IBaseResource getNewPayload(FhirContext theCtx) {
if (myPayloadDecoded == null && isNotBlank(myPayload)) {
myPayloadDecoded = theCtx.newJsonParser().parseResource(myPayload);
}
return myPayloadDecoded;
}
public OperationTypeEnum getOperationType() {
return myOperationType;
}
public void setOperationType(OperationTypeEnum theOperationType) {
myOperationType = theOperationType;
}
public void setId(IIdType theId) {
myId = null;
if (theId != null) {
myId = theId.getValue();
}
}
public String getParentTransactionGuid() {
return myParentTransactionGuid;
}
public void setParentTransactionGuid(String theParentTransactionGuid) {
myParentTransactionGuid = theParentTransactionGuid;
}
private void setNewPayload(FhirContext theCtx, IBaseResource theNewPayload) {
/*
* References with placeholders would be invalid by the time we get here, and
* would be caught before we even get here. This check is basically a last-ditch
* effort to make sure nothing has broken in the various safeguards that
* should prevent this from happening (hence it only being an assert as
* opposed to something executed all the time).
*/
assert payloadContainsNoPlaceholderReferences(theCtx, theNewPayload);
/*
* Note: Don't set myPayloadDecoded in here- This is a false optimization since
* it doesn't actually get used if anyone is doing subscriptions at any
* scale using a queue engine, and not going through the serialize/deserialize
* as we would in a queue engine can mask bugs.
* -JA
*/
myPayload = theCtx.newJsonParser().encodeResourceToString(theNewPayload);
myPayloadId = theNewPayload.getIdElement().toUnqualified().getValue();
}
public enum OperationTypeEnum {
CREATE,
UPDATE,
DELETE,
MANUALLY_TRIGGERED
}
private static boolean payloadContainsNoPlaceholderReferences(FhirContext theCtx, IBaseResource theNewPayload) {
List<ResourceReferenceInfo> refs = theCtx.newTerser().getAllResourceReferences(theNewPayload);
for (ResourceReferenceInfo next : refs) {
String ref = next.getResourceReference().getReferenceElement().getValue();
if (isBlank(ref)) {
IBaseResource resource = next.getResourceReference().getResource();
if (resource != null) {
ref = resource.getIdElement().getValue();
}
}
if (isNotBlank(ref)) {
if (ref.startsWith("#")) {
continue;
}
if (ref.startsWith("urn:uuid:")) {
throw new AssertionError("Reference at " + next.getName() + " is invalid: " + ref);
}
}
}
return true;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("myId", myId)
.append("myOperationType", myOperationType)
.append("mySubscriptionId", mySubscriptionId)
// .append("myPayload", myPayload)
.append("myPayloadId", myPayloadId)
// .append("myPayloadDecoded", myPayloadDecoded)
.toString();
}
}

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.subscription.model;
* #L% * #L%
*/ */
import ca.uhn.fhir.rest.server.messaging.json.BaseJsonMessage;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringBuilder;

View File

@ -24,7 +24,7 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.parser.IParser; import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.EncodingEnum; import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage; import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage; import ca.uhn.fhir.rest.server.messaging.IResourceMessage;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringBuilder;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.rest.server.messaging.json; package ca.uhn.fhir.jpa.subscription.model;
/*- /*-
* #%L * #%L
@ -20,7 +20,7 @@ package ca.uhn.fhir.rest.server.messaging.json;
* #L% * #L%
*/ */
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage; import ca.uhn.fhir.rest.server.messaging.json.BaseJsonMessage;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringBuilder;

View File

@ -0,0 +1,78 @@
package ca.uhn.fhir.jpa.subscription.model;
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.hl7.fhir.instance.model.api.IBaseResource;
/**
* Most of this class has been moved to ResourceModifiedMessage in the hapi-fhir-server project, for a reusable channel ResourceModifiedMessage
* that doesn't require knowledge of subscriptions.
*/
public class ResourceModifiedMessage extends BaseResourceModifiedMessage {
/**
* This will only be set if the resource is being triggered for a specific
* subscription
*/
@JsonProperty(value = "subscriptionId", required = false)
private String mySubscriptionId;
/**
* Constructor
*/
public ResourceModifiedMessage() {
super();
}
public ResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theResource, OperationTypeEnum theOperationType) {
super(theFhirContext, theResource, theOperationType);
}
public ResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theNewResource, OperationTypeEnum theOperationType, RequestDetails theRequest) {
super(theFhirContext, theNewResource, theOperationType, theRequest);
}
public String getSubscriptionId() {
return mySubscriptionId;
}
public void setSubscriptionId(String theSubscriptionId) {
mySubscriptionId = theSubscriptionId;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("myId", myId)
.append("myOperationType", myOperationType)
.append("mySubscriptionId", mySubscriptionId)
// .append("myPayload", myPayload)
.append("myPayloadId", myPayloadId)
// .append("myPayloadDecoded", myPayloadDecoded)
.toString();
}
}

View File

@ -10,10 +10,10 @@ import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer; import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber; import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster; import ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster;
import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.messaging.json.ResourceModifiedJsonMessage;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;

View File

@ -32,6 +32,7 @@ import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService; import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer; import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum; import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
import ca.uhn.fhir.rest.annotation.IdParam; import ca.uhn.fhir.rest.annotation.IdParam;
import ca.uhn.fhir.rest.api.CacheControlDirective; import ca.uhn.fhir.rest.api.CacheControlDirective;
@ -40,8 +41,6 @@ import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException; import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedSubscriptionMessage;
import ca.uhn.fhir.util.ParametersUtil; import ca.uhn.fhir.util.ParametersUtil;
import ca.uhn.fhir.util.StopWatch; import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.UrlUtil; import ca.uhn.fhir.util.UrlUtil;
@ -307,7 +306,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId); ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId);
ResourceModifiedSubscriptionMessage msg = new ResourceModifiedSubscriptionMessage(myFhirContext, theResourceToTrigger, ResourceModifiedMessage.OperationTypeEnum.UPDATE); ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theResourceToTrigger, ResourceModifiedMessage.OperationTypeEnum.UPDATE);
msg.setSubscriptionId(theSubscriptionId); msg.setSubscriptionId(theSubscriptionId);
return myExecutorService.submit(() -> { return myExecutorService.submit(() -> {

View File

@ -26,7 +26,7 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.util.StopWatch; import ca.uhn.fhir.util.StopWatch;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;

View File

@ -1,8 +1,8 @@
package ca.uhn.fhir.jpa.subscription.channel.subscription; package ca.uhn.fhir.jpa.subscription.channel.subscription;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.rest.server.messaging.json.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock; import org.mockito.Mock;

View File

@ -8,11 +8,11 @@ import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.EncodingEnum; import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.client.api.IGenericClient; import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.api.IRestfulClientFactory; import ca.uhn.fhir.rest.client.api.IRestfulClientFactory;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage;
import org.hl7.fhir.r4.model.IdType; import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Patient; import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;

View File

@ -1,7 +1,7 @@
package ca.uhn.fhir.jpa.subscription.module; package ca.uhn.fhir.jpa.subscription.module;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import org.hl7.fhir.r4.model.Organization; import org.hl7.fhir.r4.model.Organization;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;

View File

@ -11,6 +11,8 @@ import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test; import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriberTest; import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriberTest;
import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.model.primitive.IdDt;
@ -21,8 +23,6 @@ import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.MethodOutcome; import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.server.IResourceProvider; import ca.uhn.fhir.rest.server.IResourceProvider;
import ca.uhn.fhir.rest.server.RestfulServer; import ca.uhn.fhir.rest.server.RestfulServer;
import ca.uhn.fhir.rest.server.messaging.json.ResourceModifiedJsonMessage;
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedMessage;
import ca.uhn.fhir.test.utilities.JettyUtil; import ca.uhn.fhir.test.utilities.JettyUtil;
import ca.uhn.test.concurrency.IPointcutLatch; import ca.uhn.test.concurrency.IPointcutLatch;
import ca.uhn.test.concurrency.PointcutLatch; import ca.uhn.test.concurrency.PointcutLatch;

View File

@ -2,6 +2,8 @@ package ca.uhn.fhir.rest.server.messaging;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.util.ResourceReferenceInfo; import ca.uhn.fhir.util.ResourceReferenceInfo;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
@ -15,29 +17,29 @@ import java.util.List;
import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class ResourceModifiedMessage extends BaseResourceMessage implements IResourceMessage, IModelJson { public class BaseResourceModifiedMessage extends BaseResourceMessage implements IResourceMessage, IModelJson {
@JsonProperty("resourceId") @JsonProperty("resourceId")
private String myId; protected String myId;
@JsonProperty("operationType") @JsonProperty("operationType")
private OperationTypeEnum myOperationType; protected OperationTypeEnum myOperationType;
@JsonProperty("payload") @JsonProperty("payload")
private String myPayload; protected String myPayload;
@JsonProperty("payloadId") @JsonProperty("payloadId")
private String myPayloadId; protected String myPayloadId;
@JsonProperty("parentTransactionGuid") @JsonProperty("parentTransactionGuid")
private String myParentTransactionGuid; protected String myParentTransactionGuid;
@JsonIgnore @JsonIgnore
private transient IBaseResource myPayloadDecoded; protected transient IBaseResource myPayloadDecoded;
/** /**
* Constructor * Constructor
*/ */
public ResourceModifiedMessage() { public BaseResourceModifiedMessage() {
super(); super();
} }
public ResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theResource, OperationTypeEnum theOperationType) { public BaseResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theResource, OperationTypeEnum theOperationType) {
this(); this();
setId(theResource.getIdElement()); setId(theResource.getIdElement());
setOperationType(theOperationType); setOperationType(theOperationType);
@ -46,7 +48,7 @@ public class ResourceModifiedMessage extends BaseResourceMessage implements IRes
} }
} }
public ResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theNewResource, OperationTypeEnum theOperationType, RequestDetails theRequest) { public BaseResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theNewResource, OperationTypeEnum theOperationType, RequestDetails theRequest) {
this(theFhirContext, theNewResource, theOperationType); this(theFhirContext, theNewResource, theOperationType);
if (theRequest != null) { if (theRequest != null) {
setParentTransactionGuid(theRequest.getTransactionGuid()); setParentTransactionGuid(theRequest.getTransactionGuid());
@ -77,6 +79,24 @@ public class ResourceModifiedMessage extends BaseResourceMessage implements IRes
return myPayloadDecoded; return myPayloadDecoded;
} }
public IBaseResource getPayload(FhirContext theCtx) {
IBaseResource retVal = myPayloadDecoded;
if (retVal == null && isNotBlank(myPayload)) {
IParser parser = EncodingEnum.detectEncoding(myPayload).newParser(theCtx);
retVal = parser.parseResource(myPayload);
myPayloadDecoded = retVal;
}
return retVal;
}
public String getPayloadString() {
if (this.myPayload != null) {
return this.myPayload;
}
return "";
}
public OperationTypeEnum getOperationType() { public OperationTypeEnum getOperationType() {
return myOperationType; return myOperationType;
} }
@ -100,7 +120,7 @@ public class ResourceModifiedMessage extends BaseResourceMessage implements IRes
myParentTransactionGuid = theParentTransactionGuid; myParentTransactionGuid = theParentTransactionGuid;
} }
private void setNewPayload(FhirContext theCtx, IBaseResource theNewPayload) { protected void setNewPayload(FhirContext theCtx, IBaseResource theNewPayload) {
/* /*
* References with placeholders would be invalid by the time we get here, and * References with placeholders would be invalid by the time we get here, and
* would be caught before we even get here. This check is basically a last-ditch * would be caught before we even get here. This check is basically a last-ditch
@ -128,7 +148,7 @@ public class ResourceModifiedMessage extends BaseResourceMessage implements IRes
MANUALLY_TRIGGERED MANUALLY_TRIGGERED
} }
private static boolean payloadContainsNoPlaceholderReferences(FhirContext theCtx, IBaseResource theNewPayload) { protected static boolean payloadContainsNoPlaceholderReferences(FhirContext theCtx, IBaseResource theNewPayload) {
List<ResourceReferenceInfo> refs = theCtx.newTerser().getAllResourceReferences(theNewPayload); List<ResourceReferenceInfo> refs = theCtx.newTerser().getAllResourceReferences(theNewPayload);
for (ResourceReferenceInfo next : refs) { for (ResourceReferenceInfo next : refs) {
String ref = next.getResourceReference().getReferenceElement().getValue(); String ref = next.getResourceReference().getReferenceElement().getValue();

View File

@ -1,36 +0,0 @@
package ca.uhn.fhir.rest.server.messaging;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.hl7.fhir.instance.model.api.IBaseResource;
public class ResourceModifiedSubscriptionMessage extends ResourceModifiedMessage {
/**
* This will only be set if the resource is being triggered for a specific
* subscription
*/
@JsonProperty(value = "subscriptionId", required = false)
private String mySubscriptionId;
public ResourceModifiedSubscriptionMessage() {
}
public ResourceModifiedSubscriptionMessage(FhirContext theFhirContext, IBaseResource theResource, OperationTypeEnum theOperationType) {
super(theFhirContext, theResource, theOperationType);
}
public ResourceModifiedSubscriptionMessage(FhirContext theFhirContext, IBaseResource theNewResource, OperationTypeEnum theOperationType, RequestDetails theRequest) {
super(theFhirContext, theNewResource, theOperationType, theRequest);
}
public String getSubscriptionId() {
return mySubscriptionId;
}
public void setSubscriptionId(String theSubscriptionId) {
mySubscriptionId = theSubscriptionId;
}
}

View File

@ -25,23 +25,57 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessageHeaders;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public abstract class BaseJsonMessage<T> implements Message<T>, IModelJson { public abstract class BaseJsonMessage<T> implements Message<T>, IModelJson {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@JsonProperty("headers") @JsonProperty("headers")
private MessageHeaders myHeaders; private MessageHeaders myHeaders;
private String RETRY_COUNT_HEADER = "retryCount";
private String FIRST_FAILURE_HEADER = "firstFailure";
private String LAST_FAILURE_HEADER = "lastFailure";
/** /**
* Constructor * Constructor
*/ */
public BaseJsonMessage() { public BaseJsonMessage() {
super(); super();
setDefaultRetryHeaders();
}
protected void setDefaultRetryHeaders() {
Map<String, Object> headers = new HashMap<>();
headers.put(RETRY_COUNT_HEADER, 0);
headers.put(FIRST_FAILURE_HEADER, null);
headers.put(LAST_FAILURE_HEADER, null);
MessageHeaders messageHeaders = new MessageHeaders(headers);
setHeaders(messageHeaders);
} }
@Override @Override
public MessageHeaders getHeaders() { public MessageHeaders getHeaders() {
return myHeaders; return myHeaders;
} }
public final Integer getRetryCount() {
//TODO GGG this is not NPE-safe
return (Integer)this.getHeaders().get(RETRY_COUNT_HEADER);
}
public final Date getFirstFailureDate() {
//TODO GGG this is not NPE-safe
return (Date)this.getHeaders().get(FIRST_FAILURE_HEADER);
}
public final Date getLastFailureDate() {
//TODO GGG this is not NPE-safe
return (Date)this.getHeaders().get(LAST_FAILURE_HEADER);
}
public void setHeaders(MessageHeaders theHeaders) { public void setHeaders(MessageHeaders theHeaders) {
myHeaders = theHeaders; myHeaders = theHeaders;

View File

@ -20,35 +20,44 @@ package ca.uhn.fhir.rest.server.messaging.json;
* #L% * #L%
*/ */
import ca.uhn.fhir.rest.server.messaging.ResourceModifiedSubscriptionMessage; import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringBuilder;
import org.springframework.messaging.MessageHeaders;
public class BaseResourceModifiedJsonMessage extends BaseJsonMessage<BaseResourceModifiedMessage> {
public class ResourceModifiedSubscriptionJsonMessage extends BaseJsonMessage<ResourceModifiedSubscriptionMessage> {
@JsonProperty("payload") @JsonProperty("payload")
private ResourceModifiedSubscriptionMessage myPayload; private BaseResourceModifiedMessage myPayload;
/** /**
* Constructor * Constructor
*/ */
public ResourceModifiedSubscriptionJsonMessage() { public BaseResourceModifiedJsonMessage() {
super(); super();
} }
/** /**
* Constructor * Constructor
*/ */
public ResourceModifiedSubscriptionJsonMessage(ResourceModifiedSubscriptionMessage thePayload) { public BaseResourceModifiedJsonMessage(BaseResourceModifiedMessage thePayload) {
myPayload = thePayload; myPayload = thePayload;
setDefaultRetryHeaders();
} }
public BaseResourceModifiedJsonMessage(MessageHeaders theRetryMessageHeaders, BaseResourceModifiedMessage thePayload) {
myPayload = thePayload;
setHeaders(theRetryMessageHeaders);
}
@Override @Override
public ResourceModifiedSubscriptionMessage getPayload() { public BaseResourceModifiedMessage getPayload() {
return myPayload; return myPayload;
} }
public void setPayload(ResourceModifiedSubscriptionMessage thePayload) { public void setPayload(BaseResourceModifiedMessage thePayload) {
myPayload = thePayload; myPayload = thePayload;
} }