HDFS-14146. [SBN read] Handle exceptions from and prevent handler threads from blocking within internalQueueCall. Contributed by Chao Sun.
This commit is contained in:
parent
790831824f
commit
e917ac2854
|
@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.hadoop.ipc.Server.Call;
|
import org.apache.hadoop.ipc.Server.Call;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
|
||||||
public abstract class ExternalCall<T> extends Call {
|
public abstract class ExternalCall<T> extends Call {
|
||||||
|
@ -78,7 +79,7 @@ public abstract class ExternalCall<T> extends Call {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
final void doResponse(Throwable t) {
|
final void doResponse(Throwable t, RpcStatusProto status) {
|
||||||
synchronized(done) {
|
synchronized(done) {
|
||||||
error = t;
|
error = t;
|
||||||
done.set(true);
|
done.set(true);
|
||||||
|
|
|
@ -799,7 +799,11 @@ public abstract class Server {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void doResponse(Throwable t) throws IOException {}
|
void doResponse(Throwable t) throws IOException {
|
||||||
|
doResponse(t, RpcStatusProto.FATAL);
|
||||||
|
}
|
||||||
|
|
||||||
|
void doResponse(Throwable t, RpcStatusProto proto) throws IOException {}
|
||||||
|
|
||||||
// For Schedulable
|
// For Schedulable
|
||||||
@Override
|
@Override
|
||||||
|
@ -966,15 +970,17 @@ public abstract class Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void doResponse(Throwable t) throws IOException {
|
void doResponse(Throwable t, RpcStatusProto status) throws IOException {
|
||||||
RpcCall call = this;
|
RpcCall call = this;
|
||||||
if (t != null) {
|
if (t != null) {
|
||||||
|
if (status == null) {
|
||||||
|
status = RpcStatusProto.FATAL;
|
||||||
|
}
|
||||||
// clone the call to prevent a race with another thread stomping
|
// clone the call to prevent a race with another thread stomping
|
||||||
// on the response while being sent. the original call is
|
// on the response while being sent. the original call is
|
||||||
// effectively discarded since the wait count won't hit zero
|
// effectively discarded since the wait count won't hit zero
|
||||||
call = new RpcCall(this);
|
call = new RpcCall(this);
|
||||||
setupResponse(call,
|
setupResponse(call, status, RpcErrorCodeProto.ERROR_RPC_SERVER,
|
||||||
RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
|
|
||||||
null, t.getClass().getName(), StringUtils.stringifyException(t));
|
null, t.getClass().getName(), StringUtils.stringifyException(t));
|
||||||
} else {
|
} else {
|
||||||
setupResponse(call, call.responseParams.returnStatus,
|
setupResponse(call, call.responseParams.returnStatus,
|
||||||
|
@ -2712,8 +2718,18 @@ public abstract class Server {
|
||||||
|
|
||||||
private void internalQueueCall(Call call)
|
private void internalQueueCall(Call call)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
|
internalQueueCall(call, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void internalQueueCall(Call call, boolean blocking)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
try {
|
try {
|
||||||
callQueue.put(call); // queue the call; maybe blocked here
|
// queue the call, may be blocked if blocking is true.
|
||||||
|
if (blocking) {
|
||||||
|
callQueue.put(call);
|
||||||
|
} else {
|
||||||
|
callQueue.add(call);
|
||||||
|
}
|
||||||
} catch (CallQueueOverflowException cqe) {
|
} catch (CallQueueOverflowException cqe) {
|
||||||
// If rpc scheduler indicates back off based on performance degradation
|
// If rpc scheduler indicates back off based on performance degradation
|
||||||
// such as response time or rpc queue is full, we will ask the client
|
// such as response time or rpc queue is full, we will ask the client
|
||||||
|
@ -2756,8 +2772,8 @@ public abstract class Server {
|
||||||
* In case of Observer, it handles only reads, which are
|
* In case of Observer, it handles only reads, which are
|
||||||
* commutative.
|
* commutative.
|
||||||
*/
|
*/
|
||||||
//Re-queue the call and continue
|
// Re-queue the call and continue
|
||||||
internalQueueCall(call);
|
requeueCall(call);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -2799,6 +2815,15 @@ public abstract class Server {
|
||||||
LOG.debug(Thread.currentThread().getName() + ": exiting");
|
LOG.debug(Thread.currentThread().getName() + ": exiting");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void requeueCall(Call call)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
try {
|
||||||
|
internalQueueCall(call, false);
|
||||||
|
} catch (RpcServerException rse) {
|
||||||
|
call.doResponse(rse.getCause(), rse.getRpcStatusProto());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
|
@ -25,12 +25,16 @@ import java.io.IOException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.ipc.RpcScheduler;
|
||||||
|
import org.apache.hadoop.ipc.Schedulable;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -84,6 +88,36 @@ public class TestConsistentReadsObserver {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRequeueCall() throws Exception {
|
||||||
|
setObserverRead(true);
|
||||||
|
|
||||||
|
// Update the configuration just for the observer, by enabling
|
||||||
|
// IPC backoff and using the test scheduler class, which starts to backoff
|
||||||
|
// after certain number of calls.
|
||||||
|
final int observerIdx = 2;
|
||||||
|
NameNode nn = dfsCluster.getNameNode(observerIdx);
|
||||||
|
int port = nn.getNameNodeAddress().getPort();
|
||||||
|
Configuration configuration = dfsCluster.getConfiguration(observerIdx);
|
||||||
|
String prefix = CommonConfigurationKeys.IPC_NAMESPACE + "." + port + ".";
|
||||||
|
configuration.set(prefix + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY,
|
||||||
|
TestRpcScheduler.class.getName());
|
||||||
|
configuration.setBoolean(prefix
|
||||||
|
+ CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
|
||||||
|
|
||||||
|
dfsCluster.restartNameNode(observerIdx);
|
||||||
|
dfsCluster.transitionToObserver(observerIdx);
|
||||||
|
|
||||||
|
dfs.create(testPath, (short)1).close();
|
||||||
|
assertSentTo(0);
|
||||||
|
|
||||||
|
// Since we haven't tailed edit logs on the observer, it will fall behind
|
||||||
|
// and keep re-queueing the incoming request. Eventually, RPC backoff will
|
||||||
|
// be triggered and client should retry active NN.
|
||||||
|
dfs.getFileStatus(testPath);
|
||||||
|
assertSentTo(0);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMsyncSimple() throws Exception {
|
public void testMsyncSimple() throws Exception {
|
||||||
// 0 == not completed, 1 == succeeded, -1 == failed
|
// 0 == not completed, 1 == succeeded, -1 == failed
|
||||||
|
@ -169,4 +203,33 @@ public class TestConsistentReadsObserver {
|
||||||
dfs = HATestUtil.configureObserverReadFs(
|
dfs = HATestUtil.configureObserverReadFs(
|
||||||
dfsCluster, conf, ObserverReadProxyProvider.class, flag);
|
dfsCluster, conf, ObserverReadProxyProvider.class, flag);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A dummy test scheduler that starts backoff after a fixed number
|
||||||
|
* of requests.
|
||||||
|
*/
|
||||||
|
public static class TestRpcScheduler implements RpcScheduler {
|
||||||
|
// Allow a number of RPCs to pass in order for the NN restart to succeed.
|
||||||
|
private int allowed = 10;
|
||||||
|
public TestRpcScheduler() {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getPriorityLevel(Schedulable obj) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean shouldBackOff(Schedulable obj) {
|
||||||
|
return --allowed < 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addResponseTime(String name, int priorityLevel, int queueTime,
|
||||||
|
int processingTime) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue