diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 1f2431076c3..a34e508f396 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -110,6 +110,8 @@ Release 2.1.0-beta - 2013-08-06 HADOOP-9832. [RPC v9] Add RPC header to client ping (daryn) + HADOOP-9820. [RPC v9] Wire protocol is insufficient to support multiplexing. (daryn via jitendra) + NEW FEATURES HADOOP-9283. Add support for running the Hadoop client on AIX. (atm) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 08e86c8d4c1..1ed72017d6a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -737,12 +737,16 @@ public class Client { } if (doPing) { - this.in = new DataInputStream(new BufferedInputStream( - new PingInputStream(inStream))); - } else { - this.in = new DataInputStream(new BufferedInputStream(inStream)); + inStream = new PingInputStream(inStream); } - this.out = new DataOutputStream(new BufferedOutputStream(outStream)); + this.in = new DataInputStream(new BufferedInputStream(inStream)); + + // SASL may have already buffered the stream + if (!(outStream instanceof BufferedOutputStream)) { + outStream = new BufferedOutputStream(outStream); + } + this.out = new DataOutputStream(outStream); + writeConnectionContext(remoteId, authMethod); // update last activity time diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index d4c70de18e1..5e32e7068bb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -72,6 +72,8 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import static org.apache.hadoop.ipc.RpcConstants.*; + +import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper; import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper; import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.VersionMismatch; @@ -1271,7 +1273,27 @@ public abstract class Server { } private void saslReadAndProcess(DataInputStream dis) throws - WrappedRpcServerException, IOException, InterruptedException { + WrappedRpcServerException, IOException, InterruptedException { + final RpcSaslProto saslMessage = + decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis); + switch (saslMessage.getState()) { + case WRAP: { + if (!saslContextEstablished || !useWrap) { + throw new WrappedRpcServerException( + RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, + new SaslException("Server is not wrapping data")); + } + // loops over decoded data and calls processOneRpc + unwrapPacketAndProcessRpcs(saslMessage.getToken().toByteArray()); + break; + } + default: + saslProcess(saslMessage); + } + } + + private void saslProcess(RpcSaslProto saslMessage) + throws WrappedRpcServerException, IOException, InterruptedException { if (saslContextEstablished) { throw new WrappedRpcServerException( RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, @@ -1280,7 +1302,7 @@ public abstract class Server { RpcSaslProto saslResponse = null; try { try { - saslResponse = processSaslMessage(dis); + saslResponse = processSaslMessage(saslMessage); } catch (IOException e) { IOException sendToClient = e; Throwable cause = e; @@ -1325,14 +1347,14 @@ public abstract class Server { // do NOT enable wrapping until the last auth response is sent if (saslContextEstablished) { String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); + // SASL wrapping is only used if the connection has a QOP, and + // the value is not auth. ex. auth-int & auth-priv useWrap = (qop != null && !"auth".equalsIgnoreCase(qop)); } } - private RpcSaslProto processSaslMessage(DataInputStream dis) + private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage) throws IOException, InterruptedException { - final RpcSaslProto saslMessage = - decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis); RpcSaslProto saslResponse = null; final SaslState state = saslMessage.getState(); // required switch (state) { @@ -1527,7 +1549,7 @@ public abstract class Server { dataLengthBuffer.clear(); data.flip(); boolean isHeaderRead = connectionContextRead; - processRpcRequestPacket(data.array()); + processOneRpc(data.array()); data = null; if (!isHeaderRead) { continue; @@ -1690,29 +1712,19 @@ public abstract class Server { } /** - * Process a RPC Request - if SASL wrapping is enabled, unwrap the - * requests and process each one, else directly process the request - * @param buf - single request or SASL wrapped requests - * @throws IOException - connection failed to authenticate or authorize, - * or the request could not be decoded into a Call + * Process a wrapped RPC Request - unwrap the SASL packet and process + * each embedded RPC request + * @param buf - SASL wrapped request of one or more RPCs + * @throws IOException - SASL packet cannot be unwrapped * @throws InterruptedException */ - private void processRpcRequestPacket(byte[] buf) - throws WrappedRpcServerException, IOException, InterruptedException { - if (saslContextEstablished && useWrap) { - if (LOG.isDebugEnabled()) - LOG.debug("Have read input token of size " + buf.length - + " for processing by saslServer.unwrap()"); - final byte[] plaintextData = saslServer.unwrap(buf, 0, buf.length); - // loops over decoded data and calls processOneRpc - unwrapPacketAndProcessRpcs(plaintextData); - } else { - processOneRpc(buf); - } - } - private void unwrapPacketAndProcessRpcs(byte[] inBuf) throws WrappedRpcServerException, IOException, InterruptedException { + if (LOG.isDebugEnabled()) { + LOG.debug("Have read input token of size " + inBuf.length + + " for processing by saslServer.unwrap()"); + } + inBuf = saslServer.unwrap(inBuf, 0, inBuf.length); ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream( inBuf)); // Read all RPCs contained in the inBuf, even partial ones @@ -2375,9 +2387,21 @@ public abstract class Server { LOG.debug("Adding saslServer wrapped token of size " + token.length + " as call response."); response.reset(); - DataOutputStream saslOut = new DataOutputStream(response); - saslOut.writeInt(token.length); - saslOut.write(token, 0, token.length); + // rebuild with sasl header and payload + RpcResponseHeaderProto saslHeader = RpcResponseHeaderProto.newBuilder() + .setCallId(AuthProtocol.SASL.callId) + .setStatus(RpcStatusProto.SUCCESS) + .build(); + RpcSaslProto saslMessage = RpcSaslProto.newBuilder() + .setState(SaslState.WRAP) + .setToken(ByteString.copyFrom(token, 0, token.length)) + .build(); + RpcResponseMessageWrapper saslResponse = + new RpcResponseMessageWrapper(saslHeader, saslMessage); + + DataOutputStream out = new DataOutputStream(response); + out.writeInt(saslResponse.getLength()); + saslResponse.write(out); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java index da8d474b5bd..b5e8c32f9de 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java @@ -20,12 +20,16 @@ package org.apache.hadoop.security; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.FilterInputStream; +import java.io.FilterOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -485,38 +489,141 @@ public class SaslRpcClient { return response; } + private boolean useWrap() { + // getNegotiatedProperty throws if client isn't complete + String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); + // SASL wrapping is only used if the connection has a QOP, and + // the value is not auth. ex. auth-int & auth-priv + return qop != null && !"auth".equalsIgnoreCase(qop); + } + /** - * Get a SASL wrapped InputStream. Can be called only after saslConnect() has - * been called. + * Get SASL wrapped InputStream if SASL QoP requires unwrapping, + * otherwise return original stream. Can be called only after + * saslConnect() has been called. * - * @param in - * the InputStream to wrap - * @return a SASL wrapped InputStream + * @param in - InputStream used to make the connection + * @return InputStream that may be using SASL unwrap * @throws IOException */ public InputStream getInputStream(InputStream in) throws IOException { - if (!saslClient.isComplete()) { - throw new IOException("Sasl authentication exchange hasn't completed yet"); + if (useWrap()) { + in = new WrappedInputStream(in); } - return new SaslInputStream(in, saslClient); + return in; } /** - * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has - * been called. + * Get SASL wrapped OutputStream if SASL QoP requires wrapping, + * otherwise return original stream. Can be called only after + * saslConnect() has been called. * - * @param out - * the OutputStream to wrap - * @return a SASL wrapped OutputStream + * @param in - InputStream used to make the connection + * @return InputStream that may be using SASL unwrap * @throws IOException */ public OutputStream getOutputStream(OutputStream out) throws IOException { - if (!saslClient.isComplete()) { - throw new IOException("Sasl authentication exchange hasn't completed yet"); + if (useWrap()) { + // the client and server negotiate a maximum buffer size that can be + // wrapped + String maxBuf = (String)saslClient.getNegotiatedProperty(Sasl.RAW_SEND_SIZE); + out = new BufferedOutputStream(new WrappedOutputStream(out), + Integer.parseInt(maxBuf)); } - return new SaslOutputStream(out, saslClient); + return out; } + // ideally this should be folded into the RPC decoding loop but it's + // currently split across Client and SaslRpcClient... + class WrappedInputStream extends FilterInputStream { + private ByteBuffer unwrappedRpcBuffer = ByteBuffer.allocate(0); + public WrappedInputStream(InputStream in) throws IOException { + super(in); + } + + @Override + public int read() throws IOException { + byte[] b = new byte[1]; + int n = read(b, 0, 1); + return (n != -1) ? b[0] : -1; + } + + @Override + public int read(byte b[]) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + synchronized(unwrappedRpcBuffer) { + // fill the buffer with the next RPC message + if (unwrappedRpcBuffer.remaining() == 0) { + readNextRpcPacket(); + } + // satisfy as much of the request as possible + int readLen = Math.min(len, unwrappedRpcBuffer.remaining()); + unwrappedRpcBuffer.get(buf, off, readLen); + return readLen; + } + } + + // all messages must be RPC SASL wrapped, else an exception is thrown + private void readNextRpcPacket() throws IOException { + LOG.debug("reading next wrapped RPC packet"); + DataInputStream dis = new DataInputStream(in); + int rpcLen = dis.readInt(); + byte[] rpcBuf = new byte[rpcLen]; + dis.readFully(rpcBuf); + + // decode the RPC header + ByteArrayInputStream bis = new ByteArrayInputStream(rpcBuf); + RpcResponseHeaderProto.Builder headerBuilder = + RpcResponseHeaderProto.newBuilder(); + headerBuilder.mergeDelimitedFrom(bis); + + boolean isWrapped = false; + // Must be SASL wrapped, verify and decode. + if (headerBuilder.getCallId() == AuthProtocol.SASL.callId) { + RpcSaslProto.Builder saslMessage = RpcSaslProto.newBuilder(); + saslMessage.mergeDelimitedFrom(bis); + if (saslMessage.getState() == SaslState.WRAP) { + isWrapped = true; + byte[] token = saslMessage.getToken().toByteArray(); + if (LOG.isDebugEnabled()) { + LOG.debug("unwrapping token of length:" + token.length); + } + token = saslClient.unwrap(token, 0, token.length); + unwrappedRpcBuffer = ByteBuffer.wrap(token); + } + } + if (!isWrapped) { + throw new SaslException("Server sent non-wrapped response"); + } + } + } + + class WrappedOutputStream extends FilterOutputStream { + public WrappedOutputStream(OutputStream out) throws IOException { + super(out); + } + @Override + public void write(byte[] buf, int off, int len) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("wrapping token of length:" + len); + } + buf = saslClient.wrap(buf, off, len); + RpcSaslProto saslMessage = RpcSaslProto.newBuilder() + .setState(SaslState.WRAP) + .setToken(ByteString.copyFrom(buf, 0, buf.length)) + .build(); + RpcRequestMessageWrapper request = + new RpcRequestMessageWrapper(saslHeader, saslMessage); + DataOutputStream dob = new DataOutputStream(out); + dob.writeInt(request.getLength()); + request.write(dob); + } + } + /** Release resources used by wrapped saslClient */ public void dispose() throws SaslException { if (saslClient != null) { @@ -572,4 +679,4 @@ public class SaslRpcClient { } } } -} \ No newline at end of file +} diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto index 2f61d986829..673883b23a5 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto @@ -141,6 +141,7 @@ message RpcSaslProto { INITIATE = 2; CHALLENGE = 3; RESPONSE = 4; + WRAP = 5; } message SaslAuth {