mdm message key (#4111)

* begin with failing test

* fixed 2 tests

* fix tests

* fix tests

* change log

Co-authored-by: Ken Stevens <ken@smilecdr.com>
This commit is contained in:
Ken Stevens 2022-10-04 20:37:40 -04:00 committed by GitHub
parent 40163f73d2
commit 44ed3ee354
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 34 additions and 6 deletions

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 4111
title: "MDM messages were using the resource id as a message key when it should be using the EID as a partition hash key.
This could lead to duplicate golden resources on systems using Kafka as a message broker."

View File

@ -55,9 +55,9 @@ public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDel
myChannelFactory = theChannelFactory; myChannelFactory = theChannelFactory;
} }
protected void doDelivery(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, IChannelProducer theChannelProducer, ResourceModifiedJsonMessage theMessage) { protected void doDelivery(ResourceDeliveryMessage theSourceMessage, CanonicalSubscription theSubscription, IChannelProducer theChannelProducer, ResourceModifiedJsonMessage theWrappedMessageToSend) {
theChannelProducer.send(theMessage); theChannelProducer.send(theWrappedMessageToSend);
ourLog.debug("Delivering {} message payload {} for {}", theMsg.getOperationType(), theMsg.getPayloadId(), theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue()); ourLog.debug("Delivering {} message payload {} for {}", theSourceMessage.getOperationType(), theSourceMessage.getPayloadId(), theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue());
} }
private ResourceModifiedJsonMessage convertDeliveryMessageToResourceModifiedMessage(ResourceDeliveryMessage theMsg) { private ResourceModifiedJsonMessage convertDeliveryMessageToResourceModifiedMessage(ResourceDeliveryMessage theMsg) {

View File

@ -79,7 +79,7 @@ public abstract class BaseResourceModifiedMessage extends BaseResourceMessage im
public void setPayloadId(IIdType thePayloadId) { public void setPayloadId(IIdType thePayloadId) {
myPayloadId = null; myPayloadId = null;
if (thePayloadId != null) { if (thePayloadId != null) {
myPayloadId = thePayloadId.getValue(); myPayloadId = thePayloadId.toUnqualifiedVersionless().getValue();
} }
} }
@ -207,6 +207,9 @@ public abstract class BaseResourceModifiedMessage extends BaseResourceMessage im
@Nullable @Nullable
@Override @Override
public String getMessageKeyOrNull() { public String getMessageKeyOrNull() {
if (super.getMessageKeyOrNull() != null) {
return super.getMessageKeyOrNull();
}
return myPayloadId; return myPayloadId;
} }

View File

@ -5,6 +5,7 @@ import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.EncodingEnum; import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage; import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
import ca.uhn.fhir.rest.server.messaging.ResourceOperationMessage; import ca.uhn.fhir.rest.server.messaging.ResourceOperationMessage;
@ -19,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
class BaseJsonMessageTest { class BaseJsonMessageTest {
FhirContext ourFhirContext = FhirContext.forR4Cached(); FhirContext ourFhirContext = FhirContext.forR4Cached();
static final String RESOURCE_ID = "Patient/123"; static final String RESOURCE_ID = "Patient/123";
static final String MESSAGE_KEY = "MY_TEST_KEY";
@Test @Test
void test_messageKeyIsResourceId_ResourceOperationJsonMessage() { void test_messageKeyIsResourceId_ResourceOperationJsonMessage() {
@ -32,7 +34,7 @@ class BaseJsonMessageTest {
@Nonnull @Nonnull
private static IBaseResource buildPatient() { private static IBaseResource buildPatient() {
IBaseResource patient = new Patient(); IBaseResource patient = new Patient();
patient.setId(RESOURCE_ID); patient.setId(new IdDt("Patient", RESOURCE_ID, "1"));
return patient; return patient;
} }
@ -46,6 +48,17 @@ class BaseJsonMessageTest {
assertEquals(RESOURCE_ID, message.getMessageKeyOrNull()); assertEquals(RESOURCE_ID, message.getMessageKeyOrNull());
} }
@Test
void test_messageKeyIsResourceId_MdmResourceDeliveryJsonMessage() {
ResourceDeliveryJsonMessage message = new ResourceDeliveryJsonMessage();
IBaseResource patient = buildPatient();
ResourceDeliveryMessage payload = new ResourceDeliveryMessage();
payload.setPayload(ourFhirContext, patient, EncodingEnum.JSON);
payload.setMessageKey(MESSAGE_KEY);
message.setPayload(payload);
assertEquals(MESSAGE_KEY, message.getMessageKeyOrNull());
}
@Test @Test
void test_messageKeyIsResourceId_ResourceModifiedJsonMessage() { void test_messageKeyIsResourceId_ResourceModifiedJsonMessage() {
ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(); ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage();

View File

@ -154,6 +154,9 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso
} }
public Map<String, String> getTags() { public Map<String, String> getTags() {
if (myTags == null) {
myTags = new HashMap<>();
}
return myTags; return myTags;
} }

View File

@ -101,7 +101,7 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes
* in tests) * in tests)
*/ */
myPayloadString = theEncoding.newParser(theCtx).encodeResourceToString(thePayload); myPayloadString = theEncoding.newParser(theCtx).encodeResourceToString(thePayload);
myPayloadId = thePayload.getIdElement().toUnqualified().getValue(); myPayloadId = thePayload.getIdElement().toUnqualifiedVersionless().getValue();
} }
@Override @Override
@ -153,6 +153,10 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes
@Nullable @Nullable
@Override @Override
public String getMessageKeyOrNull() { public String getMessageKeyOrNull() {
if (super.getMessageKeyOrNull() != null) {
return super.getMessageKeyOrNull();
}
return myPayloadId; return myPayloadId;
} }
} }