From 646fcc8a560e1e175b4cb0b7d22b015a1c9f7903 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Tue, 12 Nov 2013 01:56:56 +0000 Subject: [PATCH 1/7] HDFS-5495. Remove further JUnit3 usages from HDFS. Contributed by Jarek Jarcec Cecho. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1540914 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/TestHttpFSKerberosAuthenticationHandler.java | 2 +- .../hadoop/fs/http/server/TestHttpFSWithKerberos.java | 2 +- .../security/TestDelegationTokenManagerService.java | 2 +- .../java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java | 3 +-- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +++ .../test/java/org/apache/hadoop/hdfs/TestAbandonBlock.java | 3 +-- .../test/java/org/apache/hadoop/hdfs/TestConnCache.java | 3 +-- .../org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java | 3 +-- .../java/org/apache/hadoop/hdfs/TestDFSOutputStream.java | 3 +-- .../org/apache/hadoop/hdfs/TestFileInputStreamCache.java | 2 +- .../hadoop/hdfs/server/datanode/TestBlockReport.java | 3 +-- .../hadoop/hdfs/server/datanode/TestCachingStrategy.java | 3 +-- .../hadoop/hdfs/server/datanode/TestFsDatasetCache.java | 2 +- .../server/datanode/TestMultipleNNDataBlockScanner.java | 6 +----- .../fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java | 3 +-- .../hdfs/server/namenode/TestPathBasedCacheRequests.java | 7 +++---- .../java/org/apache/hadoop/hdfs/util/TestXMLUtils.java | 3 +-- .../apache/hadoop/hdfs/web/TestURLConnectionFactory.java | 3 +-- .../java/org/apache/hadoop/net/TestNetworkTopology.java | 3 +-- 19 files changed, 23 insertions(+), 36 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSKerberosAuthenticationHandler.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSKerberosAuthenticationHandler.java index db4cdeeadb8..e56f88b45a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSKerberosAuthenticationHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSKerberosAuthenticationHandler.java @@ -18,7 +18,6 @@ package org.apache.hadoop.fs.http.server; -import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.http.client.HttpFSFileSystem; import org.apache.hadoop.fs.http.client.HttpFSKerberosAuthenticator; @@ -36,6 +35,7 @@ import org.apache.hadoop.test.TestDir; import org.apache.hadoop.test.TestDirHelper; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSWithKerberos.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSWithKerberos.java index 140f8661a59..45ce8ed7302 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSWithKerberos.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSWithKerberos.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.fs.http.server; -import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.DelegationTokenRenewer; @@ -40,6 +39,7 @@ import org.apache.hadoop.test.TestJettyHelper; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.junit.After; +import org.junit.Assert; import org.junit.Test; import org.mortbay.jetty.Server; import org.mortbay.jetty.webapp.WebAppContext; diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/lib/service/security/TestDelegationTokenManagerService.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/lib/service/security/TestDelegationTokenManagerService.java index 480d04c8f42..c56e3c2d6bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/lib/service/security/TestDelegationTokenManagerService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/lib/service/security/TestDelegationTokenManagerService.java @@ -18,7 +18,6 @@ package org.apache.hadoop.lib.service.security; -import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.http.server.HttpFSServerWebApp; import org.apache.hadoop.lib.server.Server; @@ -30,6 +29,7 @@ import org.apache.hadoop.test.HTestCase; import org.apache.hadoop.test.TestDir; import org.apache.hadoop.test.TestDirHelper; import org.apache.hadoop.util.StringUtils; +import org.junit.Assert; import org.junit.Test; import java.net.InetAddress; diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java index c4451364564..1a808807f9a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java @@ -26,8 +26,6 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.concurrent.ConcurrentNavigableMap; -import junit.framework.Assert; - import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -49,6 +47,7 @@ import org.apache.hadoop.nfs.nfs3.response.CREATE3Response; import org.apache.hadoop.nfs.nfs3.response.READ3Response; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.SecurityHandler; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d61ef530991..83c53998a7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -190,6 +190,9 @@ Trunk (Unreleased) HDFS-5326. add modifyDirective to cacheAdmin. (cmccabe) + HDFS-5495. Remove further JUnit3 usages from HDFS. + (Jarek Jarcec Cecho via wang) + OPTIMIZATIONS HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAbandonBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAbandonBlock.java index f5c3b4c1c1a..5370d792be4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAbandonBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAbandonBlock.java @@ -21,8 +21,6 @@ import static org.junit.Assert.fail; import java.io.IOException; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -33,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java index b140bae1109..837e5523210 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java @@ -22,8 +22,6 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; import java.net.InetSocketAddress; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -32,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.security.token.Token; +import org.junit.Assert; import org.junit.Test; import org.mockito.Matchers; import org.mockito.Mockito; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java index 3574323d545..08842b5618f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java @@ -23,8 +23,6 @@ import java.io.IOException; import java.io.OutputStream; import java.util.List; -import junit.framework.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -33,6 +31,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.util.ThreadUtil; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 86b732f992a..678a3b8d17e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -20,12 +20,11 @@ package org.apache.hadoop.hdfs; import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; -import junit.framework.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.internal.util.reflection.Whitebox; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java index 2d02f3bd172..7eef5383855 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java @@ -17,13 +17,13 @@ */ package org.apache.hadoop.hdfs; -import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.junit.Assert; import org.junit.Test; import java.io.FileInputStream; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java index d1c3d9eccb1..4dd22c9faf5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java @@ -29,8 +29,6 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; @@ -61,6 +59,7 @@ import org.apache.hadoop.test.GenericTestUtils.DelayAnswer; import org.apache.hadoop.util.Time; import org.apache.log4j.Level; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java index b2e42e6ef5f..5a2a52a7883 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java @@ -22,8 +22,6 @@ import java.util.Arrays; import java.util.Map; import java.util.TreeMap; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -39,6 +37,7 @@ import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheTracker; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index 7f5a9101b65..ccc829cf683 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -17,9 +17,9 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import static junit.framework.Assert.assertTrue; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java index 7a9b67fd39b..143ff283367 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java @@ -20,8 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.IOException; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; @@ -34,12 +32,10 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; -import org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner; import static org.apache.hadoop.hdfs.server.datanode.DataBlockScanner.SLEEP_PERIOD_MS; import org.apache.log4j.Level; -import org.apache.log4j.Logger; +import org.junit.Assert; import org.junit.Test; -import org.junit.Ignore; import static org.junit.Assert.fail; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java index cfc9954b72d..9326b6ad25f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java @@ -23,12 +23,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_AVAILABLE_SPACE_ import java.util.ArrayList; import java.util.List; -import junit.framework.Assert; - import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.ReflectionUtils; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java index 1b7c535380d..a367d69e4fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java @@ -23,10 +23,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.fail; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.security.PrivilegedExceptionAction; @@ -35,8 +35,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -60,6 +58,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.GSet; import org.junit.After; +import org.junit.Assert; import org.junit.Assume; import org.junit.Before; import org.junit.Test; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestXMLUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestXMLUtils.java index 520107c0707..f3ab56c578c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestXMLUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestXMLUtils.java @@ -17,9 +17,8 @@ */ package org.apache.hadoop.hdfs.util; -import junit.framework.Assert; - import org.apache.hadoop.hdfs.util.XMLUtils.UnmanglingError; +import org.junit.Assert; import org.junit.Test; public class TestXMLUtils { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java index a8f8b2f7e2f..e8593b62163 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java @@ -22,9 +22,8 @@ import java.net.HttpURLConnection; import java.net.URL; import java.util.List; -import junit.framework.Assert; - import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; +import org.junit.Assert; import org.junit.Test; import com.google.common.collect.Lists; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java index af09595ff50..154014180a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java @@ -26,8 +26,6 @@ import static org.junit.Assert.fail; import java.util.HashMap; import java.util.Map; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -38,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; From 455269cf9f543d8917547e55b587ca778347af30 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Tue, 12 Nov 2013 02:06:21 +0000 Subject: [PATCH 2/7] Move HDFS-5495 to 2.3.0 in CHANGES.txt git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1540917 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 83c53998a7e..68c77e44a90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -190,9 +190,6 @@ Trunk (Unreleased) HDFS-5326. add modifyDirective to cacheAdmin. (cmccabe) - HDFS-5495. Remove further JUnit3 usages from HDFS. - (Jarek Jarcec Cecho via wang) - OPTIMIZATIONS HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) @@ -471,6 +468,9 @@ Release 2.3.0 - UNRELEASED HDFS-5467. Remove tab characters in hdfs-default.xml. (Shinichi Yamashita via Andrew Wang) + HDFS-5495. Remove further JUnit3 usages from HDFS. + (Jarek Jarcec Cecho via wang) + OPTIMIZATIONS HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) From eeda370249d3f65a36718942a15867f79654ff66 Mon Sep 17 00:00:00 2001 From: Sanford Ryza Date: Tue, 12 Nov 2013 03:06:10 +0000 Subject: [PATCH 3/7] MAPREDUCE-1176. FixedLengthInputFormat and FixedLengthRecordReader (Mariappan Asokan and BitsOfInfo via Sandy Ryza) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1540931 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapred/FixedLengthInputFormat.java | 97 ++++ .../mapred/FixedLengthRecordReader.java | 89 ++++ .../lib/input/FixedLengthInputFormat.java | 90 ++++ .../lib/input/FixedLengthRecordReader.java | 220 +++++++++ .../mapred/TestFixedLengthInputFormat.java | 416 ++++++++++++++++ .../lib/input/TestFixedLengthInputFormat.java | 461 ++++++++++++++++++ 7 files changed, 1376 insertions(+) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 240a63a605b..6a0deb59f4f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -164,6 +164,9 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-4421. Run MapReduce framework via the distributed cache (jlowe) + MAPREDUCE-1176. FixedLengthInputFormat and FixedLengthRecordReader + (Mariappan Asokan and BitsOfInfo via Sandy Ryza) + OPTIMIZATIONS MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java new file mode 100644 index 00000000000..60ba69c5db8 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; + +/** + * FixedLengthInputFormat is an input format used to read input files + * which contain fixed length records. The content of a record need not be + * text. It can be arbitrary binary data. Users must configure the record + * length property by calling: + * FixedLengthInputFormat.setRecordLength(conf, recordLength);

or + * conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, recordLength); + *

+ * @see FixedLengthRecordReader + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class FixedLengthInputFormat + extends FileInputFormat + implements JobConfigurable { + + private CompressionCodecFactory compressionCodecs = null; + + public static final String FIXED_RECORD_LENGTH = + "fixedlengthinputformat.record.length"; + + /** + * Set the length of each record + * @param conf configuration + * @param recordLength the length of a record + */ + public static void setRecordLength(Configuration conf, int recordLength) { + conf.setInt(FIXED_RECORD_LENGTH, recordLength); + } + + /** + * Get record length value + * @param conf configuration + * @return the record length, zero means none was set + */ + public static int getRecordLength(Configuration conf) { + return conf.getInt(FIXED_RECORD_LENGTH, 0); + } + + @Override + public void configure(JobConf conf) { + compressionCodecs = new CompressionCodecFactory(conf); + } + + @Override + public RecordReader + getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter) + throws IOException { + reporter.setStatus(genericSplit.toString()); + int recordLength = getRecordLength(job); + if (recordLength <= 0) { + throw new IOException("Fixed record length " + recordLength + + " is invalid. It should be set to a value greater than zero"); + } + return new FixedLengthRecordReader(job, (FileSplit)genericSplit, + recordLength); + } + + @Override + protected boolean isSplitable(FileSystem fs, Path file) { + final CompressionCodec codec = compressionCodecs.getCodec(file); + return(null == codec); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java new file mode 100644 index 00000000000..6d85fda5c30 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; + +/** + * A reader to read fixed length records from a split. Record offset is + * returned as key and the record as bytes is returned in value. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class FixedLengthRecordReader + implements RecordReader { + + private int recordLength; + // Make use of the new API implementation to avoid code duplication. + private org.apache.hadoop.mapreduce.lib.input.FixedLengthRecordReader reader; + + public FixedLengthRecordReader(Configuration job, FileSplit split, + int recordLength) throws IOException { + this.recordLength = recordLength; + reader = new org.apache.hadoop.mapreduce.lib.input.FixedLengthRecordReader( + recordLength); + reader.initialize(job, split.getStart(), split.getLength(), + split.getPath()); + } + + @Override + public LongWritable createKey() { + return new LongWritable(); + } + + @Override + public BytesWritable createValue() { + return new BytesWritable(new byte[recordLength]); + } + + @Override + public synchronized boolean next(LongWritable key, BytesWritable value) + throws IOException { + boolean dataRead = reader.nextKeyValue(); + if (dataRead) { + LongWritable newKey = reader.getCurrentKey(); + BytesWritable newValue = reader.getCurrentValue(); + key.set(newKey.get()); + value.set(newValue); + } + return dataRead; + } + + @Override + public float getProgress() throws IOException { + return reader.getProgress(); + } + + @Override + public synchronized long getPos() throws IOException { + return reader.getPos(); + } + + @Override + public void close() throws IOException { + reader.close(); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java new file mode 100644 index 00000000000..9c32e83c5c3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * FixedLengthInputFormat is an input format used to read input files + * which contain fixed length records. The content of a record need not be + * text. It can be arbitrary binary data. Users must configure the record + * length property by calling: + * FixedLengthInputFormat.setRecordLength(conf, recordLength);

or + * conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, recordLength); + *

+ * @see FixedLengthRecordReader + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class FixedLengthInputFormat + extends FileInputFormat { + + public static final String FIXED_RECORD_LENGTH = + "fixedlengthinputformat.record.length"; + + /** + * Set the length of each record + * @param conf configuration + * @param recordLength the length of a record + */ + public static void setRecordLength(Configuration conf, int recordLength) { + conf.setInt(FIXED_RECORD_LENGTH, recordLength); + } + + /** + * Get record length value + * @param conf configuration + * @return the record length, zero means none was set + */ + public static int getRecordLength(Configuration conf) { + return conf.getInt(FIXED_RECORD_LENGTH, 0); + } + + @Override + public RecordReader + createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + int recordLength = getRecordLength(context.getConfiguration()); + if (recordLength <= 0) { + throw new IOException("Fixed record length " + recordLength + + " is invalid. It should be set to a value greater than zero"); + } + return new FixedLengthRecordReader(recordLength); + } + + @Override + protected boolean isSplitable(JobContext context, Path file) { + final CompressionCodec codec = + new CompressionCodecFactory(context.getConfiguration()).getCodec(file); + return (null == codec); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java new file mode 100644 index 00000000000..70fa7b2aec3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; + +/** + * A reader to read fixed length records from a split. Record offset is + * returned as key and the record as bytes is returned in value. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class FixedLengthRecordReader + extends RecordReader { + private static final Log LOG + = LogFactory.getLog(FixedLengthRecordReader.class); + + private int recordLength; + private long start; + private long pos; + private long end; + private long numRecordsRemainingInSplit; + private FSDataInputStream fileIn; + private Seekable filePosition; + private LongWritable key; + private BytesWritable value; + private boolean isCompressedInput; + private Decompressor decompressor; + private InputStream inputStream; + + public FixedLengthRecordReader(int recordLength) { + this.recordLength = recordLength; + } + + @Override + public void initialize(InputSplit genericSplit, + TaskAttemptContext context) throws IOException { + FileSplit split = (FileSplit) genericSplit; + Configuration job = context.getConfiguration(); + final Path file = split.getPath(); + initialize(job, split.getStart(), split.getLength(), file); + } + + // This is also called from the old FixedLengthRecordReader API implementation + public void initialize(Configuration job, long splitStart, long splitLength, + Path file) throws IOException { + start = splitStart; + end = start + splitLength; + long partialRecordLength = start % recordLength; + long numBytesToSkip = 0; + if (partialRecordLength != 0) { + numBytesToSkip = recordLength - partialRecordLength; + } + + // open the file and seek to the start of the split + final FileSystem fs = file.getFileSystem(job); + fileIn = fs.open(file); + + CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); + if (null != codec) { + isCompressedInput = true; + decompressor = CodecPool.getDecompressor(codec); + CompressionInputStream cIn + = codec.createInputStream(fileIn, decompressor); + filePosition = cIn; + inputStream = cIn; + numRecordsRemainingInSplit = Long.MAX_VALUE; + LOG.info( + "Compressed input; cannot compute number of records in the split"); + } else { + fileIn.seek(start); + filePosition = fileIn; + inputStream = fileIn; + long splitSize = end - start - numBytesToSkip; + numRecordsRemainingInSplit = (splitSize + recordLength - 1)/recordLength; + if (numRecordsRemainingInSplit < 0) { + numRecordsRemainingInSplit = 0; + } + LOG.info("Expecting " + numRecordsRemainingInSplit + + " records each with a length of " + recordLength + + " bytes in the split with an effective size of " + + splitSize + " bytes"); + } + if (numBytesToSkip != 0) { + start += inputStream.skip(numBytesToSkip); + } + this.pos = start; + } + + @Override + public synchronized boolean nextKeyValue() throws IOException { + if (key == null) { + key = new LongWritable(); + } + if (value == null) { + value = new BytesWritable(new byte[recordLength]); + } + boolean dataRead = false; + value.setSize(recordLength); + byte[] record = value.getBytes(); + if (numRecordsRemainingInSplit > 0) { + key.set(pos); + int offset = 0; + int numBytesToRead = recordLength; + int numBytesRead = 0; + while (numBytesToRead > 0) { + numBytesRead = inputStream.read(record, offset, numBytesToRead); + if (numBytesRead == -1) { + // EOF + break; + } + offset += numBytesRead; + numBytesToRead -= numBytesRead; + } + numBytesRead = recordLength - numBytesToRead; + pos += numBytesRead; + if (numBytesRead > 0) { + dataRead = true; + if (numBytesRead >= recordLength) { + if (!isCompressedInput) { + numRecordsRemainingInSplit--; + } + } else { + throw new IOException("Partial record(length = " + numBytesRead + + ") found at the end of split."); + } + } else { + numRecordsRemainingInSplit = 0L; // End of input. + } + } + return dataRead; + } + + @Override + public LongWritable getCurrentKey() { + return key; + } + + @Override + public BytesWritable getCurrentValue() { + return value; + } + + @Override + public synchronized float getProgress() throws IOException { + if (start == end) { + return 0.0f; + } else { + return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start)); + } + } + + @Override + public synchronized void close() throws IOException { + try { + if (inputStream != null) { + inputStream.close(); + inputStream = null; + } + } finally { + if (decompressor != null) { + CodecPool.returnDecompressor(decompressor); + decompressor = null; + } + } + } + + // This is called from the old FixedLengthRecordReader API implementation. + public long getPos() { + return pos; + } + + private long getFilePosition() throws IOException { + long retVal; + if (isCompressedInput && null != filePosition) { + retVal = filePosition.getPos(); + } else { + retVal = pos; + } + return retVal; + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java new file mode 100644 index 00000000000..0c614eaf0c7 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java @@ -0,0 +1,416 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.*; +import org.apache.hadoop.util.ReflectionUtils; + +import org.junit.BeforeClass; +import org.junit.Test; +import static junit.framework.Assert.*; + +public class TestFixedLengthInputFormat { + + private static Log LOG; + private static Configuration defaultConf; + private static FileSystem localFs; + private static Path workDir; + private static Reporter voidReporter; + + // some chars for the record data + private static char[] chars; + private static Random charRand; + + @BeforeClass + public static void onlyOnce() { + try { + LOG = LogFactory.getLog(TestFixedLengthInputFormat.class.getName()); + defaultConf = new Configuration(); + defaultConf.set("fs.defaultFS", "file:///"); + localFs = FileSystem.getLocal(defaultConf); + voidReporter = Reporter.NULL; + // our set of chars + chars = ("abcdefghijklmnopqrstuvABCDEFGHIJKLMN OPQRSTUVWXYZ1234567890)" + + "(*&^%$#@!-=> reader = + format.getRecordReader(split, job, voidReporter); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for not setting record length:", exceptionThrown); + } + + /** + * Test with record length set to 0 + */ + @Test (timeout=5000) + public void testZeroRecordLength() throws IOException { + localFs.delete(workDir, true); + Path file = new Path(workDir, new String("testFormat.txt")); + createFile(file, null, 10, 10); + // Set the fixed length record length config property + Configuration testConf = new Configuration(defaultConf); + JobConf job = new JobConf(testConf); + FileInputFormat.setInputPaths(job, workDir); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(job, 0); + format.configure(job); + InputSplit splits[] = format.getSplits(job, 1); + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + RecordReader reader = + format.getRecordReader(split, job, voidReporter); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for zero record length:", exceptionThrown); + } + + /** + * Test with record length set to a negative value + */ + @Test (timeout=5000) + public void testNegativeRecordLength() throws IOException { + localFs.delete(workDir, true); + Path file = new Path(workDir, new String("testFormat.txt")); + createFile(file, null, 10, 10); + // Set the fixed length record length config property + Configuration testConf = new Configuration(defaultConf); + JobConf job = new JobConf(testConf); + FileInputFormat.setInputPaths(job, workDir); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(job, -10); + format.configure(job); + InputSplit splits[] = format.getSplits(job, 1); + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + RecordReader reader = + format.getRecordReader(split, job, voidReporter); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for negative record length:", exceptionThrown); + } + + /** + * Test with partial record at the end of a compressed input file. + */ + @Test (timeout=5000) + public void testPartialRecordCompressedIn() throws IOException { + CompressionCodec gzip = new GzipCodec(); + runPartialRecordTest(gzip); + } + + /** + * Test with partial record at the end of an uncompressed input file. + */ + @Test (timeout=5000) + public void testPartialRecordUncompressedIn() throws IOException { + runPartialRecordTest(null); + } + + /** + * Test using the gzip codec with two input files. + */ + @Test (timeout=5000) + public void testGzipWithTwoInputs() throws IOException { + CompressionCodec gzip = new GzipCodec(); + localFs.delete(workDir, true); + // Create files with fixed length records with 5 byte long records. + writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, + "one two threefour five six seveneightnine ten "); + writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip, + "ten nine eightsevensix five four threetwo one "); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(defaultConf, 5); + JobConf job = new JobConf(defaultConf); + FileInputFormat.setInputPaths(job, workDir); + ReflectionUtils.setConf(gzip, job); + format.configure(job); + InputSplit[] splits = format.getSplits(job, 100); + assertEquals("compressed splits == 2", 2, splits.length); + FileSplit tmp = (FileSplit) splits[0]; + if (tmp.getPath().getName().equals("part2.txt.gz")) { + splits[0] = splits[1]; + splits[1] = tmp; + } + List results = readSplit(format, splits[0], job); + assertEquals("splits[0] length", 10, results.size()); + assertEquals("splits[0][5]", "six ", results.get(5)); + results = readSplit(format, splits[1], job); + assertEquals("splits[1] length", 10, results.size()); + assertEquals("splits[1][0]", "ten ", results.get(0)); + assertEquals("splits[1][1]", "nine ", results.get(1)); + } + + // Create a file containing fixed length records with random data + private ArrayList createFile(Path targetFile, CompressionCodec codec, + int recordLen, + int numRecords) throws IOException { + ArrayList recordList = new ArrayList(numRecords); + OutputStream ostream = localFs.create(targetFile); + if (codec != null) { + ostream = codec.createOutputStream(ostream); + } + Writer writer = new OutputStreamWriter(ostream); + try { + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < numRecords; i++) { + for (int j = 0; j < recordLen; j++) { + sb.append(chars[charRand.nextInt(chars.length)]); + } + String recordData = sb.toString(); + recordList.add(recordData); + writer.write(recordData); + sb.setLength(0); + } + } finally { + writer.close(); + } + return recordList; + } + + private void runRandomTests(CompressionCodec codec) throws IOException { + StringBuilder fileName = new StringBuilder("testFormat.txt"); + if (codec != null) { + fileName.append(".gz"); + } + localFs.delete(workDir, true); + Path file = new Path(workDir, fileName.toString()); + int seed = new Random().nextInt(); + LOG.info("Seed = " + seed); + Random random = new Random(seed); + int MAX_TESTS = 20; + LongWritable key = new LongWritable(); + BytesWritable value = new BytesWritable(); + + for (int i = 0; i < MAX_TESTS; i++) { + LOG.info("----------------------------------------------------------"); + // Maximum total records of 999 + int totalRecords = random.nextInt(999)+1; + // Test an empty file + if (i == 8) { + totalRecords = 0; + } + // Maximum bytes in a record of 100K + int recordLength = random.nextInt(1024*100)+1; + // For the 11th test, force a record length of 1 + if (i == 10) { + recordLength = 1; + } + // The total bytes in the test file + int fileSize = (totalRecords * recordLength); + LOG.info("totalRecords=" + totalRecords + " recordLength=" + + recordLength); + // Create the test file + ArrayList recordList + = createFile(file, codec, recordLength, totalRecords); + assertTrue(localFs.exists(file)); + // Set the fixed length record length config property + Configuration testConf = new Configuration(defaultConf); + FixedLengthInputFormat.setRecordLength(testConf, recordLength); + + int numSplits = 1; + // Arbitrarily set number of splits. + if (i > 0) { + if (i == (MAX_TESTS-1)) { + // Test a split size that is less than record len + numSplits = (int)(fileSize/Math.floor(recordLength/2)); + } else { + if (MAX_TESTS % i == 0) { + // Let us create a split size that is forced to be + // smaller than the end file itself, (ensures 1+ splits) + numSplits = fileSize/(fileSize - random.nextInt(fileSize)); + } else { + // Just pick a random split size with no upper bound + numSplits = Math.max(1, fileSize/random.nextInt(Integer.MAX_VALUE)); + } + } + LOG.info("Number of splits set to: " + numSplits); + } + + // Create the job, and setup the input path + JobConf job = new JobConf(testConf); + FileInputFormat.setInputPaths(job, workDir); + // Try splitting the file in a variety of sizes + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.configure(job); + InputSplit splits[] = format.getSplits(job, numSplits); + LOG.info("Actual number of splits = " + splits.length); + // Test combined split lengths = total file size + long recordOffset = 0; + int recordNumber = 0; + for (InputSplit split : splits) { + RecordReader reader = + format.getRecordReader(split, job, voidReporter); + Class clazz = reader.getClass(); + assertEquals("RecordReader class should be FixedLengthRecordReader:", + FixedLengthRecordReader.class, clazz); + // Plow through the records in this split + while (reader.next(key, value)) { + assertEquals("Checking key", (long)(recordNumber*recordLength), + key.get()); + String valueString = + new String(value.getBytes(), 0, value.getLength()); + assertEquals("Checking record length:", recordLength, + value.getLength()); + assertTrue("Checking for more records than expected:", + recordNumber < totalRecords); + String origRecord = recordList.get(recordNumber); + assertEquals("Checking record content:", origRecord, valueString); + recordNumber++; + } + reader.close(); + } + assertEquals("Total original records should be total read records:", + recordList.size(), recordNumber); + } + } + + private static void writeFile(FileSystem fs, Path name, + CompressionCodec codec, + String contents) throws IOException { + OutputStream stm; + if (codec == null) { + stm = fs.create(name); + } else { + stm = codec.createOutputStream(fs.create(name)); + } + stm.write(contents.getBytes()); + stm.close(); + } + + private static List readSplit(FixedLengthInputFormat format, + InputSplit split, + JobConf job) throws IOException { + List result = new ArrayList(); + RecordReader reader = + format.getRecordReader(split, job, voidReporter); + LongWritable key = reader.createKey(); + BytesWritable value = reader.createValue(); + while (reader.next(key, value)) { + result.add(new String(value.getBytes(), 0, value.getLength())); + } + reader.close(); + return result; + } + + private void runPartialRecordTest(CompressionCodec codec) throws IOException { + localFs.delete(workDir, true); + // Create a file with fixed length records with 5 byte long + // records with a partial record at the end. + StringBuilder fileName = new StringBuilder("testFormat.txt"); + if (codec != null) { + fileName.append(".gz"); + } + writeFile(localFs, new Path(workDir, fileName.toString()), codec, + "one two threefour five six seveneightnine ten"); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(defaultConf, 5); + JobConf job = new JobConf(defaultConf); + FileInputFormat.setInputPaths(job, workDir); + if (codec != null) { + ReflectionUtils.setConf(codec, job); + } + format.configure(job); + InputSplit[] splits = format.getSplits(job, 100); + if (codec != null) { + assertEquals("compressed splits == 1", 1, splits.length); + } + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + List results = readSplit(format, split, job); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for partial record:", exceptionThrown); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java new file mode 100644 index 00000000000..0c94a4d34ce --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java @@ -0,0 +1,461 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.*; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MapContext; +import org.apache.hadoop.mapreduce.MapReduceTestUtil; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.task.MapContextImpl; + +import org.apache.hadoop.util.ReflectionUtils; + +import org.junit.BeforeClass; +import org.junit.Test; +import static junit.framework.Assert.*; + +public class TestFixedLengthInputFormat { + + private static Log LOG; + private static Configuration defaultConf; + private static FileSystem localFs; + private static Path workDir; + + // some chars for the record data + private static char[] chars; + private static Random charRand; + + @BeforeClass + public static void onlyOnce() { + try { + LOG = LogFactory.getLog(TestFixedLengthInputFormat.class.getName()); + defaultConf = new Configuration(); + defaultConf.set("fs.defaultFS", "file:///"); + localFs = FileSystem.getLocal(defaultConf); + // our set of chars + chars = ("abcdefghijklmnopqrstuvABCDEFGHIJKLMN OPQRSTUVWXYZ1234567890)" + + "(*&^%$#@!-=> splits = format.getSplits(job); + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + TaskAttemptContext context = MapReduceTestUtil. + createDummyMapTaskAttemptContext(job.getConfiguration()); + RecordReader reader = + format.createRecordReader(split, context); + MapContext + mcontext = + new MapContextImpl(job.getConfiguration(), context.getTaskAttemptID(), + reader, null, null, MapReduceTestUtil.createDummyReporter(), split); + reader.initialize(split, mcontext); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for not setting record length:", exceptionThrown); + } + + /** + * Test with record length set to 0 + */ + @Test (timeout=5000) + public void testZeroRecordLength() throws Exception { + localFs.delete(workDir, true); + Path file = new Path(workDir, new String("testFormat.txt")); + createFile(file, null, 10, 10); + // Set the fixed length record length config property + Configuration testConf = new Configuration(defaultConf); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(testConf, 0); + Job job = Job.getInstance(testConf); + FileInputFormat.setInputPaths(job, workDir); + List splits = format.getSplits(job); + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + TaskAttemptContext context = + MapReduceTestUtil.createDummyMapTaskAttemptContext( + job.getConfiguration()); + RecordReader reader = + format.createRecordReader(split, context); + MapContext + mcontext = + new MapContextImpl(job.getConfiguration(), context.getTaskAttemptID(), + reader, null, null, MapReduceTestUtil.createDummyReporter(), split); + reader.initialize(split, mcontext); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for zero record length:", exceptionThrown); + } + + /** + * Test with record length set to a negative value + */ + @Test (timeout=5000) + public void testNegativeRecordLength() throws Exception { + localFs.delete(workDir, true); + Path file = new Path(workDir, new String("testFormat.txt")); + createFile(file, null, 10, 10); + // Set the fixed length record length config property + Configuration testConf = new Configuration(defaultConf); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(testConf, -10); + Job job = Job.getInstance(testConf); + FileInputFormat.setInputPaths(job, workDir); + List splits = format.getSplits(job); + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + TaskAttemptContext context = MapReduceTestUtil. + createDummyMapTaskAttemptContext(job.getConfiguration()); + RecordReader reader = + format.createRecordReader(split, context); + MapContext + mcontext = + new MapContextImpl(job.getConfiguration(), context.getTaskAttemptID(), + reader, null, null, MapReduceTestUtil.createDummyReporter(), split); + reader.initialize(split, mcontext); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for negative record length:", exceptionThrown); + } + + /** + * Test with partial record at the end of a compressed input file. + */ + @Test (timeout=5000) + public void testPartialRecordCompressedIn() throws Exception { + CompressionCodec gzip = new GzipCodec(); + runPartialRecordTest(gzip); + } + + /** + * Test with partial record at the end of an uncompressed input file. + */ + @Test (timeout=5000) + public void testPartialRecordUncompressedIn() throws Exception { + runPartialRecordTest(null); + } + + /** + * Test using the gzip codec with two input files. + */ + @Test (timeout=5000) + public void testGzipWithTwoInputs() throws Exception { + CompressionCodec gzip = new GzipCodec(); + localFs.delete(workDir, true); + // Create files with fixed length records with 5 byte long records. + writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, + "one two threefour five six seveneightnine ten "); + writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip, + "ten nine eightsevensix five four threetwo one "); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(defaultConf, 5); + ReflectionUtils.setConf(gzip, defaultConf); + Job job = Job.getInstance(defaultConf); + FileInputFormat.setInputPaths(job, workDir); + List splits = format.getSplits(job); + assertEquals("compressed splits == 2", 2, splits.size()); + FileSplit tmp = (FileSplit) splits.get(0); + if (tmp.getPath().getName().equals("part2.txt.gz")) { + splits.set(0, splits.get(1)); + splits.set(1, tmp); + } + List results = readSplit(format, splits.get(0), job); + assertEquals("splits[0] length", 10, results.size()); + assertEquals("splits[0][5]", "six ", results.get(5)); + results = readSplit(format, splits.get(1), job); + assertEquals("splits[1] length", 10, results.size()); + assertEquals("splits[1][0]", "ten ", results.get(0)); + assertEquals("splits[1][1]", "nine ", results.get(1)); + } + + // Create a file containing fixed length records with random data + private ArrayList createFile(Path targetFile, CompressionCodec codec, + int recordLen, + int numRecords) throws IOException { + ArrayList recordList = new ArrayList(numRecords); + OutputStream ostream = localFs.create(targetFile); + if (codec != null) { + ostream = codec.createOutputStream(ostream); + } + Writer writer = new OutputStreamWriter(ostream); + try { + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < numRecords; i++) { + for (int j = 0; j < recordLen; j++) { + sb.append(chars[charRand.nextInt(chars.length)]); + } + String recordData = sb.toString(); + recordList.add(recordData); + writer.write(recordData); + sb.setLength(0); + } + } finally { + writer.close(); + } + return recordList; + } + + private void runRandomTests(CompressionCodec codec) throws Exception { + StringBuilder fileName = new StringBuilder("testFormat.txt"); + if (codec != null) { + fileName.append(".gz"); + } + localFs.delete(workDir, true); + Path file = new Path(workDir, fileName.toString()); + int seed = new Random().nextInt(); + LOG.info("Seed = " + seed); + Random random = new Random(seed); + int MAX_TESTS = 20; + LongWritable key; + BytesWritable value; + + for (int i = 0; i < MAX_TESTS; i++) { + LOG.info("----------------------------------------------------------"); + // Maximum total records of 999 + int totalRecords = random.nextInt(999)+1; + // Test an empty file + if (i == 8) { + totalRecords = 0; + } + // Maximum bytes in a record of 100K + int recordLength = random.nextInt(1024*100)+1; + // For the 11th test, force a record length of 1 + if (i == 10) { + recordLength = 1; + } + // The total bytes in the test file + int fileSize = (totalRecords * recordLength); + LOG.info("totalRecords=" + totalRecords + " recordLength=" + + recordLength); + // Create the test file + ArrayList recordList = + createFile(file, codec, recordLength, totalRecords); + assertTrue(localFs.exists(file)); + // Set the fixed length record length config property + Configuration testConf = new Configuration(defaultConf); + FixedLengthInputFormat.setRecordLength(testConf, recordLength); + + int numSplits = 1; + // Arbitrarily set number of splits. + if (i > 0) { + if (i == (MAX_TESTS-1)) { + // Test a split size that is less than record len + numSplits = (int)(fileSize/Math.floor(recordLength/2)); + } else { + if (MAX_TESTS % i == 0) { + // Let us create a split size that is forced to be + // smaller than the end file itself, (ensures 1+ splits) + numSplits = fileSize/(fileSize - random.nextInt(fileSize)); + } else { + // Just pick a random split size with no upper bound + numSplits = Math.max(1, fileSize/random.nextInt(Integer.MAX_VALUE)); + } + } + LOG.info("Number of splits set to: " + numSplits); + } + testConf.setLong("mapreduce.input.fileinputformat.split.maxsize", + (long)(fileSize/numSplits)); + + // Create the job, and setup the input path + Job job = Job.getInstance(testConf); + FileInputFormat.setInputPaths(job, workDir); + // Try splitting the file in a variety of sizes + FixedLengthInputFormat format = new FixedLengthInputFormat(); + List splits = format.getSplits(job); + LOG.info("Actual number of splits = " + splits.size()); + // Test combined split lengths = total file size + long recordOffset = 0; + int recordNumber = 0; + for (InputSplit split : splits) { + TaskAttemptContext context = MapReduceTestUtil. + createDummyMapTaskAttemptContext(job.getConfiguration()); + RecordReader reader = + format.createRecordReader(split, context); + MapContext + mcontext = + new MapContextImpl(job.getConfiguration(), context.getTaskAttemptID(), + reader, null, null, MapReduceTestUtil.createDummyReporter(), split); + reader.initialize(split, mcontext); + Class clazz = reader.getClass(); + assertEquals("RecordReader class should be FixedLengthRecordReader:", + FixedLengthRecordReader.class, clazz); + // Plow through the records in this split + while (reader.nextKeyValue()) { + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + assertEquals("Checking key", (long)(recordNumber*recordLength), + key.get()); + String valueString = new String(value.getBytes(), 0, + value.getLength()); + assertEquals("Checking record length:", recordLength, + value.getLength()); + assertTrue("Checking for more records than expected:", + recordNumber < totalRecords); + String origRecord = recordList.get(recordNumber); + assertEquals("Checking record content:", origRecord, valueString); + recordNumber++; + } + reader.close(); + } + assertEquals("Total original records should be total read records:", + recordList.size(), recordNumber); + } + } + + private static void writeFile(FileSystem fs, Path name, + CompressionCodec codec, + String contents) throws IOException { + OutputStream stm; + if (codec == null) { + stm = fs.create(name); + } else { + stm = codec.createOutputStream(fs.create(name)); + } + stm.write(contents.getBytes()); + stm.close(); + } + + private static List readSplit(FixedLengthInputFormat format, + InputSplit split, + Job job) throws Exception { + List result = new ArrayList(); + TaskAttemptContext context = MapReduceTestUtil. + createDummyMapTaskAttemptContext(job.getConfiguration()); + RecordReader reader = + format.createRecordReader(split, context); + MapContext + mcontext = + new MapContextImpl(job.getConfiguration(), context.getTaskAttemptID(), + reader, null, null, MapReduceTestUtil.createDummyReporter(), split); + reader.initialize(split, mcontext); + LongWritable key; + BytesWritable value; + while (reader.nextKeyValue()) { + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + result.add(new String(value.getBytes(), 0, value.getLength())); + } + reader.close(); + return result; + } + + private void runPartialRecordTest(CompressionCodec codec) throws Exception { + localFs.delete(workDir, true); + // Create a file with fixed length records with 5 byte long + // records with a partial record at the end. + StringBuilder fileName = new StringBuilder("testFormat.txt"); + if (codec != null) { + fileName.append(".gz"); + ReflectionUtils.setConf(codec, defaultConf); + } + writeFile(localFs, new Path(workDir, fileName.toString()), codec, + "one two threefour five six seveneightnine ten"); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(defaultConf, 5); + Job job = Job.getInstance(defaultConf); + FileInputFormat.setInputPaths(job, workDir); + List splits = format.getSplits(job); + if (codec != null) { + assertEquals("compressed splits == 1", 1, splits.size()); + } + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + List results = readSplit(format, split, job); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for partial record:", exceptionThrown); + } + +} From 214fa10e52b749ae3f19b49b5d5ee7f360f11b12 Mon Sep 17 00:00:00 2001 From: Chris Nauroth Date: Tue, 12 Nov 2013 03:58:14 +0000 Subject: [PATCH 4/7] YARN-1400. yarn.cmd uses HADOOP_RESOURCEMANAGER_OPTS. Should be YARN_RESOURCEMANAGER_OPTS. Contributed by Raja Aluri. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1540938 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 +++ hadoop-yarn-project/hadoop-yarn/bin/yarn.cmd | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 1e982df92b1..19a59645810 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -124,6 +124,9 @@ Release 2.3.0 - UNRELEASED YARN-1395. Distributed shell application master launched with debug flag can hang waiting for external ls process. (cnauroth) + YARN-1400. yarn.cmd uses HADOOP_RESOURCEMANAGER_OPTS. Should be + YARN_RESOURCEMANAGER_OPTS. (Raja Aluri via cnauroth) + Release 2.2.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/bin/yarn.cmd b/hadoop-yarn-project/hadoop-yarn/bin/yarn.cmd index ee03b1d0d67..8afc6ea0564 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/yarn.cmd +++ b/hadoop-yarn-project/hadoop-yarn/bin/yarn.cmd @@ -180,7 +180,7 @@ goto :eof :resourcemanager set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR%\rm-config\log4j.properties set CLASS=org.apache.hadoop.yarn.server.resourcemanager.ResourceManager - set YARN_OPTS=%YARN_OPTS% %HADOOP_RESOURCEMANAGER_OPTS% + set YARN_OPTS=%YARN_OPTS% %YARN_RESOURCEMANAGER_OPTS% if defined YARN_RESOURCEMANAGER_HEAPSIZE ( set JAVA_HEAP_MAX=-Xmx%YARN_RESOURCEMANAGER_HEAPSIZE%m ) From 8162fdcdbc23d749fdb188ae8419e173c59cb1ed Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Tue, 12 Nov 2013 22:10:09 +0000 Subject: [PATCH 5/7] HDFS-5425. Renaming underconstruction file with snapshots can make NN failure on restart. Contributed by Vinay and Jing Zhao. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1541261 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 ++ .../hdfs/server/namenode/FSEditLogLoader.java | 2 - .../hdfs/server/namenode/FSImageFormat.java | 7 ++- .../hdfs/server/namenode/FSNamesystem.java | 4 +- .../snapshot/INodeDirectoryWithSnapshot.java | 10 +++- .../snapshot/TestRenameWithSnapshots.java | 50 +++++++++++++++++++ 6 files changed, 70 insertions(+), 6 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 68c77e44a90..ad74d142de2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -529,6 +529,9 @@ Release 2.3.0 - UNRELEASED HDFS-5488. Clean up TestHftpURLTimeout. (Haohui Mai via jing9) + HDFS-5425. Renaming underconstruction file with snapshots can make NN failure on + restart. (jing9 and Vinay) + Release 2.2.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index be2aa1cb9a4..aaa82cfa870 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -30,7 +30,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -84,7 +83,6 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Co import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; import org.apache.hadoop.hdfs.util.ChunkedArrayList; import org.apache.hadoop.hdfs.util.Holder; -import org.apache.jasper.tagplugins.jstl.core.Remove; import com.google.common.base.Joiner; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java index 2746c7872b2..4cf5e5ac57d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java @@ -824,7 +824,12 @@ public class FSImageFormat { final INodesInPath iip = fsDir.getLastINodeInPath(path); INodeFile oldnode = INodeFile.valueOf(iip.getINode(0), path); cons.setLocalName(oldnode.getLocalNameBytes()); - cons.setParent(oldnode.getParent()); + INodeReference parentRef = oldnode.getParentReference(); + if (parentRef != null) { + cons.setParentReference(parentRef); + } else { + cons.setParent(oldnode.getParent()); + } if (oldnode instanceof INodeFileWithSnapshot) { cons = new INodeFileUnderConstructionWithSnapshot(cons, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 5a47a9118e0..d4610827cd7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2562,7 +2562,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, final INode[] inodes = analyzeFileState( src, fileId, clientName, previous, onRetryBlock).getINodes(); final INodeFileUnderConstruction pendingFile = - (INodeFileUnderConstruction) inodes[inodes.length - 1]; + (INodeFileUnderConstruction) inodes[inodes.length - 1].asFile(); if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) { // This is a retry. Just return the last block if having locations. @@ -2600,7 +2600,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, analyzeFileState(src, fileId, clientName, previous, onRetryBlock); INode[] inodes = inodesInPath.getINodes(); final INodeFileUnderConstruction pendingFile = - (INodeFileUnderConstruction) inodes[inodes.length - 1]; + (INodeFileUnderConstruction) inodes[inodes.length - 1].asFile(); if (onRetryBlock[0] != null) { if (onRetryBlock[0].getLocations().length > 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java index b5f3caa96c9..dfbf22c7c05 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java @@ -595,7 +595,15 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota { public void replaceChild(final INode oldChild, final INode newChild, final INodeMap inodeMap) { super.replaceChild(oldChild, newChild, inodeMap); - diffs.replaceChild(ListType.CREATED, oldChild, newChild); + if (oldChild.getParentReference() != null && !newChild.isReference()) { + // oldChild is referred by a Reference node. Thus we are replacing the + // referred inode, e.g., + // INodeFileWithSnapshot -> INodeFileUnderConstructionWithSnapshot + // in this case, we do not need to update the diff list + return; + } else { + diffs.replaceChild(ListType.CREATED, oldChild, newChild); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java index 0524ab28e4a..32e8a0a1ccb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.spy; import java.io.File; import java.io.IOException; +import java.util.EnumSet; import java.util.List; import java.util.Random; @@ -40,9 +41,12 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; @@ -102,6 +106,7 @@ public class TestRenameWithSnapshots { @Before public void setUp() throws Exception { + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL).format(true) .build(); cluster.waitActive(); @@ -2289,4 +2294,49 @@ public class TestRenameWithSnapshots { assertEquals(0, diff.getChildrenDiff().getList(ListType.DELETED).size()); assertEquals(0, diff.getChildrenDiff().getList(ListType.CREATED).size()); } + + /** + * Rename of the underconstruction file in snapshot should not fail NN restart + * after checkpoint. Unit test for HDFS-5425. + */ + @Test + public void testRenameUCFileInSnapshot() throws Exception { + final Path test = new Path("/test"); + final Path foo = new Path(test, "foo"); + final Path bar = new Path(foo, "bar"); + hdfs.mkdirs(foo); + // create a file and keep it as underconstruction. + hdfs.create(bar); + SnapshotTestHelper.createSnapshot(hdfs, test, "s0"); + // rename bar --> bar2 + final Path bar2 = new Path(foo, "bar2"); + hdfs.rename(bar, bar2); + + // save namespace and restart + restartClusterAndCheckImage(true); + } + + /** + * Similar with testRenameUCFileInSnapshot, but do renaming first and then + * append file without closing it. Unit test for HDFS-5425. + */ + @Test + public void testAppendFileAfterRenameInSnapshot() throws Exception { + final Path test = new Path("/test"); + final Path foo = new Path(test, "foo"); + final Path bar = new Path(foo, "bar"); + DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPL, SEED); + SnapshotTestHelper.createSnapshot(hdfs, test, "s0"); + // rename bar --> bar2 + final Path bar2 = new Path(foo, "bar2"); + hdfs.rename(bar, bar2); + // append file and keep it as underconstruction. + FSDataOutputStream out = hdfs.append(bar2); + out.writeByte(0); + ((DFSOutputStream) out.getWrappedStream()).hsync( + EnumSet.of(SyncFlag.UPDATE_LENGTH)); + + // save namespace and restart + restartClusterAndCheckImage(true); + } } From ce35e0950cef9250ce2ceffb3b8bfcff533c6b92 Mon Sep 17 00:00:00 2001 From: Colin McCabe Date: Tue, 12 Nov 2013 23:52:23 +0000 Subject: [PATCH 6/7] HDFS-5471. CacheAdmin -listPools fails when user lacks permissions to view all pools (Andrew Wang via Colin Patrick McCabe) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1541323 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 + ...tion.java => InvalidRequestException.java} | 7 +- ...amenodeProtocolServerSideTranslatorPB.java | 46 +--- .../ClientNamenodeProtocolTranslatorPB.java | 34 +-- .../hadoop/hdfs/protocolPB/PBHelper.java | 41 +++ .../hdfs/server/namenode/CacheManager.java | 242 +++++++++--------- .../hdfs/server/namenode/CachePool.java | 24 +- .../server/namenode/FSPermissionChecker.java | 17 +- .../main/proto/ClientNamenodeProtocol.proto | 20 +- .../namenode/TestPathBasedCacheRequests.java | 92 +++++-- 10 files changed, 281 insertions(+), 245 deletions(-) rename hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/{IdNotFoundException.java => InvalidRequestException.java} (80%) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index e4783037e3f..ebe1d315a98 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -277,6 +277,9 @@ Trunk (Unreleased) HADOOP-9740. Fix FsShell '-text' command to be able to read Avro files stored in HDFS and other filesystems. (Allan Yan via cutting) + HDFS-5471. CacheAdmin -listPools fails when user lacks permissions to view + all pools (Andrew Wang via Colin Patrick McCabe) + OPTIMIZATIONS HADOOP-7761. Improve the performance of raw comparisons. (todd) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/IdNotFoundException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/InvalidRequestException.java similarity index 80% rename from hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/IdNotFoundException.java rename to hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/InvalidRequestException.java index 059c2d1d7f7..437276d5c10 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/IdNotFoundException.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/InvalidRequestException.java @@ -20,12 +20,13 @@ package org.apache.hadoop.fs; import java.io.IOException; /** - * Exception corresponding to ID not found - EINVAL + * Thrown when the user makes a malformed request, for example missing required + * parameters or parameters that are not valid. */ -public class IdNotFoundException extends IOException { +public class InvalidRequestException extends IOException { static final long serialVersionUID = 0L; - public IdNotFoundException(String str) { + public InvalidRequestException(String str) { super(str); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 8c905908494..4f0c4d99580 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -1104,21 +1104,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements public AddCachePoolResponseProto addCachePool(RpcController controller, AddCachePoolRequestProto request) throws ServiceException { try { - CachePoolInfo info = - new CachePoolInfo(request.getPoolName()); - if (request.hasOwnerName()) { - info.setOwnerName(request.getOwnerName()); - } - if (request.hasGroupName()) { - info.setGroupName(request.getGroupName()); - } - if (request.hasMode()) { - info.setMode(new FsPermission((short)request.getMode())); - } - if (request.hasWeight()) { - info.setWeight(request.getWeight()); - } - server.addCachePool(info); + server.addCachePool(PBHelper.convert(request.getInfo())); return AddCachePoolResponseProto.newBuilder().build(); } catch (IOException e) { throw new ServiceException(e); @@ -1129,21 +1115,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements public ModifyCachePoolResponseProto modifyCachePool(RpcController controller, ModifyCachePoolRequestProto request) throws ServiceException { try { - CachePoolInfo info = - new CachePoolInfo(request.getPoolName()); - if (request.hasOwnerName()) { - info.setOwnerName(request.getOwnerName()); - } - if (request.hasGroupName()) { - info.setGroupName(request.getGroupName()); - } - if (request.hasMode()) { - info.setMode(new FsPermission((short)request.getMode())); - } - if (request.hasWeight()) { - info.setWeight(request.getWeight()); - } - server.modifyCachePool(info); + server.modifyCachePool(PBHelper.convert(request.getInfo())); return ModifyCachePoolResponseProto.newBuilder().build(); } catch (IOException e) { throw new ServiceException(e); @@ -1174,19 +1146,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements CachePoolInfo pool = iter.next(); ListCachePoolsResponseElementProto.Builder elemBuilder = ListCachePoolsResponseElementProto.newBuilder(); - elemBuilder.setPoolName(pool.getPoolName()); - if (pool.getOwnerName() != null) { - elemBuilder.setOwnerName(pool.getOwnerName()); - } - if (pool.getGroupName() != null) { - elemBuilder.setGroupName(pool.getGroupName()); - } - if (pool.getMode() != null) { - elemBuilder.setMode(pool.getMode().toShort()); - } - if (pool.getWeight() != null) { - elemBuilder.setWeight(pool.getWeight()); - } + elemBuilder.setInfo(PBHelper.convert(pool)); responseBuilder.addElements(elemBuilder.build()); prevPoolName = pool.getPoolName(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index c570917f52e..129ac8c717a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -1109,19 +1109,7 @@ public class ClientNamenodeProtocolTranslatorPB implements public void addCachePool(CachePoolInfo info) throws IOException { AddCachePoolRequestProto.Builder builder = AddCachePoolRequestProto.newBuilder(); - builder.setPoolName(info.getPoolName()); - if (info.getOwnerName() != null) { - builder.setOwnerName(info.getOwnerName()); - } - if (info.getGroupName() != null) { - builder.setGroupName(info.getGroupName()); - } - if (info.getMode() != null) { - builder.setMode(info.getMode().toShort()); - } - if (info.getWeight() != null) { - builder.setWeight(info.getWeight()); - } + builder.setInfo(PBHelper.convert(info)); try { rpcProxy.addCachePool(null, builder.build()); } catch (ServiceException e) { @@ -1133,19 +1121,7 @@ public class ClientNamenodeProtocolTranslatorPB implements public void modifyCachePool(CachePoolInfo req) throws IOException { ModifyCachePoolRequestProto.Builder builder = ModifyCachePoolRequestProto.newBuilder(); - builder.setPoolName(req.getPoolName()); - if (req.getOwnerName() != null) { - builder.setOwnerName(req.getOwnerName()); - } - if (req.getGroupName() != null) { - builder.setGroupName(req.getGroupName()); - } - if (req.getMode() != null) { - builder.setMode(req.getMode().toShort()); - } - if (req.getWeight() != null) { - builder.setWeight(req.getWeight()); - } + builder.setInfo(PBHelper.convert(req)); try { rpcProxy.modifyCachePool(null, builder.build()); } catch (ServiceException e) { @@ -1175,11 +1151,7 @@ public class ClientNamenodeProtocolTranslatorPB implements @Override public CachePoolInfo get(int i) { ListCachePoolsResponseElementProto elem = proto.getElements(i); - return new CachePoolInfo(elem.getPoolName()). - setOwnerName(elem.getOwnerName()). - setGroupName(elem.getGroupName()). - setMode(new FsPermission((short)elem.getMode())). - setWeight(elem.getWeight()); + return PBHelper.convert(elem.getInfo()); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 1ef07d5d563..f2d9a9f1b23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.protocolPB; +import static com.google.common.base.Preconditions.checkNotNull; + import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -34,6 +36,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.DatanodeID; @@ -53,6 +56,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto; @@ -1602,6 +1606,43 @@ public class PBHelper { return builder.build(); } + public static CachePoolInfoProto convert(CachePoolInfo info) { + CachePoolInfoProto.Builder builder = CachePoolInfoProto.newBuilder(); + builder.setPoolName(info.getPoolName()); + if (info.getOwnerName() != null) { + builder.setOwnerName(info.getOwnerName()); + } + if (info.getGroupName() != null) { + builder.setGroupName(info.getGroupName()); + } + if (info.getMode() != null) { + builder.setMode(info.getMode().toShort()); + } + if (info.getWeight() != null) { + builder.setWeight(info.getWeight()); + } + return builder.build(); + } + + public static CachePoolInfo convert (CachePoolInfoProto proto) { + // Pool name is a required field, the rest are optional + String poolName = checkNotNull(proto.getPoolName()); + CachePoolInfo info = new CachePoolInfo(poolName); + if (proto.hasOwnerName()) { + info.setOwnerName(proto.getOwnerName()); + } + if (proto.hasGroupName()) { + info.setGroupName(proto.getGroupName()); + } + if (proto.hasMode()) { + info.setMode(new FsPermission((short)proto.getMode())); + } + if (proto.hasWeight()) { + info.setWeight(proto.getWeight()); + } + return info; + } + public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) { return HdfsProtos.ChecksumTypeProto.valueOf(type.id); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java index a74a8b41aba..0dd46e8696d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java @@ -43,7 +43,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.IdNotFoundException; +import org.apache.hadoop.fs.InvalidRequestException; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hdfs.DFSUtil; @@ -250,11 +250,87 @@ public final class CacheManager { private long getNextEntryId() throws IOException { assert namesystem.hasWriteLock(); if (nextEntryId == Long.MAX_VALUE) { - throw new IOException("No more available IDs"); + throw new IOException("No more available IDs."); } return nextEntryId++; } + // Helper getter / validation methods + + private static void checkWritePermission(FSPermissionChecker pc, + CachePool pool) throws AccessControlException { + if ((pc != null)) { + pc.checkPermission(pool, FsAction.WRITE); + } + } + + private static String validatePoolName(PathBasedCacheDirective directive) + throws InvalidRequestException { + String pool = directive.getPool(); + if (pool == null) { + throw new InvalidRequestException("No pool specified."); + } + if (pool.isEmpty()) { + throw new InvalidRequestException("Invalid empty pool name."); + } + return pool; + } + + private static String validatePath(PathBasedCacheDirective directive) + throws InvalidRequestException { + if (directive.getPath() == null) { + throw new InvalidRequestException("No path specified."); + } + String path = directive.getPath().toUri().getPath(); + if (!DFSUtil.isValidName(path)) { + throw new InvalidRequestException("Invalid path '" + path + "'."); + } + return path; + } + + private static short validateReplication(PathBasedCacheDirective directive, + short defaultValue) throws InvalidRequestException { + short repl = (directive.getReplication() != null) + ? directive.getReplication() : defaultValue; + if (repl <= 0) { + throw new InvalidRequestException("Invalid replication factor " + repl + + " <= 0"); + } + return repl; + } + + /** + * Get a PathBasedCacheEntry by ID, validating the ID and that the entry + * exists. + */ + private PathBasedCacheEntry getById(long id) throws InvalidRequestException { + // Check for invalid IDs. + if (id <= 0) { + throw new InvalidRequestException("Invalid negative ID."); + } + // Find the entry. + PathBasedCacheEntry entry = entriesById.get(id); + if (entry == null) { + throw new InvalidRequestException("No directive with ID " + id + + " found."); + } + return entry; + } + + /** + * Get a CachePool by name, validating that it exists. + */ + private CachePool getCachePool(String poolName) + throws InvalidRequestException { + CachePool pool = cachePools.get(poolName); + if (pool == null) { + throw new InvalidRequestException("Unknown pool " + poolName); + } + return pool; + } + + // RPC handlers + private void addInternal(PathBasedCacheEntry entry) { entriesById.put(entry.getEntryId(), entry); String path = entry.getPath(); @@ -272,34 +348,10 @@ public final class CacheManager { assert namesystem.hasWriteLock(); PathBasedCacheEntry entry; try { - if (directive.getPool() == null) { - throw new IdNotFoundException("addDirective: no pool was specified."); - } - if (directive.getPool().isEmpty()) { - throw new IdNotFoundException("addDirective: pool name was empty."); - } - CachePool pool = cachePools.get(directive.getPool()); - if (pool == null) { - throw new IdNotFoundException("addDirective: no such pool as " + - directive.getPool()); - } - if ((pc != null) && (!pc.checkPermission(pool, FsAction.WRITE))) { - throw new AccessControlException("addDirective: write " + - "permission denied for pool " + directive.getPool()); - } - if (directive.getPath() == null) { - throw new IOException("addDirective: no path was specified."); - } - String path = directive.getPath().toUri().getPath(); - if (!DFSUtil.isValidName(path)) { - throw new IOException("addDirective: path '" + path + "' is invalid."); - } - short replication = directive.getReplication() == null ? - (short)1 : directive.getReplication(); - if (replication <= 0) { - throw new IOException("addDirective: replication " + replication + - " is invalid."); - } + CachePool pool = getCachePool(validatePoolName(directive)); + checkWritePermission(pc, pool); + String path = validatePath(directive); + short replication = validateReplication(directive, (short)1); long id; if (directive.getId() != null) { // We are loading an entry from the edit log. @@ -312,10 +364,10 @@ public final class CacheManager { entry = new PathBasedCacheEntry(id, path, replication, pool); addInternal(entry); } catch (IOException e) { - LOG.warn("addDirective " + directive + ": failed.", e); + LOG.warn("addDirective of " + directive + " failed: ", e); throw e; } - LOG.info("addDirective " + directive + ": succeeded."); + LOG.info("addDirective of " + directive + " successful."); if (monitor != null) { monitor.kick(); } @@ -332,75 +384,43 @@ public final class CacheManager { // Check for invalid IDs. Long id = directive.getId(); if (id == null) { - throw new IdNotFoundException("modifyDirective: " + - "no ID to modify was supplied."); - } - if (id <= 0) { - throw new IdNotFoundException("modifyDirective " + id + - ": invalid non-positive directive ID."); - } - // Find the entry. - PathBasedCacheEntry prevEntry = entriesById.get(id); - if (prevEntry == null) { - throw new IdNotFoundException("modifyDirective " + id + - ": id not found."); - } - if ((pc != null) && - (!pc.checkPermission(prevEntry.getPool(), FsAction.WRITE))) { - throw new AccessControlException("modifyDirective " + id + - ": permission denied for initial pool " + prevEntry.getPool()); + throw new InvalidRequestException("Must supply an ID."); } + PathBasedCacheEntry prevEntry = getById(id); + checkWritePermission(pc, prevEntry.getPool()); String path = prevEntry.getPath(); if (directive.getPath() != null) { - path = directive.getPath().toUri().getPath(); - if (!DFSUtil.isValidName(path)) { - throw new IOException("modifyDirective " + id + ": new path " + - path + " is not valid."); - } + path = validatePath(directive); } - short replication = (directive.getReplication() != null) ? - directive.getReplication() : prevEntry.getReplication(); - if (replication <= 0) { - throw new IOException("modifyDirective: replication " + replication + - " is invalid."); + short replication = prevEntry.getReplication(); + if (directive.getReplication() != null) { + replication = validateReplication(directive, replication); } CachePool pool = prevEntry.getPool(); if (directive.getPool() != null) { - pool = cachePools.get(directive.getPool()); - if (pool == null) { - throw new IdNotFoundException("modifyDirective " + id + - ": pool " + directive.getPool() + " not found."); - } - if (directive.getPool().isEmpty()) { - throw new IdNotFoundException("modifyDirective: pool name was " + - "empty."); - } - if ((pc != null) && - (!pc.checkPermission(pool, FsAction.WRITE))) { - throw new AccessControlException("modifyDirective " + id + - ": permission denied for target pool " + pool); - } + pool = getCachePool(validatePoolName(directive)); + checkWritePermission(pc, pool); } removeInternal(prevEntry); PathBasedCacheEntry newEntry = new PathBasedCacheEntry(id, path, replication, pool); addInternal(newEntry); } catch (IOException e) { - LOG.warn("modifyDirective " + idString + ": failed.", e); + LOG.warn("modifyDirective of " + idString + " failed: ", e); throw e; } - LOG.info("modifyDirective " + idString + ": successfully applied " + - directive); + LOG.info("modifyDirective of " + idString + " successfully applied " + + directive + "."); } public void removeInternal(PathBasedCacheEntry existing) - throws IOException { + throws InvalidRequestException { assert namesystem.hasWriteLock(); // Remove the corresponding entry in entriesByPath. String path = existing.getPath(); List entries = entriesByPath.get(path); if (entries == null || !entries.remove(existing)) { - throw new IdNotFoundException("removeInternal: failed to locate entry " + + throw new InvalidRequestException("Failed to locate entry " + existing.getEntryId() + " by path " + existing.getPath()); } if (entries.size() == 0) { @@ -413,32 +433,17 @@ public final class CacheManager { throws IOException { assert namesystem.hasWriteLock(); try { - // Check for invalid IDs. - if (id <= 0) { - throw new IdNotFoundException("removeDirective " + id + ": invalid " + - "non-positive directive ID."); - } - // Find the entry. - PathBasedCacheEntry existing = entriesById.get(id); - if (existing == null) { - throw new IdNotFoundException("removeDirective " + id + - ": id not found."); - } - if ((pc != null) && - (!pc.checkPermission(existing.getPool(), FsAction.WRITE))) { - throw new AccessControlException("removeDirective " + id + - ": write permission denied on pool " + - existing.getPool().getPoolName()); - } + PathBasedCacheEntry existing = getById(id); + checkWritePermission(pc, existing.getPool()); removeInternal(existing); } catch (IOException e) { - LOG.warn("removeDirective " + id + " failed.", e); + LOG.warn("removeDirective of " + id + " failed: ", e); throw e; } if (monitor != null) { monitor.kick(); } - LOG.info("removeDirective " + id + ": succeeded."); + LOG.info("removeDirective of " + id + " successful."); } public BatchedListEntries @@ -449,18 +454,13 @@ public final class CacheManager { final int NUM_PRE_ALLOCATED_ENTRIES = 16; String filterPath = null; if (filter.getId() != null) { - throw new IOException("we currently don't support filtering by ID"); + throw new IOException("Filtering by ID is unsupported."); } if (filter.getPath() != null) { - filterPath = filter.getPath().toUri().getPath(); - if (!DFSUtil.isValidName(filterPath)) { - throw new IOException("listPathBasedCacheDirectives: invalid " + - "path name '" + filterPath + "'"); - } + filterPath = validatePath(filter); } if (filter.getReplication() != null) { - throw new IOException("we currently don't support filtering " + - "by replication"); + throw new IOException("Filtering by replication is unsupported."); } ArrayList replies = new ArrayList(NUM_PRE_ALLOCATED_ENTRIES); @@ -481,8 +481,15 @@ public final class CacheManager { !directive.getPath().toUri().getPath().equals(filterPath)) { continue; } - if ((pc == null) || - (pc.checkPermission(curEntry.getPool(), FsAction.READ))) { + boolean hasPermission = true; + if (pc != null) { + try { + pc.checkPermission(curEntry.getPool(), FsAction.READ); + } catch (AccessControlException e) { + hasPermission = false; + } + } + if (hasPermission) { replies.add(cur.getValue().toDirective()); numReplies++; } @@ -505,12 +512,13 @@ public final class CacheManager { String poolName = info.getPoolName(); CachePool pool = cachePools.get(poolName); if (pool != null) { - throw new IOException("cache pool " + poolName + " already exists."); + throw new InvalidRequestException("Cache pool " + poolName + + " already exists."); } pool = CachePool.createFromInfoAndDefaults(info); cachePools.put(pool.getPoolName(), pool); - LOG.info("created new cache pool " + pool); - return pool.getInfo(true); + LOG.info("Created new cache pool " + pool); + return pool.getInfo(null); } /** @@ -528,7 +536,8 @@ public final class CacheManager { String poolName = info.getPoolName(); CachePool pool = cachePools.get(poolName); if (pool == null) { - throw new IOException("cache pool " + poolName + " does not exist."); + throw new InvalidRequestException("Cache pool " + poolName + + " does not exist."); } StringBuilder bld = new StringBuilder(); String prefix = ""; @@ -575,7 +584,8 @@ public final class CacheManager { CachePoolInfo.validateName(poolName); CachePool pool = cachePools.remove(poolName); if (pool == null) { - throw new IOException("can't remove non-existent cache pool " + poolName); + throw new InvalidRequestException( + "Cannot remove non-existent cache pool " + poolName); } // Remove entries using this pool @@ -607,11 +617,7 @@ public final class CacheManager { if (numListed++ >= maxListCachePoolsResponses) { return new BatchedListEntries(results, true); } - if (pc == null) { - results.add(cur.getValue().getInfo(true)); - } else { - results.add(cur.getValue().getInfo(pc)); - } + results.add(cur.getValue().getInfo(pc)); } return new BatchedListEntries(results, false); } @@ -755,7 +761,7 @@ public final class CacheManager { Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); out.writeInt(cachePools.size()); for (CachePool pool: cachePools.values()) { - pool.getInfo(true).writeTo(out); + pool.getInfo(null).writeTo(out); counter.increment(); } prog.endStep(Phase.SAVING_CHECKPOINT, step); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java index 0bc5bb4c6a3..c8d4221c386 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import com.google.common.base.Preconditions; @@ -162,7 +163,7 @@ public final class CachePool { } /** - * Get information about this cache pool. + * Get either full or partial information about this CachePool. * * @param fullInfo * If true, only the name will be returned (i.e., what you @@ -170,7 +171,7 @@ public final class CachePool { * @return * Cache pool information. */ - public CachePoolInfo getInfo(boolean fullInfo) { + private CachePoolInfo getInfo(boolean fullInfo) { CachePoolInfo info = new CachePoolInfo(poolName); if (!fullInfo) { return info; @@ -181,8 +182,25 @@ public final class CachePool { setWeight(weight); } + /** + * Returns a CachePoolInfo describing this CachePool based on the permissions + * of the calling user. Unprivileged users will see only minimal descriptive + * information about the pool. + * + * @param pc Permission checker to be used to validate the user's permissions, + * or null + * @return CachePoolInfo describing this CachePool + */ public CachePoolInfo getInfo(FSPermissionChecker pc) { - return getInfo(pc.checkPermission(this, FsAction.READ)); + boolean hasPermission = true; + if (pc != null) { + try { + pc.checkPermission(this, FsAction.READ); + } catch (AccessControlException e) { + hasPermission = false; + } + } + return getInfo(hasPermission); } public String toString() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java index 644a6397f93..17b43239c83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java @@ -261,24 +261,27 @@ class FSPermissionChecker { * * @param pool CachePool being accessed * @param access type of action being performed on the cache pool - * @return if the pool can be accessed + * @throws AccessControlException if pool cannot be accessed */ - public boolean checkPermission(CachePool pool, FsAction access) { + public void checkPermission(CachePool pool, FsAction access) + throws AccessControlException { FsPermission mode = pool.getMode(); if (isSuperUser()) { - return true; + return; } if (user.equals(pool.getOwnerName()) && mode.getUserAction().implies(access)) { - return true; + return; } if (groups.contains(pool.getGroupName()) && mode.getGroupAction().implies(access)) { - return true; + return; } if (mode.getOtherAction().implies(access)) { - return true; + return; } - return false; + throw new AccessControlException("Permission denied while accessing pool " + + pool.getPoolName() + ": user " + user + " does not have " + + access.toString() + " permissions."); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto index 8217b611ad9..b98fd473168 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto @@ -406,23 +406,23 @@ message ListPathBasedCacheDirectivesResponseProto { required bool hasMore = 2; } -message AddCachePoolRequestProto { - required string poolName = 1; +message CachePoolInfoProto { + optional string poolName = 1; optional string ownerName = 2; optional string groupName = 3; optional int32 mode = 4; optional int32 weight = 5; } +message AddCachePoolRequestProto { + required CachePoolInfoProto info = 1; +} + message AddCachePoolResponseProto { // void response } message ModifyCachePoolRequestProto { - required string poolName = 1; - optional string ownerName = 2; - optional string groupName = 3; - optional int32 mode = 4; - optional int32 weight = 5; + required CachePoolInfoProto info = 1; } message ModifyCachePoolResponseProto { // void response @@ -445,11 +445,7 @@ message ListCachePoolsResponseProto { } message ListCachePoolsResponseElementProto { - required string poolName = 1; - required string ownerName = 2; - required string groupName = 3; - required int32 mode = 4; - required int32 weight = 5; + required CachePoolInfoProto info = 1; } message GetFileLinkInfoRequestProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java index a367d69e4fd..a62a0d73c49 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java @@ -17,14 +17,16 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -40,11 +42,12 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper; -import org.apache.hadoop.fs.IdNotFoundException; +import org.apache.hadoop.fs.InvalidRequestException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -58,7 +61,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.GSet; import org.junit.After; -import org.junit.Assert; import org.junit.Assume; import org.junit.Before; import org.junit.Test; @@ -186,15 +188,15 @@ public class TestPathBasedCacheRequests { fail("expected to get an exception when " + "removing a non-existent pool."); } catch (IOException ioe) { - GenericTestUtils.assertExceptionContains("can't remove " + + GenericTestUtils.assertExceptionContains("Cannot remove " + "non-existent cache pool", ioe); } try { dfs.removeCachePool(poolName); - Assert.fail("expected to get an exception when " + + fail("expected to get an exception when " + "removing a non-existent pool."); } catch (IOException ioe) { - GenericTestUtils.assertExceptionContains("can't remove " + + GenericTestUtils.assertExceptionContains("Cannot remove " + "non-existent cache pool", ioe); } try { @@ -271,18 +273,18 @@ public class TestPathBasedCacheRequests { try { proto.removeCachePool("pool99"); - Assert.fail("expected to get an exception when " + + fail("expected to get an exception when " + "removing a non-existent pool."); } catch (IOException ioe) { - GenericTestUtils.assertExceptionContains("can't remove non-existent", + GenericTestUtils.assertExceptionContains("Cannot remove non-existent", ioe); } try { proto.removeCachePool(poolName); - Assert.fail("expected to get an exception when " + + fail("expected to get an exception when " + "removing a non-existent pool."); } catch (IOException ioe) { - GenericTestUtils.assertExceptionContains("can't remove non-existent", + GenericTestUtils.assertExceptionContains("Cannot remove non-existent", ioe); } @@ -350,8 +352,8 @@ public class TestPathBasedCacheRequests { setPool("no_such_pool"). build()); fail("expected an error when adding to a non-existent pool."); - } catch (IdNotFoundException ioe) { - GenericTestUtils.assertExceptionContains("no such pool as", ioe); + } catch (InvalidRequestException ioe) { + GenericTestUtils.assertExceptionContains("Unknown pool", ioe); } try { @@ -363,7 +365,7 @@ public class TestPathBasedCacheRequests { "mode 0 (no permissions for anyone)."); } catch (AccessControlException e) { GenericTestUtils. - assertExceptionContains("permission denied for pool", e); + assertExceptionContains("Permission denied while accessing pool", e); } try { @@ -383,10 +385,10 @@ public class TestPathBasedCacheRequests { setReplication((short)1). setPool(""). build()); - Assert.fail("expected an error when adding a PathBasedCache " + + fail("expected an error when adding a PathBasedCache " + "directive with an empty pool name."); - } catch (IdNotFoundException e) { - GenericTestUtils.assertExceptionContains("pool name was empty", e); + } catch (InvalidRequestException e) { + GenericTestUtils.assertExceptionContains("Invalid empty pool name", e); } long deltaId = addAsUnprivileged(delta); @@ -404,7 +406,7 @@ public class TestPathBasedCacheRequests { validateListAll(iter, alphaId, alphaId2, betaId, deltaId, relativeId ); iter = dfs.listPathBasedCacheDirectives( new PathBasedCacheDirective.Builder().setPool("pool3").build()); - Assert.assertFalse(iter.hasNext()); + assertFalse(iter.hasNext()); iter = dfs.listPathBasedCacheDirectives( new PathBasedCacheDirective.Builder().setPool("pool1").build()); validateListAll(iter, alphaId, alphaId2, deltaId, relativeId ); @@ -415,27 +417,27 @@ public class TestPathBasedCacheRequests { dfs.removePathBasedCacheDirective(betaId); iter = dfs.listPathBasedCacheDirectives( new PathBasedCacheDirective.Builder().setPool("pool2").build()); - Assert.assertFalse(iter.hasNext()); + assertFalse(iter.hasNext()); try { dfs.removePathBasedCacheDirective(betaId); - Assert.fail("expected an error when removing a non-existent ID"); - } catch (IdNotFoundException e) { - GenericTestUtils.assertExceptionContains("id not found", e); + fail("expected an error when removing a non-existent ID"); + } catch (InvalidRequestException e) { + GenericTestUtils.assertExceptionContains("No directive with ID", e); } try { proto.removePathBasedCacheDirective(-42l); - Assert.fail("expected an error when removing a negative ID"); - } catch (IdNotFoundException e) { + fail("expected an error when removing a negative ID"); + } catch (InvalidRequestException e) { GenericTestUtils.assertExceptionContains( - "invalid non-positive directive ID", e); + "Invalid negative ID", e); } try { proto.removePathBasedCacheDirective(43l); - Assert.fail("expected an error when removing a non-existent ID"); - } catch (IdNotFoundException e) { - GenericTestUtils.assertExceptionContains("id not found", e); + fail("expected an error when removing a non-existent ID"); + } catch (InvalidRequestException e) { + GenericTestUtils.assertExceptionContains("No directive with ID", e); } dfs.removePathBasedCacheDirective(alphaId); @@ -744,4 +746,38 @@ public class TestPathBasedCacheRequests { } } + @Test(timeout=60000) + public void testListCachePoolPermissions() throws Exception { + final UserGroupInformation myUser = UserGroupInformation + .createRemoteUser("myuser"); + final DistributedFileSystem myDfs = + (DistributedFileSystem)DFSTestUtil.getFileSystemAs(myUser, conf); + final String poolName = "poolparty"; + dfs.addCachePool(new CachePoolInfo(poolName) + .setMode(new FsPermission((short)0700))); + // Should only see partial info + RemoteIterator it = myDfs.listCachePools(); + CachePoolInfo info = it.next(); + assertFalse(it.hasNext()); + assertEquals("Expected pool name", poolName, info.getPoolName()); + assertNull("Unexpected owner name", info.getOwnerName()); + assertNull("Unexpected group name", info.getGroupName()); + assertNull("Unexpected mode", info.getMode()); + assertNull("Unexpected weight", info.getWeight()); + // Modify the pool so myuser is now the owner + dfs.modifyCachePool(new CachePoolInfo(poolName) + .setOwnerName(myUser.getShortUserName()) + .setWeight(99)); + // Should see full info + it = myDfs.listCachePools(); + info = it.next(); + assertFalse(it.hasNext()); + assertEquals("Expected pool name", poolName, info.getPoolName()); + assertEquals("Mismatched owner name", myUser.getShortUserName(), + info.getOwnerName()); + assertNotNull("Expected group name", info.getGroupName()); + assertEquals("Mismatched mode", (short) 0700, + info.getMode().toShort()); + assertEquals("Mismatched weight", 99, (int)info.getWeight()); + } } From 9d06631719466679135840d50e55ad62332b3866 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Wed, 13 Nov 2013 00:31:55 +0000 Subject: [PATCH 7/7] HDFS-5450. better API for getting the cached blocks locations. Contributed by Andrew Wang. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1541338 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/hadoop/fs/BlockLocation.java | 101 +++++++++----- .../apache/hadoop/fs/TestBlockLocation.java | 108 +++++++++++++++ hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../apache/hadoop/fs/HdfsBlockLocation.java | 3 +- .../java/org/apache/hadoop/hdfs/DFSUtil.java | 8 +- .../org/apache/hadoop/hdfs/web/JsonUtil.java | 6 +- .../server/datanode/TestFsDatasetCache.java | 7 + .../namenode/TestPathBasedCacheRequests.java | 124 ++++++++++++++++++ 8 files changed, 320 insertions(+), 39 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestBlockLocation.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BlockLocation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BlockLocation.java index fa095343c51..286d8514d6f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BlockLocation.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BlockLocation.java @@ -31,17 +31,33 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceStability.Stable public class BlockLocation { private String[] hosts; // Datanode hostnames + private String[] cachedHosts; // Datanode hostnames with a cached replica private String[] names; // Datanode IP:xferPort for accessing the block private String[] topologyPaths; // Full path name in network topology private long offset; // Offset of the block in the file private long length; private boolean corrupt; + private static final String[] EMPTY_STR_ARRAY = new String[0]; + /** * Default Constructor */ public BlockLocation() { - this(new String[0], new String[0], 0L, 0L); + this(EMPTY_STR_ARRAY, EMPTY_STR_ARRAY, 0L, 0L); + } + + /** + * Copy constructor + */ + public BlockLocation(BlockLocation that) { + this.hosts = that.hosts; + this.cachedHosts = that.cachedHosts; + this.names = that.names; + this.topologyPaths = that.topologyPaths; + this.offset = that.offset; + this.length = that.length; + this.corrupt = that.corrupt; } /** @@ -57,20 +73,7 @@ public class BlockLocation { */ public BlockLocation(String[] names, String[] hosts, long offset, long length, boolean corrupt) { - if (names == null) { - this.names = new String[0]; - } else { - this.names = names; - } - if (hosts == null) { - this.hosts = new String[0]; - } else { - this.hosts = hosts; - } - this.offset = offset; - this.length = length; - this.topologyPaths = new String[0]; - this.corrupt = corrupt; + this(names, hosts, null, offset, length, corrupt); } /** @@ -87,34 +90,55 @@ public class BlockLocation { */ public BlockLocation(String[] names, String[] hosts, String[] topologyPaths, long offset, long length, boolean corrupt) { - this(names, hosts, offset, length, corrupt); + this(names, hosts, null, topologyPaths, offset, length, corrupt); + } + + public BlockLocation(String[] names, String[] hosts, String[] cachedHosts, + String[] topologyPaths, long offset, long length, boolean corrupt) { + if (names == null) { + this.names = EMPTY_STR_ARRAY; + } else { + this.names = names; + } + if (hosts == null) { + this.hosts = EMPTY_STR_ARRAY; + } else { + this.hosts = hosts; + } + if (cachedHosts == null) { + this.cachedHosts = EMPTY_STR_ARRAY; + } else { + this.cachedHosts = cachedHosts; + } if (topologyPaths == null) { - this.topologyPaths = new String[0]; + this.topologyPaths = EMPTY_STR_ARRAY; } else { this.topologyPaths = topologyPaths; } + this.offset = offset; + this.length = length; + this.corrupt = corrupt; } /** * Get the list of hosts (hostname) hosting this block */ public String[] getHosts() throws IOException { - if (hosts == null || hosts.length == 0) { - return new String[0]; - } else { - return hosts; - } + return hosts; + } + + /** + * Get the list of hosts (hostname) hosting a cached replica of the block + */ + public String[] getCachedHosts() { + return cachedHosts; } /** * Get the list of names (IP:xferPort) hosting this block */ public String[] getNames() throws IOException { - if (names == null || names.length == 0) { - return new String[0]; - } else { - return names; - } + return names; } /** @@ -122,11 +146,7 @@ public class BlockLocation { * The last component of the path is the "name" (IP:xferPort). */ public String[] getTopologyPaths() throws IOException { - if (topologyPaths == null || topologyPaths.length == 0) { - return new String[0]; - } else { - return topologyPaths; - } + return topologyPaths; } /** @@ -176,18 +196,29 @@ public class BlockLocation { */ public void setHosts(String[] hosts) throws IOException { if (hosts == null) { - this.hosts = new String[0]; + this.hosts = EMPTY_STR_ARRAY; } else { this.hosts = hosts; } } + /** + * Set the hosts hosting a cached replica of this block + */ + public void setCachedHosts(String[] cachedHosts) { + if (cachedHosts == null) { + this.cachedHosts = EMPTY_STR_ARRAY; + } else { + this.cachedHosts = cachedHosts; + } + } + /** * Set the names (host:port) hosting this block */ public void setNames(String[] names) throws IOException { if (names == null) { - this.names = new String[0]; + this.names = EMPTY_STR_ARRAY; } else { this.names = names; } @@ -198,7 +229,7 @@ public class BlockLocation { */ public void setTopologyPaths(String[] topologyPaths) throws IOException { if (topologyPaths == null) { - this.topologyPaths = new String[0]; + this.topologyPaths = EMPTY_STR_ARRAY; } else { this.topologyPaths = topologyPaths; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestBlockLocation.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestBlockLocation.java new file mode 100644 index 00000000000..3cb608a971f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestBlockLocation.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import org.junit.Test; + +public class TestBlockLocation { + + private static final String[] EMPTY_STR_ARRAY = new String[0]; + + private static void checkBlockLocation(final BlockLocation loc) + throws Exception { + checkBlockLocation(loc, 0, 0, false); + } + + private static void checkBlockLocation(final BlockLocation loc, + final long offset, final long length, final boolean corrupt) + throws Exception { + checkBlockLocation(loc, EMPTY_STR_ARRAY, EMPTY_STR_ARRAY, EMPTY_STR_ARRAY, + EMPTY_STR_ARRAY, offset, length, corrupt); + } + + private static void checkBlockLocation(final BlockLocation loc, + String[] names, String[] hosts, String[] cachedHosts, + String[] topologyPaths, final long offset, final long length, + final boolean corrupt) throws Exception { + assertNotNull(loc.getHosts()); + assertNotNull(loc.getCachedHosts()); + assertNotNull(loc.getNames()); + assertNotNull(loc.getTopologyPaths()); + + assertArrayEquals(hosts, loc.getHosts()); + assertArrayEquals(cachedHosts, loc.getCachedHosts()); + assertArrayEquals(names, loc.getNames()); + assertArrayEquals(topologyPaths, loc.getTopologyPaths()); + + assertEquals(offset, loc.getOffset()); + assertEquals(length, loc.getLength()); + assertEquals(corrupt, loc.isCorrupt()); + } + + /** + * Call all the constructors and verify the delegation is working properly + */ + @Test(timeout = 5000) + public void testBlockLocationConstructors() throws Exception { + // + BlockLocation loc; + loc = new BlockLocation(); + checkBlockLocation(loc); + loc = new BlockLocation(null, null, 1, 2); + checkBlockLocation(loc, 1, 2, false); + loc = new BlockLocation(null, null, null, 1, 2); + checkBlockLocation(loc, 1, 2, false); + loc = new BlockLocation(null, null, null, 1, 2, true); + checkBlockLocation(loc, 1, 2, true); + loc = new BlockLocation(null, null, null, null, 1, 2, true); + checkBlockLocation(loc, 1, 2, true); + } + + /** + * Call each of the setters and verify + */ + @Test(timeout = 5000) + public void testBlockLocationSetters() throws Exception { + BlockLocation loc; + loc = new BlockLocation(); + // Test that null sets the empty array + loc.setHosts(null); + loc.setCachedHosts(null); + loc.setNames(null); + loc.setTopologyPaths(null); + checkBlockLocation(loc); + // Test that not-null gets set properly + String[] names = new String[] { "name" }; + String[] hosts = new String[] { "host" }; + String[] cachedHosts = new String[] { "cachedHost" }; + String[] topologyPaths = new String[] { "path" }; + loc.setNames(names); + loc.setHosts(hosts); + loc.setCachedHosts(cachedHosts); + loc.setTopologyPaths(topologyPaths); + loc.setOffset(1); + loc.setLength(2); + loc.setCorrupt(true); + checkBlockLocation(loc, names, hosts, cachedHosts, topologyPaths, 1, 2, + true); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ad74d142de2..bc333e6e9dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -190,6 +190,8 @@ Trunk (Unreleased) HDFS-5326. add modifyDirective to cacheAdmin. (cmccabe) + HDFS-5450. Better API for getting the cached blocks locations. (wang) + OPTIMIZATIONS HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java index f736d9637eb..0ccacda8d84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java @@ -37,8 +37,7 @@ public class HdfsBlockLocation extends BlockLocation { public HdfsBlockLocation(BlockLocation loc, LocatedBlock block) throws IOException { // Initialize with data from passed in BlockLocation - super(loc.getNames(), loc.getHosts(), loc.getTopologyPaths(), - loc.getOffset(), loc.getLength(), loc.isCorrupt()); + super(loc); this.block = block; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 27c0059b791..a4b6810985e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -419,7 +419,13 @@ public class DFSUtil { locations[hCnt].getNetworkLocation()); racks[hCnt] = node.toString(); } - blkLocations[idx] = new BlockLocation(xferAddrs, hosts, racks, + DatanodeInfo[] cachedLocations = blk.getCachedLocations(); + String[] cachedHosts = new String[cachedLocations.length]; + for (int i=0; i)m.get("blockToken"))); return locatedblock; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index ccc829cf683..b0e907fc1f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock.Mlocker; import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; @@ -87,6 +88,8 @@ public class TestFsDatasetCache { private static DatanodeProtocolClientSideTranslatorPB spyNN; private static PageRounder rounder = new PageRounder(); + private Mlocker mlocker; + @Before public void setUp() throws Exception { assumeTrue(!Path.WINDOWS); @@ -110,6 +113,8 @@ public class TestFsDatasetCache { fsd = dn.getFSDataset(); spyNN = DataNodeTestUtils.spyOnBposToNN(dn, nn); + // Save the current mlocker and replace it at the end of the test + mlocker = MappableBlock.mlocker; } @After @@ -120,6 +125,8 @@ public class TestFsDatasetCache { if (cluster != null) { cluster.shutdown(); } + // Restore the original mlocker + MappableBlock.mlocker = mlocker; } private static void setHeartbeatResponse(DatanodeCommand[] cmds) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java index a62a0d73c49..890ccfb525c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java @@ -31,6 +31,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.nio.MappedByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Iterator; @@ -40,6 +41,8 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.InvalidRequestException; @@ -54,6 +57,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.security.AccessControlException; @@ -78,6 +82,15 @@ public class TestPathBasedCacheRequests { static private DistributedFileSystem dfs; static private NamenodeProtocols proto; + static { + MappableBlock.mlocker = new MappableBlock.Mlocker() { + @Override + public void mlock(MappedByteBuffer mmap, long length) throws IOException { + // Stubbed out for testing + } + }; + } + @Before public void setup() throws Exception { conf = new HdfsConfiguration(); @@ -530,6 +543,14 @@ public class TestPathBasedCacheRequests { assertFalse("Unexpected # of cache directives found", dit.hasNext()); } + /** + * Wait for the NameNode to have an expected number of cached blocks + * and replicas. + * @param nn NameNode + * @param expectedCachedBlocks + * @param expectedCachedReplicas + * @throws Exception + */ private static void waitForCachedBlocks(NameNode nn, final int expectedCachedBlocks, final int expectedCachedReplicas) throws Exception { @@ -570,6 +591,37 @@ public class TestPathBasedCacheRequests { }, 500, 60000); } + private static void checkNumCachedReplicas(final DistributedFileSystem dfs, + final List paths, final int expectedBlocks, + final int expectedReplicas) + throws Exception { + int numCachedBlocks = 0; + int numCachedReplicas = 0; + for (Path p: paths) { + final FileStatus f = dfs.getFileStatus(p); + final long len = f.getLen(); + final long blockSize = f.getBlockSize(); + // round it up to full blocks + final long numBlocks = (len + blockSize - 1) / blockSize; + BlockLocation[] locs = dfs.getFileBlockLocations(p, 0, len); + assertEquals("Unexpected number of block locations for path " + p, + numBlocks, locs.length); + for (BlockLocation l: locs) { + if (l.getCachedHosts().length > 0) { + numCachedBlocks++; + } + numCachedReplicas += l.getCachedHosts().length; + } + } + LOG.info("Found " + numCachedBlocks + " of " + expectedBlocks + " blocks"); + LOG.info("Found " + numCachedReplicas + " of " + expectedReplicas + + " replicas"); + assertEquals("Unexpected number of cached blocks", expectedBlocks, + numCachedBlocks); + assertEquals("Unexpected number of cached replicas", expectedReplicas, + numCachedReplicas); + } + private static final long BLOCK_SIZE = 512; private static final int NUM_DATANODES = 4; @@ -746,6 +798,78 @@ public class TestPathBasedCacheRequests { } } + /** + * Tests stepping the cache replication factor up and down, checking the + * number of cached replicas and blocks as well as the advertised locations. + * @throws Exception + */ + @Test(timeout=120000) + public void testReplicationFactor() throws Exception { + Assume.assumeTrue(canTestDatanodeCaching()); + HdfsConfiguration conf = createCachingConf(); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build(); + + try { + cluster.waitActive(); + DistributedFileSystem dfs = cluster.getFileSystem(); + NameNode namenode = cluster.getNameNode(); + // Create the pool + final String pool = "friendlyPool"; + dfs.addCachePool(new CachePoolInfo(pool)); + // Create some test files + final List paths = new LinkedList(); + paths.add(new Path("/foo/bar")); + paths.add(new Path("/foo/baz")); + paths.add(new Path("/foo2/bar2")); + paths.add(new Path("/foo2/baz2")); + dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault()); + dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault()); + final int numBlocksPerFile = 2; + for (Path path : paths) { + FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile, + (int)BLOCK_SIZE, (short)3, false); + } + waitForCachedBlocks(namenode, 0, 0); + checkNumCachedReplicas(dfs, paths, 0, 0); + // cache directory + long id = dfs.addPathBasedCacheDirective( + new PathBasedCacheDirective.Builder(). + setPath(new Path("/foo")). + setReplication((short)1). + setPool(pool). + build()); + waitForCachedBlocks(namenode, 4, 4); + checkNumCachedReplicas(dfs, paths, 4, 4); + // step up the replication factor + for (int i=2; i<=3; i++) { + dfs.modifyPathBasedCacheDirective( + new PathBasedCacheDirective.Builder(). + setId(id). + setReplication((short)i). + build()); + waitForCachedBlocks(namenode, 4, 4*i); + checkNumCachedReplicas(dfs, paths, 4, 4*i); + } + // step it down + for (int i=2; i>=1; i--) { + dfs.modifyPathBasedCacheDirective( + new PathBasedCacheDirective.Builder(). + setId(id). + setReplication((short)i). + build()); + waitForCachedBlocks(namenode, 4, 4*i); + checkNumCachedReplicas(dfs, paths, 4, 4*i); + } + // remove and watch numCached go to 0 + dfs.removePathBasedCacheDirective(id); + waitForCachedBlocks(namenode, 0, 0); + checkNumCachedReplicas(dfs, paths, 0, 0); + } finally { + cluster.shutdown(); + } + } + @Test(timeout=60000) public void testListCachePoolPermissions() throws Exception { final UserGroupInformation myUser = UserGroupInformation