HADOOP-13537. Support external calls in the RPC call queue. Contributed by Daryn Sharp.

This commit is contained in:
Kihwal Lee 2016-09-29 13:27:30 -05:00
parent ee0c722dc8
commit 236ac773c9
4 changed files with 221 additions and 23 deletions

View File

@ -405,4 +405,9 @@
<Bug pattern="NP_NULL_PARAM_DEREF"/>
</Match>
<Match>
<Class name="org.apache.hadoop.ipc.ExternalCall"/>
<Filed name="done"/>
<Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/>
</Match>
</FindBugsFilter>

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.ipc.Server.Call;
import org.apache.hadoop.security.UserGroupInformation;
public abstract class ExternalCall<T> extends Call {
private final PrivilegedExceptionAction<T> action;
private final AtomicBoolean done = new AtomicBoolean();
private T result;
private Throwable error;
public ExternalCall(PrivilegedExceptionAction<T> action) {
this.action = action;
}
public abstract UserGroupInformation getRemoteUser();
public final T get() throws IOException, InterruptedException {
waitForCompletion();
if (error != null) {
if (error instanceof IOException) {
throw (IOException)error;
} else {
throw new IOException(error);
}
}
return result;
}
// wait for response to be triggered to support postponed calls
private void waitForCompletion() throws InterruptedException {
synchronized(done) {
while (!done.get()) {
try {
done.wait();
} catch (InterruptedException ie) {
if (Thread.interrupted()) {
throw ie;
}
}
}
}
}
boolean isDone() {
return done.get();
}
// invoked by ipc handler
@Override
public final Void run() throws IOException {
try {
result = action.run();
sendResponse();
} catch (Throwable t) {
abortResponse(t);
}
return null;
}
@Override
final void doResponse(Throwable t) {
synchronized(done) {
error = t;
done.set(true);
done.notify();
}
}
}

View File

