HADOOP-10940. RPC client does no bounds checking of responses. Contributed by Daryn Sharp.

(cherry picked from commit d4d076876a)
(cherry picked from commit d234990208)
This commit is contained in:
Kihwal Lee 2016-09-09 10:48:05 -05:00
parent 7a27b2a82f
commit 4b9845bc53
6 changed files with 249 additions and 84 deletions

View File

@ -47,7 +47,7 @@
--> -->
<Match> <Match>
<Class name="org.apache.hadoop.ipc.Client$Connection" /> <Class name="org.apache.hadoop.ipc.Client$Connection" />
<Field name="out" /> <Field name="ipcStreams" />
<Bug pattern="IS2_INCONSISTENT_SYNC" /> <Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match> </Match>
<!-- <!--
@ -346,13 +346,7 @@
<Method name="removeRenewAction" /> <Method name="removeRenewAction" />
<Bug pattern="BC_UNCONFIRMED_CAST" /> <Bug pattern="BC_UNCONFIRMED_CAST" />
</Match> </Match>
<!-- Inconsistent synchronization flagged by findbugs is not valid. -->
<Match>
<Class name="org.apache.hadoop.ipc.Client$Connection" />
<Field name="in" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!-- <!--
The switch condition for INITIATE is expected to fallthru to RESPONSE The switch condition for INITIATE is expected to fallthru to RESPONSE
to process initial sasl response token included in the INITIATE to process initial sasl response token included in the INITIATE

View File

