From 192f6b86cc3ee02d5d3737e40d40698627bea178 Mon Sep 17 00:00:00 2001 From: Daryn Sharp Date: Tue, 16 Jul 2013 18:59:29 +0000 Subject: [PATCH] HADOOP-9683. [RPC v9] Wrap IpcConnectionContext in RPC headers (daryn) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1503830 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 2 + .../java/org/apache/hadoop/ipc/Client.java | 21 +- .../java/org/apache/hadoop/ipc/Server.java | 384 ++++++++++++------ .../org/apache/hadoop/ipc/TestSaslRPC.java | 3 +- 4 files changed, 272 insertions(+), 138 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 21481ed17d3..b76aa96f94a 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -78,6 +78,8 @@ Release 2.1.0-beta - 2013-07-02 HADOOP-9421. [RPC v9] Convert SASL to use ProtoBuf and provide negotiation capabilities (daryn) + HADOOP-9683. [RPC v9] Wrap IpcConnectionContext in RPC headers (daryn) + NEW FEATURES HADOOP-9283. Add support for running the Hadoop client on AIX. (atm) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 6265c92b326..94e111d5727 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -62,7 +62,10 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; +import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper; +import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.Server.AuthProtocol; +import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; @@ -113,6 +116,8 @@ public class Client { private final boolean fallbackAllowed; final static int PING_CALL_ID = -1; + + final static int CONNECTION_CONTEXT_CALL_ID = -3; /** * Executor on which IPC calls' parameters are sent. @@ -832,17 +837,19 @@ public class Client { AuthMethod authMethod) throws IOException { // Write out the ConnectionHeader - DataOutputBuffer buf = new DataOutputBuffer(); - ProtoUtil.makeIpcConnectionContext( + IpcConnectionContextProto message = ProtoUtil.makeIpcConnectionContext( RPC.getProtocolName(remoteId.getProtocol()), remoteId.getTicket(), - authMethod).writeTo(buf); + authMethod); + RpcRequestHeaderProto connectionContextHeader = + ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, + OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID); + RpcRequestMessageWrapper request = + new RpcRequestMessageWrapper(connectionContextHeader, message); // Write out the packet length - int bufLen = buf.getLength(); - - out.writeInt(bufLen); - out.write(buf.getData(), 0, bufLen); + out.writeInt(request.getLength()); + request.write(out); } /* wait till someone signals us to start reading RPC response or diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 5d3946e8c58..ee633b137fa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -71,7 +71,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper; import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper; import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.VersionMismatch; @@ -108,6 +107,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Message; +import com.google.protobuf.Message.Builder; /** An abstract IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on @@ -761,9 +761,10 @@ public abstract class Server { LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo); throw ieo; } catch (Exception e) { - LOG.info(getName() + ": readAndProcess threw exception " + e + - " from client " + c.getHostAddress() + - ". Count of bytes read: " + count, e); + // log stack trace for "interesting" exceptions not sent to client + LOG.info(getName() + ": readAndProcess from client " + + c.getHostAddress() + " threw exception [" + e + "]", + (e instanceof WrappedRpcServerException) ? null : e); count = -1; //so that the (count < 0) block is executed } if (count < 0) { @@ -1083,6 +1084,32 @@ public abstract class Server { } }; + /** + * Wrapper for RPC IOExceptions to be returned to the client. Used to + * let exceptions bubble up to top of processOneRpc where the correct + * callId can be associated with the response. Also used to prevent + * unnecessary stack trace logging if it's not an internal server error. + */ + @SuppressWarnings("serial") + private static class WrappedRpcServerException extends RpcServerException { + private final RpcErrorCodeProto errCode; + public WrappedRpcServerException(RpcErrorCodeProto errCode, IOException ioe) { + super(ioe.toString(), ioe); + this.errCode = errCode; + } + public WrappedRpcServerException(RpcErrorCodeProto errCode, String message) { + this(errCode, new RpcServerException(message)); + } + @Override + public RpcErrorCodeProto getRpcErrorCodeProto() { + return errCode; + } + @Override + public String toString() { + return getCause().toString(); + } + } + /** Reads calls from a connection and queues them for handling. */ public class Connection { private boolean connectionHeaderRead = false; // connection header is read? @@ -1120,6 +1147,8 @@ public abstract class Server { // Fake 'call' for failed authorization response private static final int AUTHORIZATION_FAILED_CALLID = -1; + private static final int CONNECTION_CONTEXT_CALL_ID = -3; + private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, this); private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream(); @@ -1200,7 +1229,7 @@ public abstract class Server { } private UserGroupInformation getAuthorizedUgi(String authorizedId) - throws IOException { + throws InvalidToken, AccessControlException { if (authMethod == AuthMethod.TOKEN) { TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authorizedId, secretManager); @@ -1216,12 +1245,17 @@ public abstract class Server { } } - private void saslReadAndProcess(byte[] saslToken) throws IOException, - InterruptedException { - if (!saslContextEstablished) { - RpcSaslProto saslResponse; + private RpcSaslProto saslReadAndProcess(DataInputStream dis) throws + WrappedRpcServerException, InterruptedException { + if (saslContextEstablished) { + throw new WrappedRpcServerException( + RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, + new SaslException("Negotiation is already complete")); + } + RpcSaslProto saslResponse = null; + try { try { - saslResponse = processSaslMessage(saslToken); + saslResponse = processSaslMessage(dis); } catch (IOException e) { IOException sendToClient = e; Throwable cause = e; @@ -1237,9 +1271,7 @@ public abstract class Server { // attempting user could be null AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser + " (" + e.getLocalizedMessage() + ")"); - // wait to send response until failure is logged - doSaslReply(sendToClient); - throw e; + throw sendToClient; } if (saslServer != null && saslServer.isComplete()) { @@ -1257,37 +1289,19 @@ public abstract class Server { AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user); saslContextEstablished = true; } - // send reply here to avoid a successful auth being logged as a - // failure if response can't be sent - doSaslReply(saslResponse); - } else { - if (LOG.isDebugEnabled()) - LOG.debug("Have read input token of size " + saslToken.length - + " for processing by saslServer.unwrap()"); - - if (!useWrap) { - processOneRpc(saslToken); - } else { - byte[] plaintextData = saslServer.unwrap(saslToken, 0, - saslToken.length); - processUnwrappedData(plaintextData); - } + } catch (WrappedRpcServerException wrse) { // don't re-wrap + throw wrse; + } catch (IOException ioe) { + throw new WrappedRpcServerException( + RpcErrorCodeProto.FATAL_UNAUTHORIZED, ioe); } + return saslResponse; } - private RpcSaslProto processSaslMessage(byte[] buf) + private RpcSaslProto processSaslMessage(DataInputStream dis) throws IOException, InterruptedException { - final DataInputStream dis = - new DataInputStream(new ByteArrayInputStream(buf)); - RpcRequestMessageWrapper requestWrapper = new RpcRequestMessageWrapper(); - requestWrapper.readFields(dis); - - final RpcRequestHeaderProto rpcHeader = requestWrapper.requestHeader; - if (rpcHeader.getCallId() != AuthProtocol.SASL.callId) { - throw new SaslException("Client sent non-SASL request"); - } final RpcSaslProto saslMessage = - RpcSaslProto.parseFrom(requestWrapper.theRequestRead); + decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis); RpcSaslProto saslResponse = null; final SaslState state = saslMessage.getState(); // required switch (state) { @@ -1337,8 +1351,7 @@ public abstract class Server { return saslResponse; } - private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken) - throws IOException { + private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken) { if (LOG.isDebugEnabled()) { LOG.debug("Will send " + state + " token of size " + ((replyToken != null) ? replyToken.length : null) @@ -1352,8 +1365,7 @@ public abstract class Server { return response.build(); } - private void doSaslReply(Message message) - throws IOException { + private void doSaslReply(Message message) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Sending sasl message "+message); } @@ -1465,16 +1477,7 @@ public abstract class Server { dataLengthBuffer.clear(); data.flip(); boolean isHeaderRead = connectionContextRead; - if (authProtocol == AuthProtocol.SASL) { - // switch to simple must ignore next negotiate or initiate - if (skipInitialSaslHandshake) { - authProtocol = AuthProtocol.NONE; - } else { - saslReadAndProcess(data.array()); - } - } else { - processOneRpc(data.array()); - } + processRpcRequestPacket(data.array()); data = null; if (!isHeaderRead) { continue; @@ -1509,6 +1512,7 @@ public abstract class Server { // switch to simple hack, but don't switch if other auths are // supported, ex. tokens if (isSimpleEnabled && enabledAuthMethods.size() == 1) { + authProtocol = AuthProtocol.NONE; skipInitialSaslHandshake = true; doSaslReply(buildSaslResponse(SaslState.SUCCESS, null)); } @@ -1608,11 +1612,21 @@ public abstract class Server { responder.doRespond(fakeCall); } - /** Reads the connection context following the connection header */ - private void processConnectionContext(byte[] buf) throws IOException { - DataInputStream in = - new DataInputStream(new ByteArrayInputStream(buf)); - connectionContext = IpcConnectionContextProto.parseFrom(in); + /** Reads the connection context following the connection header + * @param dis - DataInputStream from which to read the header + * @throws WrappedRpcServerException - if the header cannot be + * deserialized, or the user is not authorized + */ + private void processConnectionContext(DataInputStream dis) + throws WrappedRpcServerException { + // allow only one connection context during a session + if (connectionContextRead) { + throw new WrappedRpcServerException( + RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, + "Connection context already processed"); + } + connectionContext = decodeProtobufFromStream( + IpcConnectionContextProto.newBuilder(), dis); protocolName = connectionContext.hasProtocol() ? connectionContext .getProtocol() : null; @@ -1629,9 +1643,11 @@ public abstract class Server { && (!protocolUser.getUserName().equals(user.getUserName()))) { if (authMethod == AuthMethod.TOKEN) { // Not allowed to doAs if token authentication is used - throw new AccessControlException("Authenticated user (" + user - + ") doesn't match what the client claims to be (" - + protocolUser + ")"); + throw new WrappedRpcServerException( + RpcErrorCodeProto.FATAL_UNAUTHORIZED, + new AccessControlException("Authenticated user (" + user + + ") doesn't match what the client claims to be (" + + protocolUser + ")")); } else { // Effective user can be different from authenticated user // for simple auth or kerberos auth @@ -1642,9 +1658,34 @@ public abstract class Server { } } } + authorizeConnection(); + // don't set until after authz because connection isn't established + connectionContextRead = true; } - private void processUnwrappedData(byte[] inBuf) throws IOException, + /** + * Process a RPC Request - if SASL wrapping is enabled, unwrap the + * requests and process each one, else directly process the request + * @param buf - single request or SASL wrapped requests + * @throws IOException - connection failed to authenticate or authorize, + * or the request could not be decoded into a Call + * @throws InterruptedException + */ + private void processRpcRequestPacket(byte[] buf) throws IOException, + InterruptedException { + if (saslContextEstablished && useWrap) { + if (LOG.isDebugEnabled()) + LOG.debug("Have read input token of size " + buf.length + + " for processing by saslServer.unwrap()"); + final byte[] plaintextData = saslServer.unwrap(buf, 0, buf.length); + // loops over decoded data and calls processOneRpc + unwrapPacketAndProcessRpcs(plaintextData); + } else { + processOneRpc(buf); + } + } + + private void unwrapPacketAndProcessRpcs(byte[] inBuf) throws IOException, InterruptedException { ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream( inBuf)); @@ -1683,61 +1724,93 @@ public abstract class Server { } } - private void processOneRpc(byte[] buf) throws IOException, - InterruptedException { - if (connectionContextRead) { - processRpcRequest(buf); - } else { - processConnectionContext(buf); - connectionContextRead = true; - if (!authorizeConnection()) { - throw new AccessControlException("Connection from " + this - + " for protocol " + connectionContext.getProtocol() - + " is unauthorized for user " + user); + /** + * Process an RPC Request - handle connection setup and decoding of + * request into a Call + * @param buf - contains the RPC request header and the rpc request + * @throws IOException - internal error that should not be returned to + * client, typically failure to respond to client + * @throws WrappedRpcServerException - an exception to be sent back to + * the client that does not require verbose logging by the + * Listener thread + * @throws InterruptedException + */ + private void processOneRpc(byte[] buf) + throws IOException, WrappedRpcServerException, InterruptedException { + int callId = -1; + try { + final DataInputStream dis = + new DataInputStream(new ByteArrayInputStream(buf)); + final RpcRequestHeaderProto header = + decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis); + callId = header.getCallId(); + if (LOG.isDebugEnabled()) { + LOG.debug(" got #" + callId); } + checkRpcHeaders(header); + + if (callId < 0) { // callIds typically used during connection setup + processRpcOutOfBandRequest(header, dis); + } else if (!connectionContextRead) { + throw new WrappedRpcServerException( + RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, + "Connection context not established"); + } else { + processRpcRequest(header, dis); + } + } catch (WrappedRpcServerException wrse) { // inform client of error + Throwable ioe = wrse.getCause(); + final Call call = new Call(callId, null, this); + setupResponse(authFailedResponse, call, + RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null, + ioe.getClass().getName(), ioe.getMessage()); + responder.doRespond(call); + throw wrse; } } - + /** - * Process an RPC Request - the connection headers and context have been - * read - * @param buf - contains the RPC request header and the rpc request - * @throws RpcServerException due to fatal rpc layer issues such as - * invalid header. In this case a RPC fatal status response is sent back - * to client. + * Verify RPC header is valid + * @param header - RPC request header + * @throws WrappedRpcServerException - header contains invalid values */ - - private void processRpcRequest(byte[] buf) - throws RpcServerException, IOException, InterruptedException { - DataInputStream dis = - new DataInputStream(new ByteArrayInputStream(buf)); - RpcRequestHeaderProto header = RpcRequestHeaderProto.parseDelimitedFrom(dis); - - if (LOG.isDebugEnabled()) - LOG.debug(" got #" + header.getCallId()); + private void checkRpcHeaders(RpcRequestHeaderProto header) + throws WrappedRpcServerException { if (!header.hasRpcOp()) { String err = " IPC Server: No rpc op in rpcRequestHeader"; - respondBadRpcHeader(new Call(header.getCallId(), null, this), - RpcServerException.class.getName(), err); - throw new RpcServerException(err); + throw new WrappedRpcServerException( + RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err); } if (header.getRpcOp() != RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET) { String err = "IPC Server does not implement rpc header operation" + header.getRpcOp(); - respondBadRpcHeader(new Call(header.getCallId(), null, this), - RpcServerException.class.getName(), err); - throw new RpcServerException(err); + throw new WrappedRpcServerException( + RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err); } // If we know the rpc kind, get its class so that we can deserialize // (Note it would make more sense to have the handler deserialize but // we continue with this original design. if (!header.hasRpcKind()) { String err = " IPC Server: No rpc kind in rpcRequestHeader"; - respondBadRpcHeader(new Call(header.getCallId(), null, this), - RpcServerException.class.getName(), err); - throw new RpcServerException(err); + throw new WrappedRpcServerException( + RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err); } + } + + /** + * Process an RPC Request - the connection headers and context must + * have been already read + * @param header - RPC request header + * @param dis - stream to request payload + * @throws WrappedRpcServerException - due to fatal rpc layer issues such + * as invalid header or deserialization error. In this case a RPC fatal + * status response will later be sent back to client. + * @throws InterruptedException + */ + private void processRpcRequest(RpcRequestHeaderProto header, + DataInputStream dis) throws WrappedRpcServerException, + InterruptedException { Class rpcRequestClass = getRpcRequestWrapper(header.getRpcKind()); if (rpcRequestClass == null) { @@ -1745,9 +1818,8 @@ public abstract class Server { " from client " + getHostAddress()); final String err = "Unknown rpc kind in rpc header" + header.getRpcKind(); - respondBadRpcHeader(new Call(header.getCallId(), null, this), - RpcServerException.class.getName(), err); - throw new RpcServerException(err); + throw new WrappedRpcServerException( + RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err); } Writable rpcRequest; try { //Read the rpc request @@ -1757,17 +1829,9 @@ public abstract class Server { LOG.warn("Unable to read call parameters for client " + getHostAddress() + "on connection protocol " + this.protocolName + " for rpcKind " + header.getRpcKind(), t); - final Call readParamsFailedCall = - new Call(header.getCallId(), null, this); - ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); String err = "IPC server unable to read call parameters: "+ t.getMessage(); - - setupResponse(responseBuffer, readParamsFailedCall, - RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, - null, t.getClass().getName(), - err); - responder.doRespond(readParamsFailedCall); - throw new RpcServerException(err, t); + throw new WrappedRpcServerException( + RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err); } Call call = new Call(header.getCallId(), rpcRequest, this, @@ -1776,7 +1840,59 @@ public abstract class Server { incRpcCount(); // Increment the rpc count } - private boolean authorizeConnection() throws IOException { + + /** + * Establish RPC connection setup by negotiating SASL if required, then + * reading and authorizing the connection header + * @param header - RPC header + * @param dis - stream to request payload + * @throws WrappedRpcServerException - setup failed due to SASL + * negotiation failure, premature or invalid connection context, + * or other state errors + * @throws IOException - failed to send a response back to the client + * @throws InterruptedException + */ + private void processRpcOutOfBandRequest(RpcRequestHeaderProto header, + DataInputStream dis) throws WrappedRpcServerException, IOException, + InterruptedException { + final int callId = header.getCallId(); + if (callId == CONNECTION_CONTEXT_CALL_ID) { + // SASL must be established prior to connection context + if (authProtocol == AuthProtocol.SASL && !saslContextEstablished) { + throw new WrappedRpcServerException( + RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, + "Connection header sent during SASL negotiation"); + } + // read and authorize the user + processConnectionContext(dis); + } else if (callId == AuthProtocol.SASL.callId) { + // if client was switched to simple, ignore first SASL message + if (authProtocol != AuthProtocol.SASL) { + if (!skipInitialSaslHandshake) { + throw new WrappedRpcServerException( + RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, + "SASL protocol not requested by client"); + } + skipInitialSaslHandshake = false; + return; + } + RpcSaslProto response = saslReadAndProcess(dis); + // send back response if any, may throw IOException + if (response != null) { + doSaslReply(response); + } + } else { + throw new WrappedRpcServerException( + RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, + "Unknown out of band call #" + callId); + } + } + + /** + * Authorize proxy users to access this server + * @throws WrappedRpcServerException - user is not allowed to proxy + */ + private void authorizeConnection() throws WrappedRpcServerException { try { // If auth method is DIGEST, the token was obtained by the // real user for the effective user, therefore not required to @@ -1792,16 +1908,36 @@ public abstract class Server { } rpcMetrics.incrAuthorizationSuccesses(); } catch (AuthorizationException ae) { + LOG.info("Connection from " + this + + " for protocol " + connectionContext.getProtocol() + + " is unauthorized for user " + user); rpcMetrics.incrAuthorizationFailures(); - setupResponse(authFailedResponse, authFailedCall, - RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_UNAUTHORIZED, null, - ae.getClass().getName(), ae.getMessage()); - responder.doRespond(authFailedCall); - return false; + throw new WrappedRpcServerException( + RpcErrorCodeProto.FATAL_UNAUTHORIZED, ae); } - return true; } + /** + * Decode the a protobuf from the given input stream + * @param builder - Builder of the protobuf to decode + * @param dis - DataInputStream to read the protobuf + * @return Message - decoded protobuf + * @throws WrappedRpcServerException - deserialization failed + */ + @SuppressWarnings("unchecked") + private T decodeProtobufFromStream(Builder builder, + DataInputStream dis) throws WrappedRpcServerException { + try { + builder.mergeDelimitedFrom(dis); + return (T)builder.build(); + } catch (Exception ioe) { + Class protoClass = builder.getDefaultInstanceForType().getClass(); + throw new WrappedRpcServerException( + RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, + "Error decoding " + protoClass.getSimpleName() + ": "+ ioe); + } + } + /** * Get service class for connection * @return the serviceClass @@ -2206,18 +2342,6 @@ public abstract class Server { } - private void respondBadRpcHeader(Call call, String errorClass, String error) - throws IOException - { - ByteArrayOutputStream responseBuf = new ByteArrayOutputStream(); - setupResponse(responseBuf, call, - RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, - null, errorClass, error); - responder.doRespond(call); - return; - - } - private void wrapWithSasl(ByteArrayOutputStream response, Call call) throws IOException { if (call.connection.saslServer != null) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java index 6d67f73e537..08f9672be00 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java @@ -312,7 +312,7 @@ public class TestSaslRPC { doDigestRpc(server, sm); } catch (RemoteException e) { LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage()); - assertTrue(ERROR_MESSAGE.equals(e.getLocalizedMessage())); + assertEquals(ERROR_MESSAGE, e.getLocalizedMessage()); assertTrue(e.unwrapRemoteException() instanceof InvalidToken); succeeded = true; } @@ -818,6 +818,7 @@ public class TestSaslRPC { } try { + LOG.info("trying ugi:"+clientUgi+" tokens:"+clientUgi.getTokens()); return clientUgi.doAs(new PrivilegedExceptionAction() { @Override public String run() throws IOException {