Configurable submit ResourceModifiedMessage immediately if of type Subscription, store in DB on failure (#6395)

* Attempt to submit ResourceModifiedMessage immediately, store in DB on failure

* Configurable immediate submission of Subscription

* Formating

* Change setting from JPA to Subscription Settings

* Formating

* Add configurable interval of subscription submission

* Add unit test covering the new logic for immediate subscription submission

* Credit for #6395

---------

Co-authored-by: James Agnew <jamesagnew@gmail.com>
This commit is contained in:
Michal Sevcik 2024-10-29 18:17:59 +01:00 committed by GitHub
parent b6c27cd933
commit ba05db1097
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 170 additions and 6 deletions

View File

@ -0,0 +1,7 @@
---
type: perf
issue: 6395
title: "A new configuration option has been added to `SubsciptionSubmitterConfig` which
causes Subscription resources to be submitted to the processing queue synchronously
instead of asynchronously as all other resources are. This is useful for cases where
subscriptions need to be activated quickly. Thanks to Michal Sevcik for the contribution!"

View File

@ -34,6 +34,7 @@ public abstract class BaseSubscriptionSettings {
public static final String DEFAULT_WEBSOCKET_CONTEXT_PATH = "/websocket"; public static final String DEFAULT_WEBSOCKET_CONTEXT_PATH = "/websocket";
public static final String DEFAULT_RESTHOOK_ENDPOINTURL_VALIDATION_REGEX = public static final String DEFAULT_RESTHOOK_ENDPOINTURL_VALIDATION_REGEX =
"((((http?|https?)://))([-%()_.!~*';/?:@&=+$,A-Za-z0-9])+)"; "((((http?|https?)://))([-%()_.!~*';/?:@&=+$,A-Za-z0-9])+)";
public static final long DEFAULT_SUBMISSION_INTERVAL_IN_MS = 5000;
private final Set<Subscription.SubscriptionChannelType> mySupportedSubscriptionTypes = new HashSet<>(); private final Set<Subscription.SubscriptionChannelType> mySupportedSubscriptionTypes = new HashSet<>();
private String myEmailFromAddress = DEFAULT_EMAIL_FROM_ADDRESS; private String myEmailFromAddress = DEFAULT_EMAIL_FROM_ADDRESS;
@ -42,6 +43,7 @@ public abstract class BaseSubscriptionSettings {
private boolean myCrossPartitionSubscriptionEnabled = true; private boolean myCrossPartitionSubscriptionEnabled = true;
private boolean myEnableInMemorySubscriptionMatching = true; private boolean myEnableInMemorySubscriptionMatching = true;
private boolean myTriggerSubscriptionsForNonVersioningChanges; private boolean myTriggerSubscriptionsForNonVersioningChanges;
private long mySubmissionIntervalInMs = DEFAULT_SUBMISSION_INTERVAL_IN_MS;
/** /**
* @since 6.8.0 * @since 6.8.0
@ -50,7 +52,15 @@ public abstract class BaseSubscriptionSettings {
private boolean myAllowOnlyInMemorySubscriptions = false; private boolean myAllowOnlyInMemorySubscriptions = false;
/** /**
* @since 7.6.0 * If this is enabled (default is {@literal false}), changes to Subscription resource would be put on queue immediately.
* Reducing delay between creation of the Subscription and Activation.
*
* @since 7.8.0
*/
private boolean mySubscriptionChangeQueuedImmediately = false;
/**
* @since 7.8.0
* *
* Regex To perform validation on the endpoint URL for Subscription of type RESTHOOK. * Regex To perform validation on the endpoint URL for Subscription of type RESTHOOK.
*/ */
@ -274,4 +284,42 @@ public abstract class BaseSubscriptionSettings {
public boolean hasRestHookEndpointUrlValidationRegex() { public boolean hasRestHookEndpointUrlValidationRegex() {
return isNotBlank(myRestHookEndpointUrlValidationRegex); return isNotBlank(myRestHookEndpointUrlValidationRegex);
} }
/**
* If this is enabled (default is {@literal false}), changes to Subscription resource would be put on queue immediately.
* Reducing delay between creation of the Subscription and Activation.
*
* @since 7.8.0
*/
public boolean isSubscriptionChangeQueuedImmediately() {
return mySubscriptionChangeQueuedImmediately;
}
/**
* If this is enabled (default is {@literal false}), changes to Subscription resource would be put on queue immediately.
* Reducing delay between creation of the Subscription and Activation.
*
* @since 7.8.0
*/
public void setSubscriptionChangeQueuedImmediately(boolean theSubscriptionChangeQueuedImmediately) {
mySubscriptionChangeQueuedImmediately = theSubscriptionChangeQueuedImmediately;
}
/**
* The interval in which the Resource Changes will be polled from DB. Defaults to {@literal 5000}.
*
* @since 7.7.0
*/
public long getSubscriptionIntervalInMs() {
return mySubmissionIntervalInMs;
}
/**
* The interval in which the Resource Changes will be polled from DB. Defaults to {@literal 5000}.
*
* @since 7.7.0
*/
public void setSubscriptionIntervalInMs(long theSubscriptionIntervalInMs) {
mySubmissionIntervalInMs = theSubscriptionIntervalInMs;
}
} }

View File

@ -100,8 +100,9 @@ public class SubscriptionSubmitterConfig {
} }
@Bean @Bean
public AsyncResourceModifiedProcessingSchedulerSvc asyncResourceModifiedProcessingSchedulerSvc() { public AsyncResourceModifiedProcessingSchedulerSvc asyncResourceModifiedProcessingSchedulerSvc(
return new AsyncResourceModifiedProcessingSchedulerSvc(); SubscriptionConfig subscriptionConfig, SubscriptionSettings theSubscriptionSettings) {
return new AsyncResourceModifiedProcessingSchedulerSvc(theSubscriptionSettings.getSubscriptionIntervalInMs());
} }
@Bean @Bean

View File

@ -27,7 +27,9 @@ import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.model.config.SubscriptionSettings; import ca.uhn.fhir.jpa.model.config.SubscriptionSettings;
import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage; import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
@ -37,6 +39,7 @@ import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageDeliveryException;
import static java.util.Objects.isNull; import static java.util.Objects.isNull;
import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isBlank;
@ -65,6 +68,9 @@ public class SubscriptionMatcherInterceptor {
@Autowired @Autowired
private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc; private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
@Autowired
private IResourceModifiedConsumer myResourceModifiedConsumer;
/** /**
* Constructor * Constructor
*/ */
@ -124,8 +130,27 @@ public class SubscriptionMatcherInterceptor {
} }
protected void processResourceModifiedMessage(ResourceModifiedMessage theResourceModifiedMessage) { protected void processResourceModifiedMessage(ResourceModifiedMessage theResourceModifiedMessage) {
// persist the message for async submission to the processing pipeline. see {@link // Persist the message for async submission to the processing pipeline.
// AsyncResourceModifiedProcessingSchedulerSvc} // see {@link AsyncResourceModifiedProcessingSchedulerSvc}
// If enabled in {@link JpaStorageSettings} the subscription will be handled immediately.
if (mySubscriptionSettings.isSubscriptionChangeQueuedImmediately()
&& theResourceModifiedMessage.hasPayloadType(myFhirContext, "Subscription")) {
try {
myResourceModifiedConsumer.submitResourceModified(theResourceModifiedMessage);
return;
} catch (MessageDeliveryException exception) {
String payloadId = theResourceModifiedMessage.getPayloadId();
String subscriptionId = theResourceModifiedMessage.getSubscriptionId();
ourLog.error(
"Channel submission failed for resource with id {} matching subscription with id {}. Further attempts will be performed at later time.",
payloadId,
subscriptionId,
exception);
}
}
IPersistedResourceModifiedMessage persistedResourceModifiedMessage =
myResourceModifiedMessagePersistenceSvc.persist(theResourceModifiedMessage); myResourceModifiedMessagePersistenceSvc.persist(theResourceModifiedMessage);
} }

View File

@ -0,0 +1,79 @@
package ca.uhn.fhir.jpa.subscription.submit.interceptor;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.model.config.SubscriptionSettings;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.messaging.MessageDeliveryException;
import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
class SubscriptionMatcherInterceptorTest {
@Mock
private FhirContext myFhirContext;
@Mock
private SubscriptionSettings mySubscriptionSettings;
@Mock
private IResourceModifiedConsumer myResourceModifiedConsumer;
@Mock
private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
@Mock
private ResourceModifiedMessage theResourceModifiedMessage;
@InjectMocks
private SubscriptionMatcherInterceptor subscriptionMatcherInterceptor;
@Test
void testProcessResourceModifiedMessageQueuedImmediatelySuccess() {
// Arrange
when(mySubscriptionSettings.isSubscriptionChangeQueuedImmediately()).thenReturn(true);
when(theResourceModifiedMessage.hasPayloadType(myFhirContext, "Subscription")).thenReturn(true);
// Act
subscriptionMatcherInterceptor.processResourceModifiedMessage(theResourceModifiedMessage);
// Assert
verify(myResourceModifiedConsumer, times(1)).submitResourceModified(theResourceModifiedMessage);
verify(myResourceModifiedMessagePersistenceSvc, never()).persist(any());
}
@Test
void testProcessResourceModifiedMessageQueuedImmediatelyFailure() {
// Arrange
when(mySubscriptionSettings.isSubscriptionChangeQueuedImmediately()).thenReturn(true);
when(theResourceModifiedMessage.hasPayloadType(myFhirContext, "Subscription")).thenReturn(true);
doThrow(new MessageDeliveryException("Submission failure")).when(myResourceModifiedConsumer).submitResourceModified(theResourceModifiedMessage);
// Act
subscriptionMatcherInterceptor.processResourceModifiedMessage(theResourceModifiedMessage);
// Assert
verify(myResourceModifiedConsumer, times(1)).submitResourceModified(theResourceModifiedMessage);
verify(myResourceModifiedMessagePersistenceSvc, times(1)).persist(theResourceModifiedMessage);
}
@Test
void testProcessResourceModifiedMessageNotQueuedImmediately() {
// Arrange
when(mySubscriptionSettings.isSubscriptionChangeQueuedImmediately()).thenReturn(false);
// Act
subscriptionMatcherInterceptor.processResourceModifiedMessage(theResourceModifiedMessage);
// Assert
verify(myResourceModifiedConsumer, never()).submitResourceModified(any());
verify(myResourceModifiedMessagePersistenceSvc, times(1)).persist(theResourceModifiedMessage);
}
}

View File

@ -948,6 +948,10 @@
<id>Boereck</id> <id>Boereck</id>
<name>Max Bureck</name> <name>Max Bureck</name>
</developer> </developer>
<developer>
<id>SevcikMichal</id>
<name>Michal Sevcik</name>
</developer>
</developers> </developers>
<licenses> <licenses>