HADOOP-9683. [RPC v9] Wrap IpcConnectionContext in RPC headers (daryn)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1503811 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Daryn Sharp 2013-07-16 17:59:39 +00:00
parent 32076136f7
commit a3a9d72e98
5 changed files with 275 additions and 138 deletions

View File

@ -354,6 +354,8 @@ Release 2.1.0-beta - 2013-07-02
HADOOP-9688. Add globally unique Client ID to RPC requests. (suresh) HADOOP-9688. Add globally unique Client ID to RPC requests. (suresh)
HADOOP-9683. [RPC v9] Wrap IpcConnectionContext in RPC headers (daryn)
NEW FEATURES NEW FEATURES
HADOOP-9283. Add support for running the Hadoop client on AIX. (atm) HADOOP-9283. Add support for running the Hadoop client on AIX. (atm)

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
@ -63,7 +65,10 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.Server.AuthProtocol; import org.apache.hadoop.ipc.Server.AuthProtocol;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
@ -833,17 +838,20 @@ public class Client {
AuthMethod authMethod) AuthMethod authMethod)
throws IOException { throws IOException {
// Write out the ConnectionHeader // Write out the ConnectionHeader
DataOutputBuffer buf = new DataOutputBuffer(); IpcConnectionContextProto message = ProtoUtil.makeIpcConnectionContext(
ProtoUtil.makeIpcConnectionContext(
RPC.getProtocolName(remoteId.getProtocol()), RPC.getProtocolName(remoteId.getProtocol()),
remoteId.getTicket(), remoteId.getTicket(),
authMethod).writeTo(buf); authMethod);
RpcRequestHeaderProto connectionContextHeader =
ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
clientId);
RpcRequestMessageWrapper request =
new RpcRequestMessageWrapper(connectionContextHeader, message);
// Write out the packet length // Write out the packet length
int bufLen = buf.getLength(); out.writeInt(request.getLength());
request.write(out);
out.writeInt(bufLen);
out.write(buf.getData(), 0, bufLen);
} }
/* wait till someone signals us to start reading RPC response or /* wait till someone signals us to start reading RPC response or

View File

@ -33,6 +33,8 @@ public class RpcConstants {
public static final int INVALID_CALL_ID = -2; public static final int INVALID_CALL_ID = -2;
public static final int CONNECTION_CONTEXT_CALL_ID = -3;
/** /**
* The first four bytes of Hadoop RPC connections * The first four bytes of Hadoop RPC connections
*/ */

View File

@ -73,7 +73,7 @@ import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION; import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper; import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper; import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.VersionMismatch; import org.apache.hadoop.ipc.RPC.VersionMismatch;
@ -110,6 +110,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream; import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import com.google.protobuf.Message.Builder;
/** An abstract IPC service. IPC calls take a single {@link Writable} as a /** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on * parameter, and return a {@link Writable} as their value. A service runs on
@ -776,9 +777,10 @@ public abstract class Server {
LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo); LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
throw ieo; throw ieo;
} catch (Exception e) { } catch (Exception e) {
LOG.info(getName() + ": readAndProcess threw exception " + e + // log stack trace for "interesting" exceptions not sent to client
" from client " + c.getHostAddress() + LOG.info(getName() + ": readAndProcess from client " +
". Count of bytes read: " + count, e); c.getHostAddress() + " threw exception [" + e + "]",
(e instanceof WrappedRpcServerException) ? null : e);
count = -1; //so that the (count < 0) block is executed count = -1; //so that the (count < 0) block is executed
} }
if (count < 0) { if (count < 0) {
@ -1098,6 +1100,32 @@ public abstract class Server {
} }
}; };
/**
* Wrapper for RPC IOExceptions to be returned to the client. Used to
* let exceptions bubble up to top of processOneRpc where the correct
* callId can be associated with the response. Also used to prevent
* unnecessary stack trace logging if it's not an internal server error.
*/
@SuppressWarnings("serial")
private static class WrappedRpcServerException extends RpcServerException {
private final RpcErrorCodeProto errCode;
public WrappedRpcServerException(RpcErrorCodeProto errCode, IOException ioe) {
super(ioe.toString(), ioe);
this.errCode = errCode;
}
public WrappedRpcServerException(RpcErrorCodeProto errCode, String message) {
this(errCode, new RpcServerException(message));
}
@Override
public RpcErrorCodeProto getRpcErrorCodeProto() {
return errCode;
}
@Override
public String toString() {
return getCause().toString();
}
}
/** Reads calls from a connection and queues them for handling. */ /** Reads calls from a connection and queues them for handling. */
public class Connection { public class Connection {
private boolean connectionHeaderRead = false; // connection header is read? private boolean connectionHeaderRead = false; // connection header is read?
@ -1135,6 +1163,7 @@ public abstract class Server {
// Fake 'call' for failed authorization response // Fake 'call' for failed authorization response
private static final int AUTHORIZATION_FAILED_CALLID = -1; private static final int AUTHORIZATION_FAILED_CALLID = -1;
private final Call authFailedCall = private final Call authFailedCall =
new Call(AUTHORIZATION_FAILED_CALLID, null, this); new Call(AUTHORIZATION_FAILED_CALLID, null, this);
private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream(); private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
@ -1215,7 +1244,7 @@ public abstract class Server {
} }
private UserGroupInformation getAuthorizedUgi(String authorizedId) private UserGroupInformation getAuthorizedUgi(String authorizedId)
throws IOException { throws InvalidToken, AccessControlException {
if (authMethod == AuthMethod.TOKEN) { if (authMethod == AuthMethod.TOKEN) {
TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authorizedId, TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authorizedId,
secretManager); secretManager);
@ -1231,12 +1260,17 @@ public abstract class Server {
} }
} }
private void saslReadAndProcess(byte[] saslToken) throws IOException, private RpcSaslProto saslReadAndProcess(DataInputStream dis) throws
InterruptedException { WrappedRpcServerException, InterruptedException {
if (!saslContextEstablished) { if (saslContextEstablished) {
RpcSaslProto saslResponse; throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
new SaslException("Negotiation is already complete"));
}
RpcSaslProto saslResponse = null;
try {
try { try {
saslResponse = processSaslMessage(saslToken); saslResponse = processSaslMessage(dis);
} catch (IOException e) { } catch (IOException e) {
IOException sendToClient = e; IOException sendToClient = e;
Throwable cause = e; Throwable cause = e;
@ -1252,9 +1286,7 @@ public abstract class Server {
// attempting user could be null // attempting user could be null
AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser + AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser +
" (" + e.getLocalizedMessage() + ")"); " (" + e.getLocalizedMessage() + ")");
// wait to send response until failure is logged throw sendToClient;
doSaslReply(sendToClient);
throw e;
} }
if (saslServer != null && saslServer.isComplete()) { if (saslServer != null && saslServer.isComplete()) {
@ -1272,37 +1304,19 @@ public abstract class Server {
AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user); AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
saslContextEstablished = true; saslContextEstablished = true;
} }
// send reply here to avoid a successful auth being logged as a } catch (WrappedRpcServerException wrse) { // don't re-wrap
// failure if response can't be sent throw wrse;
doSaslReply(saslResponse); } catch (IOException ioe) {
} else { throw new WrappedRpcServerException(
if (LOG.isDebugEnabled()) RpcErrorCodeProto.FATAL_UNAUTHORIZED, ioe);
LOG.debug("Have read input token of size " + saslToken.length
+ " for processing by saslServer.unwrap()");
if (!useWrap) {
processOneRpc(saslToken);
} else {
byte[] plaintextData = saslServer.unwrap(saslToken, 0,
saslToken.length);
processUnwrappedData(plaintextData);
}
} }
return saslResponse;
} }
private RpcSaslProto processSaslMessage(byte[] buf) private RpcSaslProto processSaslMessage(DataInputStream dis)
throws IOException, InterruptedException { throws IOException, InterruptedException {
final DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
RpcRequestMessageWrapper requestWrapper = new RpcRequestMessageWrapper();
requestWrapper.readFields(dis);
final RpcRequestHeaderProto rpcHeader = requestWrapper.requestHeader;
if (rpcHeader.getCallId() != AuthProtocol.SASL.callId) {
throw new SaslException("Client sent non-SASL request");
}
final RpcSaslProto saslMessage = final RpcSaslProto saslMessage =
RpcSaslProto.parseFrom(requestWrapper.theRequestRead); decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);
RpcSaslProto saslResponse = null; RpcSaslProto saslResponse = null;
final SaslState state = saslMessage.getState(); // required final SaslState state = saslMessage.getState(); // required
switch (state) { switch (state) {
@ -1352,8 +1366,7 @@ public abstract class Server {
return saslResponse; return saslResponse;
} }
private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken) private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken) {
throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Will send " + state + " token of size " LOG.debug("Will send " + state + " token of size "
+ ((replyToken != null) ? replyToken.length : null) + ((replyToken != null) ? replyToken.length : null)
@ -1367,8 +1380,7 @@ public abstract class Server {
return response.build(); return response.build();
} }
private void doSaslReply(Message message) private void doSaslReply(Message message) throws IOException {
throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Sending sasl message "+message); LOG.debug("Sending sasl message "+message);
} }
@ -1481,16 +1493,7 @@ public abstract class Server {
dataLengthBuffer.clear(); dataLengthBuffer.clear();
data.flip(); data.flip();
boolean isHeaderRead = connectionContextRead; boolean isHeaderRead = connectionContextRead;
if (authProtocol == AuthProtocol.SASL) { processRpcRequestPacket(data.array());
// switch to simple must ignore next negotiate or initiate
if (skipInitialSaslHandshake) {
authProtocol = AuthProtocol.NONE;
} else {
saslReadAndProcess(data.array());
}
} else {
processOneRpc(data.array());
}
data = null; data = null;
if (!isHeaderRead) { if (!isHeaderRead) {
continue; continue;
@ -1525,6 +1528,7 @@ public abstract class Server {
// switch to simple hack, but don't switch if other auths are // switch to simple hack, but don't switch if other auths are
// supported, ex. tokens // supported, ex. tokens
if (isSimpleEnabled && enabledAuthMethods.size() == 1) { if (isSimpleEnabled && enabledAuthMethods.size() == 1) {
authProtocol = AuthProtocol.NONE;
skipInitialSaslHandshake = true; skipInitialSaslHandshake = true;
doSaslReply(buildSaslResponse(SaslState.SUCCESS, null)); doSaslReply(buildSaslResponse(SaslState.SUCCESS, null));
} }
@ -1624,11 +1628,21 @@ public abstract class Server {
responder.doRespond(fakeCall); responder.doRespond(fakeCall);
} }
/** Reads the connection context following the connection header */ /** Reads the connection context following the connection header
private void processConnectionContext(byte[] buf) throws IOException { * @param dis - DataInputStream from which to read the header
DataInputStream in = * @throws WrappedRpcServerException - if the header cannot be
new DataInputStream(new ByteArrayInputStream(buf)); * deserialized, or the user is not authorized
connectionContext = IpcConnectionContextProto.parseFrom(in); */
private void processConnectionContext(DataInputStream dis)
throws WrappedRpcServerException {
// allow only one connection context during a session
if (connectionContextRead) {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
"Connection context already processed");
}
connectionContext = decodeProtobufFromStream(
IpcConnectionContextProto.newBuilder(), dis);
protocolName = connectionContext.hasProtocol() ? connectionContext protocolName = connectionContext.hasProtocol() ? connectionContext
.getProtocol() : null; .getProtocol() : null;
@ -1645,9 +1659,11 @@ public abstract class Server {
&& (!protocolUser.getUserName().equals(user.getUserName()))) { && (!protocolUser.getUserName().equals(user.getUserName()))) {
if (authMethod == AuthMethod.TOKEN) { if (authMethod == AuthMethod.TOKEN) {
// Not allowed to doAs if token authentication is used // Not allowed to doAs if token authentication is used
throw new AccessControlException("Authenticated user (" + user throw new WrappedRpcServerException(
+ ") doesn't match what the client claims to be (" RpcErrorCodeProto.FATAL_UNAUTHORIZED,
+ protocolUser + ")"); new AccessControlException("Authenticated user (" + user
+ ") doesn't match what the client claims to be ("
+ protocolUser + ")"));
} else { } else {
// Effective user can be different from authenticated user // Effective user can be different from authenticated user
// for simple auth or kerberos auth // for simple auth or kerberos auth
@ -1658,9 +1674,34 @@ public abstract class Server {
} }
} }
} }
authorizeConnection();
// don't set until after authz because connection isn't established
connectionContextRead = true;
} }
private void processUnwrappedData(byte[] inBuf) throws IOException, /**
* Process a RPC Request - if SASL wrapping is enabled, unwrap the
* requests and process each one, else directly process the request
* @param buf - single request or SASL wrapped requests
* @throws IOException - connection failed to authenticate or authorize,
* or the request could not be decoded into a Call
* @throws InterruptedException
*/
private void processRpcRequestPacket(byte[] buf) throws IOException,
InterruptedException {
if (saslContextEstablished && useWrap) {
if (LOG.isDebugEnabled())
LOG.debug("Have read input token of size " + buf.length
+ " for processing by saslServer.unwrap()");
final byte[] plaintextData = saslServer.unwrap(buf, 0, buf.length);
// loops over decoded data and calls processOneRpc
unwrapPacketAndProcessRpcs(plaintextData);
} else {
processOneRpc(buf);
}
}
private void unwrapPacketAndProcessRpcs(byte[] inBuf) throws IOException,
InterruptedException { InterruptedException {
ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream( ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
inBuf)); inBuf));
@ -1699,61 +1740,93 @@ public abstract class Server {
} }
} }
private void processOneRpc(byte[] buf) throws IOException, /**
InterruptedException { * Process an RPC Request - handle connection setup and decoding of
if (connectionContextRead) { * request into a Call
processRpcRequest(buf); * @param buf - contains the RPC request header and the rpc request
} else { * @throws IOException - internal error that should not be returned to
processConnectionContext(buf); * client, typically failure to respond to client
connectionContextRead = true; * @throws WrappedRpcServerException - an exception to be sent back to
if (!authorizeConnection()) { * the client that does not require verbose logging by the
throw new AccessControlException("Connection from " + this * Listener thread
+ " for protocol " + connectionContext.getProtocol() * @throws InterruptedException
+ " is unauthorized for user " + user); */
private void processOneRpc(byte[] buf)
throws IOException, WrappedRpcServerException, InterruptedException {
int callId = -1;
try {
final DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
final RpcRequestHeaderProto header =
decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis);
callId = header.getCallId();
if (LOG.isDebugEnabled()) {
LOG.debug(" got #" + callId);
} }
checkRpcHeaders(header);
if (callId < 0) { // callIds typically used during connection setup
processRpcOutOfBandRequest(header, dis);
} else if (!connectionContextRead) {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
"Connection context not established");
} else {
processRpcRequest(header, dis);
}
} catch (WrappedRpcServerException wrse) { // inform client of error
Throwable ioe = wrse.getCause();
final Call call = new Call(callId, null, this);
setupResponse(authFailedResponse, call,
RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
ioe.getClass().getName(), ioe.getMessage());
responder.doRespond(call);
throw wrse;
} }
} }
/** /**
* Process an RPC Request - the connection headers and context have been * Verify RPC header is valid
* read * @param header - RPC request header
* @param buf - contains the RPC request header and the rpc request * @throws WrappedRpcServerException - header contains invalid values
* @throws RpcServerException due to fatal rpc layer issues such as
* invalid header. In this case a RPC fatal status response is sent back
* to client.
*/ */
private void checkRpcHeaders(RpcRequestHeaderProto header)
private void processRpcRequest(byte[] buf) throws WrappedRpcServerException {
throws RpcServerException, IOException, InterruptedException {
DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
RpcRequestHeaderProto header = RpcRequestHeaderProto.parseDelimitedFrom(dis);
if (LOG.isDebugEnabled())
LOG.debug(" got #" + header.getCallId());
if (!header.hasRpcOp()) { if (!header.hasRpcOp()) {
String err = " IPC Server: No rpc op in rpcRequestHeader"; String err = " IPC Server: No rpc op in rpcRequestHeader";
respondBadRpcHeader(new Call(header.getCallId(), null, this), throw new WrappedRpcServerException(
RpcServerException.class.getName(), err); RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
throw new RpcServerException(err);
} }
if (header.getRpcOp() != if (header.getRpcOp() !=
RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET) { RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET) {
String err = "IPC Server does not implement rpc header operation" + String err = "IPC Server does not implement rpc header operation" +
header.getRpcOp(); header.getRpcOp();
respondBadRpcHeader(new Call(header.getCallId(), null, this), throw new WrappedRpcServerException(
RpcServerException.class.getName(), err); RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
throw new RpcServerException(err);
} }
// If we know the rpc kind, get its class so that we can deserialize // If we know the rpc kind, get its class so that we can deserialize
// (Note it would make more sense to have the handler deserialize but // (Note it would make more sense to have the handler deserialize but
// we continue with this original design. // we continue with this original design.
if (!header.hasRpcKind()) { if (!header.hasRpcKind()) {
String err = " IPC Server: No rpc kind in rpcRequestHeader"; String err = " IPC Server: No rpc kind in rpcRequestHeader";
respondBadRpcHeader(new Call(header.getCallId(), null, this), throw new WrappedRpcServerException(
RpcServerException.class.getName(), err); RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
throw new RpcServerException(err);
} }
}
/**
* Process an RPC Request - the connection headers and context must
* have been already read
* @param header - RPC request header
* @param dis - stream to request payload
* @throws WrappedRpcServerException - due to fatal rpc layer issues such
* as invalid header or deserialization error. In this case a RPC fatal
* status response will later be sent back to client.
* @throws InterruptedException
*/
private void processRpcRequest(RpcRequestHeaderProto header,
DataInputStream dis) throws WrappedRpcServerException,
InterruptedException {
Class<? extends Writable> rpcRequestClass = Class<? extends Writable> rpcRequestClass =
getRpcRequestWrapper(header.getRpcKind()); getRpcRequestWrapper(header.getRpcKind());
if (rpcRequestClass == null) { if (rpcRequestClass == null) {
@ -1761,9 +1834,8 @@ public abstract class Server {
" from client " + getHostAddress()); " from client " + getHostAddress());
final String err = "Unknown rpc kind in rpc header" + final String err = "Unknown rpc kind in rpc header" +
header.getRpcKind(); header.getRpcKind();
respondBadRpcHeader(new Call(header.getCallId(), null, this), throw new WrappedRpcServerException(
RpcServerException.class.getName(), err); RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
throw new RpcServerException(err);
} }
Writable rpcRequest; Writable rpcRequest;
try { //Read the rpc request try { //Read the rpc request
@ -1773,17 +1845,9 @@ public abstract class Server {
LOG.warn("Unable to read call parameters for client " + LOG.warn("Unable to read call parameters for client " +
getHostAddress() + "on connection protocol " + getHostAddress() + "on connection protocol " +
this.protocolName + " for rpcKind " + header.getRpcKind(), t); this.protocolName + " for rpcKind " + header.getRpcKind(), t);
final Call readParamsFailedCall =
new Call(header.getCallId(), null, this);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
String err = "IPC server unable to read call parameters: "+ t.getMessage(); String err = "IPC server unable to read call parameters: "+ t.getMessage();
throw new WrappedRpcServerException(
setupResponse(responseBuffer, readParamsFailedCall, RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
null, t.getClass().getName(),
err);
responder.doRespond(readParamsFailedCall);
throw new RpcServerException(err, t);
} }
Call call = new Call(header.getCallId(), rpcRequest, this, Call call = new Call(header.getCallId(), rpcRequest, this,
@ -1793,7 +1857,59 @@ public abstract class Server {
incRpcCount(); // Increment the rpc count incRpcCount(); // Increment the rpc count
} }
private boolean authorizeConnection() throws IOException {
/**
* Establish RPC connection setup by negotiating SASL if required, then
* reading and authorizing the connection header
* @param header - RPC header
* @param dis - stream to request payload
* @throws WrappedRpcServerException - setup failed due to SASL
* negotiation failure, premature or invalid connection context,
* or other state errors
* @throws IOException - failed to send a response back to the client
* @throws InterruptedException
*/
private void processRpcOutOfBandRequest(RpcRequestHeaderProto header,
DataInputStream dis) throws WrappedRpcServerException, IOException,
InterruptedException {
final int callId = header.getCallId();
if (callId == CONNECTION_CONTEXT_CALL_ID) {
// SASL must be established prior to connection context
if (authProtocol == AuthProtocol.SASL && !saslContextEstablished) {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
"Connection header sent during SASL negotiation");
}
// read and authorize the user
processConnectionContext(dis);
} else if (callId == AuthProtocol.SASL.callId) {
// if client was switched to simple, ignore first SASL message
if (authProtocol != AuthProtocol.SASL) {
if (!skipInitialSaslHandshake) {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
"SASL protocol not requested by client");
}
skipInitialSaslHandshake = false;
return;
}
RpcSaslProto response = saslReadAndProcess(dis);
// send back response if any, may throw IOException
if (response != null) {
doSaslReply(response);
}
} else {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
"Unknown out of band call #" + callId);
}
}
/**
* Authorize proxy users to access this server
* @throws WrappedRpcServerException - user is not allowed to proxy
*/
private void authorizeConnection() throws WrappedRpcServerException {
try { try {
// If auth method is DIGEST, the token was obtained by the // If auth method is DIGEST, the token was obtained by the
// real user for the effective user, therefore not required to // real user for the effective user, therefore not required to
@ -1809,14 +1925,34 @@ public abstract class Server {
} }
rpcMetrics.incrAuthorizationSuccesses(); rpcMetrics.incrAuthorizationSuccesses();
} catch (AuthorizationException ae) { } catch (AuthorizationException ae) {
LOG.info("Connection from " + this
+ " for protocol " + connectionContext.getProtocol()
+ " is unauthorized for user " + user);
rpcMetrics.incrAuthorizationFailures(); rpcMetrics.incrAuthorizationFailures();
setupResponse(authFailedResponse, authFailedCall, throw new WrappedRpcServerException(
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_UNAUTHORIZED, null, RpcErrorCodeProto.FATAL_UNAUTHORIZED, ae);
ae.getClass().getName(), ae.getMessage()); }
responder.doRespond(authFailedCall); }
return false;
/**
* Decode the a protobuf from the given input stream
* @param builder - Builder of the protobuf to decode
* @param dis - DataInputStream to read the protobuf
* @return Message - decoded protobuf
* @throws WrappedRpcServerException - deserialization failed
*/
@SuppressWarnings("unchecked")
private <T extends Message> T decodeProtobufFromStream(Builder builder,
DataInputStream dis) throws WrappedRpcServerException {
try {
builder.mergeDelimitedFrom(dis);
return (T)builder.build();
} catch (Exception ioe) {
Class<?> protoClass = builder.getDefaultInstanceForType().getClass();
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
"Error decoding " + protoClass.getSimpleName() + ": "+ ioe);
} }
return true;
} }
/** /**
@ -2223,18 +2359,6 @@ public abstract class Server {
} }
private void respondBadRpcHeader(Call call, String errorClass, String error)
throws IOException
{
ByteArrayOutputStream responseBuf = new ByteArrayOutputStream();
setupResponse(responseBuf, call,
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
null, errorClass, error);
responder.doRespond(call);
return;
}
private void wrapWithSasl(ByteArrayOutputStream response, Call call) private void wrapWithSasl(ByteArrayOutputStream response, Call call)
throws IOException { throws IOException {
if (call.connection.saslServer != null) { if (call.connection.saslServer != null) {

View File

@ -312,7 +312,7 @@ public class TestSaslRPC {
doDigestRpc(server, sm); doDigestRpc(server, sm);
} catch (RemoteException e) { } catch (RemoteException e) {
LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage()); LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage());
assertTrue(ERROR_MESSAGE.equals(e.getLocalizedMessage())); assertEquals(ERROR_MESSAGE, e.getLocalizedMessage());
assertTrue(e.unwrapRemoteException() instanceof InvalidToken); assertTrue(e.unwrapRemoteException() instanceof InvalidToken);
succeeded = true; succeeded = true;
} }
@ -818,6 +818,7 @@ public class TestSaslRPC {
} }
try { try {
LOG.info("trying ugi:"+clientUgi+" tokens:"+clientUgi.getTokens());
return clientUgi.doAs(new PrivilegedExceptionAction<String>() { return clientUgi.doAs(new PrivilegedExceptionAction<String>() {
@Override @Override
public String run() throws IOException { public String run() throws IOException {