From 9280cde491f2ca6292c652f89939737b61c5804d Mon Sep 17 00:00:00 2001 From: Ken Stevens Date: Fri, 1 Feb 2019 10:14:46 -0500 Subject: [PATCH] Subscription only registers active (#1189) * Fixed a bug in standalone subscription subscriber: It was adding REQUESTED subscriptions to the active subscription registry. (Only ACTIVE subscriptions should be added.) --- .../main/java/ca/uhn/fhir/cli/BaseApp.java | 8 +-- .../SubscriptionActivatingInterceptor.java | 21 +++---- .../SubscriptionMatcherInterceptor.java | 2 - hapi-fhir-jpaserver-searchparam/pom.xml | 5 +- .../registry/BaseSearchParamRegistry.java | 28 ++------- .../fhir/jpa/searchparam/retry/Retrier.java | 49 ++++++++-------- .../jpa/searchparam/retry/RetrierTest.java | 55 +++++++++--------- hapi-fhir-jpaserver-subscription/pom.xml | 5 ++ .../module/ResourceModifiedMessage.java | 12 ++++ .../module/cache/ActiveSubscriptionCache.java | 4 +- .../cache/SubscriptionCanonicalizer.java | 11 +++- .../module/cache/SubscriptionConstants.java | 6 ++ .../module/cache/SubscriptionLoader.java | 12 +--- .../StandaloneSubscriptionMessageHandler.java | 9 ++- .../ResourceDeliveryJsonMessage.java | 7 +++ .../subscriber/ResourceDeliveryMessage.java | 13 +++++ .../ResourceModifiedJsonMessage.java | 7 +++ .../module/BaseSubscriptionDstu3Test.java | 16 ++++++ .../module/BaseSubscriptionTest.java | 1 - .../module/FhirObjectPrinter.java | 17 ++++++ .../subscription/module/PointcutLatch.java | 57 ++++++++----------- .../module/SubscriptionTestHelper.java | 36 ++++++++++++ ...kingQueueSubscribableChannelDstu3Test.java | 24 +------- .../standalone/SearchParamLoaderTest.java | 15 +---- ...ndaloneSubscriptionMessageHandlerTest.java | 48 ++++++++++++++++ .../SubscriptionLoaderFhirClientTest.java | 10 ++-- .../standalone/SubscriptionLoaderTest.java | 17 +----- .../SubscriptionCheckingSubscriberTest.java | 2 +- .../pom.xml | 2 +- .../hapi-fhir-spring-boot-samples/pom.xml | 2 +- .../hapi-fhir-spring-boot-starter/pom.xml | 2 +- hapi-fhir-spring-boot/pom.xml | 2 +- pom.xml | 15 ++++- 33 files changed, 311 insertions(+), 209 deletions(-) mode change 100644 => 100755 hapi-fhir-cli/hapi-fhir-cli-api/src/main/java/ca/uhn/fhir/cli/BaseApp.java mode change 100644 => 100755 hapi-fhir-jpaserver-searchparam/pom.xml mode change 100644 => 100755 hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/registry/BaseSearchParamRegistry.java mode change 100644 => 100755 hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/retry/Retrier.java mode change 100644 => 100755 hapi-fhir-jpaserver-searchparam/src/test/java/ca/uhn/fhir/jpa/searchparam/retry/RetrierTest.java mode change 100644 => 100755 hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java create mode 100644 hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/FhirObjectPrinter.java create mode 100644 hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/SubscriptionTestHelper.java mode change 100644 => 100755 hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SearchParamLoaderTest.java create mode 100644 hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandlerTest.java mode change 100644 => 100755 hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderTest.java mode change 100644 => 100755 pom.xml diff --git a/hapi-fhir-cli/hapi-fhir-cli-api/src/main/java/ca/uhn/fhir/cli/BaseApp.java b/hapi-fhir-cli/hapi-fhir-cli-api/src/main/java/ca/uhn/fhir/cli/BaseApp.java old mode 100644 new mode 100755 index 31aea0978f6..aea44721f1b --- a/hapi-fhir-cli/hapi-fhir-cli-api/src/main/java/ca/uhn/fhir/cli/BaseApp.java +++ b/hapi-fhir-cli/hapi-fhir-cli-api/src/main/java/ca/uhn/fhir/cli/BaseApp.java @@ -42,9 +42,9 @@ import java.util.List; import static org.fusesource.jansi.Ansi.ansi; public abstract class BaseApp { - public static final String STACKFILTER_PATTERN = "%xEx{full, sun.reflect, org.junit, org.eclipse, java.lang.reflect.Method, org.springframework, org.hibernate, com.sun.proxy, org.attoparser, org.thymeleaf}"; - public static final String STACKFILTER_PATTERN_PROP = "log.stackfilter.pattern"; - public static final String LINESEP = System.getProperty("line.separator"); + private static final String STACKFILTER_PATTERN = "%xEx{full, sun.reflect, org.junit, org.eclipse, java.lang.reflect.Method, org.springframework, org.hibernate, com.sun.proxy, org.attoparser, org.thymeleaf}"; + private static final String STACKFILTER_PATTERN_PROP = "log.stackfilter.pattern"; + static final String LINESEP = System.getProperty("line.separator"); protected static final org.slf4j.Logger ourLog; private static List ourCommands; @@ -145,7 +145,7 @@ public abstract class BaseApp { protected abstract String provideCommandName(); - public List provideCommands() { + List provideCommands() { ArrayList commands = new ArrayList<>(); commands.add(new RunServerCommand()); commands.add(new ExampleDataUploader()); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingInterceptor.java index a86607801da..b4fe6344785 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingInterceptor.java @@ -29,22 +29,20 @@ import ca.uhn.fhir.jpa.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.model.interceptor.api.Hook; import ca.uhn.fhir.jpa.model.interceptor.api.Interceptor; import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut; -import ca.uhn.fhir.jpa.searchparam.MatchUrlService; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionCanonicalizer; +import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionConstants; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.module.matcher.SubscriptionMatchingStrategy; import ca.uhn.fhir.jpa.subscription.module.matcher.SubscriptionStrategyEvaluator; import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum; import ca.uhn.fhir.parser.DataFormatException; -import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; import ca.uhn.fhir.util.SubscriptionUtil; import com.google.common.annotations.VisibleForTesting; import org.hl7.fhir.instance.model.Subscription; import org.hl7.fhir.instance.model.api.IBaseResource; -import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,8 +74,6 @@ public class SubscriptionActivatingInterceptor { private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingInterceptor.class); private static boolean ourWaitForSubscriptionActivationSynchronouslyForUnitTest; - private static final String REQUESTED_STATUS = Subscription.SubscriptionStatus.REQUESTED.toCode(); - private static final String ACTIVE_STATUS = Subscription.SubscriptionStatus.ACTIVE.toCode(); @Autowired private PlatformTransactionManager myTransactionManager; @@ -93,8 +89,6 @@ public class SubscriptionActivatingInterceptor { @Autowired private SubscriptionCanonicalizer mySubscriptionCanonicalizer; @Autowired - private MatchUrlService myMatchUrlService; - @Autowired private DaoConfig myDaoConfig; @Autowired private SubscriptionStrategyEvaluator mySubscriptionStrategyEvaluator; @@ -104,7 +98,7 @@ public class SubscriptionActivatingInterceptor { // subscriber applies.. String subscriptionChannelTypeCode = myFhirContext .newTerser() - .getSingleValueOrNull(theSubscription, SubscriptionMatcherInterceptor.SUBSCRIPTION_TYPE, IPrimitiveType.class) + .getSingleValueOrNull(theSubscription, SubscriptionConstants.SUBSCRIPTION_TYPE, IPrimitiveType.class) .getValueAsString(); Subscription.SubscriptionChannelType subscriptionChannelType = Subscription.SubscriptionChannelType.fromCode(subscriptionChannelTypeCode); @@ -113,10 +107,9 @@ public class SubscriptionActivatingInterceptor { return false; } - final IPrimitiveType status = myFhirContext.newTerser().getSingleValueOrNull(theSubscription, SubscriptionMatcherInterceptor.SUBSCRIPTION_STATUS, IPrimitiveType.class); - String statusString = status.getValueAsString(); + String statusString = mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription); - if (REQUESTED_STATUS.equals(statusString)) { + if (SubscriptionConstants.REQUESTED_STATUS.equals(statusString)) { if (TransactionSynchronizationManager.isSynchronizationActive()) { /* * If we're in a transaction, we don't want to try and change the status from @@ -133,7 +126,7 @@ public class SubscriptionActivatingInterceptor { Future activationFuture = myTaskExecutor.submit(new Runnable() { @Override public void run() { - activateSubscription(ACTIVE_STATUS, theSubscription, REQUESTED_STATUS); + activateSubscription(SubscriptionConstants.ACTIVE_STATUS, theSubscription, SubscriptionConstants.REQUESTED_STATUS); } }); @@ -152,9 +145,9 @@ public class SubscriptionActivatingInterceptor { }); return true; } else { - return activateSubscription(ACTIVE_STATUS, theSubscription, REQUESTED_STATUS); + return activateSubscription(SubscriptionConstants.ACTIVE_STATUS, theSubscription, SubscriptionConstants.REQUESTED_STATUS); } - } else if (ACTIVE_STATUS.equals(statusString)) { + } else if (SubscriptionConstants.ACTIVE_STATUS.equals(statusString)) { return mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(theSubscription); } else { // Status isn't "active" or "requested" diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionMatcherInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionMatcherInterceptor.java index 94d4fc65600..d5d5c665305 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionMatcherInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionMatcherInterceptor.java @@ -48,8 +48,6 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer private Logger ourLog = LoggerFactory.getLogger(SubscriptionMatcherInterceptor.class); private static final String SUBSCRIPTION_MATCHING_CHANNEL_NAME = "subscription-matching"; - static final String SUBSCRIPTION_STATUS = "Subscription.status"; - static final String SUBSCRIPTION_TYPE = "Subscription.channel.type"; private SubscribableChannel myProcessingChannel; @Autowired diff --git a/hapi-fhir-jpaserver-searchparam/pom.xml b/hapi-fhir-jpaserver-searchparam/pom.xml old mode 100644 new mode 100755 index c384c0f967e..256da95fc61 --- a/hapi-fhir-jpaserver-searchparam/pom.xml +++ b/hapi-fhir-jpaserver-searchparam/pom.xml @@ -86,7 +86,10 @@ javax.annotation javax.annotation-api - + + org.springframework.retry + spring-retry + diff --git a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/registry/BaseSearchParamRegistry.java b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/registry/BaseSearchParamRegistry.java old mode 100644 new mode 100755 index 8a2d06f59d8..519d0d6cb98 --- a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/registry/BaseSearchParamRegistry.java +++ b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/registry/BaseSearchParamRegistry.java @@ -48,8 +48,6 @@ public abstract class BaseSearchParamRegistry implemen private static final int MAX_MANAGED_PARAM_COUNT = 10000; private static final Logger ourLog = LoggerFactory.getLogger(BaseSearchParamRegistry.class); - @VisibleForTesting - public static final int INITIAL_SECONDS_BETWEEN_RETRIES = 5; private static long REFRESH_INTERVAL = 60 * DateUtils.MILLIS_PER_MINUTE; private static final int MAX_RETRIES = 60; // 5 minutes @@ -60,7 +58,6 @@ public abstract class BaseSearchParamRegistry implemen @Autowired private FhirContext myFhirContext; - private volatile int mySecondsBetweenRetries = INITIAL_SECONDS_BETWEEN_RETRIES; private Map> myBuiltInSearchParams; private volatile Map> myActiveUniqueSearchParams = Collections.emptyMap(); private volatile Map, List>> myActiveParamNamesToUniqueSearchParams = Collections.emptyMap(); @@ -85,7 +82,7 @@ public abstract class BaseSearchParamRegistry implemen return myActiveSearchParams.get(theResourceName); } - void requiresActiveSearchParams() { + private void requiresActiveSearchParams() { if (myActiveSearchParams == null) { refreshCacheWithRetry(); } @@ -120,11 +117,7 @@ public abstract class BaseSearchParamRegistry implemen } private Map getSearchParamMap(Map> searchParams, String theResourceName) { - Map retVal = searchParams.get(theResourceName); - if (retVal == null) { - retVal = new HashMap<>(); - searchParams.put(theResourceName, retVal); - } + Map retVal = searchParams.computeIfAbsent(theResourceName, k -> new HashMap<>()); return retVal; } @@ -139,11 +132,7 @@ public abstract class BaseSearchParamRegistry implemen * Loop through parameters and find JPA params */ for (Map.Entry> nextResourceNameToEntries : theActiveSearchParams.entrySet()) { - List uniqueSearchParams = activeUniqueSearchParams.get(nextResourceNameToEntries.getKey()); - if (uniqueSearchParams == null) { - uniqueSearchParams = new ArrayList<>(); - activeUniqueSearchParams.put(nextResourceNameToEntries.getKey(), uniqueSearchParams); - } + List uniqueSearchParams = activeUniqueSearchParams.computeIfAbsent(nextResourceNameToEntries.getKey(), k -> new ArrayList<>()); Collection nextSearchParamsForResourceName = nextResourceNameToEntries.getValue().values(); for (RuntimeSearchParam nextCandidate : nextSearchParamsForResourceName) { @@ -181,7 +170,7 @@ public abstract class BaseSearchParamRegistry implemen } if (next.getCompositeOf() != null) { - Collections.sort(next.getCompositeOf(), new Comparator() { + next.getCompositeOf().sort(new Comparator() { @Override public int compare(RuntimeSearchParam theO1, RuntimeSearchParam theO2) { return StringUtils.compare(theO1.getName(), theO2.getName()); @@ -192,7 +181,7 @@ public abstract class BaseSearchParamRegistry implemen activeParamNamesToUniqueSearchParams.put(nextBase, new HashMap<>()); } if (!activeParamNamesToUniqueSearchParams.get(nextBase).containsKey(paramNames)) { - activeParamNamesToUniqueSearchParams.get(nextBase).put(paramNames, new ArrayList()); + activeParamNamesToUniqueSearchParams.get(nextBase).put(paramNames, new ArrayList<>()); } activeParamNamesToUniqueSearchParams.get(nextBase).get(paramNames).add(next); } @@ -339,7 +328,7 @@ public abstract class BaseSearchParamRegistry implemen } synchronized int refreshCacheWithRetry() { - Retrier refreshCacheRetrier = new Retrier(() -> mySearchParamProvider.refreshCache(this, REFRESH_INTERVAL), MAX_RETRIES, mySecondsBetweenRetries, "refresh search parameter registry"); + Retrier refreshCacheRetrier = new Retrier(() -> mySearchParamProvider.refreshCache(this, REFRESH_INTERVAL), MAX_RETRIES); return refreshCacheRetrier.runWithRetry(); } @@ -355,11 +344,6 @@ public abstract class BaseSearchParamRegistry implemen } } - @VisibleForTesting - public void setSecondsBetweenRetriesForTesting(int theSecondsBetweenRetries) { - mySecondsBetweenRetries = theSecondsBetweenRetries; - } - @Override public Map> getActiveSearchParams() { requiresActiveSearchParams(); diff --git a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/retry/Retrier.java b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/retry/Retrier.java old mode 100644 new mode 100755 index 4cb987d3127..39b27d090f5 --- a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/retry/Retrier.java +++ b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/retry/Retrier.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.searchparam.retry; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,9 +20,13 @@ package ca.uhn.fhir.jpa.searchparam.retry; * #L% */ +import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.time.DateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.retry.backoff.ExponentialBackOffPolicy; +import org.springframework.retry.policy.SimpleRetryPolicy; +import org.springframework.retry.support.RetryTemplate; import java.util.function.Supplier; @@ -30,34 +34,27 @@ public class Retrier { private static final Logger ourLog = LoggerFactory.getLogger(Retrier.class); private final Supplier mySupplier; - private final int myMaxRetries; - private final int mySecondsBetweenRetries; - private final String myDescription; - public Retrier(Supplier theSupplier, int theMaxRetries, int theSecondsBetweenRetries, String theDescription) { + private final RetryTemplate myRetryTemplate; + + public Retrier(Supplier theSupplier, int theMaxRetries) { + Validate.isTrue(theMaxRetries > 0, "maxRetries must be above zero."); mySupplier = theSupplier; - myMaxRetries = theMaxRetries; - mySecondsBetweenRetries = theSecondsBetweenRetries; - myDescription = theDescription; + + myRetryTemplate = new RetryTemplate(); + + ExponentialBackOffPolicy backOff = new ExponentialBackOffPolicy(); + backOff.setInitialInterval(500); + backOff.setMaxInterval(DateUtils.MILLIS_PER_MINUTE); + backOff.setMultiplier(2); + myRetryTemplate.setBackOffPolicy(backOff); + + SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); + retryPolicy.setMaxAttempts(theMaxRetries); + myRetryTemplate.setRetryPolicy(retryPolicy); } public T runWithRetry() { - RuntimeException lastException = new IllegalStateException("maxRetries must be above zero."); - for (int retryCount = 1; retryCount <= myMaxRetries; ++retryCount) { - try { - return mySupplier.get(); - } catch(RuntimeException e) { - ourLog.trace("Failure during retry: {}", e.getMessage(), e); // with stacktrace if it's ever needed - ourLog.info("Failed to {}. Attempt {} / {}: {}", myDescription, retryCount, myMaxRetries, e.getMessage()); - lastException = e; - try { - Thread.sleep(mySecondsBetweenRetries * DateUtils.MILLIS_PER_SECOND); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw lastException; - } - } - } - throw lastException; + return myRetryTemplate.execute(retryContext -> mySupplier.get()); } } diff --git a/hapi-fhir-jpaserver-searchparam/src/test/java/ca/uhn/fhir/jpa/searchparam/retry/RetrierTest.java b/hapi-fhir-jpaserver-searchparam/src/test/java/ca/uhn/fhir/jpa/searchparam/retry/RetrierTest.java old mode 100644 new mode 100755 index 60154a2d02a..edc946f09ba --- a/hapi-fhir-jpaserver-searchparam/src/test/java/ca/uhn/fhir/jpa/searchparam/retry/RetrierTest.java +++ b/hapi-fhir-jpaserver-searchparam/src/test/java/ca/uhn/fhir/jpa/searchparam/retry/RetrierTest.java @@ -1,17 +1,24 @@ package ca.uhn.fhir.jpa.searchparam.retry; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class RetrierTest { + + @Rule + public ExpectedException myExpectedException = ExpectedException.none(); + @Test public void happyPath() { Supplier supplier = () -> true; - Retrier retrier = new Retrier<>(supplier, 5, 0, "test"); + Retrier retrier = new Retrier<>(supplier, 5); assertTrue(retrier.runWithRetry()); } @@ -22,7 +29,7 @@ public class RetrierTest { if (counter.incrementAndGet() < 3) throw new RetryRuntimeException("test"); return true; }; - Retrier retrier = new Retrier<>(supplier, 5, 0, "test"); + Retrier retrier = new Retrier<>(supplier, 5); assertTrue(retrier.runWithRetry()); assertEquals(3, counter.get()); } @@ -31,16 +38,15 @@ public class RetrierTest { public void failMaxRetries() { AtomicInteger counter = new AtomicInteger(); Supplier supplier = () -> { - if (counter.incrementAndGet() < 10) throw new RetryRuntimeException("test"); + if (counter.incrementAndGet() < 3) throw new RetryRuntimeException("test failure message"); return true; }; - Retrier retrier = new Retrier<>(supplier, 5, 0, "test"); - try { - retrier.runWithRetry(); - fail(); - } catch (RetryRuntimeException e) { - assertEquals(5, counter.get()); - } + Retrier retrier = new Retrier<>(supplier, 1); + + myExpectedException.expect(RetryRuntimeException.class); + myExpectedException.expectMessage("test failure message"); + retrier.runWithRetry(); + assertEquals(5, counter.get()); } @Test @@ -50,14 +56,10 @@ public class RetrierTest { if (counter.incrementAndGet() < 10) throw new RetryRuntimeException("test"); return true; }; - Retrier retrier = new Retrier<>(supplier, 0, 0, "test"); - try { - retrier.runWithRetry(); - fail(); - } catch (IllegalStateException e) { - assertEquals(0, counter.get()); - assertEquals("maxRetries must be above zero." ,e.getMessage()); - } + myExpectedException.expect(IllegalArgumentException.class); + myExpectedException.expectMessage("maxRetries must be above zero."); + Retrier retrier = new Retrier<>(supplier, 0); + assertEquals(0, counter.get()); } @Test @@ -67,18 +69,13 @@ public class RetrierTest { if (counter.incrementAndGet() < 10) throw new RetryRuntimeException("test"); return true; }; - Retrier retrier = new Retrier<>(supplier, -1, 0, "test"); - try { - retrier.runWithRetry(); - fail(); - } catch (IllegalStateException e) { - assertEquals(0, counter.get()); - assertEquals("maxRetries must be above zero." ,e.getMessage()); - } + myExpectedException.expect(IllegalArgumentException.class); + myExpectedException.expectMessage("maxRetries must be above zero."); + + Retrier retrier = new Retrier<>(supplier, -1); + assertEquals(0, counter.get()); } - - class RetryRuntimeException extends RuntimeException { RetryRuntimeException(String message) { super(message); diff --git a/hapi-fhir-jpaserver-subscription/pom.xml b/hapi-fhir-jpaserver-subscription/pom.xml index 49c0b24cfbf..c19ed86448a 100644 --- a/hapi-fhir-jpaserver-subscription/pom.xml +++ b/hapi-fhir-jpaserver-subscription/pom.xml @@ -97,6 +97,11 @@ jetty-servlet test + + org.springframework.boot + spring-boot-starter-test + test + diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/ResourceModifiedMessage.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/ResourceModifiedMessage.java index 2758a29569c..f82cd27d3ce 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/ResourceModifiedMessage.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/ResourceModifiedMessage.java @@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.commons.lang3.builder.ToStringBuilder; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; @@ -166,4 +167,15 @@ public class ResourceModifiedMessage implements IResourceMessage { return true; } + @Override + public String toString() { + return new ToStringBuilder(this) + .append("myId", myId) + .append("myOperationType", myOperationType) + .append("mySubscriptionId", mySubscriptionId) +// .append("myPayload", myPayload) + .append("myPayloadId", myPayloadId) +// .append("myPayloadDecoded", myPayloadDecoded) + .toString(); + } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java index 6afcd485850..cbc1029c1f9 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java @@ -29,7 +29,7 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -public class ActiveSubscriptionCache { +class ActiveSubscriptionCache { private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(ActiveSubscriptionCache.class); private final Map myCache = new ConcurrentHashMap<>(); @@ -72,7 +72,7 @@ public class ActiveSubscriptionCache { } @VisibleForTesting - public void clearForUnitTests() { + void clearForUnitTests() { myCache.clear(); } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionCanonicalizer.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionCanonicalizer.java index cd0802bf645..c5572024a7c 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionCanonicalizer.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionCanonicalizer.java @@ -29,11 +29,11 @@ import ca.uhn.fhir.model.api.ExtensionDt; import ca.uhn.fhir.model.api.IPrimitiveDatatype; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException; -import org.hl7.fhir.dstu3.model.Subscription; import org.hl7.fhir.exceptions.FHIRException; import org.hl7.fhir.instance.model.api.IBaseMetaType; import org.hl7.fhir.instance.model.api.IBaseReference; import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.r4.model.Extension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -292,4 +292,13 @@ public class SubscriptionCanonicalizer { } meta.addTag().setSystem(SubscriptionConstants.EXT_SUBSCRIPTION_MATCHING_STRATEGY).setCode(value).setDisplay(display); } + + public String getSubscriptionStatus(IBaseResource theSubscription) { + final IPrimitiveType status = myFhirContext.newTerser().getSingleValueOrNull(theSubscription, SubscriptionConstants.SUBSCRIPTION_STATUS, IPrimitiveType.class); + if (status == null) { + return null; + } + return status.getValueAsString(); + } + } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionConstants.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionConstants.java index 66f4cd58d18..2aa9f2be379 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionConstants.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionConstants.java @@ -20,6 +20,8 @@ package ca.uhn.fhir.jpa.subscription.module.cache; * #L% */ +import org.hl7.fhir.instance.model.Subscription; + public class SubscriptionConstants { /** @@ -90,4 +92,8 @@ public class SubscriptionConstants { */ public static final int DELIVERY_EXECUTOR_QUEUE_SIZE = 1000; + public static final String SUBSCRIPTION_STATUS = "Subscription.status"; + public static final String SUBSCRIPTION_TYPE = "Subscription.channel.type"; + public static final String REQUESTED_STATUS = Subscription.SubscriptionStatus.REQUESTED.toCode(); + public static final String ACTIVE_STATUS = Subscription.SubscriptionStatus.ACTIVE.toCode(); } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java old mode 100644 new mode 100755 index b82b9254137..bb67f790cca --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java @@ -46,8 +46,6 @@ import java.util.concurrent.Semaphore; @Lazy public class SubscriptionLoader { private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionLoader.class); - @VisibleForTesting - public static final int INITIAL_SECONDS_BETWEEN_RETRIES = 5; private static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes @Autowired @@ -58,8 +56,6 @@ public class SubscriptionLoader { private final Object mySyncSubscriptionsLock = new Object(); private Semaphore mySyncSubscriptionsSemaphore = new Semaphore(1); - private volatile int mySecondsBetweenRetries = INITIAL_SECONDS_BETWEEN_RETRIES; - /** * Read the existing subscriptions from the database */ @@ -82,7 +78,7 @@ public class SubscriptionLoader { } synchronized int doSyncSubscriptionsWithRetry() { - Retrier syncSubscriptionRetrier = new Retrier(() -> doSyncSubscriptions(), MAX_RETRIES, mySecondsBetweenRetries, "sync subscriptions"); + Retrier syncSubscriptionRetrier = new Retrier<>(this::doSyncSubscriptions, MAX_RETRIES); return syncSubscriptionRetrier.runWithRetry(); } @@ -91,7 +87,6 @@ public class SubscriptionLoader { ourLog.debug("Starting sync subscriptions"); SearchParameterMap map = new SearchParameterMap(); map.add(Subscription.SP_STATUS, new TokenOrListParam() - // TODO KHS perhaps we should only be requesting ACTIVE subscriptions here?... .addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode())) .addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode()))); map.setLoadSynchronousUpTo(SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS); @@ -126,10 +121,5 @@ public class SubscriptionLoader { public void setSubscriptionProviderForUnitTest(ISubscriptionProvider theSubscriptionProvider) { mySubscriptionProvidor = theSubscriptionProvider; } - - @VisibleForTesting - public void setSecondsBetweenRetriesForTesting(int theSecondsBetweenRetries) { - mySecondsBetweenRetries = theSecondsBetweenRetries; - } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java index 1ec1447ff51..a57bb0e3cf4 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java @@ -23,6 +23,8 @@ package ca.uhn.fhir.jpa.subscription.module.standalone; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; +import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionCanonicalizer; +import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionConstants; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber; @@ -46,6 +48,8 @@ public class StandaloneSubscriptionMessageHandler implements MessageHandler { SubscriptionMatchingSubscriber mySubscriptionMatchingSubscriber; @Autowired SubscriptionRegistry mySubscriptionRegistry; + @Autowired + SubscriptionCanonicalizer mySubscriptionCanonicalizer; @Override public void handleMessage(Message theMessage) throws MessagingException { @@ -61,7 +65,10 @@ public class StandaloneSubscriptionMessageHandler implements MessageHandler { RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(resource); if (resourceDef.getName().equals(ResourceTypeEnum.SUBSCRIPTION.getCode())) { - mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(resource); + String status = mySubscriptionCanonicalizer.getSubscriptionStatus(resource); + if (SubscriptionConstants.ACTIVE_STATUS.equals(status)) { + mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(resource); + } } mySubscriptionMatchingSubscriber.matchActiveSubscriptionsAndDeliver(theResourceModifiedMessage); } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/ResourceDeliveryJsonMessage.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/ResourceDeliveryJsonMessage.java index 413d639fa02..3a9884f5f23 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/ResourceDeliveryJsonMessage.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/ResourceDeliveryJsonMessage.java @@ -23,6 +23,7 @@ package ca.uhn.fhir.jpa.subscription.module.subscriber; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.commons.lang3.builder.ToStringBuilder; @JsonInclude(JsonInclude.Include.NON_NULL) @JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) @@ -54,4 +55,10 @@ public class ResourceDeliveryJsonMessage extends BaseJsonMessage theList) { StopWatch sw = new StopWatch(); while (theList.size() != theTarget && sw.getMillis() <= 16000) { @@ -37,4 +41,16 @@ public abstract class BaseSubscriptionDstu3Test extends BaseSubscriptionTest { fail("Size " + theList.size() + " is != target " + theTarget + " - Got: " + describeResults); } } + + protected long nextId() { + return mySubscriptionTestHelper.nextId(); + } + + protected Subscription makeActiveSubscription(String theCriteria, String thePayload, String theEndpoint) { + return mySubscriptionTestHelper.makeActiveSubscription(theCriteria, thePayload, theEndpoint); + } + + protected Subscription makeSubscriptionWithStatus(String theCriteria, String thePayload, String theEndpoint, Subscription.SubscriptionStatus status) { + return mySubscriptionTestHelper.makeSubscriptionWithStatus(theCriteria, thePayload, theEndpoint, status); + } } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java index 9a1996a127e..06cef734fbe 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java @@ -44,5 +44,4 @@ public abstract class BaseSubscriptionTest { myMockFhirClientSubscriptionProvider.setBundleProvider(theBundleProvider); mySubscriptionLoader.doSyncSubscriptionsForUnitTest(); } - } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/FhirObjectPrinter.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/FhirObjectPrinter.java new file mode 100644 index 00000000000..874157a9546 --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/FhirObjectPrinter.java @@ -0,0 +1,17 @@ +package ca.uhn.fhir.jpa.subscription.module; + +import org.hl7.fhir.instance.model.api.IBaseResource; + +import java.util.function.Function; + +public class FhirObjectPrinter implements Function { + @Override + public String apply(Object object) { + if (object instanceof IBaseResource) { + IBaseResource resource = (IBaseResource) object; + return resource.getClass().getSimpleName() + " { " + resource.getIdElement().getValue() + " }"; + } else { + return object.toString(); + } + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/PointcutLatch.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/PointcutLatch.java index 6c0a5108203..0bcd02c33b2 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/PointcutLatch.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/PointcutLatch.java @@ -1,10 +1,8 @@ package ca.uhn.fhir.jpa.subscription.module; -import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.model.interceptor.api.HookParams; import ca.uhn.fhir.jpa.model.interceptor.api.IAnonymousLambdaHook; import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut; -import org.hl7.fhir.instance.model.api.IBaseResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -13,15 +11,15 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; import java.util.stream.Collectors; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class PointcutLatch implements IAnonymousLambdaHook { private static final Logger ourLog = LoggerFactory.getLogger(PointcutLatch.class); private static final int DEFAULT_TIMEOUT_SECONDS = 10; + private static final FhirObjectPrinter ourFhirObjectToStringMapper = new FhirObjectPrinter(); + private final String name; private CountDownLatch myCountdownLatch; @@ -36,11 +34,12 @@ public class PointcutLatch implements IAnonymousLambdaHook { this.name = theName; } - public void setExpectedCount(int count) throws InterruptedException { + public void setExpectedCount(int count) { if (myCountdownLatch != null) { throw new PointcutLatchException("setExpectedCount() called before previous awaitExpected() completed."); } createLatch(count); + ourLog.info("Expecting {} calls to {} latch", count, name); } private void createLatch(int count) { @@ -61,11 +60,12 @@ public class PointcutLatch implements IAnonymousLambdaHook { return name + " " + this.getClass().getSimpleName(); } - public void awaitExpected() throws InterruptedException { - awaitExpectedWithTimeout(DEFAULT_TIMEOUT_SECONDS); + public List awaitExpected() throws InterruptedException { + return awaitExpectedWithTimeout(DEFAULT_TIMEOUT_SECONDS); } - public void awaitExpectedWithTimeout(int timeoutSecond) throws InterruptedException { + public List awaitExpectedWithTimeout(int timeoutSecond) throws InterruptedException { + List retval = myCalledWith.get(); try { assertNotNull(getName() + " awaitExpected() called before setExpected() called.", myCountdownLatch); assertTrue(getName() + " timed out waiting " + timeoutSecond + " seconds for latch to be triggered.", myCountdownLatch.await(timeoutSecond, TimeUnit.SECONDS)); @@ -76,15 +76,17 @@ public class PointcutLatch implements IAnonymousLambdaHook { throw new AssertionError(error); } } finally { - destroyLatch(); + clear(); } + assertEquals("Concurrency error: Latch switched while waiting.", retval, myCalledWith.get()); + return retval; } public void expectNothing() { - destroyLatch(); + clear(); } - private void destroyLatch() { + public void clear() { myCountdownLatch = null; } @@ -97,42 +99,29 @@ public class PointcutLatch implements IAnonymousLambdaHook { return "[]"; } String retVal = "[ "; - retVal += calledWith.stream().flatMap(hookParams -> hookParams.values().stream()).map(itemToString()).collect(Collectors.joining(", ")); + retVal += calledWith.stream().flatMap(hookParams -> hookParams.values().stream()).map(ourFhirObjectToStringMapper).collect(Collectors.joining(", ")); return retVal + " ]"; } - private static Function itemToString() { - return object -> { - if (object instanceof IBaseResource) { - IBaseResource resource = (IBaseResource) object; - return "Resource " + resource.getIdElement().getValue(); - } else if (object instanceof ResourceModifiedMessage) { - ResourceModifiedMessage resourceModifiedMessage = (ResourceModifiedMessage)object; - // FIXME KHS can we get the context from the payload? - return "ResourceModified Message { " + resourceModifiedMessage.getOperationType() + ", " + resourceModifiedMessage.getNewPayload(FhirContext.forDstu3()).getIdElement().getValue() + "}"; - } else { - return object.toString(); - } - }; - } @Override public void invoke(HookParams theArgs) { if (myCountdownLatch == null) { - throw new PointcutLatchException("countdown() called before setExpectedCount() called.", theArgs); + throw new PointcutLatchException("invoke() called before setExpectedCount() called.", theArgs); } else if (myCountdownLatch.getCount() <= 0) { - setFailure("countdown() called " + (1 - myCountdownLatch.getCount()) + " more times than expected."); + setFailure("invoke() called " + (1 - myCountdownLatch.getCount()) + " more times than expected."); } - this.countdown(); if (myCalledWith.get() != null) { myCalledWith.get().add(theArgs); } + ourLog.info("Called {} {} with {}", name, myCountdownLatch, hookParamsToString(theArgs)); + + myCountdownLatch.countDown(); } - private void countdown() { - ourLog.info("{} counting down {}", name, myCountdownLatch); - myCountdownLatch.countDown(); + public void call(Object arg) { + this.invoke(new HookParams(arg)); } private class PointcutLatchException extends IllegalStateException { @@ -146,6 +135,6 @@ public class PointcutLatch implements IAnonymousLambdaHook { } private static String hookParamsToString(HookParams hookParams) { - return hookParams.values().stream().map(itemToString()).collect(Collectors.joining(", ")); + return hookParams.values().stream().map(ourFhirObjectToStringMapper).collect(Collectors.joining(", ")); } } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/SubscriptionTestHelper.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/SubscriptionTestHelper.java new file mode 100644 index 00000000000..f9819c16ff6 --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/SubscriptionTestHelper.java @@ -0,0 +1,36 @@ +package ca.uhn.fhir.jpa.subscription.module; + +import org.hl7.fhir.dstu3.model.IdType; +import org.hl7.fhir.dstu3.model.Subscription; + +import java.util.concurrent.atomic.AtomicLong; + +public class SubscriptionTestHelper { + + protected static AtomicLong idCounter = new AtomicLong(); + + + public Subscription makeActiveSubscription(String theCriteria, String thePayload, String theEndpoint) { + return makeSubscriptionWithStatus(theCriteria, thePayload, theEndpoint, Subscription.SubscriptionStatus.ACTIVE); + } + + public Subscription makeSubscriptionWithStatus(String theCriteria, String thePayload, String theEndpoint, Subscription.SubscriptionStatus status) { + Subscription subscription = new Subscription(); + subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)"); + subscription.setStatus(status); + subscription.setCriteria(theCriteria); + IdType id = new IdType("Subscription", nextId()); + subscription.setId(id); + + Subscription.SubscriptionChannelComponent channel = new Subscription.SubscriptionChannelComponent(); + channel.setType(Subscription.SubscriptionChannelType.RESTHOOK); + channel.setPayload(thePayload); + channel.setEndpoint(theEndpoint); + subscription.setChannel(channel); + return subscription; + } + + public long nextId() { + return idCounter.incrementAndGet(); + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java index 995b03f1fe3..efbdbad6e0c 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java @@ -25,7 +25,6 @@ import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.hl7.fhir.dstu3.model.*; import org.hl7.fhir.instance.model.api.IBaseResource; -import org.hl7.fhir.instance.model.api.IIdType; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -39,7 +38,6 @@ import javax.servlet.http.HttpServletRequest; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.atomic.AtomicLong; public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends BaseSubscriptionDstu3Test { private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchingSubscriberTest.class); @@ -67,8 +65,6 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base protected static List ourUpdatedObservations = Collections.synchronizedList(Lists.newArrayList()); protected static List ourContentTypes = Collections.synchronizedList(new ArrayList<>()); private static SubscribableChannel ourSubscribableChannel; - private List mySubscriptionIds = Collections.synchronizedList(new ArrayList<>()); - protected static AtomicLong idCounter = new AtomicLong(); protected PointcutLatch mySubscriptionMatchingPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED); protected PointcutLatch mySubscriptionActivatedPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED); @@ -101,32 +97,16 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base } protected Subscription sendSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException { - Subscription subscription = returnedActiveSubscription(theCriteria, thePayload, theEndpoint); + Subscription subscription = makeActiveSubscription(theCriteria, thePayload, theEndpoint); mySubscriptionActivatedPost.setExpectedCount(1); Subscription retval = sendResource(subscription); mySubscriptionActivatedPost.awaitExpected(); return retval; } - protected Subscription returnedActiveSubscription(String theCriteria, String thePayload, String theEndpoint) { - Subscription subscription = new Subscription(); - subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)"); - subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE); - subscription.setCriteria(theCriteria); - IdType id = new IdType("Subscription", idCounter.incrementAndGet()); - subscription.setId(id); - - Subscription.SubscriptionChannelComponent channel = new Subscription.SubscriptionChannelComponent(); - channel.setType(Subscription.SubscriptionChannelType.RESTHOOK); - channel.setPayload(thePayload); - channel.setEndpoint(theEndpoint); - subscription.setChannel(channel); - return subscription; - } - protected Observation sendObservation(String code, String system) throws InterruptedException { Observation observation = new Observation(); - IdType id = new IdType("Observation", idCounter.incrementAndGet()); + IdType id = new IdType("Observation", nextId()); observation.setId(id); CodeableConcept codeableConcept = new CodeableConcept(); diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SearchParamLoaderTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SearchParamLoaderTest.java old mode 100644 new mode 100755 index 753fb291302..62f54735eda --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SearchParamLoaderTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SearchParamLoaderTest.java @@ -12,6 +12,7 @@ import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import java.util.Arrays; +import java.util.Collections; import static org.junit.Assert.assertEquals; @@ -32,18 +33,8 @@ public class SearchParamLoaderTest extends BaseBlockingQueueSubscribableChannelD myMockFhirClientSearchParamProvider.setFailCount(0); } - @Before - public void zeroRetryDelay() { - mySearchParamRegistry.setSecondsBetweenRetriesForTesting(0); - } - - @After - public void restoreRetryDelay() { - mySearchParamRegistry.setSecondsBetweenRetriesForTesting(mySearchParamRegistry.INITIAL_SECONDS_BETWEEN_RETRIES); - } - @Test - public void testSubscriptionLoaderFhirClientDown() throws Exception { + public void testSubscriptionLoaderFhirClientDown() { String criteria = "BodySite?accessType=Catheter,PD%20Catheter"; SearchParameter sp = new SearchParameter(); @@ -54,7 +45,7 @@ public class SearchParamLoaderTest extends BaseBlockingQueueSubscribableChannelD sp.setXpathUsage(SearchParameter.XPathUsageType.NORMAL); sp.setStatus(Enumerations.PublicationStatus.ACTIVE); - IBundleProvider bundle = new SimpleBundleProvider(Arrays.asList(sp), "uuid"); + IBundleProvider bundle = new SimpleBundleProvider(Collections.singletonList(sp), "uuid"); initSearchParamRegistry(bundle); assertEquals(0, myMockFhirClientSearchParamProvider.getFailCount()); } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandlerTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandlerTest.java new file mode 100644 index 00000000000..27a3571d3be --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandlerTest.java @@ -0,0 +1,48 @@ +package ca.uhn.fhir.jpa.subscription.module.standalone; + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test; +import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; +import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; +import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage; +import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber; +import org.hl7.fhir.dstu3.model.Subscription; +import org.junit.Test; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.mock.mockito.MockBean; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; + +public class StandaloneSubscriptionMessageHandlerTest extends BaseSubscriptionDstu3Test { + + @Autowired + StandaloneSubscriptionMessageHandler myStandaloneSubscriptionMessageHandler; + @Autowired + FhirContext myFhirContext; + @MockBean + SubscriptionMatchingSubscriber mySubscriptionMatchingSubscriber; + @MockBean + SubscriptionRegistry mySubscriptionRegistry; + + @Test + public void activeSubscriptionIsRegistered() { + Subscription subscription = makeActiveSubscription("testCriteria", "testPayload", "testEndpoint"); + ResourceModifiedMessage message = new ResourceModifiedMessage(myFhirContext, subscription, ResourceModifiedMessage.OperationTypeEnum.CREATE); + ResourceModifiedJsonMessage jsonMessage = new ResourceModifiedJsonMessage(message); + myStandaloneSubscriptionMessageHandler.handleMessage(jsonMessage); + Mockito.verify(mySubscriptionRegistry).registerSubscriptionUnlessAlreadyRegistered(any()); + Mockito.verify(mySubscriptionMatchingSubscriber).matchActiveSubscriptionsAndDeliver(any()); + } + + @Test + public void requestedSubscriptionNotRegistered() { + Subscription subscription = makeSubscriptionWithStatus("testCriteria", "testPayload", "testEndpoint", Subscription.SubscriptionStatus.REQUESTED); + ResourceModifiedMessage message = new ResourceModifiedMessage(myFhirContext, subscription, ResourceModifiedMessage.OperationTypeEnum.CREATE); + ResourceModifiedJsonMessage jsonMessage = new ResourceModifiedJsonMessage(message); + myStandaloneSubscriptionMessageHandler.handleMessage(jsonMessage); + Mockito.verify(mySubscriptionRegistry, never()).registerSubscriptionUnlessAlreadyRegistered(any()); + Mockito.verify(mySubscriptionMatchingSubscriber).matchActiveSubscriptionsAndDeliver(any()); + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderFhirClientTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderFhirClientTest.java index 3d57b5e9011..578e2da944f 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderFhirClientTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderFhirClientTest.java @@ -12,9 +12,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static org.hamcrest.Matchers.lessThan; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscribableChannelDstu3Test { @Test @@ -28,8 +26,8 @@ public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscriba String criteria2 = "Observation?code=SNOMED-CT|" + myCode + "111&_format=xml"; List subs = new ArrayList<>(); - subs.add(returnedActiveSubscription(criteria1, payload, ourListenerServerBase)); - subs.add(returnedActiveSubscription(criteria2, payload, ourListenerServerBase)); + subs.add(makeActiveSubscription(criteria1, payload, ourListenerServerBase)); + subs.add(makeActiveSubscription(criteria2, payload, ourListenerServerBase)); IBundleProvider bundle = new SimpleBundleProvider(new ArrayList<>(subs), "uuid"); initSubscriptionLoader(bundle); @@ -53,8 +51,8 @@ public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscriba String criteria2 = "Observation?code=SNOMED-CT|" + myCode + "111&_format=xml"; List subs = new ArrayList<>(); - subs.add(returnedActiveSubscription(criteria1, payload, ourListenerServerBase).setStatus(Subscription.SubscriptionStatus.REQUESTED)); - subs.add(returnedActiveSubscription(criteria2, payload, ourListenerServerBase).setStatus(Subscription.SubscriptionStatus.REQUESTED)); + subs.add(makeActiveSubscription(criteria1, payload, ourListenerServerBase).setStatus(Subscription.SubscriptionStatus.REQUESTED)); + subs.add(makeActiveSubscription(criteria2, payload, ourListenerServerBase).setStatus(Subscription.SubscriptionStatus.REQUESTED)); IBundleProvider bundle = new SimpleBundleProvider(new ArrayList<>(subs), "uuid"); initSubscriptionLoader(bundle); diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderTest.java old mode 100644 new mode 100755 index ff675029fe0..0def5f74047 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderTest.java @@ -1,6 +1,5 @@ package ca.uhn.fhir.jpa.subscription.module.standalone; -import ca.uhn.fhir.jpa.searchparam.registry.BaseSearchParamRegistry; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader; import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSubscriptionProvider; import ca.uhn.fhir.rest.api.server.IBundleProvider; @@ -33,26 +32,16 @@ public class SubscriptionLoaderTest extends BaseBlockingQueueSubscribableChannel myMockFhirClientSubscriptionProvider.setFailCount(0); } - @Before - public void zeroRetryDelay() { - mySubscriptionLoader.setSecondsBetweenRetriesForTesting(0); - } - - @After - public void restoreRetryDelay() { - mySubscriptionLoader.setSecondsBetweenRetriesForTesting(BaseSearchParamRegistry.INITIAL_SECONDS_BETWEEN_RETRIES); - } - @Test - public void testSubscriptionLoaderFhirClientDown() throws Exception { + public void testSubscriptionLoaderFhirClientDown() { String payload = "application/fhir+json"; String criteria1 = "Observation?code=SNOMED-CT|" + myCode + "&_format=xml"; String criteria2 = "Observation?code=SNOMED-CT|" + myCode + "111&_format=xml"; List subs = new ArrayList<>(); - subs.add(returnedActiveSubscription(criteria1, payload, ourListenerServerBase)); - subs.add(returnedActiveSubscription(criteria2, payload, ourListenerServerBase)); + subs.add(makeActiveSubscription(criteria1, payload, ourListenerServerBase)); + subs.add(makeActiveSubscription(criteria2, payload, ourListenerServerBase)); IBundleProvider bundle = new SimpleBundleProvider(new ArrayList<>(subs), "uuid"); initSubscriptionLoader(bundle); diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionCheckingSubscriberTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionCheckingSubscriberTest.java index 2ff76686ceb..b946afddd06 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionCheckingSubscriberTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionCheckingSubscriberTest.java @@ -95,7 +95,7 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri ourObservationListener.setExpectedCount(1); Observation observation = new Observation(); - IdType id = new IdType("Observation", idCounter.incrementAndGet()); + IdType id = new IdType("Observation", nextId()); observation.setId(id); // Reference has display only! diff --git a/hapi-fhir-spring-boot/hapi-fhir-spring-boot-autoconfigure/pom.xml b/hapi-fhir-spring-boot/hapi-fhir-spring-boot-autoconfigure/pom.xml index 0a89d730295..5649cbba2b1 100644 --- a/hapi-fhir-spring-boot/hapi-fhir-spring-boot-autoconfigure/pom.xml +++ b/hapi-fhir-spring-boot/hapi-fhir-spring-boot-autoconfigure/pom.xml @@ -121,7 +121,7 @@ org.springframework.boot spring-boot-dependencies - ${spring-boot.version} + ${spring_boot_version} pom import diff --git a/hapi-fhir-spring-boot/hapi-fhir-spring-boot-samples/pom.xml b/hapi-fhir-spring-boot/hapi-fhir-spring-boot-samples/pom.xml index 153f1fa3d7a..27317a44d4d 100644 --- a/hapi-fhir-spring-boot/hapi-fhir-spring-boot-samples/pom.xml +++ b/hapi-fhir-spring-boot/hapi-fhir-spring-boot-samples/pom.xml @@ -23,7 +23,7 @@ org.springframework.boot spring-boot-dependencies - ${spring-boot.version} + ${spring_boot_version} pom import diff --git a/hapi-fhir-spring-boot/hapi-fhir-spring-boot-starter/pom.xml b/hapi-fhir-spring-boot/hapi-fhir-spring-boot-starter/pom.xml index 86ecbc36d5f..ab9fb0c48a6 100644 --- a/hapi-fhir-spring-boot/hapi-fhir-spring-boot-starter/pom.xml +++ b/hapi-fhir-spring-boot/hapi-fhir-spring-boot-starter/pom.xml @@ -30,7 +30,7 @@ org.springframework.boot spring-boot-dependencies - ${spring-boot.version} + ${spring_boot_version} pom import diff --git a/hapi-fhir-spring-boot/pom.xml b/hapi-fhir-spring-boot/pom.xml index 1e3fc36f83a..e7fb691f6b1 100644 --- a/hapi-fhir-spring-boot/pom.xml +++ b/hapi-fhir-spring-boot/pom.xml @@ -23,7 +23,7 @@ org.springframework.boot spring-boot-dependencies - ${spring-boot.version} + ${spring_boot_version} pom import diff --git a/pom.xml b/pom.xml old mode 100644 new mode 100755 index d44cf9ca083..384e27cf290 --- a/pom.xml +++ b/pom.xml @@ -563,7 +563,8 @@ 1.7.25 5.1.3.RELEASE 2.1.3.RELEASE - 2.1.1.RELEASE + 2.1.1.RELEASE + 1.2.2.RELEASE 3.1.4 3.0.11.RELEASE @@ -1255,6 +1256,11 @@ spring-test ${spring_version} + + org.springframework.boot + spring-boot-starter-test + ${spring_boot_version} + org.springframework spring-tx @@ -1275,6 +1281,11 @@ spring-websocket ${spring_version} + + org.springframework.retry + spring-retry + ${spring_retry_version} + org.thymeleaf thymeleaf @@ -1328,7 +1339,7 @@ org.springframework.boot spring-boot-maven-plugin - ${spring-boot.version} + ${spring_boot_version} org.sonatype.plugins