Merge -c 1512091 from trunk for HADOOP-9820. RPCv9 wire protocol is insufficient to support multiplexing. Contributed by Daryn Sharp.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1512094 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jitendra Nath Pandey 2013-08-08 23:11:10 +00:00
parent 25038197b5
commit 203fbd2c86
5 changed files with 188 additions and 50 deletions

View File

@ -110,6 +110,8 @@ Release 2.1.0-beta - 2013-08-06
HADOOP-9832. [RPC v9] Add RPC header to client ping (daryn) HADOOP-9832. [RPC v9] Add RPC header to client ping (daryn)
HADOOP-9820. [RPC v9] Wire protocol is insufficient to support multiplexing. (daryn via jitendra)
NEW FEATURES NEW FEATURES
HADOOP-9283. Add support for running the Hadoop client on AIX. (atm) HADOOP-9283. Add support for running the Hadoop client on AIX. (atm)

View File

@ -737,12 +737,16 @@ public class Client {
} }
if (doPing) { if (doPing) {
this.in = new DataInputStream(new BufferedInputStream( inStream = new PingInputStream(inStream);
new PingInputStream(inStream)));
} else {
this.in = new DataInputStream(new BufferedInputStream(inStream));
} }
this.out = new DataOutputStream(new BufferedOutputStream(outStream)); this.in = new DataInputStream(new BufferedInputStream(inStream));
// SASL may have already buffered the stream
if (!(outStream instanceof BufferedOutputStream)) {
outStream = new BufferedOutputStream(outStream);
}
this.out = new DataOutputStream(outStream);
writeConnectionContext(remoteId, authMethod); writeConnectionContext(remoteId, authMethod);
// update last activity time // update last activity time

View File

