Use serialized messages for queues

This commit is contained in:
James 2017-09-19 22:01:36 -04:00
parent d65664cd9b
commit 2fa912a43f
12 changed files with 178 additions and 112 deletions

View File

@ -41,11 +41,11 @@ public abstract class BaseSubscriptionDeliverySubscriber extends BaseSubscriptio
} }
try { try {
ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload(); ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload();
if (!subscriptionTypeApplies(getContext(), msg.getSubscription().getBackingSubscription())) { if (!subscriptionTypeApplies(getContext(), msg.getSubscription().getBackingSubscription(getContext()))) {
return; return;
} }
CanonicalSubscription updatedSubscription = (CanonicalSubscription)getSubscriptionInterceptor().getIdToSubscription().get(msg.getSubscription().getIdElement().getIdPart()); CanonicalSubscription updatedSubscription = (CanonicalSubscription)getSubscriptionInterceptor().getIdToSubscription().get(msg.getSubscription().getIdElement(getContext()).getIdPart());
if (updatedSubscription != null) { if (updatedSubscription != null) {
msg.setSubscription(updatedSubscription); msg.setSubscription(updatedSubscription);
} }

View File

@ -115,7 +115,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
CanonicalSubscription retVal = new CanonicalSubscription(); CanonicalSubscription retVal = new CanonicalSubscription();
try { try {
retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(subscription.getStatus())); retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(subscription.getStatus()));
retVal.setBackingSubscription(theSubscription); retVal.setBackingSubscription(myCtx, theSubscription);
retVal.setChannelType(org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.fromCode(subscription.getChannel().getType())); retVal.setChannelType(org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.fromCode(subscription.getChannel().getType()));
retVal.setCriteriaString(subscription.getCriteria()); retVal.setCriteriaString(subscription.getCriteria());
retVal.setEndpointUrl(subscription.getChannel().getEndpoint()); retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
@ -134,7 +134,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
CanonicalSubscription retVal = new CanonicalSubscription(); CanonicalSubscription retVal = new CanonicalSubscription();
try { try {
retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(subscription.getStatus().toCode())); retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(subscription.getStatus().toCode()));
retVal.setBackingSubscription(theSubscription); retVal.setBackingSubscription(myCtx, theSubscription);
retVal.setChannelType(org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.fromCode(subscription.getChannel().getType().toCode())); retVal.setChannelType(org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.fromCode(subscription.getChannel().getType().toCode()));
retVal.setCriteriaString(subscription.getCriteria()); retVal.setCriteriaString(subscription.getCriteria());
retVal.setEndpointUrl(subscription.getChannel().getEndpoint()); retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
@ -169,7 +169,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
CanonicalSubscription retVal = new CanonicalSubscription(); CanonicalSubscription retVal = new CanonicalSubscription();
retVal.setStatus(subscription.getStatus()); retVal.setStatus(subscription.getStatus());
retVal.setBackingSubscription(theSubscription); retVal.setBackingSubscription(myCtx, theSubscription);
retVal.setChannelType(subscription.getChannel().getType()); retVal.setChannelType(subscription.getChannel().getType());
retVal.setCriteriaString(subscription.getCriteria()); retVal.setCriteriaString(subscription.getCriteria());
retVal.setEndpointUrl(subscription.getChannel().getEndpoint()); retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
@ -201,7 +201,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
} }
org.hl7.fhir.r4.model.EventDefinition def = myEventDefinitionDaoR4.read(ref.getReferenceElement()); org.hl7.fhir.r4.model.EventDefinition def = myEventDefinitionDaoR4.read(ref.getReferenceElement());
retVal.addTrigger(def.getTrigger()); retVal.addTrigger(new CanonicalSubscription.CanonicalEventDefinition(def));
} }
return retVal; return retVal;
@ -325,7 +325,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
ResourceModifiedMessage msg = new ResourceModifiedMessage(); ResourceModifiedMessage msg = new ResourceModifiedMessage();
msg.setId(theResource.getIdElement()); msg.setId(theResource.getIdElement());
msg.setOperationType(RestOperationTypeEnum.CREATE); msg.setOperationType(RestOperationTypeEnum.CREATE);
msg.setNewPayload(theResource); msg.setNewPayload(myCtx, theResource);
submitResourceModified(msg); submitResourceModified(msg);
} }
@ -342,7 +342,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
ResourceModifiedMessage msg = new ResourceModifiedMessage(); ResourceModifiedMessage msg = new ResourceModifiedMessage();
msg.setId(theNewResource.getIdElement()); msg.setId(theNewResource.getIdElement());
msg.setOperationType(RestOperationTypeEnum.UPDATE); msg.setOperationType(RestOperationTypeEnum.UPDATE);
msg.setNewPayload(theNewResource); msg.setNewPayload(myCtx, theNewResource);
submitResourceModified(msg); submitResourceModified(msg);
} }
@ -443,7 +443,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
protected void submitResourceModified(final ResourceModifiedMessage theMsg) { protected void submitResourceModified(final ResourceModifiedMessage theMsg) {
final GenericMessage<ResourceModifiedMessage> message = new GenericMessage<>(theMsg); final GenericMessage<ResourceModifiedMessage> message = new GenericMessage<>(theMsg);
mySubscriptionActivatingSubscriber.handleMessage(message); mySubscriptionActivatingSubscriber.handleMessage(theMsg.getOperationType(), theMsg.getId(myCtx), theMsg.getNewPayload(myCtx));
sendToProcessingChannel(message); sendToProcessingChannel(message);
} }

