diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 595f4678ce6..9ddcdf8bfa8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1084,6 +1084,7 @@ Release 0.22.0 - Unreleased (jghoman) HDFS-1330. Make RPCs to DataNodes timeout. (hairong) + Added additional unit tests per HADOOP-6889. (John George via mattf) HDFS-202. HDFS support of listLocatedStatus introduced in HADOOP-6870. HDFS piggyback block locations to each file status when listing a diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index 05fa648653a..714bce7045e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -25,7 +25,12 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import java.net.SocketTimeoutException; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.LongWritable; import java.io.IOException; +import java.net.InetSocketAddress; import java.io.InputStream; import java.io.OutputStream; import java.security.MessageDigest; @@ -44,6 +49,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -52,6 +59,11 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; import org.mockito.internal.stubbing.answers.ThrowsException; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -61,9 +73,51 @@ import org.mockito.stubbing.Answer; * properly in case of errors. */ public class TestDFSClientRetries extends TestCase { + private static final String ADDRESS = "0.0.0.0"; + final static private int PING_INTERVAL = 1000; + final static private int MIN_SLEEP_TIME = 1000; public static final Log LOG = LogFactory.getLog(TestDFSClientRetries.class.getName()); - + final static private Configuration conf = new HdfsConfiguration(); + + private static class TestServer extends Server { + private boolean sleep; + private Class responseClass; + + public TestServer(int handlerCount, boolean sleep) throws IOException { + this(handlerCount, sleep, LongWritable.class, null); + } + + public TestServer(int handlerCount, boolean sleep, + Class paramClass, + Class responseClass) + throws IOException { + super(ADDRESS, 0, paramClass, handlerCount, conf); + this.sleep = sleep; + this.responseClass = responseClass; + } + + @Override + public Writable call(Class protocol, Writable param, long receiveTime) + throws IOException { + if (sleep) { + // sleep a bit + try { + Thread.sleep(PING_INTERVAL + MIN_SLEEP_TIME); + } catch (InterruptedException e) {} + } + if (responseClass != null) { + try { + return responseClass.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + return param; // echo param as result + } + } + } + // writes 'len' bytes of data to out. private static void writeData(OutputStream out, int len) throws IOException { byte [] buf = new byte[4096*16]; @@ -80,8 +134,6 @@ public class TestDFSClientRetries extends TestCase { */ public void testWriteTimeoutAtDataNode() throws IOException, InterruptedException { - Configuration conf = new HdfsConfiguration(); - final int writeTimeout = 100; //milliseconds. // set a very short write timeout for datanode, so that tests runs fast. conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, writeTimeout); @@ -136,7 +188,6 @@ public class TestDFSClientRetries extends TestCase { { final String exceptionMsg = "Nope, not replicated yet..."; final int maxRetries = 1; // Allow one retry (total of two calls) - Configuration conf = new HdfsConfiguration(); conf.setInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, maxRetries); NameNode mockNN = mock(NameNode.class); @@ -182,7 +233,6 @@ public class TestDFSClientRetries extends TestCase { long fileSize = 4096; Path file = new Path("/testFile"); - Configuration conf = new Configuration(); // Set short retry timeout so this test runs faster conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); @@ -379,7 +429,6 @@ public class TestDFSClientRetries extends TestCase { long blockSize = 128*1024*1024; // DFS block size int bufferSize = 4096; - Configuration conf = new HdfsConfiguration(); conf.setInt(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, xcievers); conf.setInt(DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, retries); @@ -540,7 +589,6 @@ public class TestDFSClientRetries extends TestCase { final String f = "/testGetFileChecksum"; final Path p = new Path(f); - final Configuration conf = new Configuration(); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); try { cluster.waitActive(); @@ -566,5 +614,39 @@ public class TestDFSClientRetries extends TestCase { cluster.shutdown(); } } + + /** Test that timeout occurs when DN does not respond to RPC. + * Start up a server and ask it to sleep for n seconds. Make an + * RPC to the server and set rpcTimeout to less than n and ensure + * that socketTimeoutException is obtained + */ + public void testClientDNProtocolTimeout() throws IOException { + final Server server = new TestServer(1, true); + server.start(); + + final InetSocketAddress addr = NetUtils.getConnectAddress(server); + DatanodeID fakeDnId = new DatanodeID( + "localhost:" + addr.getPort(), "fake-storage", 0, addr.getPort()); + + ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L)); + LocatedBlock fakeBlock = new LocatedBlock(b, new DatanodeInfo[0]); + + ClientDatanodeProtocol proxy = null; + + try { + proxy = DFSUtil.createClientDatanodeProtocolProxy( + fakeDnId, conf, 500, fakeBlock); + + proxy.getReplicaVisibleLength(null); + fail ("Did not get expected exception: SocketTimeoutException"); + } catch (SocketTimeoutException e) { + LOG.info("Got the expected Exception: SocketTimeoutException"); + } finally { + if (proxy != null) { + RPC.stopProxy(proxy); + } + server.stop(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java index eb58f7f195a..b1f1fc911c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java @@ -22,6 +22,20 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.List; +import java.net.InetSocketAddress; + +import java.net.SocketTimeoutException; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; + +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -38,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; @@ -48,6 +63,50 @@ import org.junit.Test; * This tests InterDataNodeProtocol for block handling. */ public class TestInterDatanodeProtocol { + private static final String ADDRESS = "0.0.0.0"; + final static private int PING_INTERVAL = 1000; + final static private int MIN_SLEEP_TIME = 1000; + private static Configuration conf = new HdfsConfiguration(); + + + private static class TestServer extends Server { + private boolean sleep; + private Class responseClass; + + public TestServer(int handlerCount, boolean sleep) throws IOException { + this(handlerCount, sleep, LongWritable.class, null); + } + + public TestServer(int handlerCount, boolean sleep, + Class paramClass, + Class responseClass) + throws IOException { + super(ADDRESS, 0, paramClass, handlerCount, conf); + this.sleep = sleep; + this.responseClass = responseClass; + } + + @Override + public Writable call(Class protocol, Writable param, long receiveTime) + throws IOException { + if (sleep) { + // sleep a bit + try { + Thread.sleep(PING_INTERVAL + MIN_SLEEP_TIME); + } catch (InterruptedException e) {} + } + if (responseClass != null) { + try { + return responseClass.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + return param; // echo param as result + } + } + } + public static void checkMetaInfo(ExtendedBlock b, DataNode dn) throws IOException { Block metainfo = dn.data.getStoredBlock(b.getBlockPoolId(), b.getBlockId()); Assert.assertEquals(b.getBlockId(), metainfo.getBlockId()); @@ -73,7 +132,6 @@ public class TestInterDatanodeProtocol { */ @Test public void testBlockMetaDataInfo() throws Exception { - Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = null; try { @@ -222,7 +280,6 @@ public class TestInterDatanodeProtocol { * */ @Test public void testUpdateReplicaUnderRecovery() throws IOException { - final Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = null; try { @@ -291,4 +348,33 @@ public class TestInterDatanodeProtocol { if (cluster != null) cluster.shutdown(); } } + + /** Test to verify that InterDatanode RPC timesout as expected when + * the server DN does not respond. + */ + @Test + public void testInterDNProtocolTimeout() throws Exception { + final Server server = new TestServer(1, true); + server.start(); + + final InetSocketAddress addr = NetUtils.getConnectAddress(server); + DatanodeID fakeDnId = new DatanodeID( + "localhost:" + addr.getPort(), "fake-storage", 0, addr.getPort()); + DatanodeInfo dInfo = new DatanodeInfo(fakeDnId); + InterDatanodeProtocol proxy = null; + + try { + proxy = DataNode.createInterDataNodeProtocolProxy( + dInfo, conf, 500); + proxy.initReplicaRecovery(null); + fail ("Expected SocketTimeoutException exception, but did not get."); + } catch (SocketTimeoutException e) { + DataNode.LOG.info("Got expected Exception: SocketTimeoutException" + e); + } finally { + if (proxy != null) { + RPC.stopProxy(proxy); + } + server.stop(); + } + } }