HADOOP-10940. RPC client does no bounds checking of responses. Contributed by Daryn Sharp.

(cherry picked from commit d4d076876a)
This commit is contained in:
Kihwal Lee 2016-09-09 10:46:37 -05:00
parent 72ea641468
commit d234990208
6 changed files with 249 additions and 84 deletions

View File

@ -47,7 +47,7 @@
-->
<Match>
<Class name="org.apache.hadoop.ipc.Client$Connection" />
<Field name="out" />
<Field name="ipcStreams" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!--
@ -346,13 +346,7 @@
<Method name="removeRenewAction" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
<!-- Inconsistent synchronization flagged by findbugs is not valid. -->
<Match>
<Class name="org.apache.hadoop.ipc.Client$Connection" />
<Field name="in" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!--
The switch condition for INITIATE is expected to fallthru to RESPONSE
to process initial sasl response token included in the INITIATE

View File

@ -75,12 +75,20 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/** Default value for IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE */
public static final int IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT =
100;
/** Max request size a server will accept. */
public static final String IPC_MAXIMUM_DATA_LENGTH =
"ipc.maximum.data.length";
/** Default value for IPC_MAXIMUM_DATA_LENGTH. */
public static final int IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 64 * 1024 * 1024;
/** Max response size a client will accept. */
public static final String IPC_MAXIMUM_RESPONSE_LENGTH =
"ipc.maximum.response.length";
/** Default value for IPC_MAXIMUM_RESPONSE_LENGTH. */
public static final int IPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT =
128 * 1024 * 1024;
/** How many calls per handler are allowed in the queue. */
public static final String IPC_SERVER_HANDLER_QUEUE_SIZE_KEY =
"ipc.server.handler.queue.size";

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
@ -415,8 +416,8 @@ private class Connection extends Thread {
private SaslRpcClient saslRpcClient;
private Socket socket = null; // connected socket
private DataInputStream in;
private DataOutputStream out;
private IpcStreams ipcStreams;
private final int maxResponseLength;
private final int rpcTimeout;
private int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs
@ -428,8 +429,8 @@ private class Connection extends Thread {
private final boolean doPing; //do we need to send ping message
private final int pingInterval; // how often sends ping to the server
private final int soTimeout; // used by ipc ping and rpc timeout
private ResponseBuffer pingRequest; // ping message
private byte[] pingRequest; // ping message
// currently active calls
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
@ -448,6 +449,9 @@ public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
0,
new UnknownHostException());
}
this.maxResponseLength = remoteId.conf.getInt(
CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
this.rpcTimeout = remoteId.getRpcTimeout();
this.maxIdleTime = remoteId.getMaxIdleTime();
this.connectionRetryPolicy = remoteId.connectionRetryPolicy;
@ -458,12 +462,13 @@ public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
this.doPing = remoteId.getDoPing();
if (doPing) {
// construct a RPC header with the callId as the ping callId
pingRequest = new ResponseBuffer();
ResponseBuffer buf = new ResponseBuffer();
RpcRequestHeaderProto pingHeader = ProtoUtil
.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
OperationProto.RPC_FINAL_PACKET, PING_CALL_ID,
RpcConstants.INVALID_RETRY_COUNT, clientId);
pingHeader.writeDelimitedTo(pingRequest);
pingHeader.writeDelimitedTo(buf);
pingRequest = buf.toByteArray();
}
this.pingInterval = remoteId.getPingInterval();
if (rpcTimeout > 0) {
@ -598,15 +603,15 @@ private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
}
return false;
}
private synchronized AuthMethod setupSaslConnection(final InputStream in2,
final OutputStream out2) throws IOException {
private synchronized AuthMethod setupSaslConnection(IpcStreams streams)
throws IOException {
// Do not use Client.conf here! We must use ConnectionId.conf, since the
// Client object is cached and shared between all RPC clients, even those
// for separate services.
saslRpcClient = new SaslRpcClient(remoteId.getTicket(),
remoteId.getProtocol(), remoteId.getAddress(), remoteId.conf);
return saslRpcClient.saslConnect(in2, out2);
return saslRpcClient.saslConnect(streams);
}
/**
@ -772,12 +777,9 @@ private synchronized void setupIOstreams(
Random rand = null;
while (true) {
setupConnection();
InputStream inStream = NetUtils.getInputStream(socket);
OutputStream outStream = NetUtils.getOutputStream(socket);
writeConnectionHeader(outStream);
ipcStreams = new IpcStreams(socket, maxResponseLength);
writeConnectionHeader(ipcStreams);
if (authProtocol == AuthProtocol.SASL) {
final InputStream in2 = inStream;
final OutputStream out2 = outStream;
UserGroupInformation ticket = remoteId.getTicket();
if (ticket.getRealUser() != null) {
ticket = ticket.getRealUser();
@ -788,7 +790,7 @@ private synchronized void setupIOstreams(
@Override
public AuthMethod run()
throws IOException, InterruptedException {
return setupSaslConnection(in2, out2);
return setupSaslConnection(ipcStreams);
}
});
} catch (IOException ex) {
@ -807,8 +809,7 @@ public AuthMethod run()
}
if (authMethod != AuthMethod.SIMPLE) {
// Sasl connect is successful. Let's set up Sasl i/o streams.
inStream = saslRpcClient.getInputStream(inStream);
outStream = saslRpcClient.getOutputStream(outStream);
ipcStreams.setSaslClient(saslRpcClient);
// for testing
remoteId.saslQop =
(String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
@ -827,18 +828,11 @@ public AuthMethod run()
}
}
}
if (doPing) {
inStream = new PingInputStream(inStream);
}
this.in = new DataInputStream(new BufferedInputStream(inStream));
// SASL may have already buffered the stream
if (!(outStream instanceof BufferedOutputStream)) {
outStream = new BufferedOutputStream(outStream);
if (doPing) {
ipcStreams.setInputStream(new PingInputStream(ipcStreams.in));
}
this.out = new DataOutputStream(outStream);
writeConnectionContext(remoteId, authMethod);
// update last activity time
@ -952,17 +946,28 @@ private void handleConnectionFailure(int curRetries, IOException ioe
* | AuthProtocol (1 byte) |
* +----------------------------------+
*/
private void writeConnectionHeader(OutputStream outStream)
private void writeConnectionHeader(IpcStreams streams)
throws IOException {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
// Write out the header, version and authentication method
out.write(RpcConstants.HEADER.array());
out.write(RpcConstants.CURRENT_VERSION);
out.write(serviceClass);
out.write(authProtocol.callId);
out.flush();
// Write out the header, version and authentication method.
// The output stream is buffered but we must not flush it yet. The
// connection setup protocol requires the client to send multiple
// messages before reading a response.
//
// insecure: send header+context+call, read
// secure : send header+negotiate, read, (sasl), context+call, read
//
// The client must flush only when it's prepared to read. Otherwise
// "broken pipe" exceptions occur if the server closes the connection
// before all messages are sent.
final DataOutputStream out = streams.out;
synchronized (out) {
out.write(RpcConstants.HEADER.array());
out.write(RpcConstants.CURRENT_VERSION);
out.write(serviceClass);
out.write(authProtocol.callId);
}
}
/* Write the connection context header for each connection
* Out is not synchronized because only the first thread does this.
*/
@ -978,12 +983,17 @@ private void writeConnectionContext(ConnectionId remoteId,
.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
RpcConstants.INVALID_RETRY_COUNT, clientId);
// do not flush. the context and first ipc call request must be sent
// together to avoid possibility of broken pipes upon authz failure.
// see writeConnectionHeader
final ResponseBuffer buf = new ResponseBuffer();
connectionContextHeader.writeDelimitedTo(buf);
message.writeDelimitedTo(buf);
buf.writeTo(out);
synchronized (ipcStreams.out) {
ipcStreams.sendRequest(buf.toByteArray());
}
}
/* wait till someone signals us to start reading RPC response or
* it is idle too long, it is marked as to be closed,
* or the client is marked as not running.
@ -1026,9 +1036,9 @@ private synchronized void sendPing() throws IOException {
long curTime = Time.now();
if ( curTime - lastActivity.get() >= pingInterval) {
lastActivity.set(curTime);
synchronized (out) {
pingRequest.writeTo(out);
out.flush();
synchronized (ipcStreams.out) {
ipcStreams.sendRequest(pingRequest);
ipcStreams.flush();
}
}
}
@ -1094,15 +1104,16 @@ public void sendRpcRequest(final Call call)
@Override
public void run() {
try {
synchronized (Connection.this.out) {
synchronized (ipcStreams.out) {
if (shouldCloseConnection.get()) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + " sending #" + call.id);
}
buf.writeTo(out); // RpcRequestHeader + RpcRequest
out.flush();
// RpcRequestHeader + RpcRequest
ipcStreams.sendRequest(buf.toByteArray());
ipcStreams.flush();
}
} catch (IOException e) {
// exception at this point would leave the connection in an
@ -1143,10 +1154,7 @@ private void receiveRpcResponse() {
touch();
try {
int totalLen = in.readInt();
ByteBuffer bb = ByteBuffer.allocate(totalLen);
in.readFully(bb.array());
ByteBuffer bb = ipcStreams.readResponse();
RpcWritable.Buffer packet = RpcWritable.Buffer.wrap(bb);
RpcResponseHeaderProto header =
packet.getValue(RpcResponseHeaderProto.getDefaultInstance());
@ -1211,8 +1219,7 @@ private synchronized void close() {
connections.remove(remoteId, this);
// close the streams and therefore the socket
IOUtils.closeStream(out);
IOUtils.closeStream(in);
IOUtils.closeStream(ipcStreams);
disposeSasl();
// clean up all calls
@ -1741,4 +1748,75 @@ public static int nextCallId() {
public void close() throws Exception {
stop();
}
/** Manages the input and output streams for an IPC connection.
* Only exposed for use by SaslRpcClient.
*/
@InterfaceAudience.Private
public static class IpcStreams implements Closeable, Flushable {
private DataInputStream in;
public DataOutputStream out;
private int maxResponseLength;
private boolean firstResponse = true;
IpcStreams(Socket socket, int maxResponseLength) throws IOException {
this.maxResponseLength = maxResponseLength;
setInputStream(
new BufferedInputStream(NetUtils.getInputStream(socket)));
setOutputStream(
new BufferedOutputStream(NetUtils.getOutputStream(socket)));
}
void setSaslClient(SaslRpcClient client) throws IOException {
setInputStream(client.getInputStream(in));
setOutputStream(client.getOutputStream(out));
}
private void setInputStream(InputStream is) {
this.in = (is instanceof DataInputStream)
? (DataInputStream)is : new DataInputStream(is);
}
private void setOutputStream(OutputStream os) {
this.out = (os instanceof DataOutputStream)
? (DataOutputStream)os : new DataOutputStream(os);
}
public ByteBuffer readResponse() throws IOException {
int length = in.readInt();
if (firstResponse) {
firstResponse = false;
// pre-rpcv9 exception, almost certainly a version mismatch.
if (length == -1) {
in.readInt(); // ignore fatal/error status, it's fatal for us.
throw new RemoteException(WritableUtils.readString(in),
WritableUtils.readString(in));
}
}
if (length <= 0) {
throw new RpcException("RPC response has invalid length");
}
if (maxResponseLength > 0 && length > maxResponseLength) {
throw new RpcException("RPC response exceeds maximum data length");
}
ByteBuffer bb = ByteBuffer.allocate(length);
in.readFully(bb.array());
return bb;
}
public void sendRequest(byte[] buf) throws IOException {
out.write(buf);
}
@Override
public void flush() throws IOException {
out.flush();
}
@Override
public void close() {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
}
}
}

