HADOOP-9683. [RPC v9] Wrap IpcConnectionContext in RPC headers (daryn)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1503830 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Daryn Sharp 2013-07-16 18:59:29 +00:00
parent 62f5c14df2
commit 192f6b86cc
4 changed files with 272 additions and 138 deletions

View File

@ -78,6 +78,8 @@ Release 2.1.0-beta - 2013-07-02
HADOOP-9421. [RPC v9] Convert SASL to use ProtoBuf and provide
negotiation capabilities (daryn)
HADOOP-9683. [RPC v9] Wrap IpcConnectionContext in RPC headers (daryn)
NEW FEATURES
HADOOP-9283. Add support for running the Hadoop client on AIX. (atm)

View File

@ -62,7 +62,10 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.Server.AuthProtocol;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
@ -114,6 +117,8 @@ public class Client {
final static int PING_CALL_ID = -1;
final static int CONNECTION_CONTEXT_CALL_ID = -3;
/**
* Executor on which IPC calls' parameters are sent.
* Deferring the sending of parameters to a separate
@ -832,17 +837,19 @@ public class Client {
AuthMethod authMethod)
throws IOException {
// Write out the ConnectionHeader
DataOutputBuffer buf = new DataOutputBuffer();
ProtoUtil.makeIpcConnectionContext(
IpcConnectionContextProto message = ProtoUtil.makeIpcConnectionContext(
RPC.getProtocolName(remoteId.getProtocol()),
remoteId.getTicket(),
authMethod).writeTo(buf);
authMethod);
RpcRequestHeaderProto connectionContextHeader =
ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID);
RpcRequestMessageWrapper request =
new RpcRequestMessageWrapper(connectionContextHeader, message);
// Write out the packet length
int bufLen = buf.getLength();
out.writeInt(bufLen);
out.write(buf.getData(), 0, bufLen);
out.writeInt(request.getLength());
request.write(out);
}
/* wait till someone signals us to start reading RPC response or

View File

@ -71,7 +71,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
@ -108,6 +107,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
import com.google.protobuf.Message.Builder;
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@ -761,9 +761,10 @@ public abstract class Server {
LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
throw ieo;
} catch (Exception e) {
LOG.info(getName() + ": readAndProcess threw exception " + e +
" from client " + c.getHostAddress() +
". Count of bytes read: " + count, e);
// log stack trace for "interesting" exceptions not sent to client
LOG.info(getName() + ": readAndProcess from client " +
c.getHostAddress() + " threw exception [" + e + "]",
(e instanceof WrappedRpcServerException) ? null : e);
count = -1; //so that the (count < 0) block is executed
}
if (count < 0) {
@ -1083,6 +1084,32 @@ public abstract class Server {
}
};
/**
* Wrapper for RPC IOExceptions to be returned to the client. Used to
* let exceptions bubble up to top of processOneRpc where the correct
* callId can be associated with the response. Also used to prevent
* unnecessary stack trace logging if it's not an internal server error.
*/
@SuppressWarnings("serial")
private static class WrappedRpcServerException extends RpcServerException {
private final RpcErrorCodeProto errCode;
public WrappedRpcServerException(RpcErrorCodeProto errCode, IOException ioe) {
super(ioe.toString(), ioe);
this.errCode = errCode;
}
public WrappedRpcServerException(RpcErrorCodeProto errCode, String message) {
this(errCode, new RpcServerException(message));
}
@Override
public RpcErrorCodeProto getRpcErrorCodeProto() {
return errCode;
}
@Override
public String toString() {
return getCause().toString();
}
}
/** Reads calls from a connection and queues them for handling. */
public class Connection {
private boolean connectionHeaderRead = false; // connection header is read?
@ -1120,6 +1147,8 @@ public abstract class Server {
// Fake 'call' for failed authorization response
private static final int AUTHORIZATION_FAILED_CALLID = -1;
private static final int CONNECTION_CONTEXT_CALL_ID = -3;
private final Call authFailedCall =
new Call(AUTHORIZATION_FAILED_CALLID, null, this);
private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
@ -1200,7 +1229,7 @@ public abstract class Server {
}
private UserGroupInformation getAuthorizedUgi(String authorizedId)
throws IOException {
throws InvalidToken, AccessControlException {
if (authMethod == AuthMethod.TOKEN) {
TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authorizedId,
secretManager);
@ -1216,12 +1245,17 @@ public abstract class Server {
}
}
private void saslReadAndProcess(byte[] saslToken) throws IOException,
InterruptedException {
if (!saslContextEstablished) {
RpcSaslProto saslResponse;
private RpcSaslProto saslReadAndProcess(DataInputStream dis) throws
WrappedRpcServerException, InterruptedException {
if (saslContextEstablished) {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
new SaslException("Negotiation is already complete"));
}
RpcSaslProto saslResponse = null;
try {
saslResponse = processSaslMessage(saslToken);
try {
saslResponse = processSaslMessage(dis);
} catch (IOException e) {
IOException sendToClient = e;
Throwable cause = e;
@ -1237,9 +1271,7 @@ public abstract class Server {
// attempting user could be null
AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser +
" (" + e.getLocalizedMessage() + ")");
// wait to send response until failure is logged
doSaslReply(sendToClient);
throw e;
throw sendToClient;
}
if (saslServer != null && saslServer.isComplete()) {
@ -1257,37 +1289,19 @@ public abstract class Server {
AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
saslContextEstablished = true;
}
// send reply here to avoid a successful auth being logged as a
// failure if response can't be sent
doSaslReply(saslResponse);
} else {
if (LOG.isDebugEnabled())
LOG.debug("Have read input token of size " + saslToken.length
+ " for processing by saslServer.unwrap()");
if (!useWrap) {
processOneRpc(saslToken);
} else {
byte[] plaintextData = saslServer.unwrap(saslToken, 0,
saslToken.length);
processUnwrappedData(plaintextData);
}
} catch (WrappedRpcServerException wrse) { // don't re-wrap
throw wrse;
} catch (IOException ioe) {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_UNAUTHORIZED, ioe);
}
return saslResponse;
}
private RpcSaslProto processSaslMessage(byte[] buf)
private RpcSaslProto processSaslMessage(DataInputStream dis)
throws IOException, InterruptedException {
final DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
RpcRequestMessageWrapper requestWrapper = new RpcRequestMessageWrapper();
requestWrapper.readFields(dis);
final RpcRequestHeaderProto rpcHeader = requestWrapper.requestHeader;
if (rpcHeader.getCallId() != AuthProtocol.SASL.callId) {
throw new SaslException("Client sent non-SASL request");
}
final RpcSaslProto saslMessage =
RpcSaslProto.parseFrom(requestWrapper.theRequestRead);
decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);
RpcSaslProto saslResponse = null;
final SaslState state = saslMessage.getState(); // required
switch (state) {
@ -1337,8 +1351,7 @@ public abstract class Server {
return saslResponse;
}
private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken)
throws IOException {
private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken) {
if (LOG.isDebugEnabled()) {
LOG.debug("Will send " + state + " token of size "
+ ((replyToken != null) ? replyToken.length : null)
@ -1352,8 +1365,7 @@ public abstract class Server {
return response.build();
}
private void doSaslReply(Message message)
throws IOException {
private void doSaslReply(Message message) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending sasl message "+message);
}
@ -1465,16 +1477,7 @@ public abstract class Server {
dataLengthBuffer.clear();
data.flip();
boolean isHeaderRead = connectionContextRead;
if (authProtocol == AuthProtocol.SASL) {
// switch to simple must ignore next negotiate or initiate
if (skipInitialSaslHandshake) {
authProtocol = AuthProtocol.NONE;
} else {
saslReadAndProcess(data.array());
}
} else {
processOneRpc(data.array());
}
processRpcRequestPacket(data.array());
data = null;
if (!isHeaderRead) {
continue;
@ -1509,6 +1512,7 @@ public abstract class Server {
// switch to simple hack, but don't switch if other auths are
// supported, ex. tokens
if (isSimpleEnabled && enabledAuthMethods.size() == 1) {
authProtocol = AuthProtocol.NONE;
skipInitialSaslHandshake = true;
doSaslReply(buildSaslResponse(SaslState.SUCCESS, null));
}
@ -1608,11 +1612,21 @@ public abstract class Server {
responder.doRespond(fakeCall);
}
/** Reads the connection context following the connection header */
private void processConnectionContext(byte[] buf) throws IOException {
DataInputStream in =
new DataInputStream(new ByteArrayInputStream(buf));
connectionContext = IpcConnectionContextProto.parseFrom(in);
/** Reads the connection context following the connection header
* @param dis - DataInputStream from which to read the header
* @throws WrappedRpcServerException - if the header cannot be
* deserialized, or the user is not authorized
*/
private void processConnectionContext(DataInputStream dis)
throws WrappedRpcServerException {
// allow only one connection context during a session
if (connectionContextRead) {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
"Connection context already processed");
}
connectionContext = decodeProtobufFromStream(
IpcConnectionContextProto.newBuilder(), dis);
protocolName = connectionContext.hasProtocol() ? connectionContext
.getProtocol() : null;
@ -1629,9 +1643,11 @@ public abstract class Server {
&& (!protocolUser.getUserName().equals(user.getUserName()))) {
if (authMethod == AuthMethod.TOKEN) {
// Not allowed to doAs if token authentication is used
throw new AccessControlException("Authenticated user (" + user
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_UNAUTHORIZED,
new AccessControlException("Authenticated user (" + user
+ ") doesn't match what the client claims to be ("
+ protocolUser + ")");
+ protocolUser + ")"));
} else {
// Effective user can be different from authenticated user
// for simple auth or kerberos auth
@ -1642,9 +1658,34 @@ public abstract class Server {
}
}
}
authorizeConnection();
// don't set until after authz because connection isn't established
connectionContextRead = true;
}
private void processUnwrappedData(byte[] inBuf) throws IOException,
/**
* Process a RPC Request - if SASL wrapping is enabled, unwrap the
* requests and process each one, else directly process the request
* @param buf - single request or SASL wrapped requests
* @throws IOException - connection failed to authenticate or authorize,
* or the request could not be decoded into a Call
* @throws InterruptedException
*/
private void processRpcRequestPacket(byte[] buf) throws IOException,
InterruptedException {
if (saslContextEstablished && useWrap) {
if (LOG.isDebugEnabled())
LOG.debug("Have read input token of size " + buf.length
+ " for processing by saslServer.unwrap()");
final byte[] plaintextData = saslServer.unwrap(buf, 0, buf.length);
// loops over decoded data and calls processOneRpc
unwrapPacketAndProcessRpcs(plaintextData);
} else {
processOneRpc(buf);
}
}
private void unwrapPacketAndProcessRpcs(byte[] inBuf) throws IOException,
InterruptedException {
ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
inBuf));
@ -1683,61 +1724,93 @@ public abstract class Server {
}
}
private void processOneRpc(byte[] buf) throws IOException,
InterruptedException {
if (connectionContextRead) {
processRpcRequest(buf);
} else {
processConnectionContext(buf);
connectionContextRead = true;
if (!authorizeConnection()) {
throw new AccessControlException("Connection from " + this
+ " for protocol " + connectionContext.getProtocol()
+ " is unauthorized for user " + user);
/**
* Process an RPC Request - handle connection setup and decoding of
* request into a Call
* @param buf - contains the RPC request header and the rpc request
* @throws IOException - internal error that should not be returned to
* client, typically failure to respond to client
* @throws WrappedRpcServerException - an exception to be sent back to
* the client that does not require verbose logging by the
* Listener thread
* @throws InterruptedException
*/
private void processOneRpc(byte[] buf)
throws IOException, WrappedRpcServerException, InterruptedException {
int callId = -1;
try {
final DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
final RpcRequestHeaderProto header =
decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis);
callId = header.getCallId();
if (LOG.isDebugEnabled()) {
LOG.debug(" got #" + callId);
}
checkRpcHeaders(header);
if (callId < 0) { // callIds typically used during connection setup
processRpcOutOfBandRequest(header, dis);
} else if (!connectionContextRead) {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
"Connection context not established");
} else {
processRpcRequest(header, dis);
}
} catch (WrappedRpcServerException wrse) { // inform client of error
Throwable ioe = wrse.getCause();
final Call call = new Call(callId, null, this);
setupResponse(authFailedResponse, call,
RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
ioe.getClass().getName(), ioe.getMessage());
responder.doRespond(call);
throw wrse;
}
}
/**
* Process an RPC Request - the connection headers and context have been
* read
* @param buf - contains the RPC request header and the rpc request
* @throws RpcServerException due to fatal rpc layer issues such as
* invalid header. In this case a RPC fatal status response is sent back
* to client.
* Verify RPC header is valid
* @param header - RPC request header
* @throws WrappedRpcServerException - header contains invalid values
*/
private void processRpcRequest(byte[] buf)
throws RpcServerException, IOException, InterruptedException {
DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
RpcRequestHeaderProto header = RpcRequestHeaderProto.parseDelimitedFrom(dis);
if (LOG.isDebugEnabled())
LOG.debug(" got #" + header.getCallId());
private void checkRpcHeaders(RpcRequestHeaderProto header)
throws WrappedRpcServerException {
if (!header.hasRpcOp()) {
String err = " IPC Server: No rpc op in rpcRequestHeader";
respondBadRpcHeader(new Call(header.getCallId(), null, this),
RpcServerException.class.getName(), err);
throw new RpcServerException(err);
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
}
if (header.getRpcOp() !=
RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET) {
String err = "IPC Server does not implement rpc header operation" +
header.getRpcOp();
respondBadRpcHeader(new Call(header.getCallId(), null, this),
RpcServerException.class.getName(), err);
throw new RpcServerException(err);
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
}
// If we know the rpc kind, get its class so that we can deserialize
// (Note it would make more sense to have the handler deserialize but
// we continue with this original design.
if (!header.hasRpcKind()) {
String err = " IPC Server: No rpc kind in rpcRequestHeader";
respondBadRpcHeader(new Call(header.getCallId(), null, this),
RpcServerException.class.getName(), err);
throw new RpcServerException(err);
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
}
}
/**
* Process an RPC Request - the connection headers and context must
* have been already read
* @param header - RPC request header
* @param dis - stream to request payload
* @throws WrappedRpcServerException - due to fatal rpc layer issues such
* as invalid header or deserialization error. In this case a RPC fatal
* status response will later be sent back to client.
* @throws InterruptedException
*/
private void processRpcRequest(RpcRequestHeaderProto header,
DataInputStream dis) throws WrappedRpcServerException,
InterruptedException {
Class<? extends Writable> rpcRequestClass =
getRpcRequestWrapper(header.getRpcKind());
if (rpcRequestClass == null) {
@ -1745,9 +1818,8 @@ public abstract class Server {
" from client " + getHostAddress());
final String err = "Unknown rpc kind in rpc header" +
header.getRpcKind();
respondBadRpcHeader(new Call(header.getCallId(), null, this),
RpcServerException.class.getName(), err);
throw new RpcServerException(err);
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
}
Writable rpcRequest;
try { //Read the rpc request
@ -1757,17 +1829,9 @@ public abstract class Server {
LOG.warn("Unable to read call parameters for client " +
getHostAddress() + "on connection protocol " +
this.protocolName + " for rpcKind " + header.getRpcKind(), t);
final Call readParamsFailedCall =
new Call(header.getCallId(), null, this);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
String err = "IPC server unable to read call parameters: "+ t.getMessage();
setupResponse(responseBuffer, readParamsFailedCall,
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
null, t.getClass().getName(),
err);
responder.doRespond(readParamsFailedCall);
throw new RpcServerException(err, t);
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
}
Call call = new Call(header.getCallId(), rpcRequest, this,
@ -1776,7 +1840,59 @@ public abstract class Server {
incRpcCount(); // Increment the rpc count
}
private boolean authorizeConnection() throws IOException {
/**
* Establish RPC connection setup by negotiating SASL if required, then
* reading and authorizing the connection header
* @param header - RPC header
* @param dis - stream to request payload
* @throws WrappedRpcServerException - setup failed due to SASL
* negotiation failure, premature or invalid connection context,
* or other state errors
* @throws IOException - failed to send a response back to the client
* @throws InterruptedException
*/
private void processRpcOutOfBandRequest(RpcRequestHeaderProto header,
DataInputStream dis) throws WrappedRpcServerException, IOException,
InterruptedException {
final int callId = header.getCallId();
if (callId == CONNECTION_CONTEXT_CALL_ID) {
// SASL must be established prior to connection context
if (authProtocol == AuthProtocol.SASL && !saslContextEstablished) {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
"Connection header sent during SASL negotiation");
}
// read and authorize the user
processConnectionContext(dis);
} else if (callId == AuthProtocol.SASL.callId) {
// if client was switched to simple, ignore first SASL message
if (authProtocol != AuthProtocol.SASL) {
if (!skipInitialSaslHandshake) {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
"SASL protocol not requested by client");
}
skipInitialSaslHandshake = false;
return;
}
RpcSaslProto response = saslReadAndProcess(dis);
// send back response if any, may throw IOException
if (response != null) {
doSaslReply(response);
}
} else {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
"Unknown out of band call #" + callId);
}
}
/**
* Authorize proxy users to access this server
* @throws WrappedRpcServerException - user is not allowed to proxy
*/
private void authorizeConnection() throws WrappedRpcServerException {
try {
// If auth method is DIGEST, the token was obtained by the
// real user for the effective user, therefore not required to
@ -1792,14 +1908,34 @@ public abstract class Server {
}
rpcMetrics.incrAuthorizationSuccesses();
} catch (AuthorizationException ae) {
LOG.info("Connection from " + this
+ " for protocol " + connectionContext.getProtocol()
+ " is unauthorized for user " + user);
rpcMetrics.incrAuthorizationFailures();
setupResponse(authFailedResponse, authFailedCall,
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_UNAUTHORIZED, null,
ae.getClass().getName(), ae.getMessage());
responder.doRespond(authFailedCall);
return false;
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_UNAUTHORIZED, ae);
}
}
/**
* Decode the a protobuf from the given input stream
* @param builder - Builder of the protobuf to decode
* @param dis - DataInputStream to read the protobuf
* @return Message - decoded protobuf
* @throws WrappedRpcServerException - deserialization failed
*/
@SuppressWarnings("unchecked")
private <T extends Message> T decodeProtobufFromStream(Builder builder,
DataInputStream dis) throws WrappedRpcServerException {
try {
builder.mergeDelimitedFrom(dis);
return (T)builder.build();
} catch (Exception ioe) {
Class<?> protoClass = builder.getDefaultInstanceForType().getClass();
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
"Error decoding " + protoClass.getSimpleName() + ": "+ ioe);
}
return true;
}
/**
@ -2206,18 +2342,6 @@ public abstract class Server {
}
private void respondBadRpcHeader(Call call, String errorClass, String error)
throws IOException
{
ByteArrayOutputStream responseBuf = new ByteArrayOutputStream();
setupResponse(responseBuf, call,
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
null, errorClass, error);
responder.doRespond(call);
return;
}
private void wrapWithSasl(ByteArrayOutputStream response, Call call)
throws IOException {
if (call.connection.saslServer != null) {

View File

@ -312,7 +312,7 @@ public class TestSaslRPC {
doDigestRpc(server, sm);
} catch (RemoteException e) {
LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage());
assertTrue(ERROR_MESSAGE.equals(e.getLocalizedMessage()));
assertEquals(ERROR_MESSAGE, e.getLocalizedMessage());
assertTrue(e.unwrapRemoteException() instanceof InvalidToken);
succeeded = true;
}
@ -818,6 +818,7 @@ public class TestSaslRPC {
}
try {
LOG.info("trying ugi:"+clientUgi+" tokens:"+clientUgi.getTokens());
return clientUgi.doAs(new PrivilegedExceptionAction<String>() {
@Override
public String run() throws IOException {