@ -75,12 +75,20 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/** Default value for IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE */ /** Default value for IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE */
public static final int IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT = public static final int IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT =
100; 100;
/** Max request size a server will accept. */
public static final String IPC_MAXIMUM_DATA_LENGTH = public static final String IPC_MAXIMUM_DATA_LENGTH =
"ipc.maximum.data.length"; "ipc.maximum.data.length";
/** Default value for IPC_MAXIMUM_DATA_LENGTH. */
public static final int IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 64 * 1024 * 1024; public static final int IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 64 * 1024 * 1024;
/** Max response size a client will accept. */
public static final String IPC_MAXIMUM_RESPONSE_LENGTH =
"ipc.maximum.response.length";
/** Default value for IPC_MAXIMUM_RESPONSE_LENGTH. */
public static final int IPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT =
128 * 1024 * 1024;
/** How many calls per handler are allowed in the queue. */ /** How many calls per handler are allowed in the queue. */
public static final String IPC_SERVER_HANDLER_QUEUE_SIZE_KEY = public static final String IPC_SERVER_HANDLER_QUEUE_SIZE_KEY =
"ipc.server.handler.queue.size"; "ipc.server.handler.queue.size";

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
@ -415,8 +416,8 @@ public class Client implements AutoCloseable {
private SaslRpcClient saslRpcClient; private SaslRpcClient saslRpcClient;
private Socket socket = null; // connected socket private Socket socket = null; // connected socket
private DataInputStream in; private IpcStreams ipcStreams;
private DataOutputStream out; private final int maxResponseLength;
private final int rpcTimeout; private final int rpcTimeout;
private int maxIdleTime; //connections will be culled if it was idle for private int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs //maxIdleTime msecs
@ -428,8 +429,8 @@ public class Client implements AutoCloseable {
private final boolean doPing; //do we need to send ping message private final boolean doPing; //do we need to send ping message
private final int pingInterval; // how often sends ping to the server private final int pingInterval; // how often sends ping to the server
private final int soTimeout; // used by ipc ping and rpc timeout private final int soTimeout; // used by ipc ping and rpc timeout
private ResponseBuffer pingRequest; // ping message private byte[] pingRequest; // ping message
// currently active calls // currently active calls
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>(); private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
private AtomicLong lastActivity = new AtomicLong();// last I/O activity time private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
@ -448,6 +449,9 @@ public class Client implements AutoCloseable {
0, 0,
new UnknownHostException()); new UnknownHostException());
} }
this.maxResponseLength = remoteId.conf.getInt(
CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
this.rpcTimeout = remoteId.getRpcTimeout(); this.rpcTimeout = remoteId.getRpcTimeout();
this.maxIdleTime = remoteId.getMaxIdleTime(); this.maxIdleTime = remoteId.getMaxIdleTime();
this.connectionRetryPolicy = remoteId.connectionRetryPolicy; this.connectionRetryPolicy = remoteId.connectionRetryPolicy;
@ -458,12 +462,13 @@ public class Client implements AutoCloseable {
this.doPing = remoteId.getDoPing(); this.doPing = remoteId.getDoPing();
if (doPing) { if (doPing) {
// construct a RPC header with the callId as the ping callId // construct a RPC header with the callId as the ping callId
pingRequest = new ResponseBuffer(); ResponseBuffer buf = new ResponseBuffer();
RpcRequestHeaderProto pingHeader = ProtoUtil RpcRequestHeaderProto pingHeader = ProtoUtil
.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
OperationProto.RPC_FINAL_PACKET, PING_CALL_ID, OperationProto.RPC_FINAL_PACKET, PING_CALL_ID,
RpcConstants.INVALID_RETRY_COUNT, clientId); RpcConstants.INVALID_RETRY_COUNT, clientId);
pingHeader.writeDelimitedTo(pingRequest); pingHeader.writeDelimitedTo(buf);
pingRequest = buf.toByteArray();
} }
this.pingInterval = remoteId.getPingInterval(); this.pingInterval = remoteId.getPingInterval();
if (rpcTimeout > 0) { if (rpcTimeout > 0) {
@ -598,15 +603,15 @@ public class Client implements AutoCloseable {
} }
return false; return false;
} }
private synchronized AuthMethod setupSaslConnection(final InputStream in2, private synchronized AuthMethod setupSaslConnection(IpcStreams streams)
final OutputStream out2) throws IOException { throws IOException {
// Do not use Client.conf here! We must use ConnectionId.conf, since the // Do not use Client.conf here! We must use ConnectionId.conf, since the
// Client object is cached and shared between all RPC clients, even those // Client object is cached and shared between all RPC clients, even those
// for separate services. // for separate services.
saslRpcClient = new SaslRpcClient(remoteId.getTicket(), saslRpcClient = new SaslRpcClient(remoteId.getTicket(),
remoteId.getProtocol(), remoteId.getAddress(), remoteId.conf); remoteId.getProtocol(), remoteId.getAddress(), remoteId.conf);
return saslRpcClient.saslConnect(in2, out2); return saslRpcClient.saslConnect(streams);
} }
/** /**
@ -771,12 +776,9 @@ public class Client implements AutoCloseable {
Random rand = null; Random rand = null;
while (true) { while (true) {
setupConnection(); setupConnection();
InputStream inStream = NetUtils.getInputStream(socket); ipcStreams = new IpcStreams(socket, maxResponseLength);
OutputStream outStream = NetUtils.getOutputStream(socket); writeConnectionHeader(ipcStreams);
writeConnectionHeader(outStream);
if (authProtocol == AuthProtocol.SASL) { if (authProtocol == AuthProtocol.SASL) {
final InputStream in2 = inStream;
final OutputStream out2 = outStream;
UserGroupInformation ticket = remoteId.getTicket(); UserGroupInformation ticket = remoteId.getTicket();
if (ticket.getRealUser() != null) { if (ticket.getRealUser() != null) {
ticket = ticket.getRealUser(); ticket = ticket.getRealUser();
@ -787,7 +789,7 @@ public class Client implements AutoCloseable {
@Override @Override
public AuthMethod run() public AuthMethod run()
throws IOException, InterruptedException { throws IOException, InterruptedException {
return setupSaslConnection(in2, out2); return setupSaslConnection(ipcStreams);
} }
}); });
} catch (IOException ex) { } catch (IOException ex) {
@ -806,8 +808,7 @@ public class Client implements AutoCloseable {
} }
if (authMethod != AuthMethod.SIMPLE) { if (authMethod != AuthMethod.SIMPLE) {
// Sasl connect is successful. Let's set up Sasl i/o streams. // Sasl connect is successful. Let's set up Sasl i/o streams.
inStream = saslRpcClient.getInputStream(inStream); ipcStreams.setSaslClient(saslRpcClient);
outStream = saslRpcClient.getOutputStream(outStream);
// for testing // for testing
remoteId.saslQop = remoteId.saslQop =
(String)saslRpcClient.getNegotiatedProperty(Sasl.QOP); (String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
@ -826,18 +827,11 @@ public class Client implements AutoCloseable {
} }
} }
} }
if (doPing) {
inStream = new PingInputStream(inStream);
}
this.in = new DataInputStream(new BufferedInputStream(inStream));
// SASL may have already buffered the stream if (doPing) {
if (!(outStream instanceof BufferedOutputStream)) { ipcStreams.setInputStream(new PingInputStream(ipcStreams.in));
outStream = new BufferedOutputStream(outStream);
} }
this.out = new DataOutputStream(outStream);
writeConnectionContext(remoteId, authMethod); writeConnectionContext(remoteId, authMethod);
// update last activity time // update last activity time
@ -951,17 +945,28 @@ public class Client implements AutoCloseable {
* | AuthProtocol (1 byte) | * | AuthProtocol (1 byte) |
* +----------------------------------+ * +----------------------------------+
*/ */
private void writeConnectionHeader(OutputStream outStream) private void writeConnectionHeader(IpcStreams streams)
throws IOException { throws IOException {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream)); // Write out the header, version and authentication method.
// Write out the header, version and authentication method // The output stream is buffered but we must not flush it yet. The
out.write(RpcConstants.HEADER.array()); // connection setup protocol requires the client to send multiple
out.write(RpcConstants.CURRENT_VERSION); // messages before reading a response.
out.write(serviceClass); //
out.write(authProtocol.callId); // insecure: send header+context+call, read
out.flush(); // secure : send header+negotiate, read, (sasl), context+call, read
//
// The client must flush only when it's prepared to read. Otherwise
// "broken pipe" exceptions occur if the server closes the connection
// before all messages are sent.
final DataOutputStream out = streams.out;
synchronized (out) {
out.write(RpcConstants.HEADER.array());
out.write(RpcConstants.CURRENT_VERSION);
out.write(serviceClass);
out.write(authProtocol.callId);
}
} }
/* Write the connection context header for each connection /* Write the connection context header for each connection
* Out is not synchronized because only the first thread does this. * Out is not synchronized because only the first thread does this.
*/ */
@ -977,12 +982,17 @@ public class Client implements AutoCloseable {
.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID, OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
RpcConstants.INVALID_RETRY_COUNT, clientId); RpcConstants.INVALID_RETRY_COUNT, clientId);
// do not flush. the context and first ipc call request must be sent
// together to avoid possibility of broken pipes upon authz failure.
// see writeConnectionHeader
final ResponseBuffer buf = new ResponseBuffer(); final ResponseBuffer buf = new ResponseBuffer();
connectionContextHeader.writeDelimitedTo(buf); connectionContextHeader.writeDelimitedTo(buf);
message.writeDelimitedTo(buf); message.writeDelimitedTo(buf);
buf.writeTo(out); synchronized (ipcStreams.out) {
ipcStreams.sendRequest(buf.toByteArray());
}
} }
/* wait till someone signals us to start reading RPC response or /* wait till someone signals us to start reading RPC response or
* it is idle too long, it is marked as to be closed, * it is idle too long, it is marked as to be closed,
* or the client is marked as not running. * or the client is marked as not running.
@ -1025,9 +1035,9 @@ public class Client implements AutoCloseable {
long curTime = Time.now(); long curTime = Time.now();
if ( curTime - lastActivity.get() >= pingInterval) { if ( curTime - lastActivity.get() >= pingInterval) {
lastActivity.set(curTime); lastActivity.set(curTime);
synchronized (out) { synchronized (ipcStreams.out) {
pingRequest.writeTo(out); ipcStreams.sendRequest(pingRequest);
out.flush(); ipcStreams.flush();
} }
} }
} }
@ -1093,15 +1103,16 @@ public class Client implements AutoCloseable {
@Override @Override
public void run() { public void run() {
try { try {
synchronized (Connection.this.out) { synchronized (ipcStreams.out) {
if (shouldCloseConnection.get()) { if (shouldCloseConnection.get()) {
return; return;
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(getName() + " sending #" + call.id); LOG.debug(getName() + " sending #" + call.id);
} }
buf.writeTo(out); // RpcRequestHeader + RpcRequest // RpcRequestHeader + RpcRequest
out.flush(); ipcStreams.sendRequest(buf.toByteArray());
ipcStreams.flush();
} }
} catch (IOException e) { } catch (IOException e) {
// exception at this point would leave the connection in an // exception at this point would leave the connection in an
@ -1142,10 +1153,7 @@ public class Client implements AutoCloseable {
touch(); touch();
try { try {
int totalLen = in.readInt(); ByteBuffer bb = ipcStreams.readResponse();
ByteBuffer bb = ByteBuffer.allocate(totalLen);
in.readFully(bb.array());
RpcWritable.Buffer packet = RpcWritable.Buffer.wrap(bb); RpcWritable.Buffer packet = RpcWritable.Buffer.wrap(bb);
RpcResponseHeaderProto header = RpcResponseHeaderProto header =
packet.getValue(RpcResponseHeaderProto.getDefaultInstance()); packet.getValue(RpcResponseHeaderProto.getDefaultInstance());
@ -1210,8 +1218,7 @@ public class Client implements AutoCloseable {
connections.remove(remoteId, this); connections.remove(remoteId, this);
// close the streams and therefore the socket // close the streams and therefore the socket
IOUtils.closeStream(out); IOUtils.closeStream(ipcStreams);
IOUtils.closeStream(in);
disposeSasl(); disposeSasl();
// clean up all calls // clean up all calls
@ -1740,4 +1747,75 @@ public class Client implements AutoCloseable {
public void close() throws Exception { public void close() throws Exception {
stop(); stop();
} }
/** Manages the input and output streams for an IPC connection.
* Only exposed for use by SaslRpcClient.
*/
@InterfaceAudience.Private
public static class IpcStreams implements Closeable, Flushable {
private DataInputStream in;
public DataOutputStream out;
private int maxResponseLength;
private boolean firstResponse = true;
IpcStreams(Socket socket, int maxResponseLength) throws IOException {
this.maxResponseLength = maxResponseLength;
setInputStream(
new BufferedInputStream(NetUtils.getInputStream(socket)));
setOutputStream(
new BufferedOutputStream(NetUtils.getOutputStream(socket)));
}
void setSaslClient(SaslRpcClient client) throws IOException {
setInputStream(client.getInputStream(in));
setOutputStream(client.getOutputStream(out));
}
private void setInputStream(InputStream is) {
this.in = (is instanceof DataInputStream)
? (DataInputStream)is : new DataInputStream(is);
}
private void setOutputStream(OutputStream os) {
this.out = (os instanceof DataOutputStream)
? (DataOutputStream)os : new DataOutputStream(os);
}
public ByteBuffer readResponse() throws IOException {
int length = in.readInt();
if (firstResponse) {
firstResponse = false;
// pre-rpcv9 exception, almost certainly a version mismatch.
if (length == -1) {
in.readInt(); // ignore fatal/error status, it's fatal for us.
throw new RemoteException(WritableUtils.readString(in),
WritableUtils.readString(in));
}
}
if (length <= 0) {
throw new RpcException("RPC response has invalid length");
}
if (maxResponseLength > 0 && length > maxResponseLength) {
throw new RpcException("RPC response exceeds maximum data length");
}
ByteBuffer bb = ByteBuffer.allocate(length);
in.readFully(bb.array());
return bb;
}
public void sendRequest(byte[] buf) throws IOException {
out.write(buf);
}
@Override
public void flush() throws IOException {
out.flush();
}
@Override
public void close() {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
}
}
} }