View File

@ -18,11 +18,9 @@
package org.apache.hadoop.security;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FilterInputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
@ -53,6 +51,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.GlobPattern;
import org.apache.hadoop.ipc.Client.IpcStreams;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.ResponseBuffer;
@ -352,24 +351,17 @@ String getServerPrincipal(SaslAuth authType) throws IOException {
* @return AuthMethod used to negotiate the connection
* @throws IOException
*/
public AuthMethod saslConnect(InputStream inS, OutputStream outS)
throws IOException {
DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS));
DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(
outS));
public AuthMethod saslConnect(IpcStreams ipcStreams) throws IOException {
// redefined if/when a SASL negotiation starts, can be queried if the
// negotiation fails
authMethod = AuthMethod.SIMPLE;
sendSaslMessage(outStream, negotiateRequest);
sendSaslMessage(ipcStreams.out, negotiateRequest);
// loop until sasl is complete or a rpc error occurs
boolean done = false;
do {
int rpcLen = inStream.readInt();
ByteBuffer bb = ByteBuffer.allocate(rpcLen);
inStream.readFully(bb.array());
ByteBuffer bb = ipcStreams.readResponse();
RpcWritable.Buffer saslPacket = RpcWritable.Buffer.wrap(bb);
RpcResponseHeaderProto header =
@ -446,7 +438,7 @@ public AuthMethod saslConnect(InputStream inS, OutputStream outS)
}
}
if (response != null) {
sendSaslMessage(outStream, response.build());
sendSaslMessage(ipcStreams.out, response.build());
}
} while (!done);
return authMethod;
@ -460,8 +452,10 @@ private void sendSaslMessage(OutputStream out, RpcSaslProto message)
ResponseBuffer buf = new ResponseBuffer();
saslHeader.writeDelimitedTo(buf);
message.writeDelimitedTo(buf);
buf.writeTo(out);
out.flush();
synchronized (out) {
buf.writeTo(out);
out.flush();
}
}
/**

View File

@ -1322,10 +1322,19 @@
<name>ipc.maximum.data.length</name>
<value>67108864</value>
<description>This indicates the maximum IPC message length (bytes) that can be
accepted by the server. Messages larger than this value are rejected by
server immediately. This setting should rarely need to be changed. It merits
investigating whether the cause of long RPC messages can be fixed instead,
e.g. by splitting into smaller messages.
accepted by the server. Messages larger than this value are rejected by the
immediately to avoid possible OOMs. This setting should rarely need to be
changed.
</description>
</property>
<property>
<name>ipc.maximum.response.length</name>
<value>134217728</value>
<description>This indicates the maximum IPC message length (bytes) that can be
accepted by the client. Messages larger than this value are rejected
immediately to avoid possible OOMs. This setting should rarely need to be
changed. Set to 0 to disable.
</description>
</property>

View File

@ -40,6 +40,7 @@
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
@ -49,6 +50,8 @@
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -76,6 +79,9 @@
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
@ -112,6 +118,8 @@ public class TestIPC {
public void setupConf() {
conf = new Configuration();
Client.setPingInterval(conf, PING_INTERVAL);
// tests may enable security, so disable before each test
UserGroupInformation.setConfiguration(conf);
}
static final Random RANDOM = new Random();
@ -123,8 +131,8 @@ public void setupConf() {
static ConnectionId getConnectionId(InetSocketAddress addr, int rpcTimeout,
Configuration conf) throws IOException {
return ConnectionId.getConnectionId(addr, null, null, rpcTimeout, null,
conf);
return ConnectionId.getConnectionId(addr, null,
UserGroupInformation.getCurrentUser(), rpcTimeout, null, conf);
}
static Writable call(Client client, InetSocketAddress addr,
@ -1402,6 +1410,80 @@ private void assertRetriesOnSocketTimeouts(Configuration conf,
client.stop();
}
@Test(timeout=4000)
public void testInsecureVersionMismatch() throws IOException {
checkVersionMismatch();
}
@Test(timeout=4000)
public void testSecureVersionMismatch() throws IOException {
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
UserGroupInformation.setConfiguration(conf);
checkVersionMismatch();
}
private void checkVersionMismatch() throws IOException {
try (final ServerSocket listenSocket = new ServerSocket()) {
listenSocket.bind(null);
InetSocketAddress addr =
(InetSocketAddress) listenSocket.getLocalSocketAddress();
// open a socket that accepts a client and immediately returns
// a version mismatch exception.
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(new Runnable(){
@Override
public void run() {
try {
Socket socket = listenSocket.accept();
socket.getOutputStream().write(
NetworkTraces.RESPONSE_TO_HADOOP_0_20_3_RPC);
socket.close();
} catch (Throwable t) {
// ignore.
}
}
});
try {
Client client = new Client(LongWritable.class, conf);
call(client, 0, addr, conf);
} catch (RemoteException re) {
Assert.assertEquals(RPC.VersionMismatch.class.getName(),
re.getClassName());
Assert.assertEquals(NetworkTraces.HADOOP0_20_ERROR_MSG,
re.getMessage());
return;
}
Assert.fail("didn't get version mismatch");
}
}
@Test
public void testRpcResponseLimit() throws Throwable {
Server server = new TestServer(1, false);
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH, 0);
Client client = new Client(LongWritable.class, conf);
call(client, 0, addr, conf);
conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH, 4);
client = new Client(LongWritable.class, conf);
try {
call(client, 0, addr, conf);
} catch (IOException ioe) {
Throwable t = ioe.getCause();
Assert.assertNotNull(t);
Assert.assertEquals(RpcException.class, t.getClass());
Assert.assertEquals("RPC response exceeds maximum data length",
t.getMessage());
return;
}
Assert.fail("didn't get limit exceeded");
}
private void doIpcVersionTest(
byte[] requestData,
byte[] expectedResponse) throws IOException {