diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java index ba07db4c2ae..a89c3a7ab0e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java @@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit; import javax.security.sasl.SaslException; +import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.StandbyException; @@ -678,7 +679,7 @@ public class RetryPolicies { e instanceof UnknownHostException || e instanceof StandbyException || e instanceof ConnectTimeoutException || - isWrappedStandbyException(e)) { + shouldFailoverOnException(e)) { return new RetryAction(RetryAction.RetryDecision.FAILOVER_AND_RETRY, getFailoverOrRetrySleepTime(failovers)); } else if (e instanceof RetriableException @@ -729,12 +730,13 @@ public class RetryPolicies { return calculateExponentialTime(time, retries, Long.MAX_VALUE); } - private static boolean isWrappedStandbyException(Exception e) { + private static boolean shouldFailoverOnException(Exception e) { if (!(e instanceof RemoteException)) { return false; } Exception unwrapped = ((RemoteException)e).unwrapRemoteException( - StandbyException.class); + StandbyException.class, + ObserverRetryOnActiveException.class); return unwrapped instanceof StandbyException; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ObserverRetryOnActiveException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ObserverRetryOnActiveException.java index 336b304f2d0..80e2898155a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ObserverRetryOnActiveException.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ObserverRetryOnActiveException.java @@ -20,8 +20,6 @@ package org.apache.hadoop.ipc; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import java.io.IOException; - /** * Thrown by a remote ObserverNode indicating the operation has failed and the * client should retry active namenode directly (instead of retry other @@ -29,7 +27,7 @@ import java.io.IOException; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class ObserverRetryOnActiveException extends IOException { +public class ObserverRetryOnActiveException extends StandbyException { static final long serialVersionUID = 1L; public ObserverRetryOnActiveException(String msg) { super(msg); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java index ac3e7f703c5..d159a7139a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java @@ -24,6 +24,7 @@ import org.apache.hadoop.ha.ServiceFailedException; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; +import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.ipc.StandbyException; /** @@ -95,7 +96,17 @@ public class StandbyState extends HAState { String faq = ". Visit https://s.apache.org/sbnn-error"; String msg = "Operation category " + op + " is not supported in state " + context.getState() + faq; - throw new StandbyException(msg); + if (op == OperationCategory.WRITE && isObserver) { + // If observer receives a write call, return active retry + // exception to inform client to retry on active. + // A write should never happen on Observer. Except that, + // if access time is enabled. A open call can transition + // to a write operation. In this case, Observer + // should inform the client to retry this open on Active. + throw new ObserverRetryOnActiveException(msg); + } else { + throw new StandbyException(msg); + } } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index f1eb5a8d07a..2cf6dacfc5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.TestFsck; import org.apache.hadoop.hdfs.tools.GetGroups; +import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -366,6 +367,52 @@ public class TestObserverNode { assertTrue(result.contains("Status: HEALTHY")); } + /** + * Test that, if a write happens happens to go to Observer, + * Observer would throw {@link ObserverRetryOnActiveException}, + * to inform client to retry on Active + * + * @throws Exception + */ + @Test + public void testObserverRetryActiveException() throws Exception { + boolean thrownRetryException = false; + try { + // Force a write on Observer, which should throw + // retry on active exception. + dfsCluster.getNameNode(2) + .getRpcServer() + .mkdirs("/testActiveRetryException", + FsPermission.createImmutable((short) 0755), true); + } catch (ObserverRetryOnActiveException orae) { + thrownRetryException = true; + } + assertTrue(thrownRetryException); + } + + /** + * Test that for open call, if access time update is required, + * the open call should go to active, instead of observer. + * + * @throws Exception + */ + @Test + public void testAccessTimeUpdateRedirectToActive() throws Exception { + // Create a new pat to not mess up other tests + Path tmpTestPath = new Path("/TestObserverNodeAccessTime"); + dfs.create(tmpTestPath, (short)1).close(); + assertSentTo(0); + dfs.open(tmpTestPath).close(); + assertSentTo(2); + // Set access time to a time in the far past. + // So that next read call is guaranteed to + // have passed access time period. + dfs.setTimes(tmpTestPath, 0, 0); + // Verify that aTime update redirects on Active + dfs.open(tmpTestPath).close(); + assertSentTo(0); + } + private void assertSentTo(int nnIdx) throws IOException { assertTrue("Request was not sent to the expected namenode " + nnIdx, HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));