View File

@ -18,11 +18,9 @@
package org.apache.hadoop.security; package org.apache.hadoop.security;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FilterInputStream; import java.io.FilterInputStream;
import java.io.FilterOutputStream; import java.io.FilterOutputStream;
import java.io.IOException; import java.io.IOException;
@ -53,6 +51,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.GlobPattern; import org.apache.hadoop.fs.GlobPattern;
import org.apache.hadoop.ipc.Client.IpcStreams;
import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.ResponseBuffer; import org.apache.hadoop.ipc.ResponseBuffer;
@ -352,24 +351,17 @@ public class SaslRpcClient {
* @return AuthMethod used to negotiate the connection * @return AuthMethod used to negotiate the connection
* @throws IOException * @throws IOException
*/ */
public AuthMethod saslConnect(InputStream inS, OutputStream outS) public AuthMethod saslConnect(IpcStreams ipcStreams) throws IOException {
throws IOException {
DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS));
DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(
outS));
// redefined if/when a SASL negotiation starts, can be queried if the // redefined if/when a SASL negotiation starts, can be queried if the
// negotiation fails // negotiation fails
authMethod = AuthMethod.SIMPLE; authMethod = AuthMethod.SIMPLE;
sendSaslMessage(outStream, negotiateRequest); sendSaslMessage(ipcStreams.out, negotiateRequest);
// loop until sasl is complete or a rpc error occurs // loop until sasl is complete or a rpc error occurs
boolean done = false; boolean done = false;
do { do {
int rpcLen = inStream.readInt(); ByteBuffer bb = ipcStreams.readResponse();
ByteBuffer bb = ByteBuffer.allocate(rpcLen);
inStream.readFully(bb.array());
RpcWritable.Buffer saslPacket = RpcWritable.Buffer.wrap(bb); RpcWritable.Buffer saslPacket = RpcWritable.Buffer.wrap(bb);
RpcResponseHeaderProto header = RpcResponseHeaderProto header =
@ -446,7 +438,7 @@ public class SaslRpcClient {
} }
} }
if (response != null) { if (response != null) {
sendSaslMessage(outStream, response.build()); sendSaslMessage(ipcStreams.out, response.build());
} }
} while (!done); } while (!done);
return authMethod; return authMethod;
@ -460,8 +452,10 @@ public class SaslRpcClient {
ResponseBuffer buf = new ResponseBuffer(); ResponseBuffer buf = new ResponseBuffer();
saslHeader.writeDelimitedTo(buf); saslHeader.writeDelimitedTo(buf);
message.writeDelimitedTo(buf); message.writeDelimitedTo(buf);
buf.writeTo(out); synchronized (out) {
out.flush(); buf.writeTo(out);
out.flush();
}
} }
/** /**

View File

@ -1215,10 +1215,19 @@
<name>ipc.maximum.data.length</name> <name>ipc.maximum.data.length</name>
<value>67108864</value> <value>67108864</value>
<description>This indicates the maximum IPC message length (bytes) that can be <description>This indicates the maximum IPC message length (bytes) that can be
accepted by the server. Messages larger than this value are rejected by accepted by the server. Messages larger than this value are rejected by the
server immediately. This setting should rarely need to be changed. It merits immediately to avoid possible OOMs. This setting should rarely need to be
investigating whether the cause of long RPC messages can be fixed instead, changed.
e.g. by splitting into smaller messages. </description>
</property>
<property>
<name>ipc.maximum.response.length</name>
<value>134217728</value>
<description>This indicates the maximum IPC message length (bytes) that can be
accepted by the client. Messages larger than this value are rejected
immediately to avoid possible OOMs. This setting should rarely need to be
changed. Set to 0 to disable.
</description> </description>
</property> </property>

View File

@ -38,6 +38,7 @@ import java.io.OutputStream;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.ArrayList; import java.util.ArrayList;
@ -47,6 +48,8 @@ import java.util.Random;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -73,6 +76,9 @@ import org.apache.hadoop.ipc.Server.Connection;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -108,6 +114,8 @@ public class TestIPC {
public void setupConf() { public void setupConf() {
conf = new Configuration(); conf = new Configuration();
Client.setPingInterval(conf, PING_INTERVAL); Client.setPingInterval(conf, PING_INTERVAL);
// tests may enable security, so disable before each test
UserGroupInformation.setConfiguration(conf);
} }
static final Random RANDOM = new Random(); static final Random RANDOM = new Random();
@ -119,8 +127,8 @@ public class TestIPC {
static ConnectionId getConnectionId(InetSocketAddress addr, int rpcTimeout, static ConnectionId getConnectionId(InetSocketAddress addr, int rpcTimeout,
Configuration conf) throws IOException { Configuration conf) throws IOException {
return ConnectionId.getConnectionId(addr, null, null, rpcTimeout, null, return ConnectionId.getConnectionId(addr, null,
conf); UserGroupInformation.getCurrentUser(), rpcTimeout, null, conf);
} }
static Writable call(Client client, InetSocketAddress addr, static Writable call(Client client, InetSocketAddress addr,
@ -1396,6 +1404,80 @@ public class TestIPC {
client.stop(); client.stop();
} }
@Test(timeout=4000)
public void testInsecureVersionMismatch() throws IOException {
checkVersionMismatch();
}
@Test(timeout=4000)
public void testSecureVersionMismatch() throws IOException {
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
UserGroupInformation.setConfiguration(conf);
checkVersionMismatch();
}
private void checkVersionMismatch() throws IOException {
try (final ServerSocket listenSocket = new ServerSocket()) {
listenSocket.bind(null);
InetSocketAddress addr =
(InetSocketAddress) listenSocket.getLocalSocketAddress();
// open a socket that accepts a client and immediately returns
// a version mismatch exception.
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(new Runnable(){
@Override
public void run() {
try {
Socket socket = listenSocket.accept();
socket.getOutputStream().write(
NetworkTraces.RESPONSE_TO_HADOOP_0_20_3_RPC);
socket.close();
} catch (Throwable t) {
// ignore.
}
}
});
try {
Client client = new Client(LongWritable.class, conf);
call(client, 0, addr, conf);
} catch (RemoteException re) {
Assert.assertEquals(RPC.VersionMismatch.class.getName(),
re.getClassName());
Assert.assertEquals(NetworkTraces.HADOOP0_20_ERROR_MSG,
re.getMessage());
return;
}
Assert.fail("didn't get version mismatch");
}
}
@Test
public void testRpcResponseLimit() throws Throwable {
Server server = new TestServer(1, false);
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH, 0);
Client client = new Client(LongWritable.class, conf);
call(client, 0, addr, conf);
conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH, 4);
client = new Client(LongWritable.class, conf);
try {
call(client, 0, addr, conf);
} catch (IOException ioe) {
Throwable t = ioe.getCause();
Assert.assertNotNull(t);
Assert.assertEquals(RpcException.class, t.getClass());
Assert.assertEquals("RPC response exceeds maximum data length",
t.getMessage());
return;
}
Assert.fail("didn't get limit exceeded");
}
private void doIpcVersionTest( private void doIpcVersionTest(
byte[] requestData, byte[] requestData,
byte[] expectedResponse) throws IOException { byte[] expectedResponse) throws IOException {