View File

@ -72,9 +72,17 @@ public abstract class BaseSubscriptionSubscriber implements MessageHandler {
* Does this subscription type (e.g. rest hook, websocket, etc) apply to this interceptor? * Does this subscription type (e.g. rest hook, websocket, etc) apply to this interceptor?
*/ */
protected boolean subscriptionTypeApplies(FhirContext theCtx, IBaseResource theSubscription) { protected boolean subscriptionTypeApplies(FhirContext theCtx, IBaseResource theSubscription) {
Subscription.SubscriptionChannelType channelType = getChannelType();
return subscriptionTypeApplies(theCtx, theSubscription, channelType);
}
/**
* Does this subscription type (e.g. rest hook, websocket, etc) apply to this interceptor?
*/
static boolean subscriptionTypeApplies(FhirContext theCtx, IBaseResource theSubscription, Subscription.SubscriptionChannelType theChannelType) {
IPrimitiveType<?> status = theCtx.newTerser().getSingleValueOrNull(theSubscription, BaseSubscriptionInterceptor.SUBSCRIPTION_TYPE, IPrimitiveType.class); IPrimitiveType<?> status = theCtx.newTerser().getSingleValueOrNull(theSubscription, BaseSubscriptionInterceptor.SUBSCRIPTION_TYPE, IPrimitiveType.class);
boolean subscriptionTypeApplies = false; boolean subscriptionTypeApplies = false;
if (getChannelType().toCode().equals(status.getValueAsString())) { if (theChannelType.toCode().equals(status.getValueAsString())) {
subscriptionTypeApplies = true; subscriptionTypeApplies = true;
} }
return subscriptionTypeApplies; return subscriptionTypeApplies;

View File

@ -20,13 +20,16 @@ package ca.uhn.fhir.jpa.subscription;
* #L% * #L%
*/ */
import ca.uhn.fhir.context.FhirContext;
import com.google.gson.annotations.Expose;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.EventDefinition;
import org.hl7.fhir.r4.model.Subscription; import org.hl7.fhir.r4.model.Subscription;
import org.hl7.fhir.r4.model.TriggerDefinition;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
@ -36,24 +39,35 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class CanonicalSubscription implements Serializable { public class CanonicalSubscription implements Serializable {
private static final long serialVersionUID = 364269017L; private static final long serialVersionUID = 1L;
private IIdType myIdElement; @SerializedName("id")
private String myIdElement;
@SerializedName("criteria")
private String myCriteriaString; private String myCriteriaString;
@SerializedName("endpointUrl")
private String myEndpointUrl; private String myEndpointUrl;
@SerializedName("payload")
private String myPayloadString; private String myPayloadString;
@SerializedName("headers")
private List<String> myHeaders; private List<String> myHeaders;
@SerializedName("channelType")
private Subscription.SubscriptionChannelType myChannelType; private Subscription.SubscriptionChannelType myChannelType;
@SerializedName("status")
private Subscription.SubscriptionStatus myStatus; private Subscription.SubscriptionStatus myStatus;
private IBaseResource myBackingSubscription; private transient IBaseResource myBackingSubscription;
private TriggerDefinition myTrigger; @SerializedName("backingSubscription")
private String myBackingSubscriptionString;
@SerializedName("triggerDefinition")
private CanonicalEventDefinition myTrigger;
@SerializedName("emailDetails")
private EmailDetails myEmailDetails; private EmailDetails myEmailDetails;
/** /**
* For now we're using the R4 TriggerDefinition, but this * For now we're using the R4 TriggerDefinition, but this
* may change in the future when things stabilize * may change in the future when things stabilize
*/ */
public void addTrigger(TriggerDefinition theTrigger) { public void addTrigger(CanonicalEventDefinition theTrigger) {
myTrigger = theTrigger; myTrigger = theTrigger;
} }
@ -66,16 +80,27 @@ public class CanonicalSubscription implements Serializable {
CanonicalSubscription that = (CanonicalSubscription) theO; CanonicalSubscription that = (CanonicalSubscription) theO;
return new EqualsBuilder() return new EqualsBuilder()
.append(getIdElement().getIdPart(), that.getIdElement().getIdPart()) .append(getIdElementString(), that.getIdElementString())
.isEquals(); .isEquals();
} }
public IBaseResource getBackingSubscription() { public IBaseResource getBackingSubscription(FhirContext theCtx) {
if (myBackingSubscription == null && myBackingSubscriptionString != null) {
myBackingSubscription = theCtx.newJsonParser().parseResource(myBackingSubscriptionString);
}
return myBackingSubscription; return myBackingSubscription;
} }
public void setBackingSubscription(IBaseResource theBackingSubscription) { String getIdElementString() {
return myIdElement;
}
public void setBackingSubscription(FhirContext theCtx, IBaseResource theBackingSubscription) {
myBackingSubscription = theBackingSubscription; myBackingSubscription = theBackingSubscription;
myBackingSubscriptionString = null;
if (myBackingSubscription != null) {
myBackingSubscriptionString = theCtx.newJsonParser().encodeResourceToString(myBackingSubscription);
}
} }
public Subscription.SubscriptionChannelType getChannelType() { public Subscription.SubscriptionChannelType getChannelType() {
@ -122,12 +147,12 @@ public class CanonicalSubscription implements Serializable {
} }
} }
public IIdType getIdElement() { public IIdType getIdElement(FhirContext theContext) {
return myIdElement; IIdType retVal = null;
} if (isNotBlank(myIdElement)) {
retVal = theContext.getVersion().newIdType().setValue(myIdElement);
public void setIdElement(IIdType theIdElement) { }
myIdElement = theIdElement; return retVal;
} }
public String getPayloadString() { public String getPayloadString() {
@ -150,14 +175,14 @@ public class CanonicalSubscription implements Serializable {
* For now we're using the R4 triggerdefinition, but this * For now we're using the R4 triggerdefinition, but this
* may change in the future when things stabilize * may change in the future when things stabilize
*/ */
public TriggerDefinition getTrigger() { public CanonicalEventDefinition getTrigger() {
return myTrigger; return myTrigger;
} }
@Override @Override
public int hashCode() { public int hashCode() {
return new HashCodeBuilder(17, 37) return new HashCodeBuilder(17, 37)
.append(getIdElement().getIdPart()) .append(getIdElementString())
.toHashCode(); .toHashCode();
} }
@ -168,9 +193,19 @@ public class CanonicalSubscription implements Serializable {
} }
} }
public void setIdElement(IIdType theIdElement) {
myIdElement = null;
if (theIdElement != null) {
myIdElement = theIdElement.toUnqualifiedVersionless().getValue();
}
}
public static class EmailDetails { public static class EmailDetails {
@SerializedName("from")
private String myFrom; private String myFrom;
@SerializedName("subjectTemplate")
private String mySubjectTemplate; private String mySubjectTemplate;
@SerializedName("bodyTemplate")
private String myBodyTemplate; private String myBodyTemplate;
public String getBodyTemplate() { public String getBodyTemplate() {
@ -198,4 +233,11 @@ public class CanonicalSubscription implements Serializable {
} }
} }
public static class CanonicalEventDefinition {
public CanonicalEventDefinition(EventDefinition theDef) {
// nothing yet
}
}
} }

View File

@ -20,7 +20,9 @@ package ca.uhn.fhir.jpa.subscription;
* #L% * #L%
*/ */
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum; import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import com.google.gson.Gson;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
@ -28,11 +30,13 @@ import java.io.Serializable;
public class ResourceDeliveryMessage implements Serializable { public class ResourceDeliveryMessage implements Serializable {
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 1L;
private CanonicalSubscription mySubscription; private transient CanonicalSubscription mySubscription;
private IBaseResource myPayoad; private String mySubscriptionString;
private IIdType myPayloadId; private transient IBaseResource myPayload;
private String myPayoadString;
private String myPayloadId;
private RestOperationTypeEnum myOperationType; private RestOperationTypeEnum myOperationType;
public RestOperationTypeEnum getOperationType() { public RestOperationTypeEnum getOperationType() {
@ -43,28 +47,45 @@ public class ResourceDeliveryMessage implements Serializable {
myOperationType = theOperationType; myOperationType = theOperationType;
} }
public IIdType getPayloadId() { public IBaseResource getPayload(FhirContext theCtx) {
return myPayloadId; if (myPayload == null && myPayoadString != null) {
myPayload = theCtx.newJsonParser().parseResource(myPayoadString);
}
return myPayload;
} }
public void setPayloadId(IIdType thePayloadId) { public IIdType getPayloadId(FhirContext theCtx) {
myPayloadId = thePayloadId; IIdType retVal = null;
} if (myPayloadId != null) {
retVal = theCtx.getVersion().newIdType().setValue(myPayloadId);
public IBaseResource getPayload() { }
return myPayoad; return retVal;
}
public void setPayload(IBaseResource thePayload) {
myPayoad = thePayload;
} }
public CanonicalSubscription getSubscription() { public CanonicalSubscription getSubscription() {
if (mySubscription == null && mySubscriptionString != null) {
mySubscription = new Gson().fromJson(mySubscriptionString, CanonicalSubscription.class);
}
return mySubscription; return mySubscription;
} }
public void setSubscription(CanonicalSubscription theSubscription) { public void setSubscription(CanonicalSubscription theSubscription) {
mySubscription = theSubscription; mySubscription = theSubscription;
if (mySubscription != null) {
mySubscriptionString = new Gson().toJson(mySubscription);
}
}
public void setPayload(FhirContext theCtx, IBaseResource thePayload) {
myPayload = thePayload;
myPayoadString = theCtx.newJsonParser().encodeResourceToString(thePayload);
}
public void setPayloadId(IIdType thePayloadId) {
myPayloadId = null;
if (thePayloadId != null) {
myPayloadId = thePayloadId.getValue();
}
} }
} }

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.subscription;
* #L% * #L%
*/ */
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum; import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
@ -28,21 +29,28 @@ import java.io.Serializable;
public class ResourceModifiedMessage implements Serializable { public class ResourceModifiedMessage implements Serializable {
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 1L;
private IIdType myId; private String myId;
private RestOperationTypeEnum myOperationType; private RestOperationTypeEnum myOperationType;
private IBaseResource myNewPayload; private String myNewPayloadEncoded;
private transient IBaseResource myNewPayload;
public IIdType getId() { public IIdType getId(FhirContext theCtx) {
return myId; IIdType retVal = null;
if (myId != null) {
retVal = theCtx.getVersion().newIdType().setValue(myId);
}
return retVal;
} }
public void setId(IIdType theId) { public IBaseResource getNewPayload(FhirContext theCtx) {
myId = theId; if (myNewPayload == null && myNewPayloadEncoded != null) {
myNewPayload = theCtx.newJsonParser().parseResource(myNewPayloadEncoded);
}
return myNewPayload;
} }
public RestOperationTypeEnum getOperationType() { public RestOperationTypeEnum getOperationType() {
return myOperationType; return myOperationType;
} }
@ -51,11 +59,15 @@ public class ResourceModifiedMessage implements Serializable {
myOperationType = theOperationType; myOperationType = theOperationType;
} }
public IBaseResource getNewPayload() { public void setId(IIdType theId) {
return myNewPayload; myId = null;
if (theId != null) {
myId = theId.getValue();
}
} }
public void setNewPayload(IBaseResource theNewPayload) { public void setNewPayload(FhirContext theCtx, IBaseResource theNewPayload) {
myNewPayload = theNewPayload; myNewPayload = theNewPayload;
myNewPayloadEncoded = theCtx.newJsonParser().encodeResourceToString(theNewPayload);
} }
} }

View File

@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.instance.model.api.IPrimitiveType;
@ -35,29 +36,30 @@ import org.springframework.messaging.MessagingException;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public class SubscriptionActivatingSubscriber extends BaseSubscriptionSubscriber { public class SubscriptionActivatingSubscriber {
private final IFhirResourceDao mySubscriptionDao;
private final BaseSubscriptionInterceptor mySubscriptionInterceptor;
private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class); private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class);
private FhirContext myCtx;
private Subscription.SubscriptionChannelType myChannelType;
/** /**
* Constructor * Constructor
*/ */
public SubscriptionActivatingSubscriber(IFhirResourceDao<? extends IBaseResource> theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor) { public SubscriptionActivatingSubscriber(IFhirResourceDao<? extends IBaseResource> theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor) {
super(theSubscriptionDao, theChannelType, theSubscriptionInterceptor); mySubscriptionDao = theSubscriptionDao;
} mySubscriptionInterceptor = theSubscriptionInterceptor;
myChannelType = theChannelType;
private void activateAndRegisterSubscriptionIfRequired(ResourceModifiedMessage theMsg) { myCtx = theSubscriptionDao.getContext();
IBaseResource subscription = theMsg.getNewPayload();
activateAndRegisterSubscriptionIfRequired(subscription);
} }
public void activateAndRegisterSubscriptionIfRequired(IBaseResource theSubscription) { public void activateAndRegisterSubscriptionIfRequired(IBaseResource theSubscription) {
boolean subscriptionTypeApplies = subscriptionTypeApplies(theSubscription); boolean subscriptionTypeApplies = BaseSubscriptionSubscriber.subscriptionTypeApplies(myCtx, theSubscription, myChannelType);
if (subscriptionTypeApplies == false) { if (subscriptionTypeApplies == false) {
return; return;
} }
FhirContext ctx = getSubscriptionDao().getContext(); IPrimitiveType<?> status = myCtx.newTerser().getSingleValueOrNull(theSubscription, BaseSubscriptionInterceptor.SUBSCRIPTION_STATUS, IPrimitiveType.class);
IPrimitiveType<?> status = ctx.newTerser().getSingleValueOrNull(theSubscription, BaseSubscriptionInterceptor.SUBSCRIPTION_STATUS, IPrimitiveType.class);
String statusString = status.getValueAsString(); String statusString = status.getValueAsString();
String requestedStatus = Subscription.SubscriptionStatus.REQUESTED.toCode(); String requestedStatus = Subscription.SubscriptionStatus.REQUESTED.toCode();
@ -65,59 +67,37 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriptionSubscriber
if (requestedStatus.equals(statusString)) { if (requestedStatus.equals(statusString)) {
status.setValueAsString(activeStatus); status.setValueAsString(activeStatus);
ourLog.info("Activating and registering subscription {} from status {} to {}", theSubscription.getIdElement().toUnqualified().getValue(), requestedStatus, activeStatus); ourLog.info("Activating and registering subscription {} from status {} to {}", theSubscription.getIdElement().toUnqualified().getValue(), requestedStatus, activeStatus);
getSubscriptionDao().update(theSubscription); mySubscriptionDao.update(theSubscription);
getSubscriptionInterceptor().registerSubscription(theSubscription.getIdElement(), theSubscription); mySubscriptionInterceptor.registerSubscription(theSubscription.getIdElement(), theSubscription);
} else if (activeStatus.equals(statusString)) { } else if (activeStatus.equals(statusString)) {
if (!getSubscriptionInterceptor().hasSubscription(theSubscription.getIdElement())) { if (!mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement())) {
ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue()); ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
} }
getSubscriptionInterceptor().registerSubscription(theSubscription.getIdElement(), theSubscription); mySubscriptionInterceptor.registerSubscription(theSubscription.getIdElement(), theSubscription);
} else { } else {
if (getSubscriptionInterceptor().hasSubscription(theSubscription.getIdElement())) { if (mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement())) {
ourLog.info("Removing {} subscription {}", statusString, theSubscription.getIdElement().toUnqualified().getValue()); ourLog.info("Removing {} subscription {}", statusString, theSubscription.getIdElement().toUnqualified().getValue());
} }
getSubscriptionInterceptor().unregisterSubscription(theSubscription.getIdElement()); mySubscriptionInterceptor.unregisterSubscription(theSubscription.getIdElement());
} }
} }
private void handleCreate(ResourceModifiedMessage theMsg) { public void handleMessage(RestOperationTypeEnum theOperationType, IIdType theId, IBaseResource theSubscription) throws MessagingException {
if (!theMsg.getId().getResourceType().equals("Subscription")) {
return;
}
activateAndRegisterSubscriptionIfRequired(theMsg); switch (theOperationType) {
}
@Override
public void handleMessage(Message<?> theMessage) throws MessagingException {
if (!(theMessage.getPayload() instanceof ResourceModifiedMessage)) {
return;
}
ResourceModifiedMessage msg = (ResourceModifiedMessage) theMessage.getPayload();
IIdType id = msg.getId();
switch (msg.getOperationType()) {
case DELETE: case DELETE:
getSubscriptionInterceptor().unregisterSubscription(id); mySubscriptionInterceptor.unregisterSubscription(theId);
return; return;
case CREATE: case CREATE:
handleCreate(msg);
break;
case UPDATE: case UPDATE:
handleUpdate(msg); if (!theId.getResourceType().equals("Subscription")) {
return;
}
activateAndRegisterSubscriptionIfRequired(theSubscription);
break; break;
} }
} }
private void handleUpdate(ResourceModifiedMessage theMsg) {
if (!theMsg.getId().getResourceType().equals("Subscription")) {
return;
}
activateAndRegisterSubscriptionIfRequired(theMsg);
}
} }

