HDFS-15099. [SBN Read] checkOperation(WRITE) should throw ObserverRetryOnActiveException for ObserverNode. Contributed by Chen Liang.
(cherry picked from commit 26a969ec73
)
This commit is contained in:
parent
82fa8d88ef
commit
5490061885
|
@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import javax.security.sasl.SaslException;
|
import javax.security.sasl.SaslException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.ipc.RetriableException;
|
import org.apache.hadoop.ipc.RetriableException;
|
||||||
import org.apache.hadoop.ipc.StandbyException;
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
|
@ -678,7 +679,7 @@ public class RetryPolicies {
|
||||||
e instanceof UnknownHostException ||
|
e instanceof UnknownHostException ||
|
||||||
e instanceof StandbyException ||
|
e instanceof StandbyException ||
|
||||||
e instanceof ConnectTimeoutException ||
|
e instanceof ConnectTimeoutException ||
|
||||||
isWrappedStandbyException(e)) {
|
shouldFailoverOnException(e)) {
|
||||||
return new RetryAction(RetryAction.RetryDecision.FAILOVER_AND_RETRY,
|
return new RetryAction(RetryAction.RetryDecision.FAILOVER_AND_RETRY,
|
||||||
getFailoverOrRetrySleepTime(failovers));
|
getFailoverOrRetrySleepTime(failovers));
|
||||||
} else if (e instanceof RetriableException
|
} else if (e instanceof RetriableException
|
||||||
|
@ -729,12 +730,13 @@ public class RetryPolicies {
|
||||||
return calculateExponentialTime(time, retries, Long.MAX_VALUE);
|
return calculateExponentialTime(time, retries, Long.MAX_VALUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean isWrappedStandbyException(Exception e) {
|
private static boolean shouldFailoverOnException(Exception e) {
|
||||||
if (!(e instanceof RemoteException)) {
|
if (!(e instanceof RemoteException)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
Exception unwrapped = ((RemoteException)e).unwrapRemoteException(
|
Exception unwrapped = ((RemoteException)e).unwrapRemoteException(
|
||||||
StandbyException.class);
|
StandbyException.class,
|
||||||
|
ObserverRetryOnActiveException.class);
|
||||||
return unwrapped instanceof StandbyException;
|
return unwrapped instanceof StandbyException;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,8 +20,6 @@ package org.apache.hadoop.ipc;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Thrown by a remote ObserverNode indicating the operation has failed and the
|
* Thrown by a remote ObserverNode indicating the operation has failed and the
|
||||||
* client should retry active namenode directly (instead of retry other
|
* client should retry active namenode directly (instead of retry other
|
||||||
|
@ -29,7 +27,7 @@ import java.io.IOException;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class ObserverRetryOnActiveException extends IOException {
|
public class ObserverRetryOnActiveException extends StandbyException {
|
||||||
static final long serialVersionUID = 1L;
|
static final long serialVersionUID = 1L;
|
||||||
public ObserverRetryOnActiveException(String msg) {
|
public ObserverRetryOnActiveException(String msg) {
|
||||||
super(msg);
|
super(msg);
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.ha.ServiceFailedException;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||||
|
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
|
||||||
import org.apache.hadoop.ipc.StandbyException;
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -95,8 +96,18 @@ public class StandbyState extends HAState {
|
||||||
String faq = ". Visit https://s.apache.org/sbnn-error";
|
String faq = ". Visit https://s.apache.org/sbnn-error";
|
||||||
String msg = "Operation category " + op + " is not supported in state "
|
String msg = "Operation category " + op + " is not supported in state "
|
||||||
+ context.getState() + faq;
|
+ context.getState() + faq;
|
||||||
|
if (op == OperationCategory.WRITE && isObserver) {
|
||||||
|
// If observer receives a write call, return active retry
|
||||||
|
// exception to inform client to retry on active.
|
||||||
|
// A write should never happen on Observer. Except that,
|
||||||
|
// if access time is enabled. A open call can transition
|
||||||
|
// to a write operation. In this case, Observer
|
||||||
|
// should inform the client to retry this open on Active.
|
||||||
|
throw new ObserverRetryOnActiveException(msg);
|
||||||
|
} else {
|
||||||
throw new StandbyException(msg);
|
throw new StandbyException(msg);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean shouldPopulateReplQueues() {
|
public boolean shouldPopulateReplQueues() {
|
||||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
|
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
|
||||||
import org.apache.hadoop.hdfs.tools.GetGroups;
|
import org.apache.hadoop.hdfs.tools.GetGroups;
|
||||||
|
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -366,6 +367,52 @@ public class TestObserverNode {
|
||||||
assertTrue(result.contains("Status: HEALTHY"));
|
assertTrue(result.contains("Status: HEALTHY"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that, if a write happens happens to go to Observer,
|
||||||
|
* Observer would throw {@link ObserverRetryOnActiveException},
|
||||||
|
* to inform client to retry on Active
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testObserverRetryActiveException() throws Exception {
|
||||||
|
boolean thrownRetryException = false;
|
||||||
|
try {
|
||||||
|
// Force a write on Observer, which should throw
|
||||||
|
// retry on active exception.
|
||||||
|
dfsCluster.getNameNode(2)
|
||||||
|
.getRpcServer()
|
||||||
|
.mkdirs("/testActiveRetryException",
|
||||||
|
FsPermission.createImmutable((short) 0755), true);
|
||||||
|
} catch (ObserverRetryOnActiveException orae) {
|
||||||
|
thrownRetryException = true;
|
||||||
|
}
|
||||||
|
assertTrue(thrownRetryException);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that for open call, if access time update is required,
|
||||||
|
* the open call should go to active, instead of observer.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testAccessTimeUpdateRedirectToActive() throws Exception {
|
||||||
|
// Create a new pat to not mess up other tests
|
||||||
|
Path tmpTestPath = new Path("/TestObserverNodeAccessTime");
|
||||||
|
dfs.create(tmpTestPath, (short)1).close();
|
||||||
|
assertSentTo(0);
|
||||||
|
dfs.open(tmpTestPath).close();
|
||||||
|
assertSentTo(2);
|
||||||
|
// Set access time to a time in the far past.
|
||||||
|
// So that next read call is guaranteed to
|
||||||
|
// have passed access time period.
|
||||||
|
dfs.setTimes(tmpTestPath, 0, 0);
|
||||||
|
// Verify that aTime update redirects on Active
|
||||||
|
dfs.open(tmpTestPath).close();
|
||||||
|
assertSentTo(0);
|
||||||
|
}
|
||||||
|
|
||||||
private void assertSentTo(int nnIdx) throws IOException {
|
private void assertSentTo(int nnIdx) throws IOException {
|
||||||
assertTrue("Request was not sent to the expected namenode " + nnIdx,
|
assertTrue("Request was not sent to the expected namenode " + nnIdx,
|
||||||
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
|
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
|
||||||
|
|
Loading…
Reference in New Issue