diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelWithHandlers.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelWithHandlers.java index 2ae53903e4a..25b9fddda00 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelWithHandlers.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelWithHandlers.java @@ -54,6 +54,13 @@ public class SubscriptionChannelWithHandlers implements Closeable { if (mySubscribableChannel != null) { mySubscribableChannel.unsubscribe(theMessageHandler); } + if (theMessageHandler instanceof DisposableBean) { + try { + ((DisposableBean) theMessageHandler).destroy(); + } catch (Exception e) { + ourLog.warn("Could not destroy {} handler for {}", theMessageHandler.getClass().getSimpleName(), myChannelName, e); + } + } } @VisibleForTesting diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoader.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoader.java index 80060a5f849..4d7ffed2cef 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoader.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoader.java @@ -74,6 +74,8 @@ public class SubscriptionLoader implements IResourceChangeListener { private ISearchParamRegistry mySearchParamRegistry; @Autowired private IResourceChangeListenerRegistry myResourceChangeListenerRegistry; + @Autowired + private SubscriptionCanonicalizer mySubscriptionCanonicalizer; private SearchParameterMap mySearchParameterMap; private SystemRequestDetails mySystemRequestDetails; @@ -148,7 +150,7 @@ public class SubscriptionLoader implements IResourceChangeListener { synchronized (mySyncSubscriptionsLock) { ourLog.debug("Starting sync subscriptions"); - IBundleProvider subscriptionBundleList = getSubscriptionDao().search(mySearchParameterMap, mySystemRequestDetails); + IBundleProvider subscriptionBundleList = getSubscriptionDao().search(mySearchParameterMap, mySystemRequestDetails); Integer subscriptionCount = subscriptionBundleList.size(); assert subscriptionCount != null; @@ -188,14 +190,9 @@ public class SubscriptionLoader implements IResourceChangeListener { String nextId = resource.getIdElement().getIdPart(); allIds.add(nextId); - // internally, subscriptions that cannot activate - // will be set to error - boolean activated = mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(resource); + boolean activated = activateSubscriptionIfRequested(resource); if (activated) { - activatedCount++; - } - else { - logSubscriptionNotActivatedPlusErrorIfPossible(resource); + ++activatedCount; } boolean registered = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(resource); @@ -209,22 +206,35 @@ public class SubscriptionLoader implements IResourceChangeListener { return activatedCount; } + /** + * @param theSubscription + * @return true if activated + */ + private boolean activateSubscriptionIfRequested(IBaseResource theSubscription) { + if (SubscriptionConstants.REQUESTED_STATUS.equals(mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription))) { + // internally, subscriptions that cannot activate will be set to error + if (mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(theSubscription)) { + return true; + } + logSubscriptionNotActivatedPlusErrorIfPossible(theSubscription); + } + return false; + } + /** * Logs + * * @param theSubscription */ private void logSubscriptionNotActivatedPlusErrorIfPossible(IBaseResource theSubscription) { String error; if (theSubscription instanceof Subscription) { error = ((Subscription) theSubscription).getError(); - } - else if (theSubscription instanceof org.hl7.fhir.dstu3.model.Subscription) { + } else if (theSubscription instanceof org.hl7.fhir.dstu3.model.Subscription) { error = ((org.hl7.fhir.dstu3.model.Subscription) theSubscription).getError(); - } - else if (theSubscription instanceof org.hl7.fhir.dstu2.model.Subscription) { + } else if (theSubscription instanceof org.hl7.fhir.dstu2.model.Subscription) { error = ((org.hl7.fhir.dstu2.model.Subscription) theSubscription).getError(); - } - else { + } else { error = ""; } ourLog.error("Subscription " diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionValidatingInterceptor.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionValidatingInterceptor.java index 4198f557e62..85cd02fc5bb 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionValidatingInterceptor.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionValidatingInterceptor.java @@ -49,7 +49,6 @@ import org.springframework.beans.factory.annotation.Autowired; import java.net.URI; import java.net.URISyntaxException; -import java.util.Objects; import static org.apache.commons.lang3.StringUtils.isBlank; @Interceptor @@ -110,16 +109,7 @@ public class SubscriptionValidatingInterceptor { break; } - // If the subscription has the cross partition tag && - if (SubscriptionUtil.isCrossPartition(theSubscription) && !(theRequestDetails instanceof SystemRequestDetails)) { - if (!myDaoConfig.isCrossPartitionSubscription()){ - throw new UnprocessableEntityException(Msg.code(2009) + "Cross partition subscription is not enabled on this server"); - } - - if (!determinePartition(theRequestDetails, theSubscription).isDefaultPartition()) { - throw new UnprocessableEntityException(Msg.code(2010) + "Cross partition subscription must be created on the default partition"); - } - } + validatePermissions(theSubscription, subscription, theRequestDetails); mySubscriptionCanonicalizer.setMatchingStrategyTag(theSubscription, null); @@ -150,6 +140,19 @@ public class SubscriptionValidatingInterceptor { } } + protected void validatePermissions(IBaseResource theSubscription, CanonicalSubscription theCanonicalSubscription, RequestDetails theRequestDetails) { + // If the subscription has the cross partition tag + if (SubscriptionUtil.isCrossPartition(theSubscription) && !(theRequestDetails instanceof SystemRequestDetails)) { + if (!myDaoConfig.isCrossPartitionSubscription()){ + throw new UnprocessableEntityException(Msg.code(2009) + "Cross partition subscription is not enabled on this server"); + } + + if (!determinePartition(theRequestDetails, theSubscription).isDefaultPartition()) { + throw new UnprocessableEntityException(Msg.code(2010) + "Cross partition subscription must be created on the default partition"); + } + } + } + private RequestPartitionId determinePartition(RequestDetails theRequestDetails, IBaseResource theResource) { switch (theRequestDetails.getRestOperationType()) { case CREATE: diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistryTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistryTest.java index eea1b817077..1362b8f948a 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistryTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistryTest.java @@ -15,13 +15,18 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.InjectMocks; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import java.util.Optional; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + @ExtendWith(MockitoExtension.class) public class SubscriptionChannelRegistryTest { @@ -51,20 +56,20 @@ public class SubscriptionChannelRegistryTest { ActiveSubscription activeSubscription = createActiveSubscription(channelName, retryCount); // mocks - MessageHandler messageHandler = Mockito.mock(MessageHandler.class); - IChannelReceiver receiver = Mockito.mock(IChannelReceiver.class); - IChannelProducer producer = Mockito.mock(IChannelProducer.class); + MessageHandler messageHandler = mock(MessageHandler.class); + IChannelReceiver receiver = mock(IChannelReceiver.class); + IChannelProducer producer = mock(IChannelProducer.class); // when - Mockito.when(mySubscriptionChannelFactory.newDeliveryReceivingChannel( - Mockito.anyString(), - Mockito.any(ChannelConsumerSettings.class) + when(mySubscriptionChannelFactory.newDeliveryReceivingChannel( + anyString(), + any(ChannelConsumerSettings.class) )).thenReturn(receiver); - Mockito.when(mySubscriptionChannelFactory.newDeliverySendingChannel( - Mockito.anyString(), - Mockito.any(ChannelProducerSettings.class) + when(mySubscriptionChannelFactory.newDeliverySendingChannel( + anyString(), + any(ChannelProducerSettings.class) )).thenReturn(producer); - Mockito.when(mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(Mockito.any(CanonicalSubscriptionChannelType.class))) + when(mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(any(CanonicalSubscriptionChannelType.class))) .thenReturn(Optional.of(messageHandler)); // test @@ -81,15 +86,15 @@ public class SubscriptionChannelRegistryTest { // verify the creation of the sender/receiver // both have retry values provided ArgumentCaptor consumerCaptor = ArgumentCaptor.forClass(ChannelConsumerSettings.class); - Mockito.verify(mySubscriptionChannelFactory) - .newDeliveryReceivingChannel(Mockito.anyString(), + verify(mySubscriptionChannelFactory) + .newDeliveryReceivingChannel(anyString(), consumerCaptor.capture()); ChannelConsumerSettings consumerSettings = consumerCaptor.getValue(); verifySettingsHaveRetryConfig(consumerSettings, retryCount); ArgumentCaptor producerCaptor = ArgumentCaptor.forClass(ChannelProducerSettings.class); - Mockito.verify(mySubscriptionChannelFactory) - .newDeliverySendingChannel(Mockito.anyString(), + verify(mySubscriptionChannelFactory) + .newDeliverySendingChannel(anyString(), producerCaptor.capture()); verifySettingsHaveRetryConfig(producerCaptor.getValue(), retryCount); } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoaderTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoaderTest.java index 63a8cdbd01b..bffe7455054 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoaderTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoaderTest.java @@ -9,7 +9,6 @@ import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.partition.SystemRequestDetails; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber; - import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.server.SimpleBundleProvider; import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; @@ -27,7 +26,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.InjectMocks; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; import org.slf4j.LoggerFactory; @@ -35,6 +33,13 @@ import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + @ExtendWith(MockitoExtension.class) public class SubscriptionLoaderTest { @@ -68,6 +73,9 @@ public class SubscriptionLoaderTest { @Mock private IResourceChangeListenerCache mySubscriptionCache; + @Mock + private SubscriptionCanonicalizer mySubscriptionCanonicalizer; + @InjectMocks private SubscriptionLoader mySubscriptionLoader; @@ -80,11 +88,11 @@ public class SubscriptionLoaderTest { myStoredLogLevel = ourLogger.getLevel(); ourLogger.addAppender(myAppender); - Mockito.when(myResourceChangeListenerRegistry.registerResourceResourceChangeListener( - Mockito.anyString(), - Mockito.any(SearchParameterMap.class), - Mockito.any(SubscriptionLoader.class), - Mockito.anyLong() + when(myResourceChangeListenerRegistry.registerResourceResourceChangeListener( + anyString(), + any(SearchParameterMap.class), + any(SubscriptionLoader.class), + anyLong() )).thenReturn(mySubscriptionCache); mySubscriptionLoader.registerListener(); @@ -109,34 +117,36 @@ public class SubscriptionLoaderTest { Subscription subscription = new Subscription(); subscription.setId("Subscription/123"); subscription.setError("THIS IS AN ERROR"); - IFhirResourceDao subscriptionDao = Mockito.mock(IFhirResourceDao.class); + IFhirResourceDao subscriptionDao = mock(IFhirResourceDao.class); ourLogger.setLevel(Level.ERROR); // when - Mockito.when(myDaoRegistery.getSubscriptionDao()) + when(myDaoRegistery.getSubscriptionDao()) .thenReturn(subscriptionDao); - Mockito.when(myDaoRegistery.isResourceTypeSupported(Mockito.anyString())) + when(myDaoRegistery.isResourceTypeSupported(anyString())) .thenReturn(true); - Mockito.when(subscriptionDao.search(Mockito.any(SearchParameterMap.class), Mockito.any(SystemRequestDetails.class))) + when(subscriptionDao.search(any(SearchParameterMap.class), any(SystemRequestDetails.class))) .thenReturn(getSubscriptionList( Collections.singletonList(subscription) )); - Mockito.when(mySchedulerSvc.isStopping()) + when(mySchedulerSvc.isStopping()) .thenReturn(false); - Mockito.when(mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(Mockito.any(IBaseResource.class))) + when(mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(any(IBaseResource.class))) .thenReturn(false); + when(mySubscriptionCanonicalizer.getSubscriptionStatus(any())).thenReturn(SubscriptionConstants.REQUESTED_STATUS); + // test mySubscriptionLoader.syncSubscriptions(); // verify - Mockito.verify(subscriptionDao) - .search(Mockito.any(SearchParameterMap.class), Mockito.any(SystemRequestDetails.class)); + verify(subscriptionDao) + .search(any(SearchParameterMap.class), any(SystemRequestDetails.class)); ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(ILoggingEvent.class); - Mockito.verify(myAppender).doAppend(eventCaptor.capture()); + verify(myAppender).doAppend(eventCaptor.capture()); String actual = "Subscription " + subscription.getIdElement().getIdPart() + " could not be activated."; diff --git a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/messaging/BaseResourceMessage.java b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/messaging/BaseResourceMessage.java index d8cb907a185..5d43ed64c94 100644 --- a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/messaging/BaseResourceMessage.java +++ b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/messaging/BaseResourceMessage.java @@ -23,6 +23,7 @@ package ca.uhn.fhir.rest.server.messaging; import ca.uhn.fhir.model.api.IModelJson; +import ca.uhn.fhir.rest.api.RestOperationTypeEnum; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.commons.lang3.Validate; @@ -172,10 +173,20 @@ public abstract class BaseResourceMessage implements IResourceMessage, IModelJso } public enum OperationTypeEnum { - CREATE, - UPDATE, - DELETE, - MANUALLY_TRIGGERED, - TRANSACTION + CREATE(RestOperationTypeEnum.CREATE), + UPDATE(RestOperationTypeEnum.UPDATE), + DELETE(RestOperationTypeEnum.DELETE), + MANUALLY_TRIGGERED(RestOperationTypeEnum.UPDATE), + TRANSACTION(RestOperationTypeEnum.UPDATE); + + private final RestOperationTypeEnum myRestOperationTypeEnum; + + OperationTypeEnum(RestOperationTypeEnum theRestOperationTypeEnum) { + myRestOperationTypeEnum = theRestOperationTypeEnum; + } + + public RestOperationTypeEnum asRestOperationType() { + return myRestOperationTypeEnum; + } } }