diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 8ef0723a340..601a9ce5990 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -440,6 +440,9 @@ Release 2.8.0 - UNRELEASED HADOOP-12825. Log slow name resolutions. (Sidharta Seethana via stevel) + HADOOP-12622. Improve the loggings in RetryPolicies and RetryInvocationHandler. + (Junping Du via jianhe) + OPTIMIZATIONS HADOOP-11785. Reduce the number of listStatus operation in distcp diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index 5d94c3b13d2..d57dc84ffc9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -121,6 +121,7 @@ public class RetryInvocationHandler implements RpcInvocationHandler { invocationFailoverCount, isIdempotentOrAtMostOnce); RetryAction failAction = getFailAction(actions); if (failAction != null) { + // fail. if (failAction.reason != null) { LOG.warn("Exception while invoking " + currentProxy.proxy.getClass() + "." + method.getName() + " over " + currentProxy.proxyInfo @@ -136,7 +137,8 @@ public class RetryInvocationHandler implements RpcInvocationHandler { worthLogging |= LOG.isDebugEnabled(); RetryAction failOverAction = getFailOverAction(actions); long delay = getDelayMillis(actions); - if (failOverAction != null && worthLogging) { + + if (worthLogging) { String msg = "Exception while invoking " + method.getName() + " of class " + currentProxy.proxy.getClass().getSimpleName() + " over " + currentProxy.proxyInfo; @@ -144,21 +146,21 @@ public class RetryInvocationHandler implements RpcInvocationHandler { if (invocationFailoverCount > 0) { msg += " after " + invocationFailoverCount + " fail over attempts"; } - msg += ". Trying to fail over " + formatSleepMessage(delay); - LOG.info(msg, ex); - } else { - if(LOG.isDebugEnabled()) { - LOG.debug("Exception while invoking " + method.getName() - + " of class " + currentProxy.proxy.getClass().getSimpleName() - + " over " + currentProxy.proxyInfo + ". Retrying " - + formatSleepMessage(delay), ex); + + if (failOverAction != null) { + // failover + msg += ". Trying to fail over " + formatSleepMessage(delay); + } else { + // retry + msg += ". Retrying " + formatSleepMessage(delay); } + LOG.info(msg, ex); } if (delay > 0) { Thread.sleep(delay); } - + if (failOverAction != null) { // Make sure that concurrent failed method invocations only cause a // single actual fail over. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java index 171d52af7ca..131aa8feb80 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -39,6 +39,8 @@ import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import com.google.common.annotations.VisibleForTesting; + /** *

* A collection of useful implementations of {@link RetryPolicy}. @@ -177,10 +179,11 @@ public class RetryPolicies { @Override public RetryAction shouldRetry(Exception e, int retries, int failovers, boolean isIdempotentOrAtMostOnce) throws Exception { - return RetryAction.FAIL; + return new RetryAction(RetryAction.RetryDecision.FAIL, 0, "try once " + + "and fail."); } } - + static class RetryForever implements RetryPolicy { @Override public RetryAction shouldRetry(Exception e, int retries, int failovers, @@ -221,14 +224,24 @@ public class RetryPolicies { public RetryAction shouldRetry(Exception e, int retries, int failovers, boolean isIdempotentOrAtMostOnce) throws Exception { if (retries >= maxRetries) { - return RetryAction.FAIL; + return new RetryAction(RetryAction.RetryDecision.FAIL, 0 , getReason()); } return new RetryAction(RetryAction.RetryDecision.RETRY, - timeUnit.toMillis(calculateSleepTime(retries))); + timeUnit.toMillis(calculateSleepTime(retries)), getReason()); } - + + protected String getReason() { + return constructReasonString(maxRetries); + } + + @VisibleForTesting + public static String constructReasonString(int retries) { + return "retries get failed due to exceeded maximum allowed retries " + + "number: " + retries; + } + protected abstract long calculateSleepTime(int retries); - + @Override public int hashCode() { return toString().hashCode(); @@ -264,18 +277,37 @@ public class RetryPolicies { return sleepTime; } } - - static class RetryUpToMaximumTimeWithFixedSleep extends RetryUpToMaximumCountWithFixedSleep { - public RetryUpToMaximumTimeWithFixedSleep(long maxTime, long sleepTime, TimeUnit timeUnit) { + + static class RetryUpToMaximumTimeWithFixedSleep extends + RetryUpToMaximumCountWithFixedSleep { + private long maxTime = 0; + private TimeUnit timeUnit; + + public RetryUpToMaximumTimeWithFixedSleep(long maxTime, long sleepTime, + TimeUnit timeUnit) { super((int) (maxTime / sleepTime), sleepTime, timeUnit); + this.maxTime = maxTime; + this.timeUnit = timeUnit; + } + + @Override + protected String getReason() { + return constructReasonString(this.maxTime, this.timeUnit); + } + + @VisibleForTesting + public static String constructReasonString(long maxTime, + TimeUnit timeUnit) { + return "retries get failed due to exceeded maximum allowed time (" + + "in " + timeUnit.toString() + "): " + maxTime; } } - + static class RetryUpToMaximumCountWithProportionalSleep extends RetryLimited { public RetryUpToMaximumCountWithProportionalSleep(int maxRetries, long sleepTime, TimeUnit timeUnit) { super(maxRetries, sleepTime, timeUnit); } - + @Override protected long calculateSleepTime(int retries) { return sleepTime * (retries + 1); @@ -332,7 +364,8 @@ public class RetryPolicies { final Pair p = searchPair(curRetry); if (p == null) { //no more retries. - return RetryAction.FAIL; + return new RetryAction(RetryAction.RetryDecision.FAIL, 0 , "Retry " + + "all pairs in MultipleLinearRandomRetry: " + pairs); } //calculate sleep time and return. @@ -549,6 +582,7 @@ public class RetryPolicies { protected long calculateSleepTime(int retries) { return calculateExponentialTime(sleepTime, retries + 1); } + } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestRetryProxy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestRetryProxy.java index 35a45b4f731..4137daec54c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestRetryProxy.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestRetryProxy.java @@ -28,6 +28,15 @@ import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWith import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumTimeWithFixedSleep; import static org.apache.hadoop.io.retry.RetryPolicies.retryForeverWithFixedSleep; import static org.apache.hadoop.io.retry.RetryPolicies.exponentialBackoffRetry; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import static org.junit.Assert.*; import java.io.IOException; @@ -41,10 +50,19 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; +import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision; +import org.apache.hadoop.io.retry.RetryPolicies.RetryUpToMaximumCountWithFixedSleep; +import org.apache.hadoop.io.retry.RetryPolicies.RetryUpToMaximumTimeWithFixedSleep; +import org.apache.hadoop.io.retry.RetryPolicies.TryOnceThenFail; import org.apache.hadoop.io.retry.UnreliableInterface.FatalException; import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException; import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RemoteException; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + import org.junit.Before; import org.junit.Test; @@ -53,25 +71,57 @@ import java.lang.reflect.UndeclaredThrowableException; public class TestRetryProxy { private UnreliableImplementation unreliableImpl; + private RetryAction caughtRetryAction = null; @Before public void setUp() throws Exception { unreliableImpl = new UnreliableImplementation(); } + // answer mockPolicy's method with realPolicy, caught method's return value + private void setupMockPolicy(RetryPolicy mockPolicy, + final RetryPolicy realPolicy) throws Exception { + when(mockPolicy.shouldRetry(any(Exception.class), anyInt(), anyInt(), + anyBoolean())).thenAnswer(new Answer() { + @SuppressWarnings("rawtypes") + @Override + public RetryAction answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + Exception e = (Exception) args[0]; + int retries = (int) args[1]; + int failovers = (int) args[2]; + boolean isIdempotentOrAtMostOnce = (boolean) args[3]; + caughtRetryAction = realPolicy.shouldRetry(e, retries, failovers, + isIdempotentOrAtMostOnce); + return caughtRetryAction; + } + }); + } + @Test - public void testTryOnceThenFail() throws UnreliableException { + public void testTryOnceThenFail() throws Exception { + RetryPolicy policy = mock(TryOnceThenFail.class); + RetryPolicy realPolicy = TRY_ONCE_THEN_FAIL; + setupMockPolicy(policy, realPolicy); + UnreliableInterface unreliable = (UnreliableInterface) - RetryProxy.create(UnreliableInterface.class, unreliableImpl, TRY_ONCE_THEN_FAIL); + RetryProxy.create(UnreliableInterface.class, unreliableImpl, policy); unreliable.alwaysSucceeds(); try { unreliable.failsOnceThenSucceeds(); fail("Should fail"); } catch (UnreliableException e) { // expected + verify(policy, times(1)).shouldRetry(any(Exception.class), anyInt(), + anyInt(), anyBoolean()); + assertEquals(RetryDecision.FAIL, caughtRetryAction.action); + assertEquals("try once and fail.", caughtRetryAction.reason); + } catch (Exception e) { + fail("Other exception other than UnreliableException should also get " + + "failed."); } } - + /** * Test for {@link RetryInvocationHandler#isRpcInvocation(Object)} */ @@ -125,25 +175,48 @@ public class TestRetryProxy { } @Test - public void testRetryUpToMaximumCountWithFixedSleep() throws UnreliableException { + public void testRetryUpToMaximumCountWithFixedSleep() throws + Exception { + + RetryPolicy policy = mock(RetryUpToMaximumCountWithFixedSleep.class); + int maxRetries = 8; + RetryPolicy realPolicy = retryUpToMaximumCountWithFixedSleep(maxRetries, 1, + TimeUnit.NANOSECONDS); + setupMockPolicy(policy, realPolicy); + UnreliableInterface unreliable = (UnreliableInterface) - RetryProxy.create(UnreliableInterface.class, unreliableImpl, - retryUpToMaximumCountWithFixedSleep(8, 1, TimeUnit.NANOSECONDS)); + RetryProxy.create(UnreliableInterface.class, unreliableImpl, policy); + // shouldRetry += 1 unreliable.alwaysSucceeds(); + // shouldRetry += 2 unreliable.failsOnceThenSucceeds(); try { + // shouldRetry += (maxRetries -1) (just failed once above) unreliable.failsTenTimesThenSucceeds(); fail("Should fail"); } catch (UnreliableException e) { // expected + verify(policy, times(maxRetries + 2)).shouldRetry(any(Exception.class), + anyInt(), anyInt(), anyBoolean()); + assertEquals(RetryDecision.FAIL, caughtRetryAction.action); + assertEquals(RetryUpToMaximumCountWithFixedSleep.constructReasonString( + maxRetries), caughtRetryAction.reason); + } catch (Exception e) { + fail("Other exception other than UnreliableException should also get " + + "failed."); } } @Test - public void testRetryUpToMaximumTimeWithFixedSleep() throws UnreliableException { + public void testRetryUpToMaximumTimeWithFixedSleep() throws Exception { + RetryPolicy policy = mock(RetryUpToMaximumTimeWithFixedSleep.class); + long maxTime = 80L; + RetryPolicy realPolicy = retryUpToMaximumTimeWithFixedSleep(maxTime, 10, + TimeUnit.NANOSECONDS); + setupMockPolicy(policy, realPolicy); + UnreliableInterface unreliable = (UnreliableInterface) - RetryProxy.create(UnreliableInterface.class, unreliableImpl, - retryUpToMaximumTimeWithFixedSleep(80, 10, TimeUnit.NANOSECONDS)); + RetryProxy.create(UnreliableInterface.class, unreliableImpl, policy); unreliable.alwaysSucceeds(); unreliable.failsOnceThenSucceeds(); try { @@ -151,9 +224,17 @@ public class TestRetryProxy { fail("Should fail"); } catch (UnreliableException e) { // expected + verify(policy, times((int)(maxTime/10) + 2)).shouldRetry(any(Exception.class), + anyInt(), anyInt(), anyBoolean()); + assertEquals(RetryDecision.FAIL, caughtRetryAction.action); + assertEquals(RetryUpToMaximumTimeWithFixedSleep.constructReasonString( + maxTime, TimeUnit.NANOSECONDS), caughtRetryAction.reason); + } catch (Exception e) { + fail("Other exception other than UnreliableException should also get " + + "failed."); } } - + @Test public void testRetryUpToMaximumCountWithProportionalSleep() throws UnreliableException { UnreliableInterface unreliable = (UnreliableInterface)