Subscription cleanup (#3422)

* fix log message

* improve unit test coverage

* make js subscription delivery handler a separate kind of handler so executor is automatically destroyed when handler is destroyed

* Significant Overhaul.  Return everything to the handler.

* test passes
cleaned up rest hook subscriber

* ensure deleting subscription destroys JavaScript engine

* add permission

* pre-review cleanup.  remove unneccesary api-breaking changes.

* pre-review cleanup.  remove unneccesary api-breaking changes.

* pre-review cleanup.

* review feedback

* review feedback
This commit is contained in:
Ken Stevens 2022-02-24 13:55:16 -05:00 committed by GitHub
parent ca6381a52a
commit 6968cdfcfc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 107 additions and 61 deletions

View File

@ -54,6 +54,13 @@ public class SubscriptionChannelWithHandlers implements Closeable {
if (mySubscribableChannel != null) {
mySubscribableChannel.unsubscribe(theMessageHandler);
}
if (theMessageHandler instanceof DisposableBean) {
try {
((DisposableBean) theMessageHandler).destroy();
} catch (Exception e) {
ourLog.warn("Could not destroy {} handler for {}", theMessageHandler.getClass().getSimpleName(), myChannelName, e);
}
}
}
@VisibleForTesting

View File

@ -74,6 +74,8 @@ public class SubscriptionLoader implements IResourceChangeListener {
private ISearchParamRegistry mySearchParamRegistry;
@Autowired
private IResourceChangeListenerRegistry myResourceChangeListenerRegistry;
@Autowired
private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
private SearchParameterMap mySearchParameterMap;
private SystemRequestDetails mySystemRequestDetails;
@ -148,7 +150,7 @@ public class SubscriptionLoader implements IResourceChangeListener {
synchronized (mySyncSubscriptionsLock) {
ourLog.debug("Starting sync subscriptions");
IBundleProvider subscriptionBundleList = getSubscriptionDao().search(mySearchParameterMap, mySystemRequestDetails);
IBundleProvider subscriptionBundleList = getSubscriptionDao().search(mySearchParameterMap, mySystemRequestDetails);
Integer subscriptionCount = subscriptionBundleList.size();
assert subscriptionCount != null;
@ -188,14 +190,9 @@ public class SubscriptionLoader implements IResourceChangeListener {
String nextId = resource.getIdElement().getIdPart();
allIds.add(nextId);
// internally, subscriptions that cannot activate
// will be set to error
boolean activated = mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(resource);
boolean activated = activateSubscriptionIfRequested(resource);
if (activated) {
activatedCount++;
}
else {
logSubscriptionNotActivatedPlusErrorIfPossible(resource);
++activatedCount;
}
boolean registered = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(resource);
@ -209,22 +206,35 @@ public class SubscriptionLoader implements IResourceChangeListener {
return activatedCount;
}
/**
* @param theSubscription
* @return true if activated
*/
private boolean activateSubscriptionIfRequested(IBaseResource theSubscription) {
if (SubscriptionConstants.REQUESTED_STATUS.equals(mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription))) {
// internally, subscriptions that cannot activate will be set to error
if (mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(theSubscription)) {
return true;
}
logSubscriptionNotActivatedPlusErrorIfPossible(theSubscription);
}
return false;
}
/**
* Logs
*
* @param theSubscription
*/
private void logSubscriptionNotActivatedPlusErrorIfPossible(IBaseResource theSubscription) {
String error;
if (theSubscription instanceof Subscription) {
error = ((Subscription) theSubscription).getError();
}
else if (theSubscription instanceof org.hl7.fhir.dstu3.model.Subscription) {
} else if (theSubscription instanceof org.hl7.fhir.dstu3.model.Subscription) {
error = ((org.hl7.fhir.dstu3.model.Subscription) theSubscription).getError();
}
else if (theSubscription instanceof org.hl7.fhir.dstu2.model.Subscription) {
} else if (theSubscription instanceof org.hl7.fhir.dstu2.model.Subscription) {
error = ((org.hl7.fhir.dstu2.model.Subscription) theSubscription).getError();
}
else {
} else {
error = "";
}
ourLog.error("Subscription "

View File

@ -49,7 +49,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Objects;
import static org.apache.commons.lang3.StringUtils.isBlank;
@Interceptor
@ -110,16 +109,7 @@ public class SubscriptionValidatingInterceptor {
break;
}
// If the subscription has the cross partition tag &&
if (SubscriptionUtil.isCrossPartition(theSubscription) && !(theRequestDetails instanceof SystemRequestDetails)) {
if (!myDaoConfig.isCrossPartitionSubscription()){
throw new UnprocessableEntityException(Msg.code(2009) + "Cross partition subscription is not enabled on this server");
}
if (!determinePartition(theRequestDetails, theSubscription).isDefaultPartition()) {
throw new UnprocessableEntityException(Msg.code(2010) + "Cross partition subscription must be created on the default partition");
}
}
validatePermissions(theSubscription, subscription, theRequestDetails);
mySubscriptionCanonicalizer.setMatchingStrategyTag(theSubscription, null);
@ -150,6 +140,19 @@ public class SubscriptionValidatingInterceptor {
}
}
protected void validatePermissions(IBaseResource theSubscription, CanonicalSubscription theCanonicalSubscription, RequestDetails theRequestDetails) {
// If the subscription has the cross partition tag
if (SubscriptionUtil.isCrossPartition(theSubscription) && !(theRequestDetails instanceof SystemRequestDetails)) {
if (!myDaoConfig.isCrossPartitionSubscription()){
throw new UnprocessableEntityException(Msg.code(2009) + "Cross partition subscription is not enabled on this server");
}
if (!determinePartition(theRequestDetails, theSubscription).isDefaultPartition()) {
throw new UnprocessableEntityException(Msg.code(2010) + "Cross partition subscription must be created on the default partition");
}
}
}
private RequestPartitionId determinePartition(RequestDetails theRequestDetails, IBaseResource theResource) {
switch (theRequestDetails.getRestOperationType()) {
case CREATE:

View File

@ -15,13 +15,18 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import java.util.Optional;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class SubscriptionChannelRegistryTest {
@ -51,20 +56,20 @@ public class SubscriptionChannelRegistryTest {
ActiveSubscription activeSubscription = createActiveSubscription(channelName, retryCount);
// mocks
MessageHandler messageHandler = Mockito.mock(MessageHandler.class);
IChannelReceiver receiver = Mockito.mock(IChannelReceiver.class);
IChannelProducer producer = Mockito.mock(IChannelProducer.class);
MessageHandler messageHandler = mock(MessageHandler.class);
IChannelReceiver receiver = mock(IChannelReceiver.class);
IChannelProducer producer = mock(IChannelProducer.class);
// when
Mockito.when(mySubscriptionChannelFactory.newDeliveryReceivingChannel(
Mockito.anyString(),
Mockito.any(ChannelConsumerSettings.class)
when(mySubscriptionChannelFactory.newDeliveryReceivingChannel(
anyString(),
any(ChannelConsumerSettings.class)
)).thenReturn(receiver);
Mockito.when(mySubscriptionChannelFactory.newDeliverySendingChannel(
Mockito.anyString(),
Mockito.any(ChannelProducerSettings.class)
when(mySubscriptionChannelFactory.newDeliverySendingChannel(
anyString(),
any(ChannelProducerSettings.class)
)).thenReturn(producer);
Mockito.when(mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(Mockito.any(CanonicalSubscriptionChannelType.class)))
when(mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(any(CanonicalSubscriptionChannelType.class)))
.thenReturn(Optional.of(messageHandler));
// test
@ -81,15 +86,15 @@ public class SubscriptionChannelRegistryTest {
// verify the creation of the sender/receiver
// both have retry values provided
ArgumentCaptor<ChannelConsumerSettings> consumerCaptor = ArgumentCaptor.forClass(ChannelConsumerSettings.class);
Mockito.verify(mySubscriptionChannelFactory)
.newDeliveryReceivingChannel(Mockito.anyString(),
verify(mySubscriptionChannelFactory)
.newDeliveryReceivingChannel(anyString(),
consumerCaptor.capture());
ChannelConsumerSettings consumerSettings = consumerCaptor.getValue();
verifySettingsHaveRetryConfig(consumerSettings, retryCount);
ArgumentCaptor<ChannelProducerSettings> producerCaptor = ArgumentCaptor.forClass(ChannelProducerSettings.class);
Mockito.verify(mySubscriptionChannelFactory)
.newDeliverySendingChannel(Mockito.anyString(),
verify(mySubscriptionChannelFactory)
.newDeliverySendingChannel(anyString(),
producerCaptor.capture());
verifySettingsHaveRetryConfig(producerCaptor.getValue(), retryCount);
}

View File

@ -9,7 +9,6 @@ import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
@ -27,7 +26,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.LoggerFactory;
@ -35,6 +33,13 @@ import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class SubscriptionLoaderTest {
@ -68,6 +73,9 @@ public class SubscriptionLoaderTest {
@Mock
private IResourceChangeListenerCache mySubscriptionCache;
@Mock
private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
@InjectMocks
private SubscriptionLoader mySubscriptionLoader;
@ -80,11 +88,11 @@ public class SubscriptionLoaderTest {
myStoredLogLevel = ourLogger.getLevel();
ourLogger.addAppender(myAppender);
Mockito.when(myResourceChangeListenerRegistry.registerResourceResourceChangeListener(
Mockito.anyString(),
Mockito.any(SearchParameterMap.class),
Mockito.any(SubscriptionLoader.class),
Mockito.anyLong()
when(myResourceChangeListenerRegistry.registerResourceResourceChangeListener(
anyString(),
any(SearchParameterMap.class),
any(SubscriptionLoader.class),
anyLong()
)).thenReturn(mySubscriptionCache);
mySubscriptionLoader.registerListener();
@ -109,34 +117,36 @@ public class SubscriptionLoaderTest {
Subscription subscription = new Subscription();
subscription.setId("Subscription/123");
subscription.setError("THIS IS AN ERROR");
IFhirResourceDao<Subscription> subscriptionDao = Mockito.mock(IFhirResourceDao.class);
IFhirResourceDao<Subscription> subscriptionDao = mock(IFhirResourceDao.class);
ourLogger.setLevel(Level.ERROR);
// when
Mockito.when(myDaoRegistery.getSubscriptionDao())
when(myDaoRegistery.getSubscriptionDao())
.thenReturn(subscriptionDao);
Mockito.when(myDaoRegistery.isResourceTypeSupported(Mockito.anyString()))
when(myDaoRegistery.isResourceTypeSupported(anyString()))
.thenReturn(true);
Mockito.when(subscriptionDao.search(Mockito.any(SearchParameterMap.class), Mockito.any(SystemRequestDetails.class)))
when(subscriptionDao.search(any(SearchParameterMap.class), any(SystemRequestDetails.class)))
.thenReturn(getSubscriptionList(
Collections.singletonList(subscription)
));
Mockito.when(mySchedulerSvc.isStopping())
when(mySchedulerSvc.isStopping())
.thenReturn(false);
Mockito.when(mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(Mockito.any(IBaseResource.class)))
when(mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(any(IBaseResource.class)))
.thenReturn(false);
when(mySubscriptionCanonicalizer.getSubscriptionStatus(any())).thenReturn(SubscriptionConstants.REQUESTED_STATUS);
// test
mySubscriptionLoader.syncSubscriptions();
// verify
Mockito.verify(subscriptionDao)
.search(Mockito.any(SearchParameterMap.class), Mockito.any(SystemRequestDetails.class));
verify(subscriptionDao)
.search(any(SearchParameterMap.class), any(SystemRequestDetails.class));
ArgumentCaptor<ILoggingEvent> eventCaptor = ArgumentCaptor.forClass(ILoggingEvent.class);
Mockito.verify(myAppender).doAppend(eventCaptor.capture());
verify(myAppender).doAppend(eventCaptor.capture());
String actual = "Subscription "
+ subscription.getIdElement().getIdPart()
+ " could not be activated.";

View File

@ -23,6 +23,7 @@ package ca.uhn.fhir.rest.server.messaging;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.Validate;
@ -172,10 +173,20 @@ public abstract class BaseResourceMessage implements IResourceMessage, IModelJso
}
public enum OperationTypeEnum {
CREATE,
UPDATE,
DELETE,
MANUALLY_TRIGGERED,
TRANSACTION
CREATE(RestOperationTypeEnum.CREATE),
UPDATE(RestOperationTypeEnum.UPDATE),
DELETE(RestOperationTypeEnum.DELETE),
MANUALLY_TRIGGERED(RestOperationTypeEnum.UPDATE),
TRANSACTION(RestOperationTypeEnum.UPDATE);
private final RestOperationTypeEnum myRestOperationTypeEnum;
OperationTypeEnum(RestOperationTypeEnum theRestOperationTypeEnum) {
myRestOperationTypeEnum = theRestOperationTypeEnum;
}
public RestOperationTypeEnum asRestOperationType() {
return myRestOperationTypeEnum;
}
}
}