diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BlockLocation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BlockLocation.java index fa095343c51..286d8514d6f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BlockLocation.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BlockLocation.java @@ -31,17 +31,33 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceStability.Stable public class BlockLocation { private String[] hosts; // Datanode hostnames + private String[] cachedHosts; // Datanode hostnames with a cached replica private String[] names; // Datanode IP:xferPort for accessing the block private String[] topologyPaths; // Full path name in network topology private long offset; // Offset of the block in the file private long length; private boolean corrupt; + private static final String[] EMPTY_STR_ARRAY = new String[0]; + /** * Default Constructor */ public BlockLocation() { - this(new String[0], new String[0], 0L, 0L); + this(EMPTY_STR_ARRAY, EMPTY_STR_ARRAY, 0L, 0L); + } + + /** + * Copy constructor + */ + public BlockLocation(BlockLocation that) { + this.hosts = that.hosts; + this.cachedHosts = that.cachedHosts; + this.names = that.names; + this.topologyPaths = that.topologyPaths; + this.offset = that.offset; + this.length = that.length; + this.corrupt = that.corrupt; } /** @@ -57,20 +73,7 @@ public class BlockLocation { */ public BlockLocation(String[] names, String[] hosts, long offset, long length, boolean corrupt) { - if (names == null) { - this.names = new String[0]; - } else { - this.names = names; - } - if (hosts == null) { - this.hosts = new String[0]; - } else { - this.hosts = hosts; - } - this.offset = offset; - this.length = length; - this.topologyPaths = new String[0]; - this.corrupt = corrupt; + this(names, hosts, null, offset, length, corrupt); } /** @@ -87,34 +90,55 @@ public class BlockLocation { */ public BlockLocation(String[] names, String[] hosts, String[] topologyPaths, long offset, long length, boolean corrupt) { - this(names, hosts, offset, length, corrupt); + this(names, hosts, null, topologyPaths, offset, length, corrupt); + } + + public BlockLocation(String[] names, String[] hosts, String[] cachedHosts, + String[] topologyPaths, long offset, long length, boolean corrupt) { + if (names == null) { + this.names = EMPTY_STR_ARRAY; + } else { + this.names = names; + } + if (hosts == null) { + this.hosts = EMPTY_STR_ARRAY; + } else { + this.hosts = hosts; + } + if (cachedHosts == null) { + this.cachedHosts = EMPTY_STR_ARRAY; + } else { + this.cachedHosts = cachedHosts; + } if (topologyPaths == null) { - this.topologyPaths = new String[0]; + this.topologyPaths = EMPTY_STR_ARRAY; } else { this.topologyPaths = topologyPaths; } + this.offset = offset; + this.length = length; + this.corrupt = corrupt; } /** * Get the list of hosts (hostname) hosting this block */ public String[] getHosts() throws IOException { - if (hosts == null || hosts.length == 0) { - return new String[0]; - } else { - return hosts; - } + return hosts; + } + + /** + * Get the list of hosts (hostname) hosting a cached replica of the block + */ + public String[] getCachedHosts() { + return cachedHosts; } /** * Get the list of names (IP:xferPort) hosting this block */ public String[] getNames() throws IOException { - if (names == null || names.length == 0) { - return new String[0]; - } else { - return names; - } + return names; } /** @@ -122,11 +146,7 @@ public class BlockLocation { * The last component of the path is the "name" (IP:xferPort). */ public String[] getTopologyPaths() throws IOException { - if (topologyPaths == null || topologyPaths.length == 0) { - return new String[0]; - } else { - return topologyPaths; - } + return topologyPaths; } /** @@ -176,18 +196,29 @@ public class BlockLocation { */ public void setHosts(String[] hosts) throws IOException { if (hosts == null) { - this.hosts = new String[0]; + this.hosts = EMPTY_STR_ARRAY; } else { this.hosts = hosts; } } + /** + * Set the hosts hosting a cached replica of this block + */ + public void setCachedHosts(String[] cachedHosts) { + if (cachedHosts == null) { + this.cachedHosts = EMPTY_STR_ARRAY; + } else { + this.cachedHosts = cachedHosts; + } + } + /** * Set the names (host:port) hosting this block */ public void setNames(String[] names) throws IOException { if (names == null) { - this.names = new String[0]; + this.names = EMPTY_STR_ARRAY; } else { this.names = names; } @@ -198,7 +229,7 @@ public class BlockLocation { */ public void setTopologyPaths(String[] topologyPaths) throws IOException { if (topologyPaths == null) { - this.topologyPaths = new String[0]; + this.topologyPaths = EMPTY_STR_ARRAY; } else { this.topologyPaths = topologyPaths; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestBlockLocation.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestBlockLocation.java new file mode 100644 index 00000000000..3cb608a971f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestBlockLocation.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import org.junit.Test; + +public class TestBlockLocation { + + private static final String[] EMPTY_STR_ARRAY = new String[0]; + + private static void checkBlockLocation(final BlockLocation loc) + throws Exception { + checkBlockLocation(loc, 0, 0, false); + } + + private static void checkBlockLocation(final BlockLocation loc, + final long offset, final long length, final boolean corrupt) + throws Exception { + checkBlockLocation(loc, EMPTY_STR_ARRAY, EMPTY_STR_ARRAY, EMPTY_STR_ARRAY, + EMPTY_STR_ARRAY, offset, length, corrupt); + } + + private static void checkBlockLocation(final BlockLocation loc, + String[] names, String[] hosts, String[] cachedHosts, + String[] topologyPaths, final long offset, final long length, + final boolean corrupt) throws Exception { + assertNotNull(loc.getHosts()); + assertNotNull(loc.getCachedHosts()); + assertNotNull(loc.getNames()); + assertNotNull(loc.getTopologyPaths()); + + assertArrayEquals(hosts, loc.getHosts()); + assertArrayEquals(cachedHosts, loc.getCachedHosts()); + assertArrayEquals(names, loc.getNames()); + assertArrayEquals(topologyPaths, loc.getTopologyPaths()); + + assertEquals(offset, loc.getOffset()); + assertEquals(length, loc.getLength()); + assertEquals(corrupt, loc.isCorrupt()); + } + + /** + * Call all the constructors and verify the delegation is working properly + */ + @Test(timeout = 5000) + public void testBlockLocationConstructors() throws Exception { + // + BlockLocation loc; + loc = new BlockLocation(); + checkBlockLocation(loc); + loc = new BlockLocation(null, null, 1, 2); + checkBlockLocation(loc, 1, 2, false); + loc = new BlockLocation(null, null, null, 1, 2); + checkBlockLocation(loc, 1, 2, false); + loc = new BlockLocation(null, null, null, 1, 2, true); + checkBlockLocation(loc, 1, 2, true); + loc = new BlockLocation(null, null, null, null, 1, 2, true); + checkBlockLocation(loc, 1, 2, true); + } + + /** + * Call each of the setters and verify + */ + @Test(timeout = 5000) + public void testBlockLocationSetters() throws Exception { + BlockLocation loc; + loc = new BlockLocation(); + // Test that null sets the empty array + loc.setHosts(null); + loc.setCachedHosts(null); + loc.setNames(null); + loc.setTopologyPaths(null); + checkBlockLocation(loc); + // Test that not-null gets set properly + String[] names = new String[] { "name" }; + String[] hosts = new String[] { "host" }; + String[] cachedHosts = new String[] { "cachedHost" }; + String[] topologyPaths = new String[] { "path" }; + loc.setNames(names); + loc.setHosts(hosts); + loc.setCachedHosts(cachedHosts); + loc.setTopologyPaths(topologyPaths); + loc.setOffset(1); + loc.setLength(2); + loc.setCorrupt(true); + checkBlockLocation(loc, names, hosts, cachedHosts, topologyPaths, 1, 2, + true); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ad74d142de2..bc333e6e9dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -190,6 +190,8 @@ Trunk (Unreleased) HDFS-5326. add modifyDirective to cacheAdmin. (cmccabe) + HDFS-5450. Better API for getting the cached blocks locations. (wang) + OPTIMIZATIONS HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java index f736d9637eb..0ccacda8d84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java @@ -37,8 +37,7 @@ public class HdfsBlockLocation extends BlockLocation { public HdfsBlockLocation(BlockLocation loc, LocatedBlock block) throws IOException { // Initialize with data from passed in BlockLocation - super(loc.getNames(), loc.getHosts(), loc.getTopologyPaths(), - loc.getOffset(), loc.getLength(), loc.isCorrupt()); + super(loc); this.block = block; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 27c0059b791..a4b6810985e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -419,7 +419,13 @@ public class DFSUtil { locations[hCnt].getNetworkLocation()); racks[hCnt] = node.toString(); } - blkLocations[idx] = new BlockLocation(xferAddrs, hosts, racks, + DatanodeInfo[] cachedLocations = blk.getCachedLocations(); + String[] cachedHosts = new String[cachedLocations.length]; + for (int i=0; i)m.get("blockToken"))); return locatedblock; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index ccc829cf683..b0e907fc1f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock.Mlocker; import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; @@ -87,6 +88,8 @@ public class TestFsDatasetCache { private static DatanodeProtocolClientSideTranslatorPB spyNN; private static PageRounder rounder = new PageRounder(); + private Mlocker mlocker; + @Before public void setUp() throws Exception { assumeTrue(!Path.WINDOWS); @@ -110,6 +113,8 @@ public class TestFsDatasetCache { fsd = dn.getFSDataset(); spyNN = DataNodeTestUtils.spyOnBposToNN(dn, nn); + // Save the current mlocker and replace it at the end of the test + mlocker = MappableBlock.mlocker; } @After @@ -120,6 +125,8 @@ public class TestFsDatasetCache { if (cluster != null) { cluster.shutdown(); } + // Restore the original mlocker + MappableBlock.mlocker = mlocker; } private static void setHeartbeatResponse(DatanodeCommand[] cmds) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java index a62a0d73c49..890ccfb525c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java @@ -31,6 +31,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.nio.MappedByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Iterator; @@ -40,6 +41,8 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.InvalidRequestException; @@ -54,6 +57,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.security.AccessControlException; @@ -78,6 +82,15 @@ public class TestPathBasedCacheRequests { static private DistributedFileSystem dfs; static private NamenodeProtocols proto; + static { + MappableBlock.mlocker = new MappableBlock.Mlocker() { + @Override + public void mlock(MappedByteBuffer mmap, long length) throws IOException { + // Stubbed out for testing + } + }; + } + @Before public void setup() throws Exception { conf = new HdfsConfiguration(); @@ -530,6 +543,14 @@ public class TestPathBasedCacheRequests { assertFalse("Unexpected # of cache directives found", dit.hasNext()); } + /** + * Wait for the NameNode to have an expected number of cached blocks + * and replicas. + * @param nn NameNode + * @param expectedCachedBlocks + * @param expectedCachedReplicas + * @throws Exception + */ private static void waitForCachedBlocks(NameNode nn, final int expectedCachedBlocks, final int expectedCachedReplicas) throws Exception { @@ -570,6 +591,37 @@ public class TestPathBasedCacheRequests { }, 500, 60000); } + private static void checkNumCachedReplicas(final DistributedFileSystem dfs, + final List paths, final int expectedBlocks, + final int expectedReplicas) + throws Exception { + int numCachedBlocks = 0; + int numCachedReplicas = 0; + for (Path p: paths) { + final FileStatus f = dfs.getFileStatus(p); + final long len = f.getLen(); + final long blockSize = f.getBlockSize(); + // round it up to full blocks + final long numBlocks = (len + blockSize - 1) / blockSize; + BlockLocation[] locs = dfs.getFileBlockLocations(p, 0, len); + assertEquals("Unexpected number of block locations for path " + p, + numBlocks, locs.length); + for (BlockLocation l: locs) { + if (l.getCachedHosts().length > 0) { + numCachedBlocks++; + } + numCachedReplicas += l.getCachedHosts().length; + } + } + LOG.info("Found " + numCachedBlocks + " of " + expectedBlocks + " blocks"); + LOG.info("Found " + numCachedReplicas + " of " + expectedReplicas + + " replicas"); + assertEquals("Unexpected number of cached blocks", expectedBlocks, + numCachedBlocks); + assertEquals("Unexpected number of cached replicas", expectedReplicas, + numCachedReplicas); + } + private static final long BLOCK_SIZE = 512; private static final int NUM_DATANODES = 4; @@ -746,6 +798,78 @@ public class TestPathBasedCacheRequests { } } + /** + * Tests stepping the cache replication factor up and down, checking the + * number of cached replicas and blocks as well as the advertised locations. + * @throws Exception + */ + @Test(timeout=120000) + public void testReplicationFactor() throws Exception { + Assume.assumeTrue(canTestDatanodeCaching()); + HdfsConfiguration conf = createCachingConf(); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build(); + + try { + cluster.waitActive(); + DistributedFileSystem dfs = cluster.getFileSystem(); + NameNode namenode = cluster.getNameNode(); + // Create the pool + final String pool = "friendlyPool"; + dfs.addCachePool(new CachePoolInfo(pool)); + // Create some test files + final List paths = new LinkedList(); + paths.add(new Path("/foo/bar")); + paths.add(new Path("/foo/baz")); + paths.add(new Path("/foo2/bar2")); + paths.add(new Path("/foo2/baz2")); + dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault()); + dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault()); + final int numBlocksPerFile = 2; + for (Path path : paths) { + FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile, + (int)BLOCK_SIZE, (short)3, false); + } + waitForCachedBlocks(namenode, 0, 0); + checkNumCachedReplicas(dfs, paths, 0, 0); + // cache directory + long id = dfs.addPathBasedCacheDirective( + new PathBasedCacheDirective.Builder(). + setPath(new Path("/foo")). + setReplication((short)1). + setPool(pool). + build()); + waitForCachedBlocks(namenode, 4, 4); + checkNumCachedReplicas(dfs, paths, 4, 4); + // step up the replication factor + for (int i=2; i<=3; i++) { + dfs.modifyPathBasedCacheDirective( + new PathBasedCacheDirective.Builder(). + setId(id). + setReplication((short)i). + build()); + waitForCachedBlocks(namenode, 4, 4*i); + checkNumCachedReplicas(dfs, paths, 4, 4*i); + } + // step it down + for (int i=2; i>=1; i--) { + dfs.modifyPathBasedCacheDirective( + new PathBasedCacheDirective.Builder(). + setId(id). + setReplication((short)i). + build()); + waitForCachedBlocks(namenode, 4, 4*i); + checkNumCachedReplicas(dfs, paths, 4, 4*i); + } + // remove and watch numCached go to 0 + dfs.removePathBasedCacheDirective(id); + waitForCachedBlocks(namenode, 0, 0); + checkNumCachedReplicas(dfs, paths, 0, 0); + } finally { + cluster.shutdown(); + } + } + @Test(timeout=60000) public void testListCachePoolPermissions() throws Exception { final UserGroupInformation myUser = UserGroupInformation