@ -384,6 +384,11 @@ public abstract class Server {
return (call != null) ? call.getRemoteUser() : null;
}
public static String getProtocol() {
Call call = CurCall.get();
return (call != null) ? call.getProtocol() : null;
}
/** Return true if the invocation was through an RPC.
*/
public static boolean isRpcInvocation() {
@ -672,6 +677,11 @@ public abstract class Server {
private int priorityLevel;
// the priority level assigned by scheduler, 0 by default
Call() {
this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT,
RPC.RpcKind.RPC_BUILTIN, RpcConstants.DUMMY_CLIENT_ID);
}
Call(Call call) {
this(call.callId, call.retryCount, call.rpcKind, call.clientId,
call.traceScope, call.callerContext);
@ -703,6 +713,7 @@ public abstract class Server {
return "Call#" + callId + " Retry#" + retryCount;
}
@Override
public Void run() throws Exception {
return null;
}
@ -718,6 +729,10 @@ public abstract class Server {
return (addr != null) ? addr.getHostAddress() : null;
}
public String getProtocol() {
return null;
}
/**
* Allow a IPC response to be postponed instead of sent immediately
* after the handler returns from the proxy method. The intended use
@ -799,6 +814,11 @@ public abstract class Server {
this.rpcRequest = param;
}
@Override
public String getProtocol() {
return "rpc";
}
@Override
public UserGroupInformation getRemoteUser() {
return connection.user;
@ -2333,33 +2353,15 @@ public abstract class Server {
// Save the priority level assignment by the scheduler
call.setPriorityLevel(callQueue.getPriorityLevel(call));
if (callQueue.isClientBackoffEnabled()) {
// if RPC queue is full, we will ask the RPC client to back off by
// throwing RetriableException. Whether RPC client will honor
// RetriableException and retry depends on client ipc retry policy.
// For example, FailoverOnNetworkExceptionRetry handles
// RetriableException.
queueRequestOrAskClientToBackOff(call);
} else {
callQueue.put(call); // queue the call; maybe blocked here
try {
queueCall(call);
} catch (IOException ioe) {
throw new WrappedRpcServerException(
RpcErrorCodeProto.ERROR_RPC_SERVER, ioe);
}
incRpcCount(); // Increment the rpc count
}
private void queueRequestOrAskClientToBackOff(Call call)
throws WrappedRpcServerException, InterruptedException {
// If rpc scheduler indicates back off based on performance
// degradation such as response time or rpc queue is full,
// we will ask the client to back off.
if (callQueue.shouldBackOff(call) || !callQueue.offer(call)) {
rpcMetrics.incrClientBackoff();
RetriableException retriableException =
new RetriableException("Server is too busy.");
throw new WrappedRpcServerExceptionSuppressed(
RpcErrorCodeProto.ERROR_RPC_SERVER, retriableException);
}
}
/**
* Establish RPC connection setup by negotiating SASL if required, then
* reading and authorizing the connection header
@ -2487,6 +2489,21 @@ public abstract class Server {
}
}
public void queueCall(Call call) throws IOException, InterruptedException {
if (!callQueue.isClientBackoffEnabled()) {
callQueue.put(call); // queue the call; maybe blocked here
} else if (callQueue.shouldBackOff(call) || !callQueue.offer(call)) {
// If rpc scheduler indicates back off based on performance degradation
// such as response time or rpc queue is full, we will ask the client
// to back off by throwing RetriableException. Whether the client will
// honor RetriableException and retry depends the client and its policy.
// For example, IPC clients using FailoverOnNetworkExceptionRetry handle
// RetriableException.
rpcMetrics.incrClientBackoff();
throw new RetriableException("Server is too busy.");
}
}
/** Handles queued calls . */
private class Handler extends Thread {
public Handler(int instanceNumber) {

View File

@ -64,6 +64,7 @@ import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -926,6 +927,90 @@ public class TestRPC extends TestRpcBase {
}
}
@Test(timeout=30000)
public void testExternalCall() throws Exception {
final UserGroupInformation ugi = UserGroupInformation
.createUserForTesting("user123", new String[0]);
final IOException expectedIOE = new IOException("boom");
// use 1 handler so the callq can be plugged
final Server server = setupTestServer(conf, 1);
try {
final AtomicBoolean result = new AtomicBoolean();
ExternalCall<String> remoteUserCall = newExtCall(ugi,
new PrivilegedExceptionAction<String>() {
@Override
public String run() throws Exception {
return UserGroupInformation.getCurrentUser().getUserName();
}
});
ExternalCall<String> exceptionCall = newExtCall(ugi,
new PrivilegedExceptionAction<String>() {
@Override
public String run() throws Exception {
throw expectedIOE;
}
});
final CountDownLatch latch = new CountDownLatch(1);
final CyclicBarrier barrier = new CyclicBarrier(2);
ExternalCall<Void> barrierCall = newExtCall(ugi,
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
// notify we are in a handler and then wait to keep the callq
// plugged up
latch.countDown();
barrier.await();
return null;
}
});
server.queueCall(barrierCall);
server.queueCall(exceptionCall);
server.queueCall(remoteUserCall);
// wait for barrier call to enter the handler, check that the other 2
// calls are actually queued
latch.await();
assertEquals(2, server.getCallQueueLen());
// unplug the callq
barrier.await();
barrierCall.get();
// verify correct ugi is used
String answer = remoteUserCall.get();
assertEquals(ugi.getUserName(), answer);
try {
exceptionCall.get();
fail("didn't throw");
} catch (IOException ioe) {
assertEquals(expectedIOE.getMessage(), ioe.getMessage());
}
} finally {
server.stop();
}
}
private <T> ExternalCall<T> newExtCall(UserGroupInformation ugi,
PrivilegedExceptionAction<T> callable) {
return new ExternalCall<T>(callable) {
@Override
public String getProtocol() {
return "test";
}
@Override
public UserGroupInformation getRemoteUser() {
return ugi;
}
};
}
@Test
public void testRpcMetrics() throws Exception {
Server server;