diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java index 556613639bf..5cc366561f0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java @@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.ipc.Server.Call; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto; import org.apache.hadoop.security.UserGroupInformation; public abstract class ExternalCall extends Call { @@ -78,7 +79,7 @@ public abstract class ExternalCall extends Call { } @Override - final void doResponse(Throwable t) { + final void doResponse(Throwable t, RpcStatusProto status) { synchronized(done) { error = t; done.set(true); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index b76390c3d0b..84bb2f10ab1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -800,7 +800,11 @@ public abstract class Server { } } - void doResponse(Throwable t) throws IOException {} + void doResponse(Throwable t) throws IOException { + doResponse(t, RpcStatusProto.FATAL); + } + + void doResponse(Throwable t, RpcStatusProto proto) throws IOException {} // For Schedulable @Override @@ -967,15 +971,17 @@ public abstract class Server { } @Override - void doResponse(Throwable t) throws IOException { + void doResponse(Throwable t, RpcStatusProto status) throws IOException { RpcCall call = this; if (t != null) { + if (status == null) { + status = RpcStatusProto.FATAL; + } // clone the call to prevent a race with another thread stomping // on the response while being sent. the original call is // effectively discarded since the wait count won't hit zero call = new RpcCall(this); - setupResponse(call, - RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER, + setupResponse(call, status, RpcErrorCodeProto.ERROR_RPC_SERVER, null, t.getClass().getName(), StringUtils.stringifyException(t)); } else { setupResponse(call, call.responseParams.returnStatus, @@ -2713,8 +2719,18 @@ public abstract class Server { private void internalQueueCall(Call call) throws IOException, InterruptedException { + internalQueueCall(call, true); + } + + private void internalQueueCall(Call call, boolean blocking) + throws IOException, InterruptedException { try { - callQueue.put(call); // queue the call; maybe blocked here + // queue the call, may be blocked if blocking is true. + if (blocking) { + callQueue.put(call); + } else { + callQueue.add(call); + } } catch (CallQueueOverflowException cqe) { // If rpc scheduler indicates back off based on performance degradation // such as response time or rpc queue is full, we will ask the client @@ -2757,8 +2773,8 @@ public abstract class Server { * In case of Observer, it handles only reads, which are * commutative. */ - //Re-queue the call and continue - internalQueueCall(call); + // Re-queue the call and continue + requeueCall(call); continue; } if (LOG.isDebugEnabled()) { @@ -2800,6 +2816,15 @@ public abstract class Server { LOG.debug(Thread.currentThread().getName() + ": exiting"); } + private void requeueCall(Call call) + throws IOException, InterruptedException { + try { + internalQueueCall(call, false); + } catch (RpcServerException rse) { + call.doResponse(rse.getCause(), rse.getRpcStatusProto()); + } + } + } @VisibleForTesting diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java index e1fadafdee7..fe5345daff7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java @@ -25,12 +25,16 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.ipc.RpcScheduler; +import org.apache.hadoop.ipc.Schedulable; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.junit.After; @@ -84,6 +88,36 @@ public class TestConsistentReadsObserver { } } + @Test + public void testRequeueCall() throws Exception { + setObserverRead(true); + + // Update the configuration just for the observer, by enabling + // IPC backoff and using the test scheduler class, which starts to backoff + // after certain number of calls. + final int observerIdx = 2; + NameNode nn = dfsCluster.getNameNode(observerIdx); + int port = nn.getNameNodeAddress().getPort(); + Configuration configuration = dfsCluster.getConfiguration(observerIdx); + String prefix = CommonConfigurationKeys.IPC_NAMESPACE + "." + port + "."; + configuration.set(prefix + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY, + TestRpcScheduler.class.getName()); + configuration.setBoolean(prefix + + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true); + + dfsCluster.restartNameNode(observerIdx); + dfsCluster.transitionToObserver(observerIdx); + + dfs.create(testPath, (short)1).close(); + assertSentTo(0); + + // Since we haven't tailed edit logs on the observer, it will fall behind + // and keep re-queueing the incoming request. Eventually, RPC backoff will + // be triggered and client should retry active NN. + dfs.getFileStatus(testPath); + assertSentTo(0); + } + @Test public void testMsyncSimple() throws Exception { // 0 == not completed, 1 == succeeded, -1 == failed @@ -169,4 +203,33 @@ public class TestConsistentReadsObserver { dfs = HATestUtil.configureObserverReadFs( dfsCluster, conf, ObserverReadProxyProvider.class, flag); } + + /** + * A dummy test scheduler that starts backoff after a fixed number + * of requests. + */ + public static class TestRpcScheduler implements RpcScheduler { + // Allow a number of RPCs to pass in order for the NN restart to succeed. + private int allowed = 10; + public TestRpcScheduler() {} + + @Override + public int getPriorityLevel(Schedulable obj) { + return 0; + } + + @Override + public boolean shouldBackOff(Schedulable obj) { + return --allowed < 0; + } + + @Override + public void addResponseTime(String name, int priorityLevel, int queueTime, + int processingTime) { + } + + @Override + public void stop() { + } + } }