View File

@ -29,6 +29,7 @@ import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.RequestDetails;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Subscription; import org.hl7.fhir.r4.model.Subscription;
import org.hl7.fhir.utilities.ucum.Canonical; import org.hl7.fhir.utilities.ucum.Canonical;
@ -68,8 +69,9 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
return; return;
} }
String resourceType = msg.getId().getResourceType(); IIdType id = msg.getId(getContext());
String resourceId = msg.getId().getIdPart(); String resourceType = id.getResourceType();
String resourceId = id.getIdPart();
List<CanonicalSubscription> subscriptions = getSubscriptionInterceptor().getSubscriptions(); List<CanonicalSubscription> subscriptions = getSubscriptionInterceptor().getSubscriptions();
@ -77,7 +79,7 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
for (CanonicalSubscription nextSubscription : subscriptions) { for (CanonicalSubscription nextSubscription : subscriptions) {
String nextSubscriptionId = nextSubscription.getIdElement().toUnqualifiedVersionless().getValue(); String nextSubscriptionId = nextSubscription.getIdElement(getContext()).toUnqualifiedVersionless().getValue();
String nextCriteriaString = nextSubscription.getCriteriaString(); String nextCriteriaString = nextSubscription.getCriteriaString();
if (StringUtils.isBlank(nextCriteriaString)) { if (StringUtils.isBlank(nextCriteriaString)) {
@ -115,10 +117,10 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
ourLog.info("Found match: queueing rest-hook notification for resource: {}", nextBase.getIdElement()); ourLog.info("Found match: queueing rest-hook notification for resource: {}", nextBase.getIdElement());
ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage(); ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
deliveryMsg.setPayload(nextBase); deliveryMsg.setPayload(getContext(), nextBase);
deliveryMsg.setSubscription(nextSubscription); deliveryMsg.setSubscription(nextSubscription);
deliveryMsg.setOperationType(msg.getOperationType()); deliveryMsg.setOperationType(msg.getOperationType());
deliveryMsg.setPayloadId(msg.getId()); deliveryMsg.setPayloadId(msg.getId(getContext()));
getSubscriptionInterceptor().getDeliveryChannel().send(new GenericMessage<>(deliveryMsg)); getSubscriptionInterceptor().getDeliveryChannel().send(new GenericMessage<>(deliveryMsg));
} }

View File

@ -49,7 +49,7 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
} }
protected void deliverPayload(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, EncodingEnum thePayloadType, IGenericClient theClient) { protected void deliverPayload(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, EncodingEnum thePayloadType, IGenericClient theClient) {
IBaseResource payloadResource = theMsg.getPayload(); IBaseResource payloadResource = theMsg.getPayload(getContext());
IClientExecutable<?, ?> operation; IClientExecutable<?, ?> operation;
switch (theMsg.getOperationType()) { switch (theMsg.getOperationType()) {
@ -60,7 +60,7 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
operation = theClient.update().resource(payloadResource); operation = theClient.update().resource(payloadResource);
break; break;
case DELETE: case DELETE:
operation = theClient.delete().resourceById(theMsg.getPayloadId()); operation = theClient.delete().resourceById(theMsg.getPayloadId(getContext()));
break; break;
default: default:
ourLog.warn("Ignoring delivery message of type: {}", theMsg.getOperationType()); ourLog.warn("Ignoring delivery message of type: {}", theMsg.getOperationType());
@ -69,7 +69,7 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
operation.encoded(thePayloadType); operation.encoded(thePayloadType);
ourLog.info("Delivering {} rest-hook payload {} for {}", theMsg.getOperationType(), payloadResource.getIdElement().toUnqualified().getValue(), theSubscription.getIdElement().toUnqualifiedVersionless().getValue()); ourLog.info("Delivering {} rest-hook payload {} for {}", theMsg.getOperationType(), payloadResource.getIdElement().toUnqualified().getValue(), theSubscription.getIdElement(getContext()).toUnqualifiedVersionless().getValue());
operation.execute(); operation.execute();
} }

View File

@ -42,9 +42,10 @@ import java.util.Map;
public class SubscriptionWebsocketHandler extends TextWebSocketHandler implements ISubscriptionWebsocketHandler { public class SubscriptionWebsocketHandler extends TextWebSocketHandler implements ISubscriptionWebsocketHandler {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionWebsocketHandler.class); private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionWebsocketHandler.class);
private static FhirContext ourCtx;
@Autowired @Autowired
private SubscriptionWebsocketInterceptor mySubscriptionWebsocketInterceptor; private SubscriptionWebsocketInterceptor mySubscriptionWebsocketInterceptor;
@Autowired
private FhirContext myCtx;
private IState myState = new InitialState(); private IState myState = new InitialState();
@ -118,7 +119,7 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
private void deliver() { private void deliver() {
try { try {
String payload = "ping " + mySubscription.getIdElement().getIdPart(); String payload = "ping " + mySubscription.getIdElement(myCtx).getIdPart();
ourLog.info("Sending WebSocket message: {}", payload); ourLog.info("Sending WebSocket message: {}", payload);
mySession.sendMessage(new TextMessage(payload)); mySession.sendMessage(new TextMessage(payload));
} catch (IOException e) { } catch (IOException e) {
@ -176,7 +177,7 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
try { try {
Map<String, CanonicalSubscription> idToSubscription = mySubscriptionWebsocketInterceptor.getIdToSubscription(); Map<String, CanonicalSubscription> idToSubscription = mySubscriptionWebsocketInterceptor.getIdToSubscription();
CanonicalSubscription subscription = idToSubscription.get(id.getIdPart()); CanonicalSubscription subscription = idToSubscription.get(id.getIdPart());
myState = new BoundStaticSubscipriptionState(theSession, subscription); myState = new BoundStaticSubscipriptionState( theSession, subscription);
} catch (ResourceNotFoundException e) { } catch (ResourceNotFoundException e) {
try { try {
String message = "Invalid bind request - Unknown subscription: " + id.getValue(); String message = "Invalid bind request - Unknown subscription: " + id.getValue();

View File

@ -171,7 +171,7 @@ public class ResourceProviderInterceptorDstu2Test extends BaseResourceProviderDs
ardCaptor = ArgumentCaptor.forClass(ActionRequestDetails.class); ardCaptor = ArgumentCaptor.forClass(ActionRequestDetails.class);
opTypeCaptor = ArgumentCaptor.forClass(RestOperationTypeEnum.class); opTypeCaptor = ArgumentCaptor.forClass(RestOperationTypeEnum.class);
verify(myDaoInterceptor, atLeast(3)).incomingRequestPreHandled(opTypeCaptor.capture(), ardCaptor.capture()); verify(myDaoInterceptor, atLeast(2)).incomingRequestPreHandled(opTypeCaptor.capture(), ardCaptor.capture());
assertEquals(RestOperationTypeEnum.TRANSACTION, opTypeCaptor.getAllValues().get(0)); assertEquals(RestOperationTypeEnum.TRANSACTION, opTypeCaptor.getAllValues().get(0));
assertEquals("Bundle", ardCaptor.getAllValues().get(0).getResourceType()); assertEquals("Bundle", ardCaptor.getAllValues().get(0).getResourceType());
assertNotNull(ardCaptor.getAllValues().get(0).getResource()); assertNotNull(ardCaptor.getAllValues().get(0).getResource());

View File

@ -111,7 +111,7 @@ public class ReferenceClientTest {
assertEquals(HttpGet.class, capt.getValue().getClass()); assertEquals(HttpGet.class, capt.getValue().getClass());
HttpGet get = (HttpGet) capt.getValue(); HttpGet get = (HttpGet) capt.getValue();
assertEquals("http://foo/Patient?general-practitioner%3AOrganization=123", get.getURI().toString()); assertEquals("http://foo/Patient?general-practitioner=Organization%2F123", get.getURI().toString());
} }
private String createBundle() { private String createBundle() {