From 282be1b38e5cd141ed7e2b2194bfb67a7c2f7f15 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Sun, 8 Sep 2013 22:53:36 +0000 Subject: [PATCH] HDFS-5134. Move blockContentsStale, heartbeatedSinceFailover and firstBlockReport from DatanodeDescriptor to DatanodeStorageInfo; and fix a synchronization problem in DatanodeStorageInfo. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1520938 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES_HDFS-2832.txt | 4 + .../server/blockmanagement/BlockInfo.java | 8 -- .../server/blockmanagement/BlockManager.java | 32 ++++--- .../blockmanagement/DatanodeDescriptor.java | 83 ++++++------------- .../blockmanagement/DatanodeManager.java | 10 ++- .../blockmanagement/DatanodeStorageInfo.java | 62 +++++++++++++- .../server/namenode/NameNodeRpcServer.java | 3 - .../blockmanagement/BlockManagerTestUtil.java | 1 - .../blockmanagement/TestBlockManager.java | 18 ++-- 9 files changed, 124 insertions(+), 97 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt index 5a6f2c76c4c..49379bc1f9f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt @@ -17,3 +17,7 @@ IMPROVEMENTS: (Junping Du via szetszwo) HDFS-5009. Include storage information in the LocatedBlock. (szetszwo) + + HDFS-5134. Move blockContentsStale, heartbeatedSinceFailover and + firstBlockReport from DatanodeDescriptor to DatanodeStorageInfo; and + fix a synchronization problem in DatanodeStorageInfo. (szetszwo) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java index 9f7aabd4668..056628888b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java @@ -324,14 +324,6 @@ public class BlockInfo extends Block implements LightWeightGSet.LinkedElement { return head; } - int listCount(DatanodeStorageInfo storage) { - int count = 0; - for(BlockInfo cur = this; cur != null; - cur = cur.getNext(cur.findStorageInfo(storage))) - count++; - return count; - } - /** * BlockInfo represents a block that is not being constructed. * In order to start modifying the block, the BlockInfo should be converted diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index f26f0e953a1..a8024f412f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -510,7 +510,7 @@ public class BlockManager { state = "(decommissioned)"; } - if (node.areBlockContentsStale()) { + if (storage.areBlockContentsStale()) { state += " (block deletions maybe out of date)"; } out.print(" " + node + state + " : "); @@ -993,7 +993,14 @@ public class BlockManager { // failover, then we may have been holding up on processing // over-replicated blocks because of it. But we can now // process those blocks. - if (node.areBlockContentsStale()) { + boolean stale = false; + for(DatanodeStorageInfo storage : node.getStorageInfos()) { + if (storage.areBlockContentsStale()) { + stale = true; + break; + } + } + if (stale) { rescanPostponedMisreplicatedBlocks(); } } @@ -1616,14 +1623,16 @@ public class BlockManager { // To minimize startup time, we discard any second (or later) block reports // that we receive while still in startup phase. - if (namesystem.isInStartupSafeMode() && !node.isFirstBlockReport()) { + final DatanodeStorageInfo storageInfo = node.getStorageInfo(storage.getStorageID()); + if (namesystem.isInStartupSafeMode() + && storageInfo.getBlockReportCount() > 0) { blockLog.info("BLOCK* processReport: " + "discarded non-initial block report from " + nodeID + " because namenode still in startup phase"); return; } - if (node.numBlocks() == 0) { + if (storageInfo.numBlocks() == 0) { // The first block report can be processed a lot more efficiently than // ordinary block reports. This shortens restart times. processFirstBlockReport(node, storage.getStorageID(), newReport); @@ -1633,9 +1642,9 @@ public class BlockManager { // Now that we have an up-to-date block report, we know that any // deletions from a previous NN iteration have been accounted for. - boolean staleBefore = node.areBlockContentsStale(); - node.receivedBlockReport(); - if (staleBefore && !node.areBlockContentsStale()) { + boolean staleBefore = storageInfo.areBlockContentsStale(); + storageInfo.receivedBlockReport(); + if (staleBefore && !storageInfo.areBlockContentsStale()) { LOG.info("BLOCK* processReport: Received first block report from " + node + " after starting up or becoming active. Its block " + "contents are no longer considered stale"); @@ -1747,7 +1756,7 @@ public class BlockManager { final BlockListAsLongs report) throws IOException { if (report == null) return; assert (namesystem.hasWriteLock()); - assert (node.numBlocks() == 0); + assert (node.getStorageInfo(storageID).numBlocks() == 0); BlockReportIterator itBR = report.getBlockReportIterator(); while(itBR.hasNext()) { @@ -2421,10 +2430,11 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block .getNodes(block); for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); - if (cur.areBlockContentsStale()) { + if (storage.areBlockContentsStale()) { LOG.info("BLOCK* processOverReplicatedBlock: " + "Postponing processing of over-replicated " + - block + " since datanode " + cur + " does not yet have up-to-date " + + block + " since storage + " + storage + + "datanode " + cur + " does not yet have up-to-date " + "block information."); postponeBlock(block); return; @@ -2756,7 +2766,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block live++; } } - if (node.areBlockContentsStale()) { + if (storage.areBlockContentsStale()) { stale++; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index bc1107beb73..a4ddc3130e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -106,23 +106,6 @@ public class DatanodeDescriptor extends DatanodeInfo { public boolean isAlive = false; public boolean needKeyUpdate = false; - /** - * Set to false on any NN failover, and reset to true - * whenever a block report is received. - */ - private boolean heartbeatedSinceFailover = false; - - /** - * At startup or at any failover, the DNs in the cluster may - * have pending block deletions from a previous incarnation - * of the NameNode. Thus, we consider their block contents - * stale until we have received a block report. When a DN - * is considered stale, any replicas on it are transitively - * considered stale. If any block has at least one stale replica, - * then no invalidations will be processed for this block. - * See HDFS-1972. - */ - private boolean blockContentsStale = true; // A system administrator can tune the balancer bandwidth parameter // (dfs.balance.bandwidthPerSec) dynamically by calling @@ -151,9 +134,6 @@ public class DatanodeDescriptor extends DatanodeInfo { private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min private int volumeFailures = 0; - /** Set to false after processing first block report */ - private boolean firstBlockReport = true; - /** * When set to true, the node is not in include list and is not allowed * to communicate with the namenode @@ -234,11 +214,15 @@ public class DatanodeDescriptor extends DatanodeInfo { return false; } - DatanodeStorageInfo getStorageInfo(String storageID) { - return storageMap.get(storageID); + public DatanodeStorageInfo getStorageInfo(String storageID) { + synchronized (storageMap) { + return storageMap.get(storageID); + } } public Collection getStorageInfos() { - return storageMap.values(); + synchronized (storageMap) { + return new ArrayList(storageMap.values()); + } } /** @@ -314,9 +298,8 @@ public class DatanodeDescriptor extends DatanodeInfo { } public int numBlocks() { - // TODO: synchronization int blocks = 0; - for (DatanodeStorageInfo entry : storageMap.values()) { + for (DatanodeStorageInfo entry : getStorageInfos()) { blocks += entry.numBlocks(); } return blocks; @@ -334,7 +317,9 @@ public class DatanodeDescriptor extends DatanodeInfo { setXceiverCount(xceiverCount); setLastUpdate(Time.now()); this.volumeFailures = volFailures; - this.heartbeatedSinceFailover = true; + for(DatanodeStorageInfo storage : getStorageInfos()) { + storage.receivedHeartbeat(); + } rollBlocksScheduled(getLastUpdate()); } @@ -380,10 +365,10 @@ public class DatanodeDescriptor extends DatanodeInfo { } Iterator getBlockIterator() { - return new BlockIterator(storageMap.values()); + return new BlockIterator(getStorageInfos()); } Iterator getBlockIterator(final String storageID) { - return new BlockIterator(storageMap.get(storageID)); + return new BlockIterator(getStorageInfo(storageID)); } /** @@ -585,7 +570,11 @@ public class DatanodeDescriptor extends DatanodeInfo { @Override public void updateRegInfo(DatanodeID nodeReg) { super.updateRegInfo(nodeReg); - firstBlockReport = true; // must re-process IBR after re-registration + + // must re-process IBR after re-registration + for(DatanodeStorageInfo storage : getStorageInfos()) { + storage.setBlockReportCount(0); + } } /** @@ -602,26 +591,6 @@ public class DatanodeDescriptor extends DatanodeInfo { this.bandwidth = bandwidth; } - public boolean areBlockContentsStale() { - return blockContentsStale; - } - - public void markStaleAfterFailover() { - heartbeatedSinceFailover = false; - blockContentsStale = true; - } - - public void receivedBlockReport() { - if (heartbeatedSinceFailover) { - blockContentsStale = false; - } - firstBlockReport = false; - } - - boolean isFirstBlockReport() { - return firstBlockReport; - } - @Override public String dumpDatanode() { StringBuilder sb = new StringBuilder(super.dumpDatanode()); @@ -641,13 +610,15 @@ public class DatanodeDescriptor extends DatanodeInfo { } DatanodeStorageInfo updateStorage(DatanodeStorage s) { - DatanodeStorageInfo storage = getStorageInfo(s.getStorageID()); - if (storage == null) { - storage = new DatanodeStorageInfo(this, s); - storageMap.put(s.getStorageID(), storage); - } else { - storage.setState(s.getState()); + synchronized (storageMap) { + DatanodeStorageInfo storage = storageMap.get(s.getStorageID()); + if (storage == null) { + storage = new DatanodeStorageInfo(this, s); + storageMap.put(s.getStorageID(), storage); + } else { + storage.setState(s.getState()); + } + return storage; } - return storage; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 9c8a1f07a30..e89dd585553 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -713,8 +713,10 @@ public class DatanodeManager { /** Start decommissioning the specified datanode. */ private void startDecommission(DatanodeDescriptor node) { if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { - LOG.info("Start Decommissioning " + node + " with " + - node.numBlocks() + " blocks"); + for (DatanodeStorageInfo storage : node.getStorageInfos()) { + LOG.info("Start Decommissioning " + node + " " + storage + + " with " + storage.numBlocks() + " blocks"); + } heartbeatManager.startDecommission(node); node.decommissioningStatus.setStartTime(now()); @@ -1345,7 +1347,9 @@ public class DatanodeManager { LOG.info("Marking all datandoes as stale"); synchronized (datanodeMap) { for (DatanodeDescriptor dn : datanodeMap.values()) { - dn.markStaleAfterFailover(); + for(DatanodeStorageInfo storage : dn.getStorageInfos()) { + storage.markStaleAfterFailover(); + } } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index 53fcde1c691..ba0e142b36c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -78,14 +78,62 @@ public class DatanodeStorageInfo { private long dfsUsed; private long remaining; private volatile BlockInfo blockList = null; - + private int numBlocks = 0; + + /** The number of block reports received */ + private int blockReportCount = 0; + + /** + * Set to false on any NN failover, and reset to true + * whenever a block report is received. + */ + private boolean heartbeatedSinceFailover = false; + + /** + * At startup or at failover, the storages in the cluster may have pending + * block deletions from a previous incarnation of the NameNode. The block + * contents are considered as stale until a block report is received. When a + * storage is considered as stale, the replicas on it are also considered as + * stale. If any block has at least one stale replica, then no invalidations + * will be processed for this block. See HDFS-1972. + */ + private boolean blockContentsStale = true; + public DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) { this.dn = dn; this.storageID = s.getStorageID(); this.storageType = s.getStorageType(); this.state = s.getState(); } - + + int getBlockReportCount() { + return blockReportCount; + } + + void setBlockReportCount(int blockReportCount) { + this.blockReportCount = blockReportCount; + } + + public boolean areBlockContentsStale() { + return blockContentsStale; + } + + public void markStaleAfterFailover() { + heartbeatedSinceFailover = false; + blockContentsStale = true; + } + + public void receivedHeartbeat() { + heartbeatedSinceFailover = true; + } + + public void receivedBlockReport() { + if (heartbeatedSinceFailover) { + blockContentsStale = false; + } + blockReportCount++; + } + public void setUtilization(long capacity, long dfsUsed, long remaining) { this.capacity = capacity; this.dfsUsed = dfsUsed; @@ -127,16 +175,22 @@ public class DatanodeStorageInfo { return false; // add to the head of the data-node list blockList = b.listInsert(blockList, this); + numBlocks++; return true; } public boolean removeBlock(BlockInfo b) { blockList = b.listRemove(blockList, this); - return b.removeStorage(this); + if (b.removeStorage(this)) { + numBlocks--; + return true; + } else { + return false; + } } public int numBlocks() { - return blockList == null ? 0 : blockList.listCount(this); + return numBlocks; } Iterator getBlockIterator() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index c63f4b56e66..6745cd7685a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -90,7 +90,6 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; @@ -979,8 +978,6 @@ class NameNodeRpcServer implements NamenodeProtocols { bm.processReport(nodeReg, r.getStorage(), poolId, blocks); } - DatanodeDescriptor datanode = bm.getDatanodeManager().getDatanode(nodeReg); - datanode.receivedBlockReport(); if (nn.getFSImage().isUpgradeFinalized() && !nn.isStandbyState()) return new FinalizeCommand(poolId); return null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index 2f659417479..243a2917f8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.io.IOException; import java.util.Collection; import java.util.HashSet; -import java.util.Iterator; import java.util.Set; import org.apache.hadoop.hdfs.DFSConfigKeys; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 1ddb91cfe1d..2eaf9a0b128 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -523,33 +523,30 @@ public class TestBlockManager { bm.getDatanodeManager().registerDatanode(nodeReg); bm.getDatanodeManager().addDatanode(node); // swap in spy assertEquals(node, bm.getDatanodeManager().getDatanode(node)); - assertTrue(node.isFirstBlockReport()); + assertEquals(0, ds.getBlockReportCount()); // send block report, should be processed reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", new BlockListAsLongs(null, null)); - verify(node).receivedBlockReport(); - assertFalse(node.isFirstBlockReport()); + assertEquals(1, ds.getBlockReportCount()); // send block report again, should NOT be processed reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", new BlockListAsLongs(null, null)); - verify(node, never()).receivedBlockReport(); - assertFalse(node.isFirstBlockReport()); + assertEquals(1, ds.getBlockReportCount()); // re-register as if node restarted, should update existing node bm.getDatanodeManager().removeDatanode(node); reset(node); bm.getDatanodeManager().registerDatanode(nodeReg); verify(node).updateRegInfo(nodeReg); - assertTrue(node.isFirstBlockReport()); // ready for report again + assertEquals(0, ds.getBlockReportCount()); // ready for report again // send block report, should be processed after restart reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", new BlockListAsLongs(null, null)); - verify(node).receivedBlockReport(); - assertFalse(node.isFirstBlockReport()); + assertEquals(1, ds.getBlockReportCount()); } @Test @@ -570,13 +567,12 @@ public class TestBlockManager { bm.getDatanodeManager().registerDatanode(nodeReg); bm.getDatanodeManager().addDatanode(node); // swap in spy assertEquals(node, bm.getDatanodeManager().getDatanode(node)); - assertTrue(node.isFirstBlockReport()); + assertEquals(0, ds.getBlockReportCount()); // send block report while pretending to already have blocks reset(node); doReturn(1).when(node).numBlocks(); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", new BlockListAsLongs(null, null)); - verify(node).receivedBlockReport(); - assertFalse(node.isFirstBlockReport()); + assertEquals(1, ds.getBlockReportCount()); } }