Merge pull request #2076 from jamesagnew/2069-message-retry-attributes
2069 message retry attributes
This commit is contained in:
commit
631556041c
|
@ -1620,7 +1620,7 @@ public enum Pointcut {
|
||||||
* <p>
|
* <p>
|
||||||
* Hooks may accept the following parameters:
|
* Hooks may accept the following parameters:
|
||||||
* <ul>
|
* <ul>
|
||||||
* <li>ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage - This parameter should not be modified as processing is complete when this hook is invoked.</li>
|
* <li>ca.uhn.fhir.rest.server.messaging.ResourceOperationMessage - This parameter should not be modified as processing is complete when this hook is invoked.</li>
|
||||||
* <li>ca.uhn.fhir.empi.model.TransactionLogMessages - This parameter is for informational messages provided by the EMPI module during EMPI procesing. .</li>
|
* <li>ca.uhn.fhir.empi.model.TransactionLogMessages - This parameter is for informational messages provided by the EMPI module during EMPI procesing. .</li>
|
||||||
* </ul>
|
* </ul>
|
||||||
* </p>
|
* </p>
|
||||||
|
@ -1628,7 +1628,7 @@ public enum Pointcut {
|
||||||
* Hooks should return <code>void</code>.
|
* Hooks should return <code>void</code>.
|
||||||
* </p>
|
* </p>
|
||||||
*/
|
*/
|
||||||
EMPI_AFTER_PERSISTED_RESOURCE_CHECKED(void.class, "ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage", "ca.uhn.fhir.rest.server.TransactionLogMessages"),
|
EMPI_AFTER_PERSISTED_RESOURCE_CHECKED(void.class, "ca.uhn.fhir.rest.server.messaging.ResourceOperationMessage", "ca.uhn.fhir.rest.server.TransactionLogMessages"),
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <b>Performance Tracing Hook:</b>
|
* <b>Performance Tracing Hook:</b>
|
||||||
|
|
|
@ -33,6 +33,7 @@ import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
|
||||||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
||||||
import ca.uhn.fhir.rest.server.TransactionLogMessages;
|
import ca.uhn.fhir.rest.server.TransactionLogMessages;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
||||||
|
import ca.uhn.fhir.rest.server.messaging.ResourceOperationMessage;
|
||||||
import org.hl7.fhir.instance.model.api.IAnyResource;
|
import org.hl7.fhir.instance.model.api.IAnyResource;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
@ -93,16 +94,20 @@ public class EmpiMessageHandler implements MessageHandler {
|
||||||
}catch (Exception e) {
|
}catch (Exception e) {
|
||||||
log(empiContext, "Failure during EMPI processing: " + e.getMessage());
|
log(empiContext, "Failure during EMPI processing: " + e.getMessage());
|
||||||
} finally {
|
} finally {
|
||||||
|
|
||||||
// Interceptor call: EMPI_AFTER_PERSISTED_RESOURCE_CHECKED
|
// Interceptor call: EMPI_AFTER_PERSISTED_RESOURCE_CHECKED
|
||||||
|
ResourceOperationMessage outgoingMsg = new ResourceOperationMessage(myFhirContext, theMsg.getPayload(myFhirContext), theMsg.getOperationType());
|
||||||
|
outgoingMsg.setTransactionId(theMsg.getTransactionId());
|
||||||
|
|
||||||
HookParams params = new HookParams()
|
HookParams params = new HookParams()
|
||||||
.add(ResourceModifiedMessage.class, theMsg)
|
.add(ResourceOperationMessage.class, outgoingMsg)
|
||||||
.add(TransactionLogMessages.class, empiContext.getTransactionLogMessages());
|
.add(TransactionLogMessages.class, empiContext.getTransactionLogMessages());
|
||||||
myInterceptorBroadcaster.callHooks(Pointcut.EMPI_AFTER_PERSISTED_RESOURCE_CHECKED, params);
|
myInterceptorBroadcaster.callHooks(Pointcut.EMPI_AFTER_PERSISTED_RESOURCE_CHECKED, params);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private EmpiTransactionContext createEmpiContext(ResourceModifiedMessage theMsg) {
|
private EmpiTransactionContext createEmpiContext(ResourceModifiedMessage theMsg) {
|
||||||
TransactionLogMessages transactionLogMessages = TransactionLogMessages.createFromTransactionGuid(theMsg.getParentTransactionGuid());
|
TransactionLogMessages transactionLogMessages = TransactionLogMessages.createFromTransactionGuid(theMsg.getTransactionId());
|
||||||
EmpiTransactionContext.OperationType empiOperation;
|
EmpiTransactionContext.OperationType empiOperation;
|
||||||
switch (theMsg.getOperationType()) {
|
switch (theMsg.getOperationType()) {
|
||||||
case CREATE:
|
case CREATE:
|
||||||
|
|
|
@ -64,7 +64,7 @@ public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDel
|
||||||
|
|
||||||
protected void doDelivery(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, IChannelProducer theChannelProducer, IBaseResource thePayloadResource) {
|
protected void doDelivery(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, IChannelProducer theChannelProducer, IBaseResource thePayloadResource) {
|
||||||
ResourceModifiedMessage payload = new ResourceModifiedMessage(myFhirContext, thePayloadResource, theMsg.getOperationType());
|
ResourceModifiedMessage payload = new ResourceModifiedMessage(myFhirContext, thePayloadResource, theMsg.getOperationType());
|
||||||
payload.setParentTransactionGuid(theMsg.getParentTransactionGuid());
|
payload.setTransactionId(theMsg.getTransactionId());
|
||||||
ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(payload);
|
ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(payload);
|
||||||
theChannelProducer.send(message);
|
theChannelProducer.send(message);
|
||||||
ourLog.debug("Delivering {} message payload {} for {}", theMsg.getOperationType(), theMsg.getPayloadId(), theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue());
|
ourLog.debug("Delivering {} message payload {} for {}", theMsg.getOperationType(), theMsg.getPayloadId(), theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue());
|
||||||
|
|
|
@ -37,11 +37,14 @@ import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
public class DaoSubscriptionMatcher implements ISubscriptionMatcher {
|
public class DaoSubscriptionMatcher implements ISubscriptionMatcher {
|
||||||
|
private Logger ourLog = LoggerFactory.getLogger(DaoSubscriptionMatcher.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
DaoRegistry myDaoRegistry;
|
DaoRegistry myDaoRegistry;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
MatchUrlService myMatchUrlService;
|
MatchUrlService myMatchUrlService;
|
||||||
private Logger ourLog = LoggerFactory.getLogger(DaoSubscriptionMatcher.class);
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private FhirContext myCtx;
|
private FhirContext myCtx;
|
||||||
|
|
||||||
|
|
|
@ -23,13 +23,13 @@ package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
|
||||||
import ca.uhn.fhir.jpa.api.config.DaoConfig;
|
import ca.uhn.fhir.jpa.api.config.DaoConfig;
|
||||||
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||||
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
|
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
|
||||||
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
|
|
||||||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
|
|
||||||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
|
||||||
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
|
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
|
||||||
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
|
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
|
||||||
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
|
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
|
||||||
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
|
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
|
||||||
|
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
|
||||||
|
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
|
||||||
|
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
|
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
|
||||||
import ca.uhn.fhir.util.SubscriptionUtil;
|
import ca.uhn.fhir.util.SubscriptionUtil;
|
||||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
|
|
|
@ -86,7 +86,6 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
|
||||||
|
|
||||||
ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload();
|
ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload();
|
||||||
matchActiveSubscriptionsAndDeliver(msg);
|
matchActiveSubscriptionsAndDeliver(msg);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void matchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg) {
|
public void matchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg) {
|
||||||
|
@ -164,7 +163,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
|
||||||
deliveryMsg.setPayload(myFhirContext, payload, encoding);
|
deliveryMsg.setPayload(myFhirContext, payload, encoding);
|
||||||
deliveryMsg.setSubscription(subscription);
|
deliveryMsg.setSubscription(subscription);
|
||||||
deliveryMsg.setOperationType(theMsg.getOperationType());
|
deliveryMsg.setOperationType(theMsg.getOperationType());
|
||||||
deliveryMsg.setParentTransactionGuid(theMsg.getParentTransactionGuid());
|
deliveryMsg.setTransactionId(theMsg.getTransactionId());
|
||||||
deliveryMsg.copyAdditionalPropertiesFrom(theMsg);
|
deliveryMsg.copyAdditionalPropertiesFrom(theMsg);
|
||||||
|
|
||||||
// Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
|
// Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
|
||||||
|
|
|
@ -21,10 +21,10 @@ package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import ca.uhn.fhir.context.FhirContext;
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
|
||||||
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
|
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
|
||||||
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
|
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
|
||||||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
|
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
|
||||||
|
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
||||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
|
@ -1,49 +0,0 @@
|
||||||
package ca.uhn.fhir.jpa.subscription.model;
|
|
||||||
|
|
||||||
/*-
|
|
||||||
* #%L
|
|
||||||
* HAPI FHIR Subscription Server
|
|
||||||
* %%
|
|
||||||
* Copyright (C) 2014 - 2020 University Health Network
|
|
||||||
* %%
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
* #L%
|
|
||||||
*/
|
|
||||||
|
|
||||||
import ca.uhn.fhir.model.api.IModelJson;
|
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
|
||||||
import org.springframework.messaging.Message;
|
|
||||||
import org.springframework.messaging.MessageHeaders;
|
|
||||||
|
|
||||||
public abstract class BaseJsonMessage<T> implements Message<T>, IModelJson {
|
|
||||||
|
|
||||||
private static final long serialVersionUID = 1L;
|
|
||||||
@JsonProperty("headers")
|
|
||||||
private MessageHeaders myHeaders;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructor
|
|
||||||
*/
|
|
||||||
public BaseJsonMessage() {
|
|
||||||
super();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public MessageHeaders getHeaders() {
|
|
||||||
return myHeaders;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setHeaders(MessageHeaders theHeaders) {
|
|
||||||
myHeaders = theHeaders;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,25 +0,0 @@
|
||||||
package ca.uhn.fhir.jpa.subscription.model;
|
|
||||||
|
|
||||||
/*-
|
|
||||||
* #%L
|
|
||||||
* HAPI FHIR Subscription Server
|
|
||||||
* %%
|
|
||||||
* Copyright (C) 2014 - 2020 University Health Network
|
|
||||||
* %%
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
* #L%
|
|
||||||
*/
|
|
||||||
|
|
||||||
public interface IResourceMessage {
|
|
||||||
String getPayloadId();
|
|
||||||
}
|
|
|
@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.subscription.model;
|
||||||
* #L%
|
* #L%
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
import ca.uhn.fhir.rest.server.messaging.json.BaseJsonMessage;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,8 @@ package ca.uhn.fhir.jpa.subscription.model;
|
||||||
import ca.uhn.fhir.context.FhirContext;
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
import ca.uhn.fhir.parser.IParser;
|
import ca.uhn.fhir.parser.IParser;
|
||||||
import ca.uhn.fhir.rest.api.EncodingEnum;
|
import ca.uhn.fhir.rest.api.EncodingEnum;
|
||||||
|
import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
|
||||||
|
import ca.uhn.fhir.rest.server.messaging.IResourceMessage;
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||||
|
@ -38,22 +40,10 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes
|
||||||
private CanonicalSubscription mySubscription;
|
private CanonicalSubscription mySubscription;
|
||||||
@JsonProperty("payload")
|
@JsonProperty("payload")
|
||||||
private String myPayloadString;
|
private String myPayloadString;
|
||||||
@JsonIgnore
|
|
||||||
private transient IBaseResource myPayload;
|
|
||||||
@JsonProperty("payloadId")
|
@JsonProperty("payloadId")
|
||||||
private String myPayloadId;
|
private String myPayloadId;
|
||||||
@JsonProperty("parentTransactionGuid")
|
@JsonIgnore
|
||||||
private String myParentTransactionGuid;
|
private transient IBaseResource myPayloadDecoded;
|
||||||
@JsonProperty("operationType")
|
|
||||||
private ResourceModifiedMessage.OperationTypeEnum myOperationType;
|
|
||||||
|
|
||||||
public String getParentTransactionGuid() {
|
|
||||||
return myParentTransactionGuid;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setParentTransactionGuid(String theParentTransactionGuid) {
|
|
||||||
myParentTransactionGuid = theParentTransactionGuid;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
|
@ -62,20 +52,12 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ResourceModifiedMessage.OperationTypeEnum getOperationType() {
|
|
||||||
return myOperationType;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setOperationType(ResourceModifiedMessage.OperationTypeEnum theOperationType) {
|
|
||||||
myOperationType = theOperationType;
|
|
||||||
}
|
|
||||||
|
|
||||||
public IBaseResource getPayload(FhirContext theCtx) {
|
public IBaseResource getPayload(FhirContext theCtx) {
|
||||||
IBaseResource retVal = myPayload;
|
IBaseResource retVal = myPayloadDecoded;
|
||||||
if (retVal == null && isNotBlank(myPayloadString)) {
|
if (retVal == null && isNotBlank(myPayloadString)) {
|
||||||
IParser parser = EncodingEnum.detectEncoding(myPayloadString).newParser(theCtx);
|
IParser parser = EncodingEnum.detectEncoding(myPayloadString).newParser(theCtx);
|
||||||
retVal = parser.parseResource(myPayloadString);
|
retVal = parser.parseResource(myPayloadString);
|
||||||
myPayload = retVal;
|
myPayloadDecoded = retVal;
|
||||||
}
|
}
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
@ -133,9 +115,9 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes
|
||||||
return new ToStringBuilder(this)
|
return new ToStringBuilder(this)
|
||||||
.append("mySubscription", mySubscription)
|
.append("mySubscription", mySubscription)
|
||||||
.append("myPayloadString", myPayloadString)
|
.append("myPayloadString", myPayloadString)
|
||||||
.append("myPayload", myPayload)
|
.append("myPayload", myPayloadDecoded)
|
||||||
.append("myPayloadId", myPayloadId)
|
.append("myPayloadId", myPayloadId)
|
||||||
.append("myOperationType", myOperationType)
|
.append("myOperationType", getOperationType())
|
||||||
.toString();
|
.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.subscription.model;
|
||||||
* #L%
|
* #L%
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
import ca.uhn.fhir.rest.server.messaging.json.BaseJsonMessage;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||||
|
|
||||||
|
|
|
@ -21,40 +21,24 @@ package ca.uhn.fhir.jpa.subscription.model;
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import ca.uhn.fhir.context.FhirContext;
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
import ca.uhn.fhir.model.api.IModelJson;
|
|
||||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
import ca.uhn.fhir.util.ResourceReferenceInfo;
|
import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage;
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
import org.hl7.fhir.instance.model.api.IIdType;
|
|
||||||
|
|
||||||
import java.util.List;
|
/**
|
||||||
|
* Most of this class has been moved to ResourceModifiedMessage in the hapi-fhir-server project, for a reusable channel ResourceModifiedMessage
|
||||||
|
* that doesn't require knowledge of subscriptions.
|
||||||
|
*/
|
||||||
|
public class ResourceModifiedMessage extends BaseResourceModifiedMessage {
|
||||||
|
|
||||||
import static org.apache.commons.lang3.StringUtils.isBlank;
|
|
||||||
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
|
||||||
|
|
||||||
public class ResourceModifiedMessage extends BaseResourceMessage implements IResourceMessage, IModelJson {
|
|
||||||
|
|
||||||
@JsonProperty("resourceId")
|
|
||||||
private String myId;
|
|
||||||
@JsonProperty("operationType")
|
|
||||||
private OperationTypeEnum myOperationType;
|
|
||||||
/**
|
/**
|
||||||
* This will only be set if the resource is being triggered for a specific
|
* This will only be set if the resource is being triggered for a specific
|
||||||
* subscription
|
* subscription
|
||||||
*/
|
*/
|
||||||
@JsonProperty(value = "subscriptionId", required = false)
|
@JsonProperty(value = "subscriptionId", required = false)
|
||||||
private String mySubscriptionId;
|
private String mySubscriptionId;
|
||||||
@JsonProperty("payload")
|
|
||||||
private String myPayload;
|
|
||||||
@JsonProperty("payloadId")
|
|
||||||
private String myPayloadId;
|
|
||||||
@JsonProperty("parentTransactionGuid")
|
|
||||||
private String myParentTransactionGuid;
|
|
||||||
@JsonIgnore
|
|
||||||
private transient IBaseResource myPayloadDecoded;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
|
@ -64,25 +48,13 @@ public class ResourceModifiedMessage extends BaseResourceMessage implements IRes
|
||||||
}
|
}
|
||||||
|
|
||||||
public ResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theResource, OperationTypeEnum theOperationType) {
|
public ResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theResource, OperationTypeEnum theOperationType) {
|
||||||
this();
|
super(theFhirContext, theResource, theOperationType);
|
||||||
setId(theResource.getIdElement());
|
|
||||||
setOperationType(theOperationType);
|
|
||||||
if (theOperationType != OperationTypeEnum.DELETE) {
|
|
||||||
setNewPayload(theFhirContext, theResource);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public ResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theNewResource, OperationTypeEnum theOperationType, RequestDetails theRequest) {
|
public ResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theNewResource, OperationTypeEnum theOperationType, RequestDetails theRequest) {
|
||||||
this(theFhirContext, theNewResource, theOperationType);
|
super(theFhirContext, theNewResource, theOperationType, theRequest);
|
||||||
if (theRequest != null) {
|
|
||||||
setParentTransactionGuid(theRequest.getTransactionGuid());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getPayloadId() {
|
|
||||||
return myPayloadId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getSubscriptionId() {
|
public String getSubscriptionId() {
|
||||||
return mySubscriptionId;
|
return mySubscriptionId;
|
||||||
|
@ -92,98 +64,6 @@ public class ResourceModifiedMessage extends BaseResourceMessage implements IRes
|
||||||
mySubscriptionId = theSubscriptionId;
|
mySubscriptionId = theSubscriptionId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getId() {
|
|
||||||
return myId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public IIdType getId(FhirContext theCtx) {
|
|
||||||
IIdType retVal = null;
|
|
||||||
if (myId != null) {
|
|
||||||
retVal = theCtx.getVersion().newIdType().setValue(myId);
|
|
||||||
}
|
|
||||||
return retVal;
|
|
||||||
}
|
|
||||||
|
|
||||||
public IBaseResource getNewPayload(FhirContext theCtx) {
|
|
||||||
if (myPayloadDecoded == null && isNotBlank(myPayload)) {
|
|
||||||
myPayloadDecoded = theCtx.newJsonParser().parseResource(myPayload);
|
|
||||||
}
|
|
||||||
return myPayloadDecoded;
|
|
||||||
}
|
|
||||||
|
|
||||||
public OperationTypeEnum getOperationType() {
|
|
||||||
return myOperationType;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setOperationType(OperationTypeEnum theOperationType) {
|
|
||||||
myOperationType = theOperationType;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setId(IIdType theId) {
|
|
||||||
myId = null;
|
|
||||||
if (theId != null) {
|
|
||||||
myId = theId.getValue();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getParentTransactionGuid() {
|
|
||||||
return myParentTransactionGuid;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setParentTransactionGuid(String theParentTransactionGuid) {
|
|
||||||
myParentTransactionGuid = theParentTransactionGuid;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setNewPayload(FhirContext theCtx, IBaseResource theNewPayload) {
|
|
||||||
/*
|
|
||||||
* References with placeholders would be invalid by the time we get here, and
|
|
||||||
* would be caught before we even get here. This check is basically a last-ditch
|
|
||||||
* effort to make sure nothing has broken in the various safeguards that
|
|
||||||
* should prevent this from happening (hence it only being an assert as
|
|
||||||
* opposed to something executed all the time).
|
|
||||||
*/
|
|
||||||
assert payloadContainsNoPlaceholderReferences(theCtx, theNewPayload);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Note: Don't set myPayloadDecoded in here- This is a false optimization since
|
|
||||||
* it doesn't actually get used if anyone is doing subscriptions at any
|
|
||||||
* scale using a queue engine, and not going through the serialize/deserialize
|
|
||||||
* as we would in a queue engine can mask bugs.
|
|
||||||
* -JA
|
|
||||||
*/
|
|
||||||
myPayload = theCtx.newJsonParser().encodeResourceToString(theNewPayload);
|
|
||||||
myPayloadId = theNewPayload.getIdElement().toUnqualified().getValue();
|
|
||||||
}
|
|
||||||
|
|
||||||
public enum OperationTypeEnum {
|
|
||||||
CREATE,
|
|
||||||
UPDATE,
|
|
||||||
DELETE,
|
|
||||||
MANUALLY_TRIGGERED
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean payloadContainsNoPlaceholderReferences(FhirContext theCtx, IBaseResource theNewPayload) {
|
|
||||||
List<ResourceReferenceInfo> refs = theCtx.newTerser().getAllResourceReferences(theNewPayload);
|
|
||||||
for (ResourceReferenceInfo next : refs) {
|
|
||||||
String ref = next.getResourceReference().getReferenceElement().getValue();
|
|
||||||
if (isBlank(ref)) {
|
|
||||||
IBaseResource resource = next.getResourceReference().getResource();
|
|
||||||
if (resource != null) {
|
|
||||||
ref = resource.getIdElement().getValue();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (isNotBlank(ref)) {
|
|
||||||
if (ref.startsWith("#")) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (ref.startsWith("urn:uuid:")) {
|
|
||||||
throw new AssertionError("Reference at " + next.getName() + " is invalid: " + ref);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return new ToStringBuilder(this)
|
return new ToStringBuilder(this)
|
||||||
|
|
|
@ -26,7 +26,6 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig;
|
||||||
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||||
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
|
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
|
||||||
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
|
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
|
||||||
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
|
|
||||||
import ca.uhn.fhir.jpa.model.sched.HapiJob;
|
import ca.uhn.fhir.jpa.model.sched.HapiJob;
|
||||||
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
|
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
|
||||||
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
|
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
|
||||||
|
@ -38,6 +37,7 @@ import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
|
||||||
import ca.uhn.fhir.rest.annotation.IdParam;
|
import ca.uhn.fhir.rest.annotation.IdParam;
|
||||||
import ca.uhn.fhir.rest.api.CacheControlDirective;
|
import ca.uhn.fhir.rest.api.CacheControlDirective;
|
||||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
||||||
|
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
|
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
|
||||||
|
|
|
@ -25,8 +25,8 @@ import ca.uhn.fhir.interceptor.api.Interceptor;
|
||||||
import ca.uhn.fhir.interceptor.api.Pointcut;
|
import ca.uhn.fhir.interceptor.api.Pointcut;
|
||||||
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
|
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
|
||||||
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
|
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
|
||||||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
|
||||||
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
|
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
|
||||||
|
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
||||||
import ca.uhn.fhir.util.StopWatch;
|
import ca.uhn.fhir.util.StopWatch;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
|
@ -3,12 +3,12 @@ package ca.uhn.fhir.jpa.subscription.match.deliver;
|
||||||
import ca.uhn.fhir.context.FhirContext;
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
|
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
|
||||||
import ca.uhn.fhir.interceptor.api.Pointcut;
|
import ca.uhn.fhir.interceptor.api.Pointcut;
|
||||||
|
import ca.uhn.fhir.jpa.subscription.match.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
|
||||||
|
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
|
||||||
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
|
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
|
||||||
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
|
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
|
||||||
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
|
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
|
||||||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
||||||
import ca.uhn.fhir.jpa.subscription.match.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
|
|
||||||
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
|
|
||||||
import ca.uhn.fhir.rest.api.EncodingEnum;
|
import ca.uhn.fhir.rest.api.EncodingEnum;
|
||||||
import ca.uhn.fhir.rest.client.api.IGenericClient;
|
import ca.uhn.fhir.rest.client.api.IGenericClient;
|
||||||
import ca.uhn.fhir.rest.client.api.IRestfulClientFactory;
|
import ca.uhn.fhir.rest.client.api.IRestfulClientFactory;
|
||||||
|
@ -18,18 +18,20 @@ import org.hl7.fhir.r4.model.Patient;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
|
||||||
import org.mockito.Answers;
|
import org.mockito.Answers;
|
||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
import org.mockito.junit.jupiter.MockitoExtension;
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
import org.mockito.junit.jupiter.MockitoExtension;
|
|
||||||
import org.springframework.messaging.Message;
|
import org.springframework.messaging.Message;
|
||||||
import org.springframework.messaging.MessagingException;
|
import org.springframework.messaging.MessagingException;
|
||||||
import org.springframework.messaging.support.GenericMessage;
|
import org.springframework.messaging.support.GenericMessage;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.*;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.eq;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
@ExtendWith(MockitoExtension.class)
|
@ExtendWith(MockitoExtension.class)
|
||||||
public class BaseSubscriptionDeliverySubscriberTest {
|
public class BaseSubscriptionDeliverySubscriberTest {
|
||||||
|
|
|
@ -33,7 +33,6 @@ public class SubscriptionRegistrySharedTest extends BaseSubscriptionRegistryTest
|
||||||
return "shared";
|
return "shared";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -10,7 +10,26 @@ import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
|
||||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
||||||
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
|
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
|
||||||
import ca.uhn.fhir.util.UrlUtil;
|
import ca.uhn.fhir.util.UrlUtil;
|
||||||
import org.hl7.fhir.dstu3.model.*;
|
import org.hl7.fhir.dstu3.model.BodySite;
|
||||||
|
import org.hl7.fhir.dstu3.model.CodeableConcept;
|
||||||
|
import org.hl7.fhir.dstu3.model.Coding;
|
||||||
|
import org.hl7.fhir.dstu3.model.CommunicationRequest;
|
||||||
|
import org.hl7.fhir.dstu3.model.DateTimeType;
|
||||||
|
import org.hl7.fhir.dstu3.model.Dosage;
|
||||||
|
import org.hl7.fhir.dstu3.model.Enumerations;
|
||||||
|
import org.hl7.fhir.dstu3.model.EpisodeOfCare;
|
||||||
|
import org.hl7.fhir.dstu3.model.IdType;
|
||||||
|
import org.hl7.fhir.dstu3.model.Location;
|
||||||
|
import org.hl7.fhir.dstu3.model.MedicationRequest;
|
||||||
|
import org.hl7.fhir.dstu3.model.Observation;
|
||||||
|
import org.hl7.fhir.dstu3.model.Patient;
|
||||||
|
import org.hl7.fhir.dstu3.model.Procedure;
|
||||||
|
import org.hl7.fhir.dstu3.model.ProcedureRequest;
|
||||||
|
import org.hl7.fhir.dstu3.model.Provenance;
|
||||||
|
import org.hl7.fhir.dstu3.model.QuestionnaireResponse;
|
||||||
|
import org.hl7.fhir.dstu3.model.Reference;
|
||||||
|
import org.hl7.fhir.dstu3.model.SearchParameter;
|
||||||
|
import org.hl7.fhir.dstu3.model.Timing;
|
||||||
import org.hl7.fhir.dstu3.model.codesystems.MedicationRequestCategory;
|
import org.hl7.fhir.dstu3.model.codesystems.MedicationRequestCategory;
|
||||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
import org.hl7.fhir.instance.model.api.IIdType;
|
import org.hl7.fhir.instance.model.api.IIdType;
|
||||||
|
@ -89,10 +108,8 @@ public class InMemorySubscriptionMatcherR3Test extends BaseSubscriptionDstu3Test
|
||||||
pr.setSubject(new Reference("Patient/"));
|
pr.setSubject(new Reference("Patient/"));
|
||||||
assertMatched(pr, "ProcedureRequest?intent=original-order");
|
assertMatched(pr, "ProcedureRequest?intent=original-order");
|
||||||
assertNotMatched(pr, "ProcedureRequest?subject=Patient/123");
|
assertNotMatched(pr, "ProcedureRequest?subject=Patient/123");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testResourceById() {
|
public void testResourceById() {
|
||||||
|
|
||||||
|
|
|
@ -73,6 +73,10 @@
|
||||||
<groupId>org.apache.commons</groupId>
|
<groupId>org.apache.commons</groupId>
|
||||||
<artifactId>commons-collections4</artifactId>
|
<artifactId>commons-collections4</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework</groupId>
|
||||||
|
<artifactId>spring-messaging</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
|
|
@ -1,29 +1,12 @@
|
||||||
package ca.uhn.fhir.jpa.subscription.model;
|
package ca.uhn.fhir.rest.server.messaging;
|
||||||
|
|
||||||
|
|
||||||
/*-
|
|
||||||
* #%L
|
|
||||||
* HAPI FHIR Subscription Server
|
|
||||||
* %%
|
|
||||||
* Copyright (C) 2014 - 2020 University Health Network
|
|
||||||
* %%
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
* #L%
|
|
||||||
*/
|
|
||||||
|
|
||||||
import ca.uhn.fhir.model.api.IModelJson;
|
import ca.uhn.fhir.model.api.IModelJson;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import org.apache.commons.lang3.Validate;
|
import org.apache.commons.lang3.Validate;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
@ -31,9 +14,15 @@ import java.util.Optional;
|
||||||
@SuppressWarnings("WeakerAccess")
|
@SuppressWarnings("WeakerAccess")
|
||||||
public abstract class BaseResourceMessage implements IResourceMessage, IModelJson {
|
public abstract class BaseResourceMessage implements IResourceMessage, IModelJson {
|
||||||
|
|
||||||
|
@JsonProperty("operationType")
|
||||||
|
protected BaseResourceModifiedMessage.OperationTypeEnum myOperationType;
|
||||||
|
|
||||||
@JsonProperty("attributes")
|
@JsonProperty("attributes")
|
||||||
private Map<String, String> myAttributes;
|
private Map<String, String> myAttributes;
|
||||||
|
|
||||||
|
@JsonProperty("transactionId")
|
||||||
|
private String myTransactionId;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns an attribute stored in this message.
|
* Returns an attribute stored in this message.
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -94,4 +83,52 @@ public abstract class BaseResourceMessage implements IResourceMessage, IModelJso
|
||||||
myAttributes.putAll(theMsg.myAttributes);
|
myAttributes.putAll(theMsg.myAttributes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the {@link OperationTypeEnum} that is occurring to the Resource of the message
|
||||||
|
*
|
||||||
|
* @return the operation type.
|
||||||
|
*/
|
||||||
|
public BaseResourceModifiedMessage.OperationTypeEnum getOperationType() {
|
||||||
|
return myOperationType;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the {@link OperationTypeEnum} occuring to the resource of the message.
|
||||||
|
*
|
||||||
|
* @param theOperationType The operation type to set.
|
||||||
|
*/
|
||||||
|
public void setOperationType(BaseResourceModifiedMessage.OperationTypeEnum theOperationType) {
|
||||||
|
myOperationType = theOperationType;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve the transaction ID related to this message.
|
||||||
|
*
|
||||||
|
* @return the transaction ID, or null.
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
public String getTransactionId() {
|
||||||
|
return myTransactionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds a transaction ID to this message. This ID can be used for many purposes. For example, performing tracing
|
||||||
|
* across asynchronous hooks, tying data together, or downstream logging purposes.
|
||||||
|
*
|
||||||
|
* One current internal implementation uses this field to tie back EMPI processing results (which are asynchronous)
|
||||||
|
* to the original transaction log that caused the EMPI processing to occur.
|
||||||
|
*
|
||||||
|
* @param theTransactionId An ID representing a transaction of relevance to this message.
|
||||||
|
*/
|
||||||
|
public void setTransactionId(String theTransactionId) {
|
||||||
|
myTransactionId = theTransactionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public enum OperationTypeEnum {
|
||||||
|
CREATE,
|
||||||
|
UPDATE,
|
||||||
|
DELETE,
|
||||||
|
MANUALLY_TRIGGERED
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,157 @@
|
||||||
|
package ca.uhn.fhir.rest.server.messaging;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
|
import ca.uhn.fhir.model.api.IModelJson;
|
||||||
|
import ca.uhn.fhir.parser.IParser;
|
||||||
|
import ca.uhn.fhir.rest.api.EncodingEnum;
|
||||||
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
|
import ca.uhn.fhir.util.ResourceReferenceInfo;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||||
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
|
import org.hl7.fhir.instance.model.api.IIdType;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.apache.commons.lang3.StringUtils.isBlank;
|
||||||
|
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||||
|
|
||||||
|
public abstract class BaseResourceModifiedMessage extends BaseResourceMessage implements IResourceMessage, IModelJson {
|
||||||
|
|
||||||
|
@JsonProperty("resourceId")
|
||||||
|
protected String myId;
|
||||||
|
@JsonProperty("payload")
|
||||||
|
protected String myPayload;
|
||||||
|
@JsonProperty("payloadId")
|
||||||
|
protected String myPayloadId;
|
||||||
|
@JsonIgnore
|
||||||
|
protected transient IBaseResource myPayloadDecoded;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
*/
|
||||||
|
public BaseResourceModifiedMessage() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
public BaseResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theResource, OperationTypeEnum theOperationType) {
|
||||||
|
this();
|
||||||
|
setId(theResource.getIdElement());
|
||||||
|
setOperationType(theOperationType);
|
||||||
|
if (theOperationType != OperationTypeEnum.DELETE) {
|
||||||
|
setNewPayload(theFhirContext, theResource);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public BaseResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theNewResource, OperationTypeEnum theOperationType, RequestDetails theRequest) {
|
||||||
|
this(theFhirContext, theNewResource, theOperationType);
|
||||||
|
if (theRequest != null) {
|
||||||
|
setTransactionId(theRequest.getTransactionGuid());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getPayloadId() {
|
||||||
|
return myPayloadId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getId() {
|
||||||
|
return myId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IIdType getId(FhirContext theCtx) {
|
||||||
|
IIdType retVal = null;
|
||||||
|
if (myId != null) {
|
||||||
|
retVal = theCtx.getVersion().newIdType().setValue(myId);
|
||||||
|
}
|
||||||
|
return retVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IBaseResource getNewPayload(FhirContext theCtx) {
|
||||||
|
if (myPayloadDecoded == null && isNotBlank(myPayload)) {
|
||||||
|
myPayloadDecoded = theCtx.newJsonParser().parseResource(myPayload);
|
||||||
|
}
|
||||||
|
return myPayloadDecoded;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IBaseResource getPayload(FhirContext theCtx) {
|
||||||
|
IBaseResource retVal = myPayloadDecoded;
|
||||||
|
if (retVal == null && isNotBlank(myPayload)) {
|
||||||
|
IParser parser = EncodingEnum.detectEncoding(myPayload).newParser(theCtx);
|
||||||
|
retVal = parser.parseResource(myPayload);
|
||||||
|
myPayloadDecoded = retVal;
|
||||||
|
}
|
||||||
|
return retVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getPayloadString() {
|
||||||
|
if (this.myPayload != null) {
|
||||||
|
return this.myPayload;
|
||||||
|
}
|
||||||
|
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setId(IIdType theId) {
|
||||||
|
myId = null;
|
||||||
|
if (theId != null) {
|
||||||
|
myId = theId.getValue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setNewPayload(FhirContext theCtx, IBaseResource theNewPayload) {
|
||||||
|
/*
|
||||||
|
* References with placeholders would be invalid by the time we get here, and
|
||||||
|
* would be caught before we even get here. This check is basically a last-ditch
|
||||||
|
* effort to make sure nothing has broken in the various safeguards that
|
||||||
|
* should prevent this from happening (hence it only being an assert as
|
||||||
|
* opposed to something executed all the time).
|
||||||
|
*/
|
||||||
|
assert payloadContainsNoPlaceholderReferences(theCtx, theNewPayload);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Note: Don't set myPayloadDecoded in here- This is a false optimization since
|
||||||
|
* it doesn't actually get used if anyone is doing subscriptions at any
|
||||||
|
* scale using a queue engine, and not going through the serialize/deserialize
|
||||||
|
* as we would in a queue engine can mask bugs.
|
||||||
|
* -JA
|
||||||
|
*/
|
||||||
|
myPayload = theCtx.newJsonParser().encodeResourceToString(theNewPayload);
|
||||||
|
myPayloadId = theNewPayload.getIdElement().toUnqualified().getValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static boolean payloadContainsNoPlaceholderReferences(FhirContext theCtx, IBaseResource theNewPayload) {
|
||||||
|
List<ResourceReferenceInfo> refs = theCtx.newTerser().getAllResourceReferences(theNewPayload);
|
||||||
|
for (ResourceReferenceInfo next : refs) {
|
||||||
|
String ref = next.getResourceReference().getReferenceElement().getValue();
|
||||||
|
if (isBlank(ref)) {
|
||||||
|
IBaseResource resource = next.getResourceReference().getResource();
|
||||||
|
if (resource != null) {
|
||||||
|
ref = resource.getIdElement().getValue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (isNotBlank(ref)) {
|
||||||
|
if (ref.startsWith("#")) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (ref.startsWith("urn:uuid:")) {
|
||||||
|
throw new AssertionError("Reference at " + next.getName() + " is invalid: " + ref);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return new ToStringBuilder(this)
|
||||||
|
.append("myId", myId)
|
||||||
|
.append("myOperationType", myOperationType)
|
||||||
|
// .append("myPayload", myPayload)
|
||||||
|
.append("myPayloadId", myPayloadId)
|
||||||
|
// .append("myPayloadDecoded", myPayloadDecoded)
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,7 @@
|
||||||
|
package ca.uhn.fhir.rest.server.messaging;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public interface IResourceMessage {
|
||||||
|
String getPayloadId();
|
||||||
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
package ca.uhn.fhir.rest.server.messaging;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
|
|
||||||
|
public class ResourceOperationMessage extends BaseResourceModifiedMessage {
|
||||||
|
public ResourceOperationMessage() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public ResourceOperationMessage(FhirContext theFhirContext, IBaseResource theResource, OperationTypeEnum theOperationType) {
|
||||||
|
super(theFhirContext, theResource, theOperationType);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ResourceOperationMessage(FhirContext theFhirContext, IBaseResource theNewResource, OperationTypeEnum theOperationType, RequestDetails theRequest) {
|
||||||
|
super(theFhirContext, theNewResource, theOperationType, theRequest);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,41 @@
|
||||||
|
package ca.uhn.fhir.rest.server.messaging.json;
|
||||||
|
|
||||||
|
|
||||||
|
import ca.uhn.fhir.model.api.IModelJson;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import org.springframework.messaging.Message;
|
||||||
|
import org.springframework.messaging.MessageHeaders;
|
||||||
|
|
||||||
|
public abstract class BaseJsonMessage<T> implements Message<T>, IModelJson {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
@JsonProperty("headers")
|
||||||
|
private HapiMessageHeaders myHeaders;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
*/
|
||||||
|
public BaseJsonMessage() {
|
||||||
|
super();
|
||||||
|
setDefaultRetryHeaders();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setDefaultRetryHeaders() {
|
||||||
|
HapiMessageHeaders messageHeaders = new HapiMessageHeaders();
|
||||||
|
setHeaders(messageHeaders);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageHeaders getHeaders() {
|
||||||
|
return myHeaders.toMessageHeaders();
|
||||||
|
}
|
||||||
|
|
||||||
|
public HapiMessageHeaders getHapiHeaders() {
|
||||||
|
return myHeaders;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void setHeaders(HapiMessageHeaders theHeaders) {
|
||||||
|
myHeaders = theHeaders;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,75 @@
|
||||||
|
package ca.uhn.fhir.rest.server.messaging.json;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.model.api.IModelJson;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import org.springframework.messaging.MessageHeaders;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is for holding headers for BaseJsonMessages. Any serializable data can be thrown into
|
||||||
|
* the header map. There are also three special headers, defined by the constants in this class, which are for use
|
||||||
|
* in message handling retrying. There are also matching helper functions for fetching those special variables; however
|
||||||
|
* they can also be accessed in standard map fashion with a `get` on the map.
|
||||||
|
*/
|
||||||
|
public class HapiMessageHeaders implements IModelJson {
|
||||||
|
public static final String RETRY_COUNT_KEY = "retryCount";
|
||||||
|
public static final String FIRST_FAILURE_KEY = "firstFailureTimestamp";
|
||||||
|
public static final String LAST_FAILURE_KEY = "lastFailureTimestamp";
|
||||||
|
|
||||||
|
@JsonProperty(RETRY_COUNT_KEY)
|
||||||
|
private Integer myRetryCount = 0;
|
||||||
|
@JsonProperty(FIRST_FAILURE_KEY)
|
||||||
|
private Long myFirstFailureTimestamp;
|
||||||
|
@JsonProperty(LAST_FAILURE_KEY)
|
||||||
|
private Long myLastFailureTimestamp;
|
||||||
|
|
||||||
|
@JsonProperty("customHeaders")
|
||||||
|
private final Map<String, Object> headers;
|
||||||
|
|
||||||
|
public HapiMessageHeaders(Map<String, Object> theHeaders) {
|
||||||
|
headers = theHeaders;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HapiMessageHeaders() {
|
||||||
|
headers = new HashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getRetryCount() {
|
||||||
|
return this.myRetryCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getFirstFailureDate() {
|
||||||
|
return this.myFirstFailureTimestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getLastFailureDate() {
|
||||||
|
return this.myLastFailureTimestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRetryCount(Integer theRetryCount) {
|
||||||
|
this.myRetryCount = theRetryCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLastFailureDate(Long theLastFailureDate) {
|
||||||
|
this.myLastFailureTimestamp = theLastFailureDate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setFirstFailureDate(Long theFirstFailureDate) {
|
||||||
|
this.myFirstFailureTimestamp = theFirstFailureDate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Object> getCustomHeaders() {
|
||||||
|
return this.headers;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageHeaders toMessageHeaders() {
|
||||||
|
Map<String, Object> returnedHeaders = new HashMap<>(this.headers);
|
||||||
|
returnedHeaders.put(RETRY_COUNT_KEY, myRetryCount);
|
||||||
|
returnedHeaders.put(FIRST_FAILURE_KEY, myFirstFailureTimestamp);
|
||||||
|
returnedHeaders.put(LAST_FAILURE_KEY, myLastFailureTimestamp);
|
||||||
|
return new MessageHeaders(returnedHeaders);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,49 @@
|
||||||
|
package ca.uhn.fhir.rest.server.messaging.json;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.rest.server.messaging.ResourceOperationMessage;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||||
|
|
||||||
|
public class ResourceOperationJsonMessage extends BaseJsonMessage<ResourceOperationMessage> {
|
||||||
|
|
||||||
|
|
||||||
|
@JsonProperty("payload")
|
||||||
|
private ResourceOperationMessage myPayload;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
*/
|
||||||
|
public ResourceOperationJsonMessage() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
*/
|
||||||
|
public ResourceOperationJsonMessage(ResourceOperationMessage thePayload) {
|
||||||
|
myPayload = thePayload;
|
||||||
|
setDefaultRetryHeaders();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ResourceOperationJsonMessage(HapiMessageHeaders theRetryMessageHeaders, ResourceOperationMessage thePayload) {
|
||||||
|
myPayload = thePayload;
|
||||||
|
setHeaders(theRetryMessageHeaders);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResourceOperationMessage getPayload() {
|
||||||
|
return myPayload;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPayload(ResourceOperationMessage thePayload) {
|
||||||
|
myPayload = thePayload;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return new ToStringBuilder(this)
|
||||||
|
.append("myPayload", myPayload)
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,33 @@
|
||||||
|
package ca.uhn.fhir.rest.server.messaging;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.rest.server.messaging.json.ResourceOperationJsonMessage;
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static ca.uhn.fhir.rest.server.messaging.json.HapiMessageHeaders.FIRST_FAILURE_KEY;
|
||||||
|
import static ca.uhn.fhir.rest.server.messaging.json.HapiMessageHeaders.LAST_FAILURE_KEY;
|
||||||
|
import static ca.uhn.fhir.rest.server.messaging.json.HapiMessageHeaders.RETRY_COUNT_KEY;
|
||||||
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
|
||||||
|
class ResourceOperationMessageTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSerializationAndDeserializationOfResourceModifiedMessage() throws JsonProcessingException {
|
||||||
|
ResourceOperationJsonMessage jsonMessage = new ResourceOperationJsonMessage();
|
||||||
|
jsonMessage.setPayload(new ResourceOperationMessage());
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
String serialized = mapper.writeValueAsString(jsonMessage);
|
||||||
|
jsonMessage = mapper.readValue(serialized, ResourceOperationJsonMessage.class);
|
||||||
|
|
||||||
|
assertThat(jsonMessage.getHapiHeaders().getRetryCount(), is(equalTo(0)));
|
||||||
|
assertThat(jsonMessage.getHapiHeaders().getFirstFailureDate(), is(equalTo(null)));
|
||||||
|
assertThat(jsonMessage.getHapiHeaders().getLastFailureDate(), is(equalTo(null)));
|
||||||
|
|
||||||
|
assertThat(jsonMessage.getHeaders().get(RETRY_COUNT_KEY), is(equalTo(0)));
|
||||||
|
assertThat(jsonMessage.getHeaders().get(FIRST_FAILURE_KEY), is(equalTo(null)));
|
||||||
|
assertThat(jsonMessage.getHeaders().get(LAST_FAILURE_KEY), is(equalTo(null)));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue