diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index e4783037e3f..ebe1d315a98 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -277,6 +277,9 @@ Trunk (Unreleased) HADOOP-9740. Fix FsShell '-text' command to be able to read Avro files stored in HDFS and other filesystems. (Allan Yan via cutting) + HDFS-5471. CacheAdmin -listPools fails when user lacks permissions to view + all pools (Andrew Wang via Colin Patrick McCabe) + OPTIMIZATIONS HADOOP-7761. Improve the performance of raw comparisons. (todd) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BlockLocation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BlockLocation.java index fa095343c51..286d8514d6f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BlockLocation.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BlockLocation.java @@ -31,17 +31,33 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceStability.Stable public class BlockLocation { private String[] hosts; // Datanode hostnames + private String[] cachedHosts; // Datanode hostnames with a cached replica private String[] names; // Datanode IP:xferPort for accessing the block private String[] topologyPaths; // Full path name in network topology private long offset; // Offset of the block in the file private long length; private boolean corrupt; + private static final String[] EMPTY_STR_ARRAY = new String[0]; + /** * Default Constructor */ public BlockLocation() { - this(new String[0], new String[0], 0L, 0L); + this(EMPTY_STR_ARRAY, EMPTY_STR_ARRAY, 0L, 0L); + } + + /** + * Copy constructor + */ + public BlockLocation(BlockLocation that) { + this.hosts = that.hosts; + this.cachedHosts = that.cachedHosts; + this.names = that.names; + this.topologyPaths = that.topologyPaths; + this.offset = that.offset; + this.length = that.length; + this.corrupt = that.corrupt; } /** @@ -57,20 +73,7 @@ public class BlockLocation { */ public BlockLocation(String[] names, String[] hosts, long offset, long length, boolean corrupt) { - if (names == null) { - this.names = new String[0]; - } else { - this.names = names; - } - if (hosts == null) { - this.hosts = new String[0]; - } else { - this.hosts = hosts; - } - this.offset = offset; - this.length = length; - this.topologyPaths = new String[0]; - this.corrupt = corrupt; + this(names, hosts, null, offset, length, corrupt); } /** @@ -87,34 +90,55 @@ public class BlockLocation { */ public BlockLocation(String[] names, String[] hosts, String[] topologyPaths, long offset, long length, boolean corrupt) { - this(names, hosts, offset, length, corrupt); + this(names, hosts, null, topologyPaths, offset, length, corrupt); + } + + public BlockLocation(String[] names, String[] hosts, String[] cachedHosts, + String[] topologyPaths, long offset, long length, boolean corrupt) { + if (names == null) { + this.names = EMPTY_STR_ARRAY; + } else { + this.names = names; + } + if (hosts == null) { + this.hosts = EMPTY_STR_ARRAY; + } else { + this.hosts = hosts; + } + if (cachedHosts == null) { + this.cachedHosts = EMPTY_STR_ARRAY; + } else { + this.cachedHosts = cachedHosts; + } if (topologyPaths == null) { - this.topologyPaths = new String[0]; + this.topologyPaths = EMPTY_STR_ARRAY; } else { this.topologyPaths = topologyPaths; } + this.offset = offset; + this.length = length; + this.corrupt = corrupt; } /** * Get the list of hosts (hostname) hosting this block */ public String[] getHosts() throws IOException { - if (hosts == null || hosts.length == 0) { - return new String[0]; - } else { - return hosts; - } + return hosts; + } + + /** + * Get the list of hosts (hostname) hosting a cached replica of the block + */ + public String[] getCachedHosts() { + return cachedHosts; } /** * Get the list of names (IP:xferPort) hosting this block */ public String[] getNames() throws IOException { - if (names == null || names.length == 0) { - return new String[0]; - } else { - return names; - } + return names; } /** @@ -122,11 +146,7 @@ public class BlockLocation { * The last component of the path is the "name" (IP:xferPort). */ public String[] getTopologyPaths() throws IOException { - if (topologyPaths == null || topologyPaths.length == 0) { - return new String[0]; - } else { - return topologyPaths; - } + return topologyPaths; } /** @@ -176,18 +196,29 @@ public class BlockLocation { */ public void setHosts(String[] hosts) throws IOException { if (hosts == null) { - this.hosts = new String[0]; + this.hosts = EMPTY_STR_ARRAY; } else { this.hosts = hosts; } } + /** + * Set the hosts hosting a cached replica of this block + */ + public void setCachedHosts(String[] cachedHosts) { + if (cachedHosts == null) { + this.cachedHosts = EMPTY_STR_ARRAY; + } else { + this.cachedHosts = cachedHosts; + } + } + /** * Set the names (host:port) hosting this block */ public void setNames(String[] names) throws IOException { if (names == null) { - this.names = new String[0]; + this.names = EMPTY_STR_ARRAY; } else { this.names = names; } @@ -198,7 +229,7 @@ public class BlockLocation { */ public void setTopologyPaths(String[] topologyPaths) throws IOException { if (topologyPaths == null) { - this.topologyPaths = new String[0]; + this.topologyPaths = EMPTY_STR_ARRAY; } else { this.topologyPaths = topologyPaths; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/IdNotFoundException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/InvalidRequestException.java similarity index 80% rename from hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/IdNotFoundException.java rename to hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/InvalidRequestException.java index 059c2d1d7f7..437276d5c10 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/IdNotFoundException.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/InvalidRequestException.java @@ -20,12 +20,13 @@ package org.apache.hadoop.fs; import java.io.IOException; /** - * Exception corresponding to ID not found - EINVAL + * Thrown when the user makes a malformed request, for example missing required + * parameters or parameters that are not valid. */ -public class IdNotFoundException extends IOException { +public class InvalidRequestException extends IOException { static final long serialVersionUID = 0L; - public IdNotFoundException(String str) { + public InvalidRequestException(String str) { super(str); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestBlockLocation.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestBlockLocation.java new file mode 100644 index 00000000000..3cb608a971f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestBlockLocation.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import org.junit.Test; + +public class TestBlockLocation { + + private static final String[] EMPTY_STR_ARRAY = new String[0]; + + private static void checkBlockLocation(final BlockLocation loc) + throws Exception { + checkBlockLocation(loc, 0, 0, false); + } + + private static void checkBlockLocation(final BlockLocation loc, + final long offset, final long length, final boolean corrupt) + throws Exception { + checkBlockLocation(loc, EMPTY_STR_ARRAY, EMPTY_STR_ARRAY, EMPTY_STR_ARRAY, + EMPTY_STR_ARRAY, offset, length, corrupt); + } + + private static void checkBlockLocation(final BlockLocation loc, + String[] names, String[] hosts, String[] cachedHosts, + String[] topologyPaths, final long offset, final long length, + final boolean corrupt) throws Exception { + assertNotNull(loc.getHosts()); + assertNotNull(loc.getCachedHosts()); + assertNotNull(loc.getNames()); + assertNotNull(loc.getTopologyPaths()); + + assertArrayEquals(hosts, loc.getHosts()); + assertArrayEquals(cachedHosts, loc.getCachedHosts()); + assertArrayEquals(names, loc.getNames()); + assertArrayEquals(topologyPaths, loc.getTopologyPaths()); + + assertEquals(offset, loc.getOffset()); + assertEquals(length, loc.getLength()); + assertEquals(corrupt, loc.isCorrupt()); + } + + /** + * Call all the constructors and verify the delegation is working properly + */ + @Test(timeout = 5000) + public void testBlockLocationConstructors() throws Exception { + // + BlockLocation loc; + loc = new BlockLocation(); + checkBlockLocation(loc); + loc = new BlockLocation(null, null, 1, 2); + checkBlockLocation(loc, 1, 2, false); + loc = new BlockLocation(null, null, null, 1, 2); + checkBlockLocation(loc, 1, 2, false); + loc = new BlockLocation(null, null, null, 1, 2, true); + checkBlockLocation(loc, 1, 2, true); + loc = new BlockLocation(null, null, null, null, 1, 2, true); + checkBlockLocation(loc, 1, 2, true); + } + + /** + * Call each of the setters and verify + */ + @Test(timeout = 5000) + public void testBlockLocationSetters() throws Exception { + BlockLocation loc; + loc = new BlockLocation(); + // Test that null sets the empty array + loc.setHosts(null); + loc.setCachedHosts(null); + loc.setNames(null); + loc.setTopologyPaths(null); + checkBlockLocation(loc); + // Test that not-null gets set properly + String[] names = new String[] { "name" }; + String[] hosts = new String[] { "host" }; + String[] cachedHosts = new String[] { "cachedHost" }; + String[] topologyPaths = new String[] { "path" }; + loc.setNames(names); + loc.setHosts(hosts); + loc.setCachedHosts(cachedHosts); + loc.setTopologyPaths(topologyPaths); + loc.setOffset(1); + loc.setLength(2); + loc.setCorrupt(true); + checkBlockLocation(loc, names, hosts, cachedHosts, topologyPaths, 1, 2, + true); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSKerberosAuthenticationHandler.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSKerberosAuthenticationHandler.java index db4cdeeadb8..e56f88b45a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSKerberosAuthenticationHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSKerberosAuthenticationHandler.java @@ -18,7 +18,6 @@ package org.apache.hadoop.fs.http.server; -import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.http.client.HttpFSFileSystem; import org.apache.hadoop.fs.http.client.HttpFSKerberosAuthenticator; @@ -36,6 +35,7 @@ import org.apache.hadoop.test.TestDir; import org.apache.hadoop.test.TestDirHelper; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSWithKerberos.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSWithKerberos.java index 140f8661a59..45ce8ed7302 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSWithKerberos.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSWithKerberos.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.fs.http.server; -import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.DelegationTokenRenewer; @@ -40,6 +39,7 @@ import org.apache.hadoop.test.TestJettyHelper; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.junit.After; +import org.junit.Assert; import org.junit.Test; import org.mortbay.jetty.Server; import org.mortbay.jetty.webapp.WebAppContext; diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/lib/service/security/TestDelegationTokenManagerService.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/lib/service/security/TestDelegationTokenManagerService.java index 480d04c8f42..c56e3c2d6bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/lib/service/security/TestDelegationTokenManagerService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/lib/service/security/TestDelegationTokenManagerService.java @@ -18,7 +18,6 @@ package org.apache.hadoop.lib.service.security; -import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.http.server.HttpFSServerWebApp; import org.apache.hadoop.lib.server.Server; @@ -30,6 +29,7 @@ import org.apache.hadoop.test.HTestCase; import org.apache.hadoop.test.TestDir; import org.apache.hadoop.test.TestDirHelper; import org.apache.hadoop.util.StringUtils; +import org.junit.Assert; import org.junit.Test; import java.net.InetAddress; diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java index c4451364564..1a808807f9a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java @@ -26,8 +26,6 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.concurrent.ConcurrentNavigableMap; -import junit.framework.Assert; - import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -49,6 +47,7 @@ import org.apache.hadoop.nfs.nfs3.response.CREATE3Response; import org.apache.hadoop.nfs.nfs3.response.READ3Response; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.SecurityHandler; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d61ef530991..bc333e6e9dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -190,6 +190,8 @@ Trunk (Unreleased) HDFS-5326. add modifyDirective to cacheAdmin. (cmccabe) + HDFS-5450. Better API for getting the cached blocks locations. (wang) + OPTIMIZATIONS HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) @@ -468,6 +470,9 @@ Release 2.3.0 - UNRELEASED HDFS-5467. Remove tab characters in hdfs-default.xml. (Shinichi Yamashita via Andrew Wang) + HDFS-5495. Remove further JUnit3 usages from HDFS. + (Jarek Jarcec Cecho via wang) + OPTIMIZATIONS HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) @@ -526,6 +531,9 @@ Release 2.3.0 - UNRELEASED HDFS-5488. Clean up TestHftpURLTimeout. (Haohui Mai via jing9) + HDFS-5425. Renaming underconstruction file with snapshots can make NN failure on + restart. (jing9 and Vinay) + Release 2.2.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java index f736d9637eb..0ccacda8d84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java @@ -37,8 +37,7 @@ public class HdfsBlockLocation extends BlockLocation { public HdfsBlockLocation(BlockLocation loc, LocatedBlock block) throws IOException { // Initialize with data from passed in BlockLocation - super(loc.getNames(), loc.getHosts(), loc.getTopologyPaths(), - loc.getOffset(), loc.getLength(), loc.isCorrupt()); + super(loc); this.block = block; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index b58b54d756c..5756151a10b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -436,7 +436,13 @@ public class DFSUtil { locations[hCnt].getNetworkLocation()); racks[hCnt] = node.toString(); } - blkLocations[idx] = new BlockLocation(xferAddrs, hosts, racks, + DatanodeInfo[] cachedLocations = blk.getCachedLocations(); + String[] cachedHosts = new String[cachedLocations.length]; + for (int i=0; i entries = entriesByPath.get(path); if (entries == null || !entries.remove(existing)) { - throw new IdNotFoundException("removeInternal: failed to locate entry " + + throw new InvalidRequestException("Failed to locate entry " + existing.getEntryId() + " by path " + existing.getPath()); } if (entries.size() == 0) { @@ -413,32 +433,17 @@ public final class CacheManager { throws IOException { assert namesystem.hasWriteLock(); try { - // Check for invalid IDs. - if (id <= 0) { - throw new IdNotFoundException("removeDirective " + id + ": invalid " + - "non-positive directive ID."); - } - // Find the entry. - PathBasedCacheEntry existing = entriesById.get(id); - if (existing == null) { - throw new IdNotFoundException("removeDirective " + id + - ": id not found."); - } - if ((pc != null) && - (!pc.checkPermission(existing.getPool(), FsAction.WRITE))) { - throw new AccessControlException("removeDirective " + id + - ": write permission denied on pool " + - existing.getPool().getPoolName()); - } + PathBasedCacheEntry existing = getById(id); + checkWritePermission(pc, existing.getPool()); removeInternal(existing); } catch (IOException e) { - LOG.warn("removeDirective " + id + " failed.", e); + LOG.warn("removeDirective of " + id + " failed: ", e); throw e; } if (monitor != null) { monitor.kick(); } - LOG.info("removeDirective " + id + ": succeeded."); + LOG.info("removeDirective of " + id + " successful."); } public BatchedListEntries @@ -449,18 +454,13 @@ public final class CacheManager { final int NUM_PRE_ALLOCATED_ENTRIES = 16; String filterPath = null; if (filter.getId() != null) { - throw new IOException("we currently don't support filtering by ID"); + throw new IOException("Filtering by ID is unsupported."); } if (filter.getPath() != null) { - filterPath = filter.getPath().toUri().getPath(); - if (!DFSUtil.isValidName(filterPath)) { - throw new IOException("listPathBasedCacheDirectives: invalid " + - "path name '" + filterPath + "'"); - } + filterPath = validatePath(filter); } if (filter.getReplication() != null) { - throw new IOException("we currently don't support filtering " + - "by replication"); + throw new IOException("Filtering by replication is unsupported."); } ArrayList replies = new ArrayList(NUM_PRE_ALLOCATED_ENTRIES); @@ -481,8 +481,15 @@ public final class CacheManager { !directive.getPath().toUri().getPath().equals(filterPath)) { continue; } - if ((pc == null) || - (pc.checkPermission(curEntry.getPool(), FsAction.READ))) { + boolean hasPermission = true; + if (pc != null) { + try { + pc.checkPermission(curEntry.getPool(), FsAction.READ); + } catch (AccessControlException e) { + hasPermission = false; + } + } + if (hasPermission) { replies.add(cur.getValue().toDirective()); numReplies++; } @@ -505,12 +512,13 @@ public final class CacheManager { String poolName = info.getPoolName(); CachePool pool = cachePools.get(poolName); if (pool != null) { - throw new IOException("cache pool " + poolName + " already exists."); + throw new InvalidRequestException("Cache pool " + poolName + + " already exists."); } pool = CachePool.createFromInfoAndDefaults(info); cachePools.put(pool.getPoolName(), pool); - LOG.info("created new cache pool " + pool); - return pool.getInfo(true); + LOG.info("Created new cache pool " + pool); + return pool.getInfo(null); } /** @@ -528,7 +536,8 @@ public final class CacheManager { String poolName = info.getPoolName(); CachePool pool = cachePools.get(poolName); if (pool == null) { - throw new IOException("cache pool " + poolName + " does not exist."); + throw new InvalidRequestException("Cache pool " + poolName + + " does not exist."); } StringBuilder bld = new StringBuilder(); String prefix = ""; @@ -575,7 +584,8 @@ public final class CacheManager { CachePoolInfo.validateName(poolName); CachePool pool = cachePools.remove(poolName); if (pool == null) { - throw new IOException("can't remove non-existent cache pool " + poolName); + throw new InvalidRequestException( + "Cannot remove non-existent cache pool " + poolName); } // Remove entries using this pool @@ -607,11 +617,7 @@ public final class CacheManager { if (numListed++ >= maxListCachePoolsResponses) { return new BatchedListEntries(results, true); } - if (pc == null) { - results.add(cur.getValue().getInfo(true)); - } else { - results.add(cur.getValue().getInfo(pc)); - } + results.add(cur.getValue().getInfo(pc)); } return new BatchedListEntries(results, false); } @@ -755,7 +761,7 @@ public final class CacheManager { Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); out.writeInt(cachePools.size()); for (CachePool pool: cachePools.values()) { - pool.getInfo(true).writeTo(out); + pool.getInfo(null).writeTo(out); counter.increment(); } prog.endStep(Phase.SAVING_CHECKPOINT, step); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java index 0bc5bb4c6a3..c8d4221c386 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import com.google.common.base.Preconditions; @@ -162,7 +163,7 @@ public final class CachePool { } /** - * Get information about this cache pool. + * Get either full or partial information about this CachePool. * * @param fullInfo * If true, only the name will be returned (i.e., what you @@ -170,7 +171,7 @@ public final class CachePool { * @return * Cache pool information. */ - public CachePoolInfo getInfo(boolean fullInfo) { + private CachePoolInfo getInfo(boolean fullInfo) { CachePoolInfo info = new CachePoolInfo(poolName); if (!fullInfo) { return info; @@ -181,8 +182,25 @@ public final class CachePool { setWeight(weight); } + /** + * Returns a CachePoolInfo describing this CachePool based on the permissions + * of the calling user. Unprivileged users will see only minimal descriptive + * information about the pool. + * + * @param pc Permission checker to be used to validate the user's permissions, + * or null + * @return CachePoolInfo describing this CachePool + */ public CachePoolInfo getInfo(FSPermissionChecker pc) { - return getInfo(pc.checkPermission(this, FsAction.READ)); + boolean hasPermission = true; + if (pc != null) { + try { + pc.checkPermission(this, FsAction.READ); + } catch (AccessControlException e) { + hasPermission = false; + } + } + return getInfo(hasPermission); } public String toString() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index be2aa1cb9a4..aaa82cfa870 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -30,7 +30,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -84,7 +83,6 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Co import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; import org.apache.hadoop.hdfs.util.ChunkedArrayList; import org.apache.hadoop.hdfs.util.Holder; -import org.apache.jasper.tagplugins.jstl.core.Remove; import com.google.common.base.Joiner; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java index 2746c7872b2..4cf5e5ac57d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java @@ -824,7 +824,12 @@ public class FSImageFormat { final INodesInPath iip = fsDir.getLastINodeInPath(path); INodeFile oldnode = INodeFile.valueOf(iip.getINode(0), path); cons.setLocalName(oldnode.getLocalNameBytes()); - cons.setParent(oldnode.getParent()); + INodeReference parentRef = oldnode.getParentReference(); + if (parentRef != null) { + cons.setParentReference(parentRef); + } else { + cons.setParent(oldnode.getParent()); + } if (oldnode instanceof INodeFileWithSnapshot) { cons = new INodeFileUnderConstructionWithSnapshot(cons, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index af7d87845ea..fddeb6f5ec3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2557,7 +2557,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, final INode[] inodes = analyzeFileState( src, fileId, clientName, previous, onRetryBlock).getINodes(); final INodeFileUnderConstruction pendingFile = - (INodeFileUnderConstruction) inodes[inodes.length - 1]; + (INodeFileUnderConstruction) inodes[inodes.length - 1].asFile(); if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) { // This is a retry. Just return the last block if having locations. @@ -2595,7 +2595,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, analyzeFileState(src, fileId, clientName, previous, onRetryBlock); INode[] inodes = inodesInPath.getINodes(); final INodeFileUnderConstruction pendingFile = - (INodeFileUnderConstruction) inodes[inodes.length - 1]; + (INodeFileUnderConstruction) inodes[inodes.length - 1].asFile(); if (onRetryBlock[0] != null) { if (onRetryBlock[0].getLocations().length > 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java index 644a6397f93..17b43239c83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java @@ -261,24 +261,27 @@ class FSPermissionChecker { * * @param pool CachePool being accessed * @param access type of action being performed on the cache pool - * @return if the pool can be accessed + * @throws AccessControlException if pool cannot be accessed */ - public boolean checkPermission(CachePool pool, FsAction access) { + public void checkPermission(CachePool pool, FsAction access) + throws AccessControlException { FsPermission mode = pool.getMode(); if (isSuperUser()) { - return true; + return; } if (user.equals(pool.getOwnerName()) && mode.getUserAction().implies(access)) { - return true; + return; } if (groups.contains(pool.getGroupName()) && mode.getGroupAction().implies(access)) { - return true; + return; } if (mode.getOtherAction().implies(access)) { - return true; + return; } - return false; + throw new AccessControlException("Permission denied while accessing pool " + + pool.getPoolName() + ": user " + user + " does not have " + + access.toString() + " permissions."); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java index b5f3caa96c9..dfbf22c7c05 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java @@ -595,7 +595,15 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota { public void replaceChild(final INode oldChild, final INode newChild, final INodeMap inodeMap) { super.replaceChild(oldChild, newChild, inodeMap); - diffs.replaceChild(ListType.CREATED, oldChild, newChild); + if (oldChild.getParentReference() != null && !newChild.isReference()) { + // oldChild is referred by a Reference node. Thus we are replacing the + // referred inode, e.g., + // INodeFileWithSnapshot -> INodeFileUnderConstructionWithSnapshot + // in this case, we do not need to update the diff list + return; + } else { + diffs.replaceChild(ListType.CREATED, oldChild, newChild); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java index e9c20e4199f..58fafdee07f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java @@ -395,6 +395,7 @@ public class JsonUtil { m.put("startOffset", locatedblock.getStartOffset()); m.put("block", toJsonMap(locatedblock.getBlock())); m.put("locations", toJsonArray(locatedblock.getLocations())); + m.put("cachedLocations", toJsonArray(locatedblock.getCachedLocations())); return m; } @@ -409,8 +410,11 @@ public class JsonUtil { (Object[])m.get("locations")); final long startOffset = (Long)m.get("startOffset"); final boolean isCorrupt = (Boolean)m.get("isCorrupt"); + final DatanodeInfo[] cachedLocations = toDatanodeInfoArray( + (Object[])m.get("cachedLocations")); - final LocatedBlock locatedblock = new LocatedBlock(b, locations, startOffset, isCorrupt); + final LocatedBlock locatedblock = new LocatedBlock(b, locations, + startOffset, isCorrupt, cachedLocations); locatedblock.setBlockToken(toBlockToken((Map)m.get("blockToken"))); return locatedblock; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto index 30bcd865159..3f58efec385 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto @@ -407,23 +407,23 @@ message ListPathBasedCacheDirectivesResponseProto { required bool hasMore = 2; } -message AddCachePoolRequestProto { - required string poolName = 1; +message CachePoolInfoProto { + optional string poolName = 1; optional string ownerName = 2; optional string groupName = 3; optional int32 mode = 4; optional int32 weight = 5; } +message AddCachePoolRequestProto { + required CachePoolInfoProto info = 1; +} + message AddCachePoolResponseProto { // void response } message ModifyCachePoolRequestProto { - required string poolName = 1; - optional string ownerName = 2; - optional string groupName = 3; - optional int32 mode = 4; - optional int32 weight = 5; + required CachePoolInfoProto info = 1; } message ModifyCachePoolResponseProto { // void response @@ -446,11 +446,7 @@ message ListCachePoolsResponseProto { } message ListCachePoolsResponseElementProto { - required string poolName = 1; - required string ownerName = 2; - required string groupName = 3; - required int32 mode = 4; - required int32 weight = 5; + required CachePoolInfoProto info = 1; } message GetFileLinkInfoRequestProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAbandonBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAbandonBlock.java index f5c3b4c1c1a..5370d792be4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAbandonBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAbandonBlock.java @@ -21,8 +21,6 @@ import static org.junit.Assert.fail; import java.io.IOException; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -33,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java index b140bae1109..837e5523210 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java @@ -22,8 +22,6 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; import java.net.InetSocketAddress; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -32,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.security.token.Token; +import org.junit.Assert; import org.junit.Test; import org.mockito.Matchers; import org.mockito.Mockito; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java index 3574323d545..08842b5618f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java @@ -23,8 +23,6 @@ import java.io.IOException; import java.io.OutputStream; import java.util.List; -import junit.framework.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -33,6 +31,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.util.ThreadUtil; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 86b732f992a..678a3b8d17e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -20,12 +20,11 @@ package org.apache.hadoop.hdfs; import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; -import junit.framework.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.internal.util.reflection.Whitebox; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java index 2d02f3bd172..7eef5383855 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java @@ -17,13 +17,13 @@ */ package org.apache.hadoop.hdfs; -import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.junit.Assert; import org.junit.Test; import java.io.FileInputStream; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java index 16c15345e32..53b47e7d840 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java @@ -32,8 +32,6 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; @@ -64,6 +62,7 @@ import org.apache.hadoop.test.GenericTestUtils.DelayAnswer; import org.apache.hadoop.util.Time; import org.apache.log4j.Level; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java index b2e42e6ef5f..5a2a52a7883 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java @@ -22,8 +22,6 @@ import java.util.Arrays; import java.util.Map; import java.util.TreeMap; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -39,6 +37,7 @@ import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheTracker; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index 7f5a9101b65..b0e907fc1f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -17,9 +17,9 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import static junit.framework.Assert.assertTrue; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; @@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock.Mlocker; import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; @@ -87,6 +88,8 @@ public class TestFsDatasetCache { private static DatanodeProtocolClientSideTranslatorPB spyNN; private static PageRounder rounder = new PageRounder(); + private Mlocker mlocker; + @Before public void setUp() throws Exception { assumeTrue(!Path.WINDOWS); @@ -110,6 +113,8 @@ public class TestFsDatasetCache { fsd = dn.getFSDataset(); spyNN = DataNodeTestUtils.spyOnBposToNN(dn, nn); + // Save the current mlocker and replace it at the end of the test + mlocker = MappableBlock.mlocker; } @After @@ -120,6 +125,8 @@ public class TestFsDatasetCache { if (cluster != null) { cluster.shutdown(); } + // Restore the original mlocker + MappableBlock.mlocker = mlocker; } private static void setHeartbeatResponse(DatanodeCommand[] cmds) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java index 7a9b67fd39b..143ff283367 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java @@ -20,8 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.IOException; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; @@ -34,12 +32,10 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; -import org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner; import static org.apache.hadoop.hdfs.server.datanode.DataBlockScanner.SLEEP_PERIOD_MS; import org.apache.log4j.Level; -import org.apache.log4j.Logger; +import org.junit.Assert; import org.junit.Test; -import org.junit.Ignore; import static org.junit.Assert.fail; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java index cfc9954b72d..9326b6ad25f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java @@ -23,12 +23,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_AVAILABLE_SPACE_ import java.util.ArrayList; import java.util.List; -import junit.framework.Assert; - import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.ReflectionUtils; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java index 1b7c535380d..890ccfb525c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java @@ -17,42 +17,47 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.fail; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; +import java.nio.MappedByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper; -import org.apache.hadoop.fs.IdNotFoundException; +import org.apache.hadoop.fs.InvalidRequestException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.security.AccessControlException; @@ -77,6 +82,15 @@ public class TestPathBasedCacheRequests { static private DistributedFileSystem dfs; static private NamenodeProtocols proto; + static { + MappableBlock.mlocker = new MappableBlock.Mlocker() { + @Override + public void mlock(MappedByteBuffer mmap, long length) throws IOException { + // Stubbed out for testing + } + }; + } + @Before public void setup() throws Exception { conf = new HdfsConfiguration(); @@ -187,15 +201,15 @@ public class TestPathBasedCacheRequests { fail("expected to get an exception when " + "removing a non-existent pool."); } catch (IOException ioe) { - GenericTestUtils.assertExceptionContains("can't remove " + + GenericTestUtils.assertExceptionContains("Cannot remove " + "non-existent cache pool", ioe); } try { dfs.removeCachePool(poolName); - Assert.fail("expected to get an exception when " + + fail("expected to get an exception when " + "removing a non-existent pool."); } catch (IOException ioe) { - GenericTestUtils.assertExceptionContains("can't remove " + + GenericTestUtils.assertExceptionContains("Cannot remove " + "non-existent cache pool", ioe); } try { @@ -272,18 +286,18 @@ public class TestPathBasedCacheRequests { try { proto.removeCachePool("pool99"); - Assert.fail("expected to get an exception when " + + fail("expected to get an exception when " + "removing a non-existent pool."); } catch (IOException ioe) { - GenericTestUtils.assertExceptionContains("can't remove non-existent", + GenericTestUtils.assertExceptionContains("Cannot remove non-existent", ioe); } try { proto.removeCachePool(poolName); - Assert.fail("expected to get an exception when " + + fail("expected to get an exception when " + "removing a non-existent pool."); } catch (IOException ioe) { - GenericTestUtils.assertExceptionContains("can't remove non-existent", + GenericTestUtils.assertExceptionContains("Cannot remove non-existent", ioe); } @@ -351,8 +365,8 @@ public class TestPathBasedCacheRequests { setPool("no_such_pool"). build()); fail("expected an error when adding to a non-existent pool."); - } catch (IdNotFoundException ioe) { - GenericTestUtils.assertExceptionContains("no such pool as", ioe); + } catch (InvalidRequestException ioe) { + GenericTestUtils.assertExceptionContains("Unknown pool", ioe); } try { @@ -364,7 +378,7 @@ public class TestPathBasedCacheRequests { "mode 0 (no permissions for anyone)."); } catch (AccessControlException e) { GenericTestUtils. - assertExceptionContains("permission denied for pool", e); + assertExceptionContains("Permission denied while accessing pool", e); } try { @@ -384,10 +398,10 @@ public class TestPathBasedCacheRequests { setReplication((short)1). setPool(""). build()); - Assert.fail("expected an error when adding a PathBasedCache " + + fail("expected an error when adding a PathBasedCache " + "directive with an empty pool name."); - } catch (IdNotFoundException e) { - GenericTestUtils.assertExceptionContains("pool name was empty", e); + } catch (InvalidRequestException e) { + GenericTestUtils.assertExceptionContains("Invalid empty pool name", e); } long deltaId = addAsUnprivileged(delta); @@ -405,7 +419,7 @@ public class TestPathBasedCacheRequests { validateListAll(iter, alphaId, alphaId2, betaId, deltaId, relativeId ); iter = dfs.listPathBasedCacheDirectives( new PathBasedCacheDirective.Builder().setPool("pool3").build()); - Assert.assertFalse(iter.hasNext()); + assertFalse(iter.hasNext()); iter = dfs.listPathBasedCacheDirectives( new PathBasedCacheDirective.Builder().setPool("pool1").build()); validateListAll(iter, alphaId, alphaId2, deltaId, relativeId ); @@ -416,27 +430,27 @@ public class TestPathBasedCacheRequests { dfs.removePathBasedCacheDirective(betaId); iter = dfs.listPathBasedCacheDirectives( new PathBasedCacheDirective.Builder().setPool("pool2").build()); - Assert.assertFalse(iter.hasNext()); + assertFalse(iter.hasNext()); try { dfs.removePathBasedCacheDirective(betaId); - Assert.fail("expected an error when removing a non-existent ID"); - } catch (IdNotFoundException e) { - GenericTestUtils.assertExceptionContains("id not found", e); + fail("expected an error when removing a non-existent ID"); + } catch (InvalidRequestException e) { + GenericTestUtils.assertExceptionContains("No directive with ID", e); } try { proto.removePathBasedCacheDirective(-42l); - Assert.fail("expected an error when removing a negative ID"); - } catch (IdNotFoundException e) { + fail("expected an error when removing a negative ID"); + } catch (InvalidRequestException e) { GenericTestUtils.assertExceptionContains( - "invalid non-positive directive ID", e); + "Invalid negative ID", e); } try { proto.removePathBasedCacheDirective(43l); - Assert.fail("expected an error when removing a non-existent ID"); - } catch (IdNotFoundException e) { - GenericTestUtils.assertExceptionContains("id not found", e); + fail("expected an error when removing a non-existent ID"); + } catch (InvalidRequestException e) { + GenericTestUtils.assertExceptionContains("No directive with ID", e); } dfs.removePathBasedCacheDirective(alphaId); @@ -529,6 +543,14 @@ public class TestPathBasedCacheRequests { assertFalse("Unexpected # of cache directives found", dit.hasNext()); } + /** + * Wait for the NameNode to have an expected number of cached blocks + * and replicas. + * @param nn NameNode + * @param expectedCachedBlocks + * @param expectedCachedReplicas + * @throws Exception + */ private static void waitForCachedBlocks(NameNode nn, final int expectedCachedBlocks, final int expectedCachedReplicas) throws Exception { @@ -569,6 +591,37 @@ public class TestPathBasedCacheRequests { }, 500, 60000); } + private static void checkNumCachedReplicas(final DistributedFileSystem dfs, + final List paths, final int expectedBlocks, + final int expectedReplicas) + throws Exception { + int numCachedBlocks = 0; + int numCachedReplicas = 0; + for (Path p: paths) { + final FileStatus f = dfs.getFileStatus(p); + final long len = f.getLen(); + final long blockSize = f.getBlockSize(); + // round it up to full blocks + final long numBlocks = (len + blockSize - 1) / blockSize; + BlockLocation[] locs = dfs.getFileBlockLocations(p, 0, len); + assertEquals("Unexpected number of block locations for path " + p, + numBlocks, locs.length); + for (BlockLocation l: locs) { + if (l.getCachedHosts().length > 0) { + numCachedBlocks++; + } + numCachedReplicas += l.getCachedHosts().length; + } + } + LOG.info("Found " + numCachedBlocks + " of " + expectedBlocks + " blocks"); + LOG.info("Found " + numCachedReplicas + " of " + expectedReplicas + + " replicas"); + assertEquals("Unexpected number of cached blocks", expectedBlocks, + numCachedBlocks); + assertEquals("Unexpected number of cached replicas", expectedReplicas, + numCachedReplicas); + } + private static final long BLOCK_SIZE = 512; private static final int NUM_DATANODES = 4; @@ -745,4 +798,110 @@ public class TestPathBasedCacheRequests { } } + /** + * Tests stepping the cache replication factor up and down, checking the + * number of cached replicas and blocks as well as the advertised locations. + * @throws Exception + */ + @Test(timeout=120000) + public void testReplicationFactor() throws Exception { + Assume.assumeTrue(canTestDatanodeCaching()); + HdfsConfiguration conf = createCachingConf(); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build(); + + try { + cluster.waitActive(); + DistributedFileSystem dfs = cluster.getFileSystem(); + NameNode namenode = cluster.getNameNode(); + // Create the pool + final String pool = "friendlyPool"; + dfs.addCachePool(new CachePoolInfo(pool)); + // Create some test files + final List paths = new LinkedList(); + paths.add(new Path("/foo/bar")); + paths.add(new Path("/foo/baz")); + paths.add(new Path("/foo2/bar2")); + paths.add(new Path("/foo2/baz2")); + dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault()); + dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault()); + final int numBlocksPerFile = 2; + for (Path path : paths) { + FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile, + (int)BLOCK_SIZE, (short)3, false); + } + waitForCachedBlocks(namenode, 0, 0); + checkNumCachedReplicas(dfs, paths, 0, 0); + // cache directory + long id = dfs.addPathBasedCacheDirective( + new PathBasedCacheDirective.Builder(). + setPath(new Path("/foo")). + setReplication((short)1). + setPool(pool). + build()); + waitForCachedBlocks(namenode, 4, 4); + checkNumCachedReplicas(dfs, paths, 4, 4); + // step up the replication factor + for (int i=2; i<=3; i++) { + dfs.modifyPathBasedCacheDirective( + new PathBasedCacheDirective.Builder(). + setId(id). + setReplication((short)i). + build()); + waitForCachedBlocks(namenode, 4, 4*i); + checkNumCachedReplicas(dfs, paths, 4, 4*i); + } + // step it down + for (int i=2; i>=1; i--) { + dfs.modifyPathBasedCacheDirective( + new PathBasedCacheDirective.Builder(). + setId(id). + setReplication((short)i). + build()); + waitForCachedBlocks(namenode, 4, 4*i); + checkNumCachedReplicas(dfs, paths, 4, 4*i); + } + // remove and watch numCached go to 0 + dfs.removePathBasedCacheDirective(id); + waitForCachedBlocks(namenode, 0, 0); + checkNumCachedReplicas(dfs, paths, 0, 0); + } finally { + cluster.shutdown(); + } + } + + @Test(timeout=60000) + public void testListCachePoolPermissions() throws Exception { + final UserGroupInformation myUser = UserGroupInformation + .createRemoteUser("myuser"); + final DistributedFileSystem myDfs = + (DistributedFileSystem)DFSTestUtil.getFileSystemAs(myUser, conf); + final String poolName = "poolparty"; + dfs.addCachePool(new CachePoolInfo(poolName) + .setMode(new FsPermission((short)0700))); + // Should only see partial info + RemoteIterator it = myDfs.listCachePools(); + CachePoolInfo info = it.next(); + assertFalse(it.hasNext()); + assertEquals("Expected pool name", poolName, info.getPoolName()); + assertNull("Unexpected owner name", info.getOwnerName()); + assertNull("Unexpected group name", info.getGroupName()); + assertNull("Unexpected mode", info.getMode()); + assertNull("Unexpected weight", info.getWeight()); + // Modify the pool so myuser is now the owner + dfs.modifyCachePool(new CachePoolInfo(poolName) + .setOwnerName(myUser.getShortUserName()) + .setWeight(99)); + // Should see full info + it = myDfs.listCachePools(); + info = it.next(); + assertFalse(it.hasNext()); + assertEquals("Expected pool name", poolName, info.getPoolName()); + assertEquals("Mismatched owner name", myUser.getShortUserName(), + info.getOwnerName()); + assertNotNull("Expected group name", info.getGroupName()); + assertEquals("Mismatched mode", (short) 0700, + info.getMode().toShort()); + assertEquals("Mismatched weight", 99, (int)info.getWeight()); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java index 0524ab28e4a..32e8a0a1ccb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.spy; import java.io.File; import java.io.IOException; +import java.util.EnumSet; import java.util.List; import java.util.Random; @@ -40,9 +41,12 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; @@ -102,6 +106,7 @@ public class TestRenameWithSnapshots { @Before public void setUp() throws Exception { + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL).format(true) .build(); cluster.waitActive(); @@ -2289,4 +2294,49 @@ public class TestRenameWithSnapshots { assertEquals(0, diff.getChildrenDiff().getList(ListType.DELETED).size()); assertEquals(0, diff.getChildrenDiff().getList(ListType.CREATED).size()); } + + /** + * Rename of the underconstruction file in snapshot should not fail NN restart + * after checkpoint. Unit test for HDFS-5425. + */ + @Test + public void testRenameUCFileInSnapshot() throws Exception { + final Path test = new Path("/test"); + final Path foo = new Path(test, "foo"); + final Path bar = new Path(foo, "bar"); + hdfs.mkdirs(foo); + // create a file and keep it as underconstruction. + hdfs.create(bar); + SnapshotTestHelper.createSnapshot(hdfs, test, "s0"); + // rename bar --> bar2 + final Path bar2 = new Path(foo, "bar2"); + hdfs.rename(bar, bar2); + + // save namespace and restart + restartClusterAndCheckImage(true); + } + + /** + * Similar with testRenameUCFileInSnapshot, but do renaming first and then + * append file without closing it. Unit test for HDFS-5425. + */ + @Test + public void testAppendFileAfterRenameInSnapshot() throws Exception { + final Path test = new Path("/test"); + final Path foo = new Path(test, "foo"); + final Path bar = new Path(foo, "bar"); + DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPL, SEED); + SnapshotTestHelper.createSnapshot(hdfs, test, "s0"); + // rename bar --> bar2 + final Path bar2 = new Path(foo, "bar2"); + hdfs.rename(bar, bar2); + // append file and keep it as underconstruction. + FSDataOutputStream out = hdfs.append(bar2); + out.writeByte(0); + ((DFSOutputStream) out.getWrappedStream()).hsync( + EnumSet.of(SyncFlag.UPDATE_LENGTH)); + + // save namespace and restart + restartClusterAndCheckImage(true); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestXMLUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestXMLUtils.java index 520107c0707..f3ab56c578c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestXMLUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestXMLUtils.java @@ -17,9 +17,8 @@ */ package org.apache.hadoop.hdfs.util; -import junit.framework.Assert; - import org.apache.hadoop.hdfs.util.XMLUtils.UnmanglingError; +import org.junit.Assert; import org.junit.Test; public class TestXMLUtils { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java index a8f8b2f7e2f..e8593b62163 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java @@ -22,9 +22,8 @@ import java.net.HttpURLConnection; import java.net.URL; import java.util.List; -import junit.framework.Assert; - import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; +import org.junit.Assert; import org.junit.Test; import com.google.common.collect.Lists; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java index af09595ff50..154014180a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java @@ -26,8 +26,6 @@ import static org.junit.Assert.fail; import java.util.HashMap; import java.util.Map; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -38,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 240a63a605b..6a0deb59f4f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -164,6 +164,9 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-4421. Run MapReduce framework via the distributed cache (jlowe) + MAPREDUCE-1176. FixedLengthInputFormat and FixedLengthRecordReader + (Mariappan Asokan and BitsOfInfo via Sandy Ryza) + OPTIMIZATIONS MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java new file mode 100644 index 00000000000..60ba69c5db8 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; + +/** + * FixedLengthInputFormat is an input format used to read input files + * which contain fixed length records. The content of a record need not be + * text. It can be arbitrary binary data. Users must configure the record + * length property by calling: + * FixedLengthInputFormat.setRecordLength(conf, recordLength);

or + * conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, recordLength); + *

