From 44ed3ee354576cf47ea9dcdbd20352504bb21476 Mon Sep 17 00:00:00 2001 From: Ken Stevens Date: Tue, 4 Oct 2022 20:37:40 -0400 Subject: [PATCH] mdm message key (#4111) * begin with failing test * fixed 2 tests * fix tests * fix tests * change log Co-authored-by: Ken Stevens --- .../changelog/6_2_0/4111-mdm-message-key.yaml | 5 +++++ .../SubscriptionDeliveringMessageSubscriber.java | 6 +++--- .../messaging/BaseResourceModifiedMessage.java | 5 ++++- .../messaging/json/BaseJsonMessageTest.java | 15 ++++++++++++++- .../subscription/model/CanonicalSubscription.java | 3 +++ .../model/ResourceDeliveryMessage.java | 6 +++++- 6 files changed, 34 insertions(+), 6 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_2_0/4111-mdm-message-key.yaml diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_2_0/4111-mdm-message-key.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_2_0/4111-mdm-message-key.yaml new file mode 100644 index 00000000000..9d23f25b8a9 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_2_0/4111-mdm-message-key.yaml @@ -0,0 +1,5 @@ +--- +type: fix +issue: 4111 +title: "MDM messages were using the resource id as a message key when it should be using the EID as a partition hash key. +This could lead to duplicate golden resources on systems using Kafka as a message broker." diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/message/SubscriptionDeliveringMessageSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/message/SubscriptionDeliveringMessageSubscriber.java index b5a8285a798..74893545f1d 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/message/SubscriptionDeliveringMessageSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/message/SubscriptionDeliveringMessageSubscriber.java @@ -55,9 +55,9 @@ public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDel myChannelFactory = theChannelFactory; } - protected void doDelivery(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, IChannelProducer theChannelProducer, ResourceModifiedJsonMessage theMessage) { - theChannelProducer.send(theMessage); - ourLog.debug("Delivering {} message payload {} for {}", theMsg.getOperationType(), theMsg.getPayloadId(), theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue()); + protected void doDelivery(ResourceDeliveryMessage theSourceMessage, CanonicalSubscription theSubscription, IChannelProducer theChannelProducer, ResourceModifiedJsonMessage theWrappedMessageToSend) { + theChannelProducer.send(theWrappedMessageToSend); + ourLog.debug("Delivering {} message payload {} for {}", theSourceMessage.getOperationType(), theSourceMessage.getPayloadId(), theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue()); } private ResourceModifiedJsonMessage convertDeliveryMessageToResourceModifiedMessage(ResourceDeliveryMessage theMsg) { diff --git a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/messaging/BaseResourceModifiedMessage.java b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/messaging/BaseResourceModifiedMessage.java index 74a749e8850..977484d3ab2 100644 --- a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/messaging/BaseResourceModifiedMessage.java +++ b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/messaging/BaseResourceModifiedMessage.java @@ -79,7 +79,7 @@ public abstract class BaseResourceModifiedMessage extends BaseResourceMessage im public void setPayloadId(IIdType thePayloadId) { myPayloadId = null; if (thePayloadId != null) { - myPayloadId = thePayloadId.getValue(); + myPayloadId = thePayloadId.toUnqualifiedVersionless().getValue(); } } @@ -207,6 +207,9 @@ public abstract class BaseResourceModifiedMessage extends BaseResourceMessage im @Nullable @Override public String getMessageKeyOrNull() { + if (super.getMessageKeyOrNull() != null) { + return super.getMessageKeyOrNull(); + } return myPayloadId; } diff --git a/hapi-fhir-storage-test-utilities/src/test/java/ca/uhn/fhir/rest/server/messaging/json/BaseJsonMessageTest.java b/hapi-fhir-storage-test-utilities/src/test/java/ca/uhn/fhir/rest/server/messaging/json/BaseJsonMessageTest.java index 7316950df9d..6912522eaef 100644 --- a/hapi-fhir-storage-test-utilities/src/test/java/ca/uhn/fhir/rest/server/messaging/json/BaseJsonMessageTest.java +++ b/hapi-fhir-storage-test-utilities/src/test/java/ca/uhn/fhir/rest/server/messaging/json/BaseJsonMessageTest.java @@ -5,6 +5,7 @@ import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; +import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.rest.api.EncodingEnum; import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage; import ca.uhn.fhir.rest.server.messaging.ResourceOperationMessage; @@ -19,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; class BaseJsonMessageTest { FhirContext ourFhirContext = FhirContext.forR4Cached(); static final String RESOURCE_ID = "Patient/123"; + static final String MESSAGE_KEY = "MY_TEST_KEY"; @Test void test_messageKeyIsResourceId_ResourceOperationJsonMessage() { @@ -32,7 +34,7 @@ class BaseJsonMessageTest { @Nonnull private static IBaseResource buildPatient() { IBaseResource patient = new Patient(); - patient.setId(RESOURCE_ID); + patient.setId(new IdDt("Patient", RESOURCE_ID, "1")); return patient; } @@ -46,6 +48,17 @@ class BaseJsonMessageTest { assertEquals(RESOURCE_ID, message.getMessageKeyOrNull()); } + @Test + void test_messageKeyIsResourceId_MdmResourceDeliveryJsonMessage() { + ResourceDeliveryJsonMessage message = new ResourceDeliveryJsonMessage(); + IBaseResource patient = buildPatient(); + ResourceDeliveryMessage payload = new ResourceDeliveryMessage(); + payload.setPayload(ourFhirContext, patient, EncodingEnum.JSON); + payload.setMessageKey(MESSAGE_KEY); + message.setPayload(payload); + assertEquals(MESSAGE_KEY, message.getMessageKeyOrNull()); + } + @Test void test_messageKeyIsResourceId_ResourceModifiedJsonMessage() { ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(); diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/model/CanonicalSubscription.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/model/CanonicalSubscription.java index b27a939e234..f9c3f4661cd 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/model/CanonicalSubscription.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/model/CanonicalSubscription.java @@ -154,6 +154,9 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso } public Map getTags() { + if (myTags == null) { + myTags = new HashMap<>(); + } return myTags; } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/model/ResourceDeliveryMessage.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/model/ResourceDeliveryMessage.java index 3a5a4bbb01b..c1bdde06709 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/model/ResourceDeliveryMessage.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/model/ResourceDeliveryMessage.java @@ -101,7 +101,7 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes * in tests) */ myPayloadString = theEncoding.newParser(theCtx).encodeResourceToString(thePayload); - myPayloadId = thePayload.getIdElement().toUnqualified().getValue(); + myPayloadId = thePayload.getIdElement().toUnqualifiedVersionless().getValue(); } @Override @@ -153,6 +153,10 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes @Nullable @Override public String getMessageKeyOrNull() { + if (super.getMessageKeyOrNull() != null) { + return super.getMessageKeyOrNull(); + } + return myPayloadId; } }