diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java index 83d250981f7..5416d269368 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java @@ -30,12 +30,14 @@ import java.util.Random; import javax.crypto.SecretKey; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -52,6 +54,7 @@ import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; @@ -66,6 +69,7 @@ class Application handler; @@ -133,6 +137,13 @@ class Application expectedClosedCount == cleaner.getCloseSocketCount(), 100, 5000); + } + + @Test + public void testSocketTimeout() throws Exception { + ServerSocket serverSocket = setupServerSocket(); + SocketCleaner cleaner = setupCleaner(serverSocket, 100); + try { + new Socket(serverSocket.getInetAddress(), serverSocket.getLocalPort()); + Thread.sleep(1000); + } catch (Exception exception) { + // ignored... + } + GenericTestUtils.waitFor(() -> 1 == cleaner.getCloseSocketCount(), 100, + 5000); + } + + private SocketCleaner setupCleaner(ServerSocket serverSocket) { + return setupCleaner(serverSocket, + CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT); + } + + private SocketCleaner setupCleaner(ServerSocket serverSocket, int soTimeout) { + // start socket cleaner. + SocketCleaner cleaner = new SocketCleaner("test-ping-socket-cleaner", + serverSocket, soTimeout); + cleaner.setDaemon(true); + cleaner.start(); + + return cleaner; + } + + private static class SocketCleaner extends PingSocketCleaner { + private int closeSocketCount = 0; + + SocketCleaner(String name, ServerSocket serverSocket, int soTimeout) { + super(name, serverSocket, soTimeout); + } + + @Override + public void run() { + super.run(); + } + + protected void closeSocketInternal(Socket clientSocket) { + if (!clientSocket.isClosed()) { + closeSocketCount++; + } + super.closeSocketInternal(clientSocket); + } + + public int getCloseSocketCount() { + return closeSocketCount; + } + } + + private ServerSocket setupServerSocket() throws Exception { + return new ServerSocket(0, 1); + } + /** * clean previous std error and outs */