+ * @see FixedLengthRecordReader + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class FixedLengthInputFormat + extends FileInputFormat + implements JobConfigurable { + + private CompressionCodecFactory compressionCodecs = null; + + public static final String FIXED_RECORD_LENGTH = + "fixedlengthinputformat.record.length"; + + /** + * Set the length of each record + * @param conf configuration + * @param recordLength the length of a record + */ + public static void setRecordLength(Configuration conf, int recordLength) { + conf.setInt(FIXED_RECORD_LENGTH, recordLength); + } + + /** + * Get record length value + * @param conf configuration + * @return the record length, zero means none was set + */ + public static int getRecordLength(Configuration conf) { + return conf.getInt(FIXED_RECORD_LENGTH, 0); + } + + @Override + public void configure(JobConf conf) { + compressionCodecs = new CompressionCodecFactory(conf); + } + + @Override + public RecordReader + getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter) + throws IOException { + reporter.setStatus(genericSplit.toString()); + int recordLength = getRecordLength(job); + if (recordLength <= 0) { + throw new IOException("Fixed record length " + recordLength + + " is invalid. It should be set to a value greater than zero"); + } + return new FixedLengthRecordReader(job, (FileSplit)genericSplit, + recordLength); + } + + @Override + protected boolean isSplitable(FileSystem fs, Path file) { + final CompressionCodec codec = compressionCodecs.getCodec(file); + return(null == codec); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java new file mode 100644 index 00000000000..6d85fda5c30 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; + +/** + * A reader to read fixed length records from a split. Record offset is + * returned as key and the record as bytes is returned in value. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class FixedLengthRecordReader + implements RecordReader { + + private int recordLength; + // Make use of the new API implementation to avoid code duplication. + private org.apache.hadoop.mapreduce.lib.input.FixedLengthRecordReader reader; + + public FixedLengthRecordReader(Configuration job, FileSplit split, + int recordLength) throws IOException { + this.recordLength = recordLength; + reader = new org.apache.hadoop.mapreduce.lib.input.FixedLengthRecordReader( + recordLength); + reader.initialize(job, split.getStart(), split.getLength(), + split.getPath()); + } + + @Override + public LongWritable createKey() { + return new LongWritable(); + } + + @Override + public BytesWritable createValue() { + return new BytesWritable(new byte[recordLength]); + } + + @Override + public synchronized boolean next(LongWritable key, BytesWritable value) + throws IOException { + boolean dataRead = reader.nextKeyValue(); + if (dataRead) { + LongWritable newKey = reader.getCurrentKey(); + BytesWritable newValue = reader.getCurrentValue(); + key.set(newKey.get()); + value.set(newValue); + } + return dataRead; + } + + @Override + public float getProgress() throws IOException { + return reader.getProgress(); + } + + @Override + public synchronized long getPos() throws IOException { + return reader.getPos(); + } + + @Override + public void close() throws IOException { + reader.close(); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java new file mode 100644 index 00000000000..9c32e83c5c3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * FixedLengthInputFormat is an input format used to read input files + * which contain fixed length records. The content of a record need not be + * text. It can be arbitrary binary data. Users must configure the record + * length property by calling: + * FixedLengthInputFormat.setRecordLength(conf, recordLength);

or + * conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, recordLength); + *

+ * @see FixedLengthRecordReader + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class FixedLengthInputFormat + extends FileInputFormat { + + public static final String FIXED_RECORD_LENGTH = + "fixedlengthinputformat.record.length"; + + /** + * Set the length of each record + * @param conf configuration + * @param recordLength the length of a record + */ + public static void setRecordLength(Configuration conf, int recordLength) { + conf.setInt(FIXED_RECORD_LENGTH, recordLength); + } + + /** + * Get record length value + * @param conf configuration + * @return the record length, zero means none was set + */ + public static int getRecordLength(Configuration conf) { + return conf.getInt(FIXED_RECORD_LENGTH, 0); + } + + @Override + public RecordReader + createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + int recordLength = getRecordLength(context.getConfiguration()); + if (recordLength <= 0) { + throw new IOException("Fixed record length " + recordLength + + " is invalid. It should be set to a value greater than zero"); + } + return new FixedLengthRecordReader(recordLength); + } + + @Override + protected boolean isSplitable(JobContext context, Path file) { + final CompressionCodec codec = + new CompressionCodecFactory(context.getConfiguration()).getCodec(file); + return (null == codec); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java new file mode 100644 index 00000000000..70fa7b2aec3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; + +/** + * A reader to read fixed length records from a split. Record offset is + * returned as key and the record as bytes is returned in value. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class FixedLengthRecordReader + extends RecordReader { + private static final Log LOG + = LogFactory.getLog(FixedLengthRecordReader.class); + + private int recordLength; + private long start; + private long pos; + private long end; + private long numRecordsRemainingInSplit; + private FSDataInputStream fileIn; + private Seekable filePosition; + private LongWritable key; + private BytesWritable value; + private boolean isCompressedInput; + private Decompressor decompressor; + private InputStream inputStream; + + public FixedLengthRecordReader(int recordLength) { + this.recordLength = recordLength; + } + + @Override + public void initialize(InputSplit genericSplit, + TaskAttemptContext context) throws IOException { + FileSplit split = (FileSplit) genericSplit; + Configuration job = context.getConfiguration(); + final Path file = split.getPath(); + initialize(job, split.getStart(), split.getLength(), file); + } + + // This is also called from the old FixedLengthRecordReader API implementation + public void initialize(Configuration job, long splitStart, long splitLength, + Path file) throws IOException { + start = splitStart; + end = start + splitLength; + long partialRecordLength = start % recordLength; + long numBytesToSkip = 0; + if (partialRecordLength != 0) { + numBytesToSkip = recordLength - partialRecordLength; + } + + // open the file and seek to the start of the split + final FileSystem fs = file.getFileSystem(job); + fileIn = fs.open(file); + + CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); + if (null != codec) { + isCompressedInput = true; + decompressor = CodecPool.getDecompressor(codec); + CompressionInputStream cIn + = codec.createInputStream(fileIn, decompressor); + filePosition = cIn; + inputStream = cIn; + numRecordsRemainingInSplit = Long.MAX_VALUE; + LOG.info( + "Compressed input; cannot compute number of records in the split"); + } else { + fileIn.seek(start); + filePosition = fileIn; + inputStream = fileIn; + long splitSize = end - start - numBytesToSkip; + numRecordsRemainingInSplit = (splitSize + recordLength - 1)/recordLength; + if (numRecordsRemainingInSplit < 0) { + numRecordsRemainingInSplit = 0; + } + LOG.info("Expecting " + numRecordsRemainingInSplit + + " records each with a length of " + recordLength + + " bytes in the split with an effective size of " + + splitSize + " bytes"); + } + if (numBytesToSkip != 0) { + start += inputStream.skip(numBytesToSkip); + } + this.pos = start; + } + + @Override + public synchronized boolean nextKeyValue() throws IOException { + if (key == null) { + key = new LongWritable(); + } + if (value == null) { + value = new BytesWritable(new byte[recordLength]); + } + boolean dataRead = false; + value.setSize(recordLength); + byte[] record = value.getBytes(); + if (numRecordsRemainingInSplit > 0) { + key.set(pos); + int offset = 0; + int numBytesToRead = recordLength; + int numBytesRead = 0; + while (numBytesToRead > 0) { + numBytesRead = inputStream.read(record, offset, numBytesToRead); + if (numBytesRead == -1) { + // EOF + break; + } + offset += numBytesRead; + numBytesToRead -= numBytesRead; + } + numBytesRead = recordLength - numBytesToRead; + pos += numBytesRead; + if (numBytesRead > 0) { + dataRead = true; + if (numBytesRead >= recordLength) { + if (!isCompressedInput) { + numRecordsRemainingInSplit--; + } + } else { + throw new IOException("Partial record(length = " + numBytesRead + + ") found at the end of split."); + } + } else { + numRecordsRemainingInSplit = 0L; // End of input. + } + } + return dataRead; + } + + @Override + public LongWritable getCurrentKey() { + return key; + } + + @Override + public BytesWritable getCurrentValue() { + return value; + } + + @Override + public synchronized float getProgress() throws IOException { + if (start == end) { + return 0.0f; + } else { + return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start)); + } + } + + @Override + public synchronized void close() throws IOException { + try { + if (inputStream != null) { + inputStream.close(); + inputStream = null; + } + } finally { + if (decompressor != null) { + CodecPool.returnDecompressor(decompressor); + decompressor = null; + } + } + } + + // This is called from the old FixedLengthRecordReader API implementation. + public long getPos() { + return pos; + } + + private long getFilePosition() throws IOException { + long retVal; + if (isCompressedInput && null != filePosition) { + retVal = filePosition.getPos(); + } else { + retVal = pos; + } + return retVal; + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java new file mode 100644 index 00000000000..0c614eaf0c7 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java @@ -0,0 +1,416 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.*; +import org.apache.hadoop.util.ReflectionUtils; + +import org.junit.BeforeClass; +import org.junit.Test; +import static junit.framework.Assert.*; + +public class TestFixedLengthInputFormat { + + private static Log LOG; + private static Configuration defaultConf; + private static FileSystem localFs; + private static Path workDir; + private static Reporter voidReporter; + + // some chars for the record data + private static char[] chars; + private static Random charRand; + + @BeforeClass + public static void onlyOnce() { + try { + LOG = LogFactory.getLog(TestFixedLengthInputFormat.class.getName()); + defaultConf = new Configuration(); + defaultConf.set("fs.defaultFS", "file:///"); + localFs = FileSystem.getLocal(defaultConf); + voidReporter = Reporter.NULL; + // our set of chars + chars = ("abcdefghijklmnopqrstuvABCDEFGHIJKLMN OPQRSTUVWXYZ1234567890)" + + "(*&^%$#@!-=> reader = + format.getRecordReader(split, job, voidReporter); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for not setting record length:", exceptionThrown); + } + + /** + * Test with record length set to 0 + */ + @Test (timeout=5000) + public void testZeroRecordLength() throws IOException { + localFs.delete(workDir, true); + Path file = new Path(workDir, new String("testFormat.txt")); + createFile(file, null, 10, 10); + // Set the fixed length record length config property + Configuration testConf = new Configuration(defaultConf); + JobConf job = new JobConf(testConf); + FileInputFormat.setInputPaths(job, workDir); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(job, 0); + format.configure(job); + InputSplit splits[] = format.getSplits(job, 1); + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + RecordReader reader = + format.getRecordReader(split, job, voidReporter); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for zero record length:", exceptionThrown); + } + + /** + * Test with record length set to a negative value + */ + @Test (timeout=5000) + public void testNegativeRecordLength() throws IOException { + localFs.delete(workDir, true); + Path file = new Path(workDir, new String("testFormat.txt")); + createFile(file, null, 10, 10); + // Set the fixed length record length config property + Configuration testConf = new Configuration(defaultConf); + JobConf job = new JobConf(testConf); + FileInputFormat.setInputPaths(job, workDir); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(job, -10); + format.configure(job); + InputSplit splits[] = format.getSplits(job, 1); + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + RecordReader reader = + format.getRecordReader(split, job, voidReporter); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for negative record length:", exceptionThrown); + } + + /** + * Test with partial record at the end of a compressed input file. + */ + @Test (timeout=5000) + public void testPartialRecordCompressedIn() throws IOException { + CompressionCodec gzip = new GzipCodec(); + runPartialRecordTest(gzip); + } + + /** + * Test with partial record at the end of an uncompressed input file. + */ + @Test (timeout=5000) + public void testPartialRecordUncompressedIn() throws IOException { + runPartialRecordTest(null); + } + + /** + * Test using the gzip codec with two input files. + */ + @Test (timeout=5000) + public void testGzipWithTwoInputs() throws IOException { + CompressionCodec gzip = new GzipCodec(); + localFs.delete(workDir, true); + // Create files with fixed length records with 5 byte long records. + writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, + "one two threefour five six seveneightnine ten "); + writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip, + "ten nine eightsevensix five four threetwo one "); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(defaultConf, 5); + JobConf job = new JobConf(defaultConf); + FileInputFormat.setInputPaths(job, workDir); + ReflectionUtils.setConf(gzip, job); + format.configure(job); + InputSplit[] splits = format.getSplits(job, 100); + assertEquals("compressed splits == 2", 2, splits.length); + FileSplit tmp = (FileSplit) splits[0]; + if (tmp.getPath().getName().equals("part2.txt.gz")) { + splits[0] = splits[1]; + splits[1] = tmp; + } + List results = readSplit(format, splits[0], job); + assertEquals("splits[0] length", 10, results.size()); + assertEquals("splits[0][5]", "six ", results.get(5)); + results = readSplit(format, splits[1], job); + assertEquals("splits[1] length", 10, results.size()); + assertEquals("splits[1][0]", "ten ", results.get(0)); + assertEquals("splits[1][1]", "nine ", results.get(1)); + } + + // Create a file containing fixed length records with random data + private ArrayList createFile(Path targetFile, CompressionCodec codec, + int recordLen, + int numRecords) throws IOException { + ArrayList recordList = new ArrayList(numRecords); + OutputStream ostream = localFs.create(targetFile); + if (codec != null) { + ostream = codec.createOutputStream(ostream); + } + Writer writer = new OutputStreamWriter(ostream); + try { + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < numRecords; i++) { + for (int j = 0; j < recordLen; j++) { + sb.append(chars[charRand.nextInt(chars.length)]); + } + String recordData = sb.toString(); + recordList.add(recordData); + writer.write(recordData); + sb.setLength(0); + } + } finally { + writer.close(); + } + return recordList; + } + + private void runRandomTests(CompressionCodec codec) throws IOException { + StringBuilder fileName = new StringBuilder("testFormat.txt"); + if (codec != null) { + fileName.append(".gz"); + } + localFs.delete(workDir, true); + Path file = new Path(workDir, fileName.toString()); + int seed = new Random().nextInt(); + LOG.info("Seed = " + seed); + Random random = new Random(seed); + int MAX_TESTS = 20; + LongWritable key = new LongWritable(); + BytesWritable value = new BytesWritable(); + + for (int i = 0; i < MAX_TESTS; i++) { + LOG.info("----------------------------------------------------------"); + // Maximum total records of 999 + int totalRecords = random.nextInt(999)+1; + // Test an empty file + if (i == 8) { + totalRecords = 0; + } + // Maximum bytes in a record of 100K + int recordLength = random.nextInt(1024*100)+1; + // For the 11th test, force a record length of 1 + if (i == 10) { + recordLength = 1; + } + // The total bytes in the test file + int fileSize = (totalRecords * recordLength); + LOG.info("totalRecords=" + totalRecords + " recordLength=" + + recordLength); + // Create the test file + ArrayList recordList + = createFile(file, codec, recordLength, totalRecords); + assertTrue(localFs.exists(file)); + // Set the fixed length record length config property + Configuration testConf = new Configuration(defaultConf); + FixedLengthInputFormat.setRecordLength(testConf, recordLength); + + int numSplits = 1; + // Arbitrarily set number of splits. + if (i > 0) { + if (i == (MAX_TESTS-1)) { + // Test a split size that is less than record len + numSplits = (int)(fileSize/Math.floor(recordLength/2)); + } else { + if (MAX_TESTS % i == 0) { + // Let us create a split size that is forced to be + // smaller than the end file itself, (ensures 1+ splits) + numSplits = fileSize/(fileSize - random.nextInt(fileSize)); + } else { + // Just pick a random split size with no upper bound + numSplits = Math.max(1, fileSize/random.nextInt(Integer.MAX_VALUE)); + } + } + LOG.info("Number of splits set to: " + numSplits); + } + + // Create the job, and setup the input path + JobConf job = new JobConf(testConf); + FileInputFormat.setInputPaths(job, workDir); + // Try splitting the file in a variety of sizes + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.configure(job); + InputSplit splits[] = format.getSplits(job, numSplits); + LOG.info("Actual number of splits = " + splits.length); + // Test combined split lengths = total file size + long recordOffset = 0; + int recordNumber = 0; + for (InputSplit split : splits) { + RecordReader reader = + format.getRecordReader(split, job, voidReporter); + Class clazz = reader.getClass(); + assertEquals("RecordReader class should be FixedLengthRecordReader:", + FixedLengthRecordReader.class, clazz); + // Plow through the records in this split + while (reader.next(key, value)) { + assertEquals("Checking key", (long)(recordNumber*recordLength), + key.get()); + String valueString = + new String(value.getBytes(), 0, value.getLength()); + assertEquals("Checking record length:", recordLength, + value.getLength()); + assertTrue("Checking for more records than expected:", + recordNumber < totalRecords); + String origRecord = recordList.get(recordNumber); + assertEquals("Checking record content:", origRecord, valueString); + recordNumber++; + } + reader.close(); + } + assertEquals("Total original records should be total read records:", + recordList.size(), recordNumber); + } + } + + private static void writeFile(FileSystem fs, Path name, + CompressionCodec codec, + String contents) throws IOException { + OutputStream stm; + if (codec == null) { + stm = fs.create(name); + } else { + stm = codec.createOutputStream(fs.create(name)); + } + stm.write(contents.getBytes()); + stm.close(); + } + + private static List readSplit(FixedLengthInputFormat format, + InputSplit split, + JobConf job) throws IOException { + List result = new ArrayList(); + RecordReader reader = + format.getRecordReader(split, job, voidReporter); + LongWritable key = reader.createKey(); + BytesWritable value = reader.createValue(); + while (reader.next(key, value)) { + result.add(new String(value.getBytes(), 0, value.getLength())); + } + reader.close(); + return result; + } + + private void runPartialRecordTest(CompressionCodec codec) throws IOException { + localFs.delete(workDir, true); + // Create a file with fixed length records with 5 byte long + // records with a partial record at the end. + StringBuilder fileName = new StringBuilder("testFormat.txt"); + if (codec != null) { + fileName.append(".gz"); + } + writeFile(localFs, new Path(workDir, fileName.toString()), codec, + "one two threefour five six seveneightnine ten"); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(defaultConf, 5); + JobConf job = new JobConf(defaultConf); + FileInputFormat.setInputPaths(job, workDir); + if (codec != null) { + ReflectionUtils.setConf(codec, job); + } + format.configure(job); + InputSplit[] splits = format.getSplits(job, 100); + if (codec != null) { + assertEquals("compressed splits == 1", 1, splits.length); + } + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + List results = readSplit(format, split, job); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for partial record:", exceptionThrown); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java new file mode 100644 index 00000000000..0c94a4d34ce --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java @@ -0,0 +1,461 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.*; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MapContext; +import org.apache.hadoop.mapreduce.MapReduceTestUtil; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.task.MapContextImpl; + +import org.apache.hadoop.util.ReflectionUtils; + +import org.junit.BeforeClass; +import org.junit.Test; +import static junit.framework.Assert.*; + +public class TestFixedLengthInputFormat { + + private static Log LOG; + private static Configuration defaultConf; + private static FileSystem localFs; + private static Path workDir; + + // some chars for the record data + private static char[] chars; + private static Random charRand; + + @BeforeClass + public static void onlyOnce() { + try { + LOG = LogFactory.getLog(TestFixedLengthInputFormat.class.getName()); + defaultConf = new Configuration(); + defaultConf.set("fs.defaultFS", "file:///"); + localFs = FileSystem.getLocal(defaultConf); + // our set of chars + chars = ("abcdefghijklmnopqrstuvABCDEFGHIJKLMN OPQRSTUVWXYZ1234567890)" + + "(*&^%$#@!-=> splits = format.getSplits(job); + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + TaskAttemptContext context = MapReduceTestUtil. + createDummyMapTaskAttemptContext(job.getConfiguration()); + RecordReader reader = + format.createRecordReader(split, context); + MapContext + mcontext = + new MapContextImpl(job.getConfiguration(), context.getTaskAttemptID(), + reader, null, null, MapReduceTestUtil.createDummyReporter(), split); + reader.initialize(split, mcontext); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for not setting record length:", exceptionThrown); + } + + /** + * Test with record length set to 0 + */ + @Test (timeout=5000) + public void testZeroRecordLength() throws Exception { + localFs.delete(workDir, true); + Path file = new Path(workDir, new String("testFormat.txt")); + createFile(file, null, 10, 10); + // Set the fixed length record length config property + Configuration testConf = new Configuration(defaultConf); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(testConf, 0); + Job job = Job.getInstance(testConf); + FileInputFormat.setInputPaths(job, workDir); + List splits = format.getSplits(job); + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + TaskAttemptContext context = + MapReduceTestUtil.createDummyMapTaskAttemptContext( + job.getConfiguration()); + RecordReader reader = + format.createRecordReader(split, context); + MapContext + mcontext = + new MapContextImpl(job.getConfiguration(), context.getTaskAttemptID(), + reader, null, null, MapReduceTestUtil.createDummyReporter(), split); + reader.initialize(split, mcontext); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for zero record length:", exceptionThrown); + } + + /** + * Test with record length set to a negative value + */ + @Test (timeout=5000) + public void testNegativeRecordLength() throws Exception { + localFs.delete(workDir, true); + Path file = new Path(workDir, new String("testFormat.txt")); + createFile(file, null, 10, 10); + // Set the fixed length record length config property + Configuration testConf = new Configuration(defaultConf); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(testConf, -10); + Job job = Job.getInstance(testConf); + FileInputFormat.setInputPaths(job, workDir); + List splits = format.getSplits(job); + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + TaskAttemptContext context = MapReduceTestUtil. + createDummyMapTaskAttemptContext(job.getConfiguration()); + RecordReader reader = + format.createRecordReader(split, context); + MapContext + mcontext = + new MapContextImpl(job.getConfiguration(), context.getTaskAttemptID(), + reader, null, null, MapReduceTestUtil.createDummyReporter(), split); + reader.initialize(split, mcontext); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for negative record length:", exceptionThrown); + } + + /** + * Test with partial record at the end of a compressed input file. + */ + @Test (timeout=5000) + public void testPartialRecordCompressedIn() throws Exception { + CompressionCodec gzip = new GzipCodec(); + runPartialRecordTest(gzip); + } + + /** + * Test with partial record at the end of an uncompressed input file. + */ + @Test (timeout=5000) + public void testPartialRecordUncompressedIn() throws Exception { + runPartialRecordTest(null); + } + + /** + * Test using the gzip codec with two input files. + */ + @Test (timeout=5000) + public void testGzipWithTwoInputs() throws Exception { + CompressionCodec gzip = new GzipCodec(); + localFs.delete(workDir, true); + // Create files with fixed length records with 5 byte long records. + writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, + "one two threefour five six seveneightnine ten "); + writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip, + "ten nine eightsevensix five four threetwo one "); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(defaultConf, 5); + ReflectionUtils.setConf(gzip, defaultConf); + Job job = Job.getInstance(defaultConf); + FileInputFormat.setInputPaths(job, workDir); + List splits = format.getSplits(job); + assertEquals("compressed splits == 2", 2, splits.size()); + FileSplit tmp = (FileSplit) splits.get(0); + if (tmp.getPath().getName().equals("part2.txt.gz")) { + splits.set(0, splits.get(1)); + splits.set(1, tmp); + } + List results = readSplit(format, splits.get(0), job); + assertEquals("splits[0] length", 10, results.size()); + assertEquals("splits[0][5]", "six ", results.get(5)); + results = readSplit(format, splits.get(1), job); + assertEquals("splits[1] length", 10, results.size()); + assertEquals("splits[1][0]", "ten ", results.get(0)); + assertEquals("splits[1][1]", "nine ", results.get(1)); + } + + // Create a file containing fixed length records with random data + private ArrayList createFile(Path targetFile, CompressionCodec codec, + int recordLen, + int numRecords) throws IOException { + ArrayList recordList = new ArrayList(numRecords); + OutputStream ostream = localFs.create(targetFile); + if (codec != null) { + ostream = codec.createOutputStream(ostream); + } + Writer writer = new OutputStreamWriter(ostream); + try { + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < numRecords; i++) { + for (int j = 0; j < recordLen; j++) { + sb.append(chars[charRand.nextInt(chars.length)]); + } + String recordData = sb.toString(); + recordList.add(recordData); + writer.write(recordData); + sb.setLength(0); + } + } finally { + writer.close(); + } + return recordList; + } + + private void runRandomTests(CompressionCodec codec) throws Exception { + StringBuilder fileName = new StringBuilder("testFormat.txt"); + if (codec != null) { + fileName.append(".gz"); + } + localFs.delete(workDir, true); + Path file = new Path(workDir, fileName.toString()); + int seed = new Random().nextInt(); + LOG.info("Seed = " + seed); + Random random = new Random(seed); + int MAX_TESTS = 20; + LongWritable key; + BytesWritable value; + + for (int i = 0; i < MAX_TESTS; i++) { + LOG.info("----------------------------------------------------------"); + // Maximum total records of 999 + int totalRecords = random.nextInt(999)+1; + // Test an empty file + if (i == 8) { + totalRecords = 0; + } + // Maximum bytes in a record of 100K + int recordLength = random.nextInt(1024*100)+1; + // For the 11th test, force a record length of 1 + if (i == 10) { + recordLength = 1; + } + // The total bytes in the test file + int fileSize = (totalRecords * recordLength); + LOG.info("totalRecords=" + totalRecords + " recordLength=" + + recordLength); + // Create the test file + ArrayList recordList = + createFile(file, codec, recordLength, totalRecords); + assertTrue(localFs.exists(file)); + // Set the fixed length record length config property + Configuration testConf = new Configuration(defaultConf); + FixedLengthInputFormat.setRecordLength(testConf, recordLength); + + int numSplits = 1; + // Arbitrarily set number of splits. + if (i > 0) { + if (i == (MAX_TESTS-1)) { + // Test a split size that is less than record len + numSplits = (int)(fileSize/Math.floor(recordLength/2)); + } else { + if (MAX_TESTS % i == 0) { + // Let us create a split size that is forced to be + // smaller than the end file itself, (ensures 1+ splits) + numSplits = fileSize/(fileSize - random.nextInt(fileSize)); + } else { + // Just pick a random split size with no upper bound + numSplits = Math.max(1, fileSize/random.nextInt(Integer.MAX_VALUE)); + } + } + LOG.info("Number of splits set to: " + numSplits); + } + testConf.setLong("mapreduce.input.fileinputformat.split.maxsize", + (long)(fileSize/numSplits)); + + // Create the job, and setup the input path + Job job = Job.getInstance(testConf); + FileInputFormat.setInputPaths(job, workDir); + // Try splitting the file in a variety of sizes + FixedLengthInputFormat format = new FixedLengthInputFormat(); + List splits = format.getSplits(job); + LOG.info("Actual number of splits = " + splits.size()); + // Test combined split lengths = total file size + long recordOffset = 0; + int recordNumber = 0; + for (InputSplit split : splits) { + TaskAttemptContext context = MapReduceTestUtil. + createDummyMapTaskAttemptContext(job.getConfiguration()); + RecordReader reader = + format.createRecordReader(split, context); + MapContext + mcontext = + new MapContextImpl(job.getConfiguration(), context.getTaskAttemptID(), + reader, null, null, MapReduceTestUtil.createDummyReporter(), split); + reader.initialize(split, mcontext); + Class clazz = reader.getClass(); + assertEquals("RecordReader class should be FixedLengthRecordReader:", + FixedLengthRecordReader.class, clazz); + // Plow through the records in this split + while (reader.nextKeyValue()) { + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + assertEquals("Checking key", (long)(recordNumber*recordLength), + key.get()); + String valueString = new String(value.getBytes(), 0, + value.getLength()); + assertEquals("Checking record length:", recordLength, + value.getLength()); + assertTrue("Checking for more records than expected:", + recordNumber < totalRecords); + String origRecord = recordList.get(recordNumber); + assertEquals("Checking record content:", origRecord, valueString); + recordNumber++; + } + reader.close(); + } + assertEquals("Total original records should be total read records:", + recordList.size(), recordNumber); + } + } + + private static void writeFile(FileSystem fs, Path name, + CompressionCodec codec, + String contents) throws IOException { + OutputStream stm; + if (codec == null) { + stm = fs.create(name); + } else { + stm = codec.createOutputStream(fs.create(name)); + } + stm.write(contents.getBytes()); + stm.close(); + } + + private static List readSplit(FixedLengthInputFormat format, + InputSplit split, + Job job) throws Exception { + List result = new ArrayList(); + TaskAttemptContext context = MapReduceTestUtil. + createDummyMapTaskAttemptContext(job.getConfiguration()); + RecordReader reader = + format.createRecordReader(split, context); + MapContext + mcontext = + new MapContextImpl(job.getConfiguration(), context.getTaskAttemptID(), + reader, null, null, MapReduceTestUtil.createDummyReporter(), split); + reader.initialize(split, mcontext); + LongWritable key; + BytesWritable value; + while (reader.nextKeyValue()) { + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + result.add(new String(value.getBytes(), 0, value.getLength())); + } + reader.close(); + return result; + } + + private void runPartialRecordTest(CompressionCodec codec) throws Exception { + localFs.delete(workDir, true); + // Create a file with fixed length records with 5 byte long + // records with a partial record at the end. + StringBuilder fileName = new StringBuilder("testFormat.txt"); + if (codec != null) { + fileName.append(".gz"); + ReflectionUtils.setConf(codec, defaultConf); + } + writeFile(localFs, new Path(workDir, fileName.toString()), codec, + "one two threefour five six seveneightnine ten"); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(defaultConf, 5); + Job job = Job.getInstance(defaultConf); + FileInputFormat.setInputPaths(job, workDir); + List splits = format.getSplits(job); + if (codec != null) { + assertEquals("compressed splits == 1", 1, splits.size()); + } + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + List results = readSplit(format, split, job); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for partial record:", exceptionThrown); + } + +} diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 1e982df92b1..19a59645810 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -124,6 +124,9 @@ Release 2.3.0 - UNRELEASED YARN-1395. Distributed shell application master launched with debug flag can hang waiting for external ls process. (cnauroth) + YARN-1400. yarn.cmd uses HADOOP_RESOURCEMANAGER_OPTS. Should be + YARN_RESOURCEMANAGER_OPTS. (Raja Aluri via cnauroth) + Release 2.2.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/bin/yarn.cmd b/hadoop-yarn-project/hadoop-yarn/bin/yarn.cmd index ee03b1d0d67..8afc6ea0564 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/yarn.cmd +++ b/hadoop-yarn-project/hadoop-yarn/bin/yarn.cmd @@ -180,7 +180,7 @@ goto :eof :resourcemanager set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR%\rm-config\log4j.properties set CLASS=org.apache.hadoop.yarn.server.resourcemanager.ResourceManager - set YARN_OPTS=%YARN_OPTS% %HADOOP_RESOURCEMANAGER_OPTS% + set YARN_OPTS=%YARN_OPTS% %YARN_RESOURCEMANAGER_OPTS% if defined YARN_RESOURCEMANAGER_HEAPSIZE ( set JAVA_HEAP_MAX=-Xmx%YARN_RESOURCEMANAGER_HEAPSIZE%m )