HADOOP-9820. RPCv9 wire protocol is insufficient to support multiplexing. Contributed by Daryn Sharp.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1512091 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e90afcc971
commit
c03c8fe199
|
@ -383,6 +383,8 @@ Release 2.1.0-beta - 2013-08-06
|
|||
|
||||
HADOOP-9832. [RPC v9] Add RPC header to client ping (daryn)
|
||||
|
||||
HADOOP-9820. [RPC v9] Wire protocol is insufficient to support multiplexing. (daryn via jitendra)
|
||||
|
||||
NEW FEATURES
|
||||
|
||||
HADOOP-9283. Add support for running the Hadoop client on AIX. (atm)
|
||||
|
|
|
@ -737,12 +737,16 @@ public class Client {
|
|||
}
|
||||
|
||||
if (doPing) {
|
||||
this.in = new DataInputStream(new BufferedInputStream(
|
||||
new PingInputStream(inStream)));
|
||||
} else {
|
||||
this.in = new DataInputStream(new BufferedInputStream(inStream));
|
||||
inStream = new PingInputStream(inStream);
|
||||
}
|
||||
this.out = new DataOutputStream(new BufferedOutputStream(outStream));
|
||||
this.in = new DataInputStream(new BufferedInputStream(inStream));
|
||||
|
||||
// SASL may have already buffered the stream
|
||||
if (!(outStream instanceof BufferedOutputStream)) {
|
||||
outStream = new BufferedOutputStream(outStream);
|
||||
}
|
||||
this.out = new DataOutputStream(outStream);
|
||||
|
||||
writeConnectionContext(remoteId, authMethod);
|
||||
|
||||
// update last activity time
|
||||
|
|
|
@ -73,6 +73,8 @@ import org.apache.hadoop.io.DataOutputBuffer;
|
|||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
import static org.apache.hadoop.ipc.RpcConstants.*;
|
||||
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
|
||||
import org.apache.hadoop.ipc.RPC.RpcInvoker;
|
||||
import org.apache.hadoop.ipc.RPC.VersionMismatch;
|
||||
|
@ -1275,6 +1277,26 @@ public abstract class Server {
|
|||
|
||||
private void saslReadAndProcess(DataInputStream dis) throws
|
||||
WrappedRpcServerException, IOException, InterruptedException {
|
||||
final RpcSaslProto saslMessage =
|
||||
decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);
|
||||
switch (saslMessage.getState()) {
|
||||
case WRAP: {
|
||||
if (!saslContextEstablished || !useWrap) {
|
||||
throw new WrappedRpcServerException(
|
||||
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
||||
new SaslException("Server is not wrapping data"));
|
||||
}
|
||||
// loops over decoded data and calls processOneRpc
|
||||
unwrapPacketAndProcessRpcs(saslMessage.getToken().toByteArray());
|
||||
break;
|
||||
}
|
||||
default:
|
||||
saslProcess(saslMessage);
|
||||
}
|
||||
}
|
||||
|
||||
private void saslProcess(RpcSaslProto saslMessage)
|
||||
throws WrappedRpcServerException, IOException, InterruptedException {
|
||||
if (saslContextEstablished) {
|
||||
throw new WrappedRpcServerException(
|
||||
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
||||
|
@ -1283,7 +1305,7 @@ public abstract class Server {
|
|||
RpcSaslProto saslResponse = null;
|
||||
try {
|
||||
try {
|
||||
saslResponse = processSaslMessage(dis);
|
||||
saslResponse = processSaslMessage(saslMessage);
|
||||
} catch (IOException e) {
|
||||
IOException sendToClient = e;
|
||||
Throwable cause = e;
|
||||
|
@ -1328,14 +1350,14 @@ public abstract class Server {
|
|||
// do NOT enable wrapping until the last auth response is sent
|
||||
if (saslContextEstablished) {
|
||||
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
|
||||
// SASL wrapping is only used if the connection has a QOP, and
|
||||
// the value is not auth. ex. auth-int & auth-priv
|
||||
useWrap = (qop != null && !"auth".equalsIgnoreCase(qop));
|
||||
}
|
||||
}
|
||||
|
||||
private RpcSaslProto processSaslMessage(DataInputStream dis)
|
||||
private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage)
|
||||
throws IOException, InterruptedException {
|
||||
final RpcSaslProto saslMessage =
|
||||
decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);
|
||||
RpcSaslProto saslResponse = null;
|
||||
final SaslState state = saslMessage.getState(); // required
|
||||
switch (state) {
|
||||
|
@ -1530,7 +1552,7 @@ public abstract class Server {
|
|||
dataLengthBuffer.clear();
|
||||
data.flip();
|
||||
boolean isHeaderRead = connectionContextRead;
|
||||
processRpcRequestPacket(data.array());
|
||||
processOneRpc(data.array());
|
||||
data = null;
|
||||
if (!isHeaderRead) {
|
||||
continue;
|
||||
|
@ -1693,29 +1715,19 @@ public abstract class Server {
|
|||
}
|
||||
|
||||
/**
|
||||
* Process a RPC Request - if SASL wrapping is enabled, unwrap the
|
||||
* requests and process each one, else directly process the request
|
||||
* @param buf - single request or SASL wrapped requests
|
||||
* @throws IOException - connection failed to authenticate or authorize,
|
||||
* or the request could not be decoded into a Call
|
||||
* Process a wrapped RPC Request - unwrap the SASL packet and process
|
||||
* each embedded RPC request
|
||||
* @param buf - SASL wrapped request of one or more RPCs
|
||||
* @throws IOException - SASL packet cannot be unwrapped
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
private void processRpcRequestPacket(byte[] buf)
|
||||
throws WrappedRpcServerException, IOException, InterruptedException {
|
||||
if (saslContextEstablished && useWrap) {
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Have read input token of size " + buf.length
|
||||
+ " for processing by saslServer.unwrap()");
|
||||
final byte[] plaintextData = saslServer.unwrap(buf, 0, buf.length);
|
||||
// loops over decoded data and calls processOneRpc
|
||||
unwrapPacketAndProcessRpcs(plaintextData);
|
||||
} else {
|
||||
processOneRpc(buf);
|
||||
}
|
||||
}
|
||||
|
||||
private void unwrapPacketAndProcessRpcs(byte[] inBuf)
|
||||
throws WrappedRpcServerException, IOException, InterruptedException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Have read input token of size " + inBuf.length
|
||||
+ " for processing by saslServer.unwrap()");
|
||||
}
|
||||
inBuf = saslServer.unwrap(inBuf, 0, inBuf.length);
|
||||
ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
|
||||
inBuf));
|
||||
// Read all RPCs contained in the inBuf, even partial ones
|
||||
|
@ -2378,9 +2390,21 @@ public abstract class Server {
|
|||
LOG.debug("Adding saslServer wrapped token of size " + token.length
|
||||
+ " as call response.");
|
||||
response.reset();
|
||||
DataOutputStream saslOut = new DataOutputStream(response);
|
||||
saslOut.writeInt(token.length);
|
||||
saslOut.write(token, 0, token.length);
|
||||
// rebuild with sasl header and payload
|
||||
RpcResponseHeaderProto saslHeader = RpcResponseHeaderProto.newBuilder()
|
||||
.setCallId(AuthProtocol.SASL.callId)
|
||||
.setStatus(RpcStatusProto.SUCCESS)
|
||||
.build();
|
||||
RpcSaslProto saslMessage = RpcSaslProto.newBuilder()
|
||||
.setState(SaslState.WRAP)
|
||||
.setToken(ByteString.copyFrom(token, 0, token.length))
|
||||
.build();
|
||||
RpcResponseMessageWrapper saslResponse =
|
||||
new RpcResponseMessageWrapper(saslHeader, saslMessage);
|
||||
|
||||
DataOutputStream out = new DataOutputStream(response);
|
||||
out.writeInt(saslResponse.getLength());
|
||||
saslResponse.write(out);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,12 +20,16 @@ package org.apache.hadoop.security;
|
|||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.FilterInputStream;
|
||||
import java.io.FilterOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -485,36 +489,139 @@ public class SaslRpcClient {
|
|||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a SASL wrapped InputStream. Can be called only after saslConnect() has
|
||||
* been called.
|
||||
*
|
||||
* @param in
|
||||
* the InputStream to wrap
|
||||
* @return a SASL wrapped InputStream
|
||||
* @throws IOException
|
||||
*/
|
||||
public InputStream getInputStream(InputStream in) throws IOException {
|
||||
if (!saslClient.isComplete()) {
|
||||
throw new IOException("Sasl authentication exchange hasn't completed yet");
|
||||
}
|
||||
return new SaslInputStream(in, saslClient);
|
||||
private boolean useWrap() {
|
||||
// getNegotiatedProperty throws if client isn't complete
|
||||
String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
|
||||
// SASL wrapping is only used if the connection has a QOP, and
|
||||
// the value is not auth. ex. auth-int & auth-priv
|
||||
return qop != null && !"auth".equalsIgnoreCase(qop);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a SASL wrapped OutputStream. Can be called only after saslConnect() has
|
||||
* been called.
|
||||
* Get SASL wrapped InputStream if SASL QoP requires unwrapping,
|
||||
* otherwise return original stream. Can be called only after
|
||||
* saslConnect() has been called.
|
||||
*
|
||||
* @param out
|
||||
* the OutputStream to wrap
|
||||
* @return a SASL wrapped OutputStream
|
||||
* @param in - InputStream used to make the connection
|
||||
* @return InputStream that may be using SASL unwrap
|
||||
* @throws IOException
|
||||
*/
|
||||
public InputStream getInputStream(InputStream in) throws IOException {
|
||||
if (useWrap()) {
|
||||
in = new WrappedInputStream(in);
|
||||
}
|
||||
return in;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get SASL wrapped OutputStream if SASL QoP requires wrapping,
|
||||
* otherwise return original stream. Can be called only after
|
||||
* saslConnect() has been called.
|
||||
*
|
||||
* @param in - InputStream used to make the connection
|
||||
* @return InputStream that may be using SASL unwrap
|
||||
* @throws IOException
|
||||
*/
|
||||
public OutputStream getOutputStream(OutputStream out) throws IOException {
|
||||
if (!saslClient.isComplete()) {
|
||||
throw new IOException("Sasl authentication exchange hasn't completed yet");
|
||||
if (useWrap()) {
|
||||
// the client and server negotiate a maximum buffer size that can be
|
||||
// wrapped
|
||||
String maxBuf = (String)saslClient.getNegotiatedProperty(Sasl.RAW_SEND_SIZE);
|
||||
out = new BufferedOutputStream(new WrappedOutputStream(out),
|
||||
Integer.parseInt(maxBuf));
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
// ideally this should be folded into the RPC decoding loop but it's
|
||||
// currently split across Client and SaslRpcClient...
|
||||
class WrappedInputStream extends FilterInputStream {
|
||||
private ByteBuffer unwrappedRpcBuffer = ByteBuffer.allocate(0);
|
||||
public WrappedInputStream(InputStream in) throws IOException {
|
||||
super(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
byte[] b = new byte[1];
|
||||
int n = read(b, 0, 1);
|
||||
return (n != -1) ? b[0] : -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte b[]) throws IOException {
|
||||
return read(b, 0, b.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] buf, int off, int len) throws IOException {
|
||||
synchronized(unwrappedRpcBuffer) {
|
||||
// fill the buffer with the next RPC message
|
||||
if (unwrappedRpcBuffer.remaining() == 0) {
|
||||
readNextRpcPacket();
|
||||
}
|
||||
// satisfy as much of the request as possible
|
||||
int readLen = Math.min(len, unwrappedRpcBuffer.remaining());
|
||||
unwrappedRpcBuffer.get(buf, off, readLen);
|
||||
return readLen;
|
||||
}
|
||||
}
|
||||
|
||||
// all messages must be RPC SASL wrapped, else an exception is thrown
|
||||
private void readNextRpcPacket() throws IOException {
|
||||
LOG.debug("reading next wrapped RPC packet");
|
||||
DataInputStream dis = new DataInputStream(in);
|
||||
int rpcLen = dis.readInt();
|
||||
byte[] rpcBuf = new byte[rpcLen];
|
||||
dis.readFully(rpcBuf);
|
||||
|
||||
// decode the RPC header
|
||||
ByteArrayInputStream bis = new ByteArrayInputStream(rpcBuf);
|
||||
RpcResponseHeaderProto.Builder headerBuilder =
|
||||
RpcResponseHeaderProto.newBuilder();
|
||||
headerBuilder.mergeDelimitedFrom(bis);
|
||||
|
||||
boolean isWrapped = false;
|
||||
// Must be SASL wrapped, verify and decode.
|
||||
if (headerBuilder.getCallId() == AuthProtocol.SASL.callId) {
|
||||
RpcSaslProto.Builder saslMessage = RpcSaslProto.newBuilder();
|
||||
saslMessage.mergeDelimitedFrom(bis);
|
||||
if (saslMessage.getState() == SaslState.WRAP) {
|
||||
isWrapped = true;
|
||||
byte[] token = saslMessage.getToken().toByteArray();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("unwrapping token of length:" + token.length);
|
||||
}
|
||||
token = saslClient.unwrap(token, 0, token.length);
|
||||
unwrappedRpcBuffer = ByteBuffer.wrap(token);
|
||||
}
|
||||
}
|
||||
if (!isWrapped) {
|
||||
throw new SaslException("Server sent non-wrapped response");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class WrappedOutputStream extends FilterOutputStream {
|
||||
public WrappedOutputStream(OutputStream out) throws IOException {
|
||||
super(out);
|
||||
}
|
||||
@Override
|
||||
public void write(byte[] buf, int off, int len) throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("wrapping token of length:" + len);
|
||||
}
|
||||
buf = saslClient.wrap(buf, off, len);
|
||||
RpcSaslProto saslMessage = RpcSaslProto.newBuilder()
|
||||
.setState(SaslState.WRAP)
|
||||
.setToken(ByteString.copyFrom(buf, 0, buf.length))
|
||||
.build();
|
||||
RpcRequestMessageWrapper request =
|
||||
new RpcRequestMessageWrapper(saslHeader, saslMessage);
|
||||
DataOutputStream dob = new DataOutputStream(out);
|
||||
dob.writeInt(request.getLength());
|
||||
request.write(dob);
|
||||
}
|
||||
return new SaslOutputStream(out, saslClient);
|
||||
}
|
||||
|
||||
/** Release resources used by wrapped saslClient */
|
||||
|
|
|
@ -141,6 +141,7 @@ message RpcSaslProto {
|
|||
INITIATE = 2;
|
||||
CHALLENGE = 3;
|
||||
RESPONSE = 4;
|
||||
WRAP = 5;
|
||||
}
|
||||
|
||||
message SaslAuth {
|
||||
|
|
Loading…
Reference in New Issue