From 0c90bd9fa3fc25866e1cde7ac8ffb627c6c2cae5 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Thu, 15 Mar 2012 18:29:13 +0000 Subject: [PATCH] svn merge -c 1301127 from trunk for HDFS-3005. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1301130 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../hdfs/server/datanode/FSDataset.java | 104 +++++++++--------- 2 files changed, 57 insertions(+), 49 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 180977baea6..727652bba98 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -208,6 +208,8 @@ Release 0.23.3 - UNRELEASED HDFS-3093. Fix bug where namenode -format interpreted the -force flag in reverse. (todd) + HDFS-3005. FSVolume.decDfsUsed(..) should be synchronized. (szetszwo) + BREAKDOWN OF HDFS-1623 SUBTASKS HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java index da46a0534b6..4c7ea40d342 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java @@ -95,15 +95,18 @@ class FSDataset implements FSDatasetInterface { * A node type that can be built into a tree reflecting the * hierarchy of blocks on the local disk. */ - private class FSDir { + private static class FSDir { + final int maxBlocksPerDir; final File dir; int numBlocks = 0; FSDir children[]; int lastChildIdx = 0; - private FSDir(File dir) + private FSDir(File dir, int maxBlocksPerDir) throws IOException { this.dir = dir; + this.maxBlocksPerDir = maxBlocksPerDir; + this.children = null; if (!dir.exists()) { if (!dir.mkdirs()) { @@ -115,7 +118,7 @@ class FSDataset implements FSDatasetInterface { List dirList = new ArrayList(); for (int idx = 0; idx < files.length; idx++) { if (files[idx].isDirectory()) { - dirList.add(new FSDir(files[idx])); + dirList.add(new FSDir(files[idx], maxBlocksPerDir)); } else if (Block.isBlockFilename(files[idx])) { numBlocks++; } @@ -165,7 +168,8 @@ class FSDataset implements FSDatasetInterface { if (children == null || children.length == 0) { children = new FSDir[maxBlocksPerDir]; for (int idx = 0; idx < maxBlocksPerDir; idx++) { - children[idx] = new FSDir(new File(dir, DataStorage.BLOCK_SUBDIR_PREFIX+idx)); + final File sub = new File(dir, DataStorage.BLOCK_SUBDIR_PREFIX+idx); + children[idx] = new FSDir(sub, maxBlocksPerDir); } } @@ -297,8 +301,10 @@ class FSDataset implements FSDatasetInterface { * A BlockPoolSlice represents a portion of a BlockPool stored on a volume. * Taken together, all BlockPoolSlices sharing a block pool ID across a * cluster represent a single block pool. + * + * This class is synchronized by {@link FSVolume}. */ - private class BlockPoolSlice { + private static class BlockPoolSlice { private final String bpid; private final FSVolume volume; // volume to which this BlockPool belongs to private final File currentDir; // StorageDirectory/current/bpid/current @@ -335,10 +341,16 @@ class FSDataset implements FSDatasetInterface { FileUtil.fullyDelete(tmpDir); } this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW); + final boolean supportAppends = conf.getBoolean( + DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, + DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT); if (rbwDir.exists() && !supportAppends) { FileUtil.fullyDelete(rbwDir); } - this.finalizedDir = new FSDir(finalizedDir); + final int maxBlocksPerDir = conf.getInt( + DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY, + DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT); + this.finalizedDir = new FSDir(finalizedDir, maxBlocksPerDir); if (!rbwDir.mkdirs()) { // create rbw directory if not exist if (!rbwDir.isDirectory()) { throw new IOException("Mkdirs failed to create " + rbwDir.toString()); @@ -365,12 +377,12 @@ class FSDataset implements FSDatasetInterface { return rbwDir; } + /** + * This should be used only by {@link FSVolume#decDfsUsed(String, long)} + * and it will be synchronized there. + */ void decDfsUsed(long value) { - // The caller to this method (BlockFileDeleteTask.run()) does - // not have locked FSDataset.this yet. - synchronized(FSDataset.this) { - dfsUsage.decDfsUsed(value); - } + dfsUsage.decDfsUsed(value); } long getDfsUsed() throws IOException { @@ -530,14 +542,22 @@ class FSDataset implements FSDatasetInterface { dfsUsage.shutdown(); } } - - class FSVolume implements FSVolumeInterface { + + /** + * The underlying volume used to store replica. + * + * It uses the {@link FSDataset} object for synchronization. + */ + static class FSVolume implements FSVolumeInterface { + private final FSDataset dataset; private final Map map = new HashMap(); private final File currentDir; // /current private final DF usage; private final long reserved; - FSVolume(File currentDir, Configuration conf) throws IOException { + FSVolume(FSDataset dataset, File currentDir, Configuration conf + ) throws IOException { + this.dataset = dataset; this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT); this.currentDir = currentDir; @@ -555,9 +575,7 @@ class FSDataset implements FSDatasetInterface { } void decDfsUsed(String bpid, long value) { - // The caller to this method (BlockFileDeleteTask.run()) does - // not have locked FSDataset.this yet. - synchronized(FSDataset.this) { + synchronized(dataset) { BlockPoolSlice bp = map.get(bpid); if (bp != null) { bp.decDfsUsed(value); @@ -566,11 +584,11 @@ class FSDataset implements FSDatasetInterface { } long getDfsUsed() throws IOException { - // TODO valid synchronization long dfsUsed = 0; - Set> set = map.entrySet(); - for (Entry entry : set) { - dfsUsed += entry.getValue().getDfsUsed(); + synchronized(dataset) { + for(BlockPoolSlice s : map.values()) { + dfsUsed += s.getDfsUsed(); + } } return dfsUsed; } @@ -630,11 +648,11 @@ class FSDataset implements FSDatasetInterface { */ @Override public String[] getBlockPoolList() { - synchronized(FSDataset.this) { + synchronized(dataset) { return map.keySet().toArray(new String[map.keySet().size()]); } } - + /** * Temporary files. They get moved to the finalized block directory when * the block is finalized. @@ -658,14 +676,17 @@ class FSDataset implements FSDatasetInterface { return bp.addBlock(b, f); } + /** + * This should be used only by {@link FSVolumeSet#checkDirs()} + * and it will be synchronized there. + */ void checkDirs() throws DiskErrorException { // TODO:FEDERATION valid synchronization - Set> set = map.entrySet(); - for (Entry entry : set) { - entry.getValue().checkDirs(); + for(BlockPoolSlice s : map.values()) { + s.checkDirs(); } } - + void getVolumeMap(ReplicasMap volumeMap) throws IOException { Set> set = map.entrySet(); for (Entry entry : set) { @@ -877,31 +898,25 @@ class FSDataset implements FSDatasetInterface { // Make a copy of volumes for performing modification final List volumeList = new ArrayList(volumes); - for (int idx = 0; idx < volumeList.size(); idx++) { - FSVolume fsv = volumeList.get(idx); + for(Iterator i = volumeList.iterator(); i.hasNext(); ) { + final FSVolume fsv = i.next(); try { fsv.checkDirs(); } catch (DiskErrorException e) { DataNode.LOG.warn("Removing failed volume " + fsv + ": ",e); if (removedVols == null) { - removedVols = new ArrayList(1); + removedVols = new ArrayList(2); } removedVols.add(fsv); fsv.shutdown(); - volumeList.set(idx, null); // Remove the volume + i.remove(); // Remove the volume numFailedVolumes++; } } - // Remove null volumes from the volumes array if (removedVols != null && removedVols.size() > 0) { - final List newVols = new ArrayList(); - for (FSVolume vol : volumeList) { - if (vol != null) { - newVols.add(vol); - } - } - volumes = Collections.unmodifiableList(newVols); // Replace volume list + // Replace volume list + volumes = Collections.unmodifiableList(volumeList); DataNode.LOG.info("Completed FSVolumeSet.checkDirs. Removed " + removedVols.size() + " volumes. List of current volumes: " + this); @@ -1048,7 +1063,6 @@ class FSDataset implements FSDatasetInterface { private final DataNode datanode; final FSVolumeSet volumes; - private final int maxBlocksPerDir; final ReplicasMap volumeMap; final FSDatasetAsyncDiskService asyncDiskService; private final int validVolsRequired; @@ -1056,20 +1070,12 @@ class FSDataset implements FSDatasetInterface { // Used for synchronizing access to usage stats private final Object statsLock = new Object(); - final boolean supportAppends; - /** * An FSDataset has a directory where it loads its data files. */ private FSDataset(DataNode datanode, DataStorage storage, Configuration conf ) throws IOException { this.datanode = datanode; - this.maxBlocksPerDir = - conf.getInt(DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY, - DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT); - this.supportAppends = - conf.getBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, - DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT); // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. final int volFailuresTolerated = @@ -1098,7 +1104,7 @@ class FSDataset implements FSDatasetInterface { storage.getNumStorageDirs()); for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { final File dir = storage.getStorageDir(idx).getCurrentDir(); - volArray.add(new FSVolume(dir, conf)); + volArray.add(new FSVolume(this, dir, conf)); DataNode.LOG.info("FSDataset added volume - " + dir); } volumeMap = new ReplicasMap(this);