@ -72,6 +72,8 @@ import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import static org.apache.hadoop.ipc.RpcConstants.*; import static org.apache.hadoop.ipc.RpcConstants.*;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper; import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.VersionMismatch; import org.apache.hadoop.ipc.RPC.VersionMismatch;
@ -1272,6 +1274,26 @@ public abstract class Server {
private void saslReadAndProcess(DataInputStream dis) throws private void saslReadAndProcess(DataInputStream dis) throws
WrappedRpcServerException, IOException, InterruptedException { WrappedRpcServerException, IOException, InterruptedException {
final RpcSaslProto saslMessage =
decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);
switch (saslMessage.getState()) {
case WRAP: {
if (!saslContextEstablished || !useWrap) {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
new SaslException("Server is not wrapping data"));
}
// loops over decoded data and calls processOneRpc
unwrapPacketAndProcessRpcs(saslMessage.getToken().toByteArray());
break;
}
default:
saslProcess(saslMessage);
}
}
private void saslProcess(RpcSaslProto saslMessage)
throws WrappedRpcServerException, IOException, InterruptedException {
if (saslContextEstablished) { if (saslContextEstablished) {
throw new WrappedRpcServerException( throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
@ -1280,7 +1302,7 @@ public abstract class Server {
RpcSaslProto saslResponse = null; RpcSaslProto saslResponse = null;
try { try {
try { try {
saslResponse = processSaslMessage(dis); saslResponse = processSaslMessage(saslMessage);
} catch (IOException e) { } catch (IOException e) {
IOException sendToClient = e; IOException sendToClient = e;
Throwable cause = e; Throwable cause = e;
@ -1325,14 +1347,14 @@ public abstract class Server {
// do NOT enable wrapping until the last auth response is sent // do NOT enable wrapping until the last auth response is sent
if (saslContextEstablished) { if (saslContextEstablished) {
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
// SASL wrapping is only used if the connection has a QOP, and
// the value is not auth. ex. auth-int & auth-priv
useWrap = (qop != null && !"auth".equalsIgnoreCase(qop)); useWrap = (qop != null && !"auth".equalsIgnoreCase(qop));
} }
} }
private RpcSaslProto processSaslMessage(DataInputStream dis) private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage)
throws IOException, InterruptedException { throws IOException, InterruptedException {
final RpcSaslProto saslMessage =
decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);
RpcSaslProto saslResponse = null; RpcSaslProto saslResponse = null;
final SaslState state = saslMessage.getState(); // required final SaslState state = saslMessage.getState(); // required
switch (state) { switch (state) {
@ -1527,7 +1549,7 @@ public abstract class Server {
dataLengthBuffer.clear(); dataLengthBuffer.clear();
data.flip(); data.flip();
boolean isHeaderRead = connectionContextRead; boolean isHeaderRead = connectionContextRead;
processRpcRequestPacket(data.array()); processOneRpc(data.array());
data = null; data = null;
if (!isHeaderRead) { if (!isHeaderRead) {
continue; continue;
@ -1690,29 +1712,19 @@ public abstract class Server {
} }
/** /**
* Process a RPC Request - if SASL wrapping is enabled, unwrap the * Process a wrapped RPC Request - unwrap the SASL packet and process
* requests and process each one, else directly process the request * each embedded RPC request
* @param buf - single request or SASL wrapped requests * @param buf - SASL wrapped request of one or more RPCs
* @throws IOException - connection failed to authenticate or authorize, * @throws IOException - SASL packet cannot be unwrapped
* or the request could not be decoded into a Call
* @throws InterruptedException * @throws InterruptedException
*/ */
private void processRpcRequestPacket(byte[] buf)
throws WrappedRpcServerException, IOException, InterruptedException {
if (saslContextEstablished && useWrap) {
if (LOG.isDebugEnabled())
LOG.debug("Have read input token of size " + buf.length
+ " for processing by saslServer.unwrap()");
final byte[] plaintextData = saslServer.unwrap(buf, 0, buf.length);
// loops over decoded data and calls processOneRpc
unwrapPacketAndProcessRpcs(plaintextData);
} else {
processOneRpc(buf);
}
}
private void unwrapPacketAndProcessRpcs(byte[] inBuf) private void unwrapPacketAndProcessRpcs(byte[] inBuf)
throws WrappedRpcServerException, IOException, InterruptedException { throws WrappedRpcServerException, IOException, InterruptedException {
if (LOG.isDebugEnabled()) {
LOG.debug("Have read input token of size " + inBuf.length
+ " for processing by saslServer.unwrap()");
}
inBuf = saslServer.unwrap(inBuf, 0, inBuf.length);
ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream( ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
inBuf)); inBuf));
// Read all RPCs contained in the inBuf, even partial ones // Read all RPCs contained in the inBuf, even partial ones
@ -2375,9 +2387,21 @@ public abstract class Server {
LOG.debug("Adding saslServer wrapped token of size " + token.length LOG.debug("Adding saslServer wrapped token of size " + token.length
+ " as call response."); + " as call response.");
response.reset(); response.reset();
DataOutputStream saslOut = new DataOutputStream(response); // rebuild with sasl header and payload
saslOut.writeInt(token.length); RpcResponseHeaderProto saslHeader = RpcResponseHeaderProto.newBuilder()
saslOut.write(token, 0, token.length); .setCallId(AuthProtocol.SASL.callId)
.setStatus(RpcStatusProto.SUCCESS)
.build();
RpcSaslProto saslMessage = RpcSaslProto.newBuilder()
.setState(SaslState.WRAP)
.setToken(ByteString.copyFrom(token, 0, token.length))
.build();
RpcResponseMessageWrapper saslResponse =
new RpcResponseMessageWrapper(saslHeader, saslMessage);
DataOutputStream out = new DataOutputStream(response);
out.writeInt(saslResponse.getLength());
saslResponse.write(out);
} }
} }

View File

@ -20,12 +20,16 @@ package org.apache.hadoop.security;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.FilterInputStream;
import java.io.FilterOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -485,36 +489,139 @@ public class SaslRpcClient {
return response; return response;
} }
/** private boolean useWrap() {
* Get a SASL wrapped InputStream. Can be called only after saslConnect() has // getNegotiatedProperty throws if client isn't complete
* been called. String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
* // SASL wrapping is only used if the connection has a QOP, and
* @param in // the value is not auth. ex. auth-int & auth-priv
* the InputStream to wrap return qop != null && !"auth".equalsIgnoreCase(qop);
* @return a SASL wrapped InputStream
* @throws IOException
*/
public InputStream getInputStream(InputStream in) throws IOException {
if (!saslClient.isComplete()) {
throw new IOException("Sasl authentication exchange hasn't completed yet");
}
return new SaslInputStream(in, saslClient);
} }
/** /**
* Get a SASL wrapped OutputStream. Can be called only after saslConnect() has * Get SASL wrapped InputStream if SASL QoP requires unwrapping,
* been called. * otherwise return original stream. Can be called only after
* saslConnect() has been called.
* *
* @param out * @param in - InputStream used to make the connection
* the OutputStream to wrap * @return InputStream that may be using SASL unwrap
* @return a SASL wrapped OutputStream * @throws IOException
*/
public InputStream getInputStream(InputStream in) throws IOException {
if (useWrap()) {
in = new WrappedInputStream(in);
}
return in;
}
/**
* Get SASL wrapped OutputStream if SASL QoP requires wrapping,
* otherwise return original stream. Can be called only after
* saslConnect() has been called.
*
* @param in - InputStream used to make the connection
* @return InputStream that may be using SASL unwrap
* @throws IOException * @throws IOException
*/ */
public OutputStream getOutputStream(OutputStream out) throws IOException { public OutputStream getOutputStream(OutputStream out) throws IOException {
if (!saslClient.isComplete()) { if (useWrap()) {
throw new IOException("Sasl authentication exchange hasn't completed yet"); // the client and server negotiate a maximum buffer size that can be
// wrapped
String maxBuf = (String)saslClient.getNegotiatedProperty(Sasl.RAW_SEND_SIZE);
out = new BufferedOutputStream(new WrappedOutputStream(out),
Integer.parseInt(maxBuf));
}
return out;
}
// ideally this should be folded into the RPC decoding loop but it's
// currently split across Client and SaslRpcClient...
class WrappedInputStream extends FilterInputStream {
private ByteBuffer unwrappedRpcBuffer = ByteBuffer.allocate(0);
public WrappedInputStream(InputStream in) throws IOException {
super(in);
}
@Override
public int read() throws IOException {
byte[] b = new byte[1];
int n = read(b, 0, 1);
return (n != -1) ? b[0] : -1;
}
@Override
public int read(byte b[]) throws IOException {
return read(b, 0, b.length);
}
@Override
public int read(byte[] buf, int off, int len) throws IOException {
synchronized(unwrappedRpcBuffer) {
// fill the buffer with the next RPC message
if (unwrappedRpcBuffer.remaining() == 0) {
readNextRpcPacket();
}
// satisfy as much of the request as possible
int readLen = Math.min(len, unwrappedRpcBuffer.remaining());
unwrappedRpcBuffer.get(buf, off, readLen);
return readLen;
}
}
// all messages must be RPC SASL wrapped, else an exception is thrown
private void readNextRpcPacket() throws IOException {
LOG.debug("reading next wrapped RPC packet");
DataInputStream dis = new DataInputStream(in);
int rpcLen = dis.readInt();
byte[] rpcBuf = new byte[rpcLen];
dis.readFully(rpcBuf);
// decode the RPC header
ByteArrayInputStream bis = new ByteArrayInputStream(rpcBuf);
RpcResponseHeaderProto.Builder headerBuilder =
RpcResponseHeaderProto.newBuilder();
headerBuilder.mergeDelimitedFrom(bis);
boolean isWrapped = false;
// Must be SASL wrapped, verify and decode.
if (headerBuilder.getCallId() == AuthProtocol.SASL.callId) {
RpcSaslProto.Builder saslMessage = RpcSaslProto.newBuilder();
saslMessage.mergeDelimitedFrom(bis);
if (saslMessage.getState() == SaslState.WRAP) {
isWrapped = true;
byte[] token = saslMessage.getToken().toByteArray();
if (LOG.isDebugEnabled()) {
LOG.debug("unwrapping token of length:" + token.length);
}
token = saslClient.unwrap(token, 0, token.length);
unwrappedRpcBuffer = ByteBuffer.wrap(token);
}
}
if (!isWrapped) {
throw new SaslException("Server sent non-wrapped response");
}
}
}
class WrappedOutputStream extends FilterOutputStream {
public WrappedOutputStream(OutputStream out) throws IOException {
super(out);
}
@Override
public void write(byte[] buf, int off, int len) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("wrapping token of length:" + len);
}
buf = saslClient.wrap(buf, off, len);
RpcSaslProto saslMessage = RpcSaslProto.newBuilder()
.setState(SaslState.WRAP)
.setToken(ByteString.copyFrom(buf, 0, buf.length))
.build();
RpcRequestMessageWrapper request =
new RpcRequestMessageWrapper(saslHeader, saslMessage);
DataOutputStream dob = new DataOutputStream(out);
dob.writeInt(request.getLength());
request.write(dob);
} }
return new SaslOutputStream(out, saslClient);
} }
/** Release resources used by wrapped saslClient */ /** Release resources used by wrapped saslClient */

View File

@ -141,6 +141,7 @@ message RpcSaslProto {
INITIATE = 2; INITIATE = 2;
CHALLENGE = 3; CHALLENGE = 3;
RESPONSE = 4; RESPONSE = 4;
WRAP = 5;
} }
message SaslAuth { message SaslAuth {