diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java index 2bc8faaff0a..6fec97706ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -72,6 +71,7 @@ public class ProvidedStorageMap { private final DatanodeStorageInfo providedStorageInfo; private boolean providedEnabled; private long capacity; + private int defaultReplication; ProvidedStorageMap(RwLock lock, BlockManager bm, Configuration conf) throws IOException { @@ -95,6 +95,8 @@ public class ProvidedStorageMap { storageId, State.NORMAL, StorageType.PROVIDED); providedDescriptor = new ProvidedDescriptor(); providedStorageInfo = providedDescriptor.createProvidedStorage(ds); + this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, + DFSConfigKeys.DFS_REPLICATION_DEFAULT); this.bm = bm; this.lock = lock; @@ -198,63 +200,72 @@ public class ProvidedStorageMap { */ class ProvidedBlocksBuilder extends LocatedBlockBuilder { - private ShadowDatanodeInfoWithStorage pending; - private boolean hasProvidedLocations; - ProvidedBlocksBuilder(int maxBlocks) { super(maxBlocks); - pending = new ShadowDatanodeInfoWithStorage( - providedDescriptor, storageId); - hasProvidedLocations = false; + } + + private DatanodeDescriptor chooseProvidedDatanode( + Set excludedUUids) { + DatanodeDescriptor dn = providedDescriptor.choose(null, excludedUUids); + if (dn == null) { + dn = providedDescriptor.choose(null); + } + return dn; } @Override LocatedBlock newLocatedBlock(ExtendedBlock eb, DatanodeStorageInfo[] storages, long pos, boolean isCorrupt) { - DatanodeInfoWithStorage[] locs = - new DatanodeInfoWithStorage[storages.length]; - String[] sids = new String[storages.length]; - StorageType[] types = new StorageType[storages.length]; + List locs = new ArrayList<>(); + List sids = new ArrayList<>(); + List types = new ArrayList<>(); + boolean isProvidedBlock = false; + Set excludedUUids = new HashSet<>(); + for (int i = 0; i < storages.length; ++i) { - sids[i] = storages[i].getStorageID(); - types[i] = storages[i].getStorageType(); - if (StorageType.PROVIDED.equals(storages[i].getStorageType())) { - locs[i] = pending; - hasProvidedLocations = true; + DatanodeStorageInfo currInfo = storages[i]; + StorageType storageType = currInfo.getStorageType(); + sids.add(currInfo.getStorageID()); + types.add(storageType); + if (StorageType.PROVIDED.equals(storageType)) { + DatanodeDescriptor dn = chooseProvidedDatanode(excludedUUids); + locs.add( + new DatanodeInfoWithStorage( + dn, currInfo.getStorageID(), currInfo.getStorageType())); + excludedUUids.add(dn.getDatanodeUuid()); + isProvidedBlock = true; } else { - locs[i] = new DatanodeInfoWithStorage( - storages[i].getDatanodeDescriptor(), sids[i], types[i]); + locs.add(new DatanodeInfoWithStorage( + currInfo.getDatanodeDescriptor(), + currInfo.getStorageID(), storageType)); + excludedUUids.add(currInfo.getDatanodeDescriptor().getDatanodeUuid()); } } - return new LocatedBlock(eb, locs, sids, types, pos, isCorrupt, null); + + int numLocations = locs.size(); + if (isProvidedBlock) { + // add more replicas until we reach the defaultReplication + for (int count = numLocations + 1; + count <= defaultReplication && count <= providedDescriptor + .activeProvidedDatanodes(); count++) { + DatanodeDescriptor dn = chooseProvidedDatanode(excludedUUids); + locs.add(new DatanodeInfoWithStorage( + dn, storageId, StorageType.PROVIDED)); + sids.add(storageId); + types.add(StorageType.PROVIDED); + excludedUUids.add(dn.getDatanodeUuid()); + } + } + return new LocatedBlock(eb, + locs.toArray(new DatanodeInfoWithStorage[locs.size()]), + sids.toArray(new String[sids.size()]), + types.toArray(new StorageType[types.size()]), + pos, isCorrupt, null); } @Override LocatedBlocks build(DatanodeDescriptor client) { - // TODO: to support multiple provided storages, need to pass/maintain map - if (hasProvidedLocations) { - // set all fields of pending DatanodeInfo - List excludedUUids = new ArrayList(); - for (LocatedBlock b : blocks) { - DatanodeInfo[] infos = b.getLocations(); - StorageType[] types = b.getStorageTypes(); - - for (int i = 0; i < types.length; i++) { - if (!StorageType.PROVIDED.equals(types[i])) { - excludedUUids.add(infos[i].getDatanodeUuid()); - } - } - } - - DatanodeDescriptor dn = - providedDescriptor.choose(client, excludedUUids); - if (dn == null) { - dn = providedDescriptor.choose(client); - } - pending.replaceInternal(dn); - } - return new LocatedBlocks( flen, isUC, blocks, last, lastComplete, feInfo, ecPolicy); } @@ -265,53 +276,6 @@ public class ProvidedStorageMap { } } - /** - * An abstract {@link DatanodeInfoWithStorage} to represent provided storage. - */ - static class ShadowDatanodeInfoWithStorage extends DatanodeInfoWithStorage { - private String shadowUuid; - - ShadowDatanodeInfoWithStorage(DatanodeDescriptor d, String storageId) { - super(d, storageId, StorageType.PROVIDED); - } - - @Override - public String getDatanodeUuid() { - return shadowUuid; - } - - public void setDatanodeUuid(String uuid) { - shadowUuid = uuid; - } - - void replaceInternal(DatanodeDescriptor dn) { - updateRegInfo(dn); // overwrite DatanodeID (except UUID) - setDatanodeUuid(dn.getDatanodeUuid()); - setCapacity(dn.getCapacity()); - setDfsUsed(dn.getDfsUsed()); - setRemaining(dn.getRemaining()); - setBlockPoolUsed(dn.getBlockPoolUsed()); - setCacheCapacity(dn.getCacheCapacity()); - setCacheUsed(dn.getCacheUsed()); - setLastUpdate(dn.getLastUpdate()); - setLastUpdateMonotonic(dn.getLastUpdateMonotonic()); - setXceiverCount(dn.getXceiverCount()); - setNetworkLocation(dn.getNetworkLocation()); - adminState = dn.getAdminState(); - setUpgradeDomain(dn.getUpgradeDomain()); - } - - @Override - public boolean equals(Object obj) { - return super.equals(obj); - } - - @Override - public int hashCode() { - return super.hashCode(); - } - } - /** * An abstract DatanodeDescriptor to track datanodes with provided storages. * NOTE: never resolved through registerDatanode, so not in the topology. @@ -336,6 +300,7 @@ public class ProvidedStorageMap { DatanodeStorageInfo getProvidedStorage( DatanodeDescriptor dn, DatanodeStorage s) { + LOG.info("XXXXX adding Datanode " + dn.getDatanodeUuid()); dns.put(dn.getDatanodeUuid(), dn); // TODO: maintain separate RPC ident per dn return storageMap.get(s.getStorageID()); @@ -352,7 +317,7 @@ public class ProvidedStorageMap { DatanodeDescriptor choose(DatanodeDescriptor client) { // exact match for now DatanodeDescriptor dn = client != null ? - dns.get(client.getDatanodeUuid()) : null; + dns.get(client.getDatanodeUuid()) : null; if (null == dn) { dn = chooseRandom(); } @@ -360,10 +325,10 @@ public class ProvidedStorageMap { } DatanodeDescriptor choose(DatanodeDescriptor client, - List excludedUUids) { + Set excludedUUids) { // exact match for now DatanodeDescriptor dn = client != null ? - dns.get(client.getDatanodeUuid()) : null; + dns.get(client.getDatanodeUuid()) : null; if (null == dn || excludedUUids.contains(client.getDatanodeUuid())) { dn = null; diff --git a/hadoop-tools/hadoop-fs2img/src/main/java/org/apache/hadoop/hdfs/server/namenode/FixedBlockResolver.java b/hadoop-tools/hadoop-fs2img/src/main/java/org/apache/hadoop/hdfs/server/namenode/FixedBlockResolver.java index 8ff9695998c..4b3a01f8e97 100644 --- a/hadoop-tools/hadoop-fs2img/src/main/java/org/apache/hadoop/hdfs/server/namenode/FixedBlockResolver.java +++ b/hadoop-tools/hadoop-fs2img/src/main/java/org/apache/hadoop/hdfs/server/namenode/FixedBlockResolver.java @@ -34,6 +34,7 @@ public class FixedBlockResolver extends BlockResolver implements Configurable { "hdfs.image.writer.resolver.fixed.block.size"; public static final String START_BLOCK = "hdfs.image.writer.resolver.fixed.block.start"; + public static final long BLOCKSIZE_DEFAULT = 256 * (1L << 20); private Configuration conf; private long blocksize = 256 * (1L << 20); @@ -42,7 +43,7 @@ public class FixedBlockResolver extends BlockResolver implements Configurable { @Override public void setConf(Configuration conf) { this.conf = conf; - blocksize = conf.getLong(BLOCKSIZE, 256 * (1L << 20)); + blocksize = conf.getLong(BLOCKSIZE, BLOCKSIZE_DEFAULT); blockIds.set(conf.getLong(START_BLOCK, (1L << 30))); } diff --git a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java index f6d38f68216..9c82967310d 100644 --- a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java +++ b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java @@ -474,12 +474,12 @@ public class TestNameNodeProvidedImplementation { } private DatanodeInfo[] getAndCheckBlockLocations(DFSClient client, - String filename, int expectedLocations) throws IOException { - LocatedBlocks locatedBlocks = client.getLocatedBlocks( - filename, 0, baseFileLen); - //given the start and length in the above call, - //only one LocatedBlock in LocatedBlocks - assertEquals(1, locatedBlocks.getLocatedBlocks().size()); + String filename, long fileLen, long expectedBlocks, int expectedLocations) + throws IOException { + LocatedBlocks locatedBlocks = client.getLocatedBlocks(filename, 0, fileLen); + // given the start and length in the above call, + // only one LocatedBlock in LocatedBlocks + assertEquals(expectedBlocks, locatedBlocks.getLocatedBlocks().size()); LocatedBlock locatedBlock = locatedBlocks.getLocatedBlocks().get(0); assertEquals(expectedLocations, locatedBlock.getLocations().length); return locatedBlock.getLocations(); @@ -513,17 +513,20 @@ public class TestNameNodeProvidedImplementation { file, newReplication, 10000); DFSClient client = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), cluster.getConfiguration(0)); - getAndCheckBlockLocations(client, filename, newReplication); + getAndCheckBlockLocations(client, filename, baseFileLen, 1, newReplication); // set the replication back to 1 newReplication = 1; LOG.info("Setting replication of file {} back to {}", filename, newReplication); fs.setReplication(file, newReplication); + // defaultReplication number of replicas should be returned + int defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, + DFSConfigKeys.DFS_REPLICATION_DEFAULT); DFSTestUtil.waitForReplication((DistributedFileSystem) fs, - file, newReplication, 10000); - // the only replica left should be the PROVIDED datanode - getAndCheckBlockLocations(client, filename, newReplication); + file, (short) defaultReplication, 10000); + getAndCheckBlockLocations(client, filename, baseFileLen, 1, + defaultReplication); } @Test(timeout=30000) @@ -545,8 +548,9 @@ public class TestNameNodeProvidedImplementation { if (numFiles >= 1) { String filename = "/" + filePrefix + (numFiles - 1) + fileSuffix; - - DatanodeInfo[] dnInfos = getAndCheckBlockLocations(client, filename, 1); + // 2 locations returned as there are 2 PROVIDED datanodes + DatanodeInfo[] dnInfos = + getAndCheckBlockLocations(client, filename, baseFileLen, 1, 2); //the location should be one of the provided DNs available assertTrue( dnInfos[0].getDatanodeUuid().equals( @@ -564,7 +568,7 @@ public class TestNameNodeProvidedImplementation { providedDatanode1.getDatanodeId().getXferAddr()); //should find the block on the 2nd provided datanode - dnInfos = getAndCheckBlockLocations(client, filename, 1); + dnInfos = getAndCheckBlockLocations(client, filename, baseFileLen, 1, 1); assertEquals(providedDatanode2.getDatanodeUuid(), dnInfos[0].getDatanodeUuid()); @@ -575,14 +579,14 @@ public class TestNameNodeProvidedImplementation { BlockManagerTestUtil.noticeDeadDatanode( cluster.getNameNode(), providedDatanode2.getDatanodeId().getXferAddr()); - getAndCheckBlockLocations(client, filename, 0); + getAndCheckBlockLocations(client, filename, baseFileLen, 1, 0); //restart the provided datanode cluster.restartDataNode(providedDNProperties1, true); cluster.waitActive(); //should find the block on the 1st provided datanode now - dnInfos = getAndCheckBlockLocations(client, filename, 1); + dnInfos = getAndCheckBlockLocations(client, filename, baseFileLen, 1, 1); //not comparing UUIDs as the datanode can now have a different one. assertEquals(providedDatanode1.getDatanodeId().getXferAddr(), dnInfos[0].getXferAddr()); @@ -593,20 +597,18 @@ public class TestNameNodeProvidedImplementation { public void testTransientDeadDatanodes() throws Exception { createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, FixedBlockResolver.class); - // 2 Datanodes, 1 PROVIDED and other DISK - startCluster(NNDIRPATH, 2, null, + // 3 Datanodes, 2 PROVIDED and other DISK + startCluster(NNDIRPATH, 3, null, new StorageType[][] { + {StorageType.PROVIDED, StorageType.DISK}, {StorageType.PROVIDED, StorageType.DISK}, {StorageType.DISK}}, false); DataNode providedDatanode = cluster.getDataNodes().get(0); - - DFSClient client = new DFSClient(new InetSocketAddress("localhost", - cluster.getNameNodePort()), cluster.getConfiguration(0)); - for (int i= 0; i < numFiles; i++) { - verifyFileLocation(i); + // expect to have 2 locations as we have 2 provided Datanodes. + verifyFileLocation(i, 2); // NameNode thinks the datanode is down BlockManagerTestUtil.noticeDeadDatanode( cluster.getNameNode(), @@ -614,7 +616,7 @@ public class TestNameNodeProvidedImplementation { cluster.waitActive(); cluster.triggerHeartbeats(); Thread.sleep(1000); - verifyFileLocation(i); + verifyFileLocation(i, 2); } } @@ -622,17 +624,18 @@ public class TestNameNodeProvidedImplementation { public void testNamenodeRestart() throws Exception { createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, FixedBlockResolver.class); - // 2 Datanodes, 1 PROVIDED and other DISK - startCluster(NNDIRPATH, 2, null, + // 3 Datanodes, 2 PROVIDED and other DISK + startCluster(NNDIRPATH, 3, null, new StorageType[][] { + {StorageType.PROVIDED, StorageType.DISK}, {StorageType.PROVIDED, StorageType.DISK}, {StorageType.DISK}}, false); - verifyFileLocation(numFiles - 1); + verifyFileLocation(numFiles - 1, 2); cluster.restartNameNodes(); cluster.waitActive(); - verifyFileLocation(numFiles - 1); + verifyFileLocation(numFiles - 1, 2); } /** @@ -640,18 +643,21 @@ public class TestNameNodeProvidedImplementation { * @param fileIndex the index of the file to verify. * @throws Exception */ - private void verifyFileLocation(int fileIndex) + private void verifyFileLocation(int fileIndex, int replication) throws Exception { - DataNode providedDatanode = cluster.getDataNodes().get(0); DFSClient client = new DFSClient( new InetSocketAddress("localhost", cluster.getNameNodePort()), cluster.getConfiguration(0)); - if (fileIndex <= numFiles && fileIndex >= 0) { - String filename = "/" + filePrefix + fileIndex + fileSuffix; - DatanodeInfo[] dnInfos = getAndCheckBlockLocations(client, filename, 1); - // location should be the provided DN - assertEquals(providedDatanode.getDatanodeUuid(), - dnInfos[0].getDatanodeUuid()); + if (fileIndex < numFiles && fileIndex >= 0) { + String filename = filePrefix + fileIndex + fileSuffix; + File file = new File(new Path(NAMEPATH, filename).toUri()); + long fileLen = file.length(); + long blockSize = conf.getLong(FixedBlockResolver.BLOCKSIZE, + FixedBlockResolver.BLOCKSIZE_DEFAULT); + long numLocatedBlocks = + fileLen == 0 ? 1 : (long) Math.ceil(fileLen * 1.0 / blockSize); + getAndCheckBlockLocations(client, "/" + filename, fileLen, + numLocatedBlocks, replication); } } @@ -669,4 +675,55 @@ public class TestNameNodeProvidedImplementation { NameNode nn = cluster.getNameNode(); assertEquals(clusterID, nn.getNamesystem().getClusterId()); } + + @Test(timeout=30000) + public void testNumberOfProvidedLocations() throws Exception { + // set default replication to 4 + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 4); + createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, + FixedBlockResolver.class); + // start with 4 PROVIDED location + startCluster(NNDIRPATH, 4, + new StorageType[]{ + StorageType.PROVIDED, StorageType.DISK}, + null, + false); + int expectedLocations = 4; + for (int i = 0; i < numFiles; i++) { + verifyFileLocation(i, expectedLocations); + } + // stop 2 datanodes, one after the other and verify number of locations. + for (int i = 1; i <= 2; i++) { + DataNode dn = cluster.getDataNodes().get(0); + cluster.stopDataNode(0); + // make NameNode detect that datanode is down + BlockManagerTestUtil.noticeDeadDatanode(cluster.getNameNode(), + dn.getDatanodeId().getXferAddr()); + + expectedLocations = 4 - i; + for (int j = 0; j < numFiles; j++) { + verifyFileLocation(j, expectedLocations); + } + } + } + + @Test(timeout=30000) + public void testNumberOfProvidedLocationsManyBlocks() throws Exception { + // increase number of blocks per file to at least 10 blocks per file + conf.setLong(FixedBlockResolver.BLOCKSIZE, baseFileLen/10); + // set default replication to 4 + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 4); + createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, + FixedBlockResolver.class); + // start with 4 PROVIDED location + startCluster(NNDIRPATH, 4, + new StorageType[]{ + StorageType.PROVIDED, StorageType.DISK}, + null, + false); + int expectedLocations = 4; + for (int i = 0; i < numFiles; i++) { + verifyFileLocation(i, expectedLocations); + } + } }