Retrier now uses RetryTemplate

This commit is contained in:
Ken Stevens 2019-01-30 22:34:26 -05:00
parent adb037e805
commit 4b298fe40e
9 changed files with 74 additions and 116 deletions

View File

@ -42,9 +42,9 @@ import java.util.List;
import static org.fusesource.jansi.Ansi.ansi;
public abstract class BaseApp {
public static final String STACKFILTER_PATTERN = "%xEx{full, sun.reflect, org.junit, org.eclipse, java.lang.reflect.Method, org.springframework, org.hibernate, com.sun.proxy, org.attoparser, org.thymeleaf}";
public static final String STACKFILTER_PATTERN_PROP = "log.stackfilter.pattern";
public static final String LINESEP = System.getProperty("line.separator");
private static final String STACKFILTER_PATTERN = "%xEx{full, sun.reflect, org.junit, org.eclipse, java.lang.reflect.Method, org.springframework, org.hibernate, com.sun.proxy, org.attoparser, org.thymeleaf}";
private static final String STACKFILTER_PATTERN_PROP = "log.stackfilter.pattern";
static final String LINESEP = System.getProperty("line.separator");
protected static final org.slf4j.Logger ourLog;
private static List<BaseCommand> ourCommands;
@ -145,7 +145,7 @@ public abstract class BaseApp {
protected abstract String provideCommandName();
public List<BaseCommand> provideCommands() {
List<BaseCommand> provideCommands() {
ArrayList<BaseCommand> commands = new ArrayList<>();
commands.add(new RunServerCommand());
commands.add(new ExampleDataUploader());

5
hapi-fhir-jpaserver-searchparam/pom.xml Normal file → Executable file
View File

@ -86,7 +86,10 @@
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
<!-- Testing -->
<dependency>

View File

@ -48,8 +48,6 @@ public abstract class BaseSearchParamRegistry<SP extends IBaseResource> implemen
private static final int MAX_MANAGED_PARAM_COUNT = 10000;
private static final Logger ourLog = LoggerFactory.getLogger(BaseSearchParamRegistry.class);
@VisibleForTesting
public static final int INITIAL_SECONDS_BETWEEN_RETRIES = 5;
private static long REFRESH_INTERVAL = 60 * DateUtils.MILLIS_PER_MINUTE;
private static final int MAX_RETRIES = 60; // 5 minutes
@ -60,7 +58,6 @@ public abstract class BaseSearchParamRegistry<SP extends IBaseResource> implemen
@Autowired
private FhirContext myFhirContext;
private volatile int mySecondsBetweenRetries = INITIAL_SECONDS_BETWEEN_RETRIES;
private Map<String, Map<String, RuntimeSearchParam>> myBuiltInSearchParams;
private volatile Map<String, List<JpaRuntimeSearchParam>> myActiveUniqueSearchParams = Collections.emptyMap();
private volatile Map<String, Map<Set<String>, List<JpaRuntimeSearchParam>>> myActiveParamNamesToUniqueSearchParams = Collections.emptyMap();
@ -85,7 +82,7 @@ public abstract class BaseSearchParamRegistry<SP extends IBaseResource> implemen
return myActiveSearchParams.get(theResourceName);
}
void requiresActiveSearchParams() {
private void requiresActiveSearchParams() {
if (myActiveSearchParams == null) {
refreshCacheWithRetry();
}
@ -120,11 +117,7 @@ public abstract class BaseSearchParamRegistry<SP extends IBaseResource> implemen
}
private Map<String, RuntimeSearchParam> getSearchParamMap(Map<String, Map<String, RuntimeSearchParam>> searchParams, String theResourceName) {
Map<String, RuntimeSearchParam> retVal = searchParams.get(theResourceName);
if (retVal == null) {
retVal = new HashMap<>();
searchParams.put(theResourceName, retVal);
}
Map<String, RuntimeSearchParam> retVal = searchParams.computeIfAbsent(theResourceName, k -> new HashMap<>());
return retVal;
}
@ -139,11 +132,7 @@ public abstract class BaseSearchParamRegistry<SP extends IBaseResource> implemen
* Loop through parameters and find JPA params
*/
for (Map.Entry<String, Map<String, RuntimeSearchParam>> nextResourceNameToEntries : theActiveSearchParams.entrySet()) {
List<JpaRuntimeSearchParam> uniqueSearchParams = activeUniqueSearchParams.get(nextResourceNameToEntries.getKey());
if (uniqueSearchParams == null) {
uniqueSearchParams = new ArrayList<>();
activeUniqueSearchParams.put(nextResourceNameToEntries.getKey(), uniqueSearchParams);
}
List<JpaRuntimeSearchParam> uniqueSearchParams = activeUniqueSearchParams.computeIfAbsent(nextResourceNameToEntries.getKey(), k -> new ArrayList<>());
Collection<RuntimeSearchParam> nextSearchParamsForResourceName = nextResourceNameToEntries.getValue().values();
for (RuntimeSearchParam nextCandidate : nextSearchParamsForResourceName) {
@ -181,7 +170,7 @@ public abstract class BaseSearchParamRegistry<SP extends IBaseResource> implemen
}
if (next.getCompositeOf() != null) {
Collections.sort(next.getCompositeOf(), new Comparator<RuntimeSearchParam>() {
next.getCompositeOf().sort(new Comparator<RuntimeSearchParam>() {
@Override
public int compare(RuntimeSearchParam theO1, RuntimeSearchParam theO2) {
return StringUtils.compare(theO1.getName(), theO2.getName());
@ -192,7 +181,7 @@ public abstract class BaseSearchParamRegistry<SP extends IBaseResource> implemen
activeParamNamesToUniqueSearchParams.put(nextBase, new HashMap<>());
}
if (!activeParamNamesToUniqueSearchParams.get(nextBase).containsKey(paramNames)) {
activeParamNamesToUniqueSearchParams.get(nextBase).put(paramNames, new ArrayList<JpaRuntimeSearchParam>());
activeParamNamesToUniqueSearchParams.get(nextBase).put(paramNames, new ArrayList<>());
}
activeParamNamesToUniqueSearchParams.get(nextBase).get(paramNames).add(next);
}
@ -339,7 +328,7 @@ public abstract class BaseSearchParamRegistry<SP extends IBaseResource> implemen
}
synchronized int refreshCacheWithRetry() {
Retrier<Integer> refreshCacheRetrier = new Retrier(() -> mySearchParamProvider.refreshCache(this, REFRESH_INTERVAL), MAX_RETRIES, mySecondsBetweenRetries, "refresh search parameter registry");
Retrier<Integer> refreshCacheRetrier = new Retrier(() -> mySearchParamProvider.refreshCache(this, REFRESH_INTERVAL), MAX_RETRIES);
return refreshCacheRetrier.runWithRetry();
}
@ -355,11 +344,6 @@ public abstract class BaseSearchParamRegistry<SP extends IBaseResource> implemen
}
}
@VisibleForTesting
public void setSecondsBetweenRetriesForTesting(int theSecondsBetweenRetries) {
mySecondsBetweenRetries = theSecondsBetweenRetries;
}
@Override
public Map<String, Map<String, RuntimeSearchParam>> getActiveSearchParams() {
requiresActiveSearchParams();

View File

@ -20,9 +20,13 @@ package ca.uhn.fhir.jpa.searchparam.retry;
* #L%
*/
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import java.util.function.Supplier;
@ -30,34 +34,27 @@ public class Retrier<T> {
private static final Logger ourLog = LoggerFactory.getLogger(Retrier.class);
private final Supplier<T> mySupplier;
private final int myMaxRetries;
private final int mySecondsBetweenRetries;
private final String myDescription;
public Retrier(Supplier<T> theSupplier, int theMaxRetries, int theSecondsBetweenRetries, String theDescription) {
private final RetryTemplate myRetryTemplate;
public Retrier(Supplier<T> theSupplier, int theMaxRetries) {
Validate.isTrue(theMaxRetries > 0, "maxRetries must be above zero.");
mySupplier = theSupplier;
myMaxRetries = theMaxRetries;
mySecondsBetweenRetries = theSecondsBetweenRetries;
myDescription = theDescription;
myRetryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOff = new ExponentialBackOffPolicy();
backOff.setInitialInterval(500);
backOff.setMaxInterval(DateUtils.MILLIS_PER_MINUTE);
backOff.setMultiplier(2);
myRetryTemplate.setBackOffPolicy(backOff);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(theMaxRetries);
myRetryTemplate.setRetryPolicy(retryPolicy);
}
public T runWithRetry() {
RuntimeException lastException = new IllegalStateException("maxRetries must be above zero.");
for (int retryCount = 1; retryCount <= myMaxRetries; ++retryCount) {
try {
return mySupplier.get();
} catch(RuntimeException e) {
ourLog.trace("Failure during retry: {}", e.getMessage(), e); // with stacktrace if it's ever needed
ourLog.info("Failed to {}. Attempt {} / {}: {}", myDescription, retryCount, myMaxRetries, e.getMessage());
lastException = e;
try {
Thread.sleep(mySecondsBetweenRetries * DateUtils.MILLIS_PER_SECOND);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw lastException;
}
}
}
throw lastException;
return myRetryTemplate.execute(retryContext -> mySupplier.get());
}
}

View File

@ -1,17 +1,24 @@
package ca.uhn.fhir.jpa.searchparam.retry;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class RetrierTest {
@Rule
public ExpectedException myExpectedException = ExpectedException.none();
@Test
public void happyPath() {
Supplier<Boolean> supplier = () -> true;
Retrier<Boolean> retrier = new Retrier<>(supplier, 5, 0, "test");
Retrier<Boolean> retrier = new Retrier<>(supplier, 5);
assertTrue(retrier.runWithRetry());
}
@ -22,7 +29,7 @@ public class RetrierTest {
if (counter.incrementAndGet() < 3) throw new RetryRuntimeException("test");
return true;
};
Retrier<Boolean> retrier = new Retrier<>(supplier, 5, 0, "test");
Retrier<Boolean> retrier = new Retrier<>(supplier, 5);
assertTrue(retrier.runWithRetry());
assertEquals(3, counter.get());
}
@ -31,17 +38,16 @@ public class RetrierTest {
public void failMaxRetries() {
AtomicInteger counter = new AtomicInteger();
Supplier<Boolean> supplier = () -> {
if (counter.incrementAndGet() < 10) throw new RetryRuntimeException("test");
if (counter.incrementAndGet() < 3) throw new RetryRuntimeException("test failure message");
return true;
};
Retrier<Boolean> retrier = new Retrier<>(supplier, 5, 0, "test");
try {
Retrier<Boolean> retrier = new Retrier<>(supplier, 1);
myExpectedException.expect(RetryRuntimeException.class);
myExpectedException.expectMessage("test failure message");
retrier.runWithRetry();
fail();
} catch (RetryRuntimeException e) {
assertEquals(5, counter.get());
}
}
@Test
public void failMaxRetriesZero() {
@ -50,14 +56,10 @@ public class RetrierTest {
if (counter.incrementAndGet() < 10) throw new RetryRuntimeException("test");
return true;
};
Retrier<Boolean> retrier = new Retrier<>(supplier, 0, 0, "test");
try {
retrier.runWithRetry();
fail();
} catch (IllegalStateException e) {
myExpectedException.expect(IllegalArgumentException.class);
myExpectedException.expectMessage("maxRetries must be above zero.");
Retrier<Boolean> retrier = new Retrier<>(supplier, 0);
assertEquals(0, counter.get());
assertEquals("maxRetries must be above zero." ,e.getMessage());
}
}
@Test
@ -67,17 +69,12 @@ public class RetrierTest {
if (counter.incrementAndGet() < 10) throw new RetryRuntimeException("test");
return true;
};
Retrier<Boolean> retrier = new Retrier<>(supplier, -1, 0, "test");
try {
retrier.runWithRetry();
fail();
} catch (IllegalStateException e) {
myExpectedException.expect(IllegalArgumentException.class);
myExpectedException.expectMessage("maxRetries must be above zero.");
Retrier<Boolean> retrier = new Retrier<>(supplier, -1);
assertEquals(0, counter.get());
assertEquals("maxRetries must be above zero." ,e.getMessage());
}
}
class RetryRuntimeException extends RuntimeException {
RetryRuntimeException(String message) {

View File

@ -46,8 +46,6 @@ import java.util.concurrent.Semaphore;
@Lazy
public class SubscriptionLoader {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionLoader.class);
@VisibleForTesting
public static final int INITIAL_SECONDS_BETWEEN_RETRIES = 5;
private static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes
@Autowired
@ -58,8 +56,6 @@ public class SubscriptionLoader {
private final Object mySyncSubscriptionsLock = new Object();
private Semaphore mySyncSubscriptionsSemaphore = new Semaphore(1);
private volatile int mySecondsBetweenRetries = INITIAL_SECONDS_BETWEEN_RETRIES;
/**
* Read the existing subscriptions from the database
*/
@ -82,7 +78,7 @@ public class SubscriptionLoader {
}
synchronized int doSyncSubscriptionsWithRetry() {
Retrier<Integer> syncSubscriptionRetrier = new Retrier(() -> doSyncSubscriptions(), MAX_RETRIES, mySecondsBetweenRetries, "sync subscriptions");
Retrier<Integer> syncSubscriptionRetrier = new Retrier<>(this::doSyncSubscriptions, MAX_RETRIES);
return syncSubscriptionRetrier.runWithRetry();
}
@ -126,10 +122,5 @@ public class SubscriptionLoader {
public void setSubscriptionProviderForUnitTest(ISubscriptionProvider theSubscriptionProvider) {
mySubscriptionProvidor = theSubscriptionProvider;
}
@VisibleForTesting
public void setSecondsBetweenRetriesForTesting(int theSecondsBetweenRetries) {
mySecondsBetweenRetries = theSecondsBetweenRetries;
}
}

View File

@ -12,6 +12,7 @@ import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Arrays;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
@ -32,18 +33,8 @@ public class SearchParamLoaderTest extends BaseBlockingQueueSubscribableChannelD
myMockFhirClientSearchParamProvider.setFailCount(0);
}
@Before
public void zeroRetryDelay() {
mySearchParamRegistry.setSecondsBetweenRetriesForTesting(0);
}
@After
public void restoreRetryDelay() {
mySearchParamRegistry.setSecondsBetweenRetriesForTesting(mySearchParamRegistry.INITIAL_SECONDS_BETWEEN_RETRIES);
}
@Test
public void testSubscriptionLoaderFhirClientDown() throws Exception {
public void testSubscriptionLoaderFhirClientDown() {
String criteria = "BodySite?accessType=Catheter,PD%20Catheter";
SearchParameter sp = new SearchParameter();
@ -54,7 +45,7 @@ public class SearchParamLoaderTest extends BaseBlockingQueueSubscribableChannelD
sp.setXpathUsage(SearchParameter.XPathUsageType.NORMAL);
sp.setStatus(Enumerations.PublicationStatus.ACTIVE);
IBundleProvider bundle = new SimpleBundleProvider(Arrays.asList(sp), "uuid");
IBundleProvider bundle = new SimpleBundleProvider(Collections.singletonList(sp), "uuid");
initSearchParamRegistry(bundle);
assertEquals(0, myMockFhirClientSearchParamProvider.getFailCount());
}

View File

@ -1,6 +1,5 @@
package ca.uhn.fhir.jpa.subscription.module.standalone;
import ca.uhn.fhir.jpa.searchparam.registry.BaseSearchParamRegistry;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSubscriptionProvider;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
@ -33,18 +32,8 @@ public class SubscriptionLoaderTest extends BaseBlockingQueueSubscribableChannel
myMockFhirClientSubscriptionProvider.setFailCount(0);
}
@Before
public void zeroRetryDelay() {
mySubscriptionLoader.setSecondsBetweenRetriesForTesting(0);
}
@After
public void restoreRetryDelay() {
mySubscriptionLoader.setSecondsBetweenRetriesForTesting(BaseSearchParamRegistry.INITIAL_SECONDS_BETWEEN_RETRIES);
}
@Test
public void testSubscriptionLoaderFhirClientDown() throws Exception {
public void testSubscriptionLoaderFhirClientDown() {
String payload = "application/fhir+json";
String criteria1 = "Observation?code=SNOMED-CT|" + myCode + "&_format=xml";

6
pom.xml Normal file → Executable file
View File

@ -554,6 +554,7 @@
<spring_version>5.1.3.RELEASE</spring_version>
<spring_data_version>2.1.3.RELEASE</spring_data_version>
<spring-boot.version>2.1.1.RELEASE</spring-boot.version>
<spring_retry_version>1.2.2.RELEASE</spring_retry_version>
<stax2_api_version>3.1.4</stax2_api_version>
<thymeleaf-version>3.0.11.RELEASE</thymeleaf-version>
@ -1265,6 +1266,11 @@
<artifactId>spring-websocket</artifactId>
<version>${spring_version}</version>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>${spring_retry_version}</version>
</dependency>
<dependency>
<groupId>org.thymeleaf</groupId>
<artifactId>thymeleaf</artifactId>