HDFS-14146. [SBN read] Handle exceptions from and prevent handler threads from blocking within internalQueueCall. Contributed by Chao Sun.

This commit is contained in:
Erik Krogen 2018-12-14 14:02:20 -08:00 committed by Chen Liang
parent 2e7610a029
commit 8b8ec65e65
3 changed files with 97 additions and 8 deletions

View File

@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.ipc.Server.Call; import org.apache.hadoop.ipc.Server.Call;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
public abstract class ExternalCall<T> extends Call { public abstract class ExternalCall<T> extends Call {
@ -78,7 +79,7 @@ public abstract class ExternalCall<T> extends Call {
} }
@Override @Override
final void doResponse(Throwable t) { final void doResponse(Throwable t, RpcStatusProto status) {
synchronized(done) { synchronized(done) {
error = t; error = t;
done.set(true); done.set(true);

View File

@ -800,7 +800,11 @@ public abstract class Server {
} }
} }
void doResponse(Throwable t) throws IOException {} void doResponse(Throwable t) throws IOException {
doResponse(t, RpcStatusProto.FATAL);
}
void doResponse(Throwable t, RpcStatusProto proto) throws IOException {}
// For Schedulable // For Schedulable
@Override @Override
@ -967,15 +971,17 @@ public abstract class Server {
} }
@Override @Override
void doResponse(Throwable t) throws IOException { void doResponse(Throwable t, RpcStatusProto status) throws IOException {
RpcCall call = this; RpcCall call = this;
if (t != null) { if (t != null) {
if (status == null) {
status = RpcStatusProto.FATAL;
}
// clone the call to prevent a race with another thread stomping // clone the call to prevent a race with another thread stomping
// on the response while being sent. the original call is // on the response while being sent. the original call is
// effectively discarded since the wait count won't hit zero // effectively discarded since the wait count won't hit zero
call = new RpcCall(this); call = new RpcCall(this);
setupResponse(call, setupResponse(call, status, RpcErrorCodeProto.ERROR_RPC_SERVER,
RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
null, t.getClass().getName(), StringUtils.stringifyException(t)); null, t.getClass().getName(), StringUtils.stringifyException(t));
} else { } else {
setupResponse(call, call.responseParams.returnStatus, setupResponse(call, call.responseParams.returnStatus,
@ -2707,8 +2713,18 @@ public abstract class Server {
private void internalQueueCall(Call call) private void internalQueueCall(Call call)
throws IOException, InterruptedException { throws IOException, InterruptedException {
internalQueueCall(call, true);
}
private void internalQueueCall(Call call, boolean blocking)
throws IOException, InterruptedException {
try { try {
callQueue.put(call); // queue the call; maybe blocked here // queue the call, may be blocked if blocking is true.
if (blocking) {
callQueue.put(call);
} else {
callQueue.add(call);
}
} catch (CallQueueOverflowException cqe) { } catch (CallQueueOverflowException cqe) {
// If rpc scheduler indicates back off based on performance degradation // If rpc scheduler indicates back off based on performance degradation
// such as response time or rpc queue is full, we will ask the client // such as response time or rpc queue is full, we will ask the client
@ -2751,8 +2767,8 @@ public abstract class Server {
* In case of Observer, it handles only reads, which are * In case of Observer, it handles only reads, which are
* commutative. * commutative.
*/ */
//Re-queue the call and continue // Re-queue the call and continue
internalQueueCall(call); requeueCall(call);
continue; continue;
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -2794,6 +2810,15 @@ public abstract class Server {
LOG.debug(Thread.currentThread().getName() + ": exiting"); LOG.debug(Thread.currentThread().getName() + ": exiting");
} }
private void requeueCall(Call call)
throws IOException, InterruptedException {
try {
internalQueueCall(call, false);
} catch (RpcServerException rse) {
call.doResponse(rse.getCause(), rse.getRpcStatusProto());
}
}
} }
@VisibleForTesting @VisibleForTesting

View File

@ -25,12 +25,16 @@ import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RpcScheduler;
import org.apache.hadoop.ipc.Schedulable;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.junit.After; import org.junit.After;
@ -84,6 +88,36 @@ public class TestConsistentReadsObserver {
} }
} }
@Test
public void testRequeueCall() throws Exception {
setObserverRead(true);
// Update the configuration just for the observer, by enabling
// IPC backoff and using the test scheduler class, which starts to backoff
// after certain number of calls.
final int observerIdx = 2;
NameNode nn = dfsCluster.getNameNode(observerIdx);
int port = nn.getNameNodeAddress().getPort();
Configuration configuration = dfsCluster.getConfiguration(observerIdx);
String prefix = CommonConfigurationKeys.IPC_NAMESPACE + "." + port + ".";
configuration.set(prefix + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY,
TestRpcScheduler.class.getName());
configuration.setBoolean(prefix
+ CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
dfsCluster.restartNameNode(observerIdx);
dfsCluster.transitionToObserver(observerIdx);
dfs.create(testPath, (short)1).close();
assertSentTo(0);
// Since we haven't tailed edit logs on the observer, it will fall behind
// and keep re-queueing the incoming request. Eventually, RPC backoff will
// be triggered and client should retry active NN.
dfs.getFileStatus(testPath);
assertSentTo(0);
}
@Test @Test
public void testMsyncSimple() throws Exception { public void testMsyncSimple() throws Exception {
// 0 == not completed, 1 == succeeded, -1 == failed // 0 == not completed, 1 == succeeded, -1 == failed
@ -169,4 +203,33 @@ public class TestConsistentReadsObserver {
dfs = HATestUtil.configureObserverReadFs( dfs = HATestUtil.configureObserverReadFs(
dfsCluster, conf, ObserverReadProxyProvider.class, flag); dfsCluster, conf, ObserverReadProxyProvider.class, flag);
} }
/**
* A dummy test scheduler that starts backoff after a fixed number
* of requests.
*/
public static class TestRpcScheduler implements RpcScheduler {
// Allow a number of RPCs to pass in order for the NN restart to succeed.
private int allowed = 10;
public TestRpcScheduler() {}
@Override
public int getPriorityLevel(Schedulable obj) {
return 0;
}
@Override
public boolean shouldBackOff(Schedulable obj) {
return --allowed < 0;
}
@Override
public void addResponseTime(String name, int priorityLevel, int queueTime,
int processingTime) {
}
@Override
public void stop() {
}
}
} }