diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index 4893c54829f..d6cc04098db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -222,3 +222,5 @@ HDFS-2952. NN should not start with upgrade option or with a pending an unfinali HDFS-2974. MiniDFSCluster does not delete standby NN name dirs during format. (atm) HDFS-2929. Stress test and fixes for block synchronization (todd) + +HDFS-2972. Small optimization building incremental block report (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 95819304e88..982a5685033 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -24,7 +24,8 @@ import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.net.URI; import java.util.Collection; -import java.util.LinkedList; +import java.util.HashMap; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.hadoop.classification.InterfaceAudience; @@ -54,6 +55,7 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; /** * A thread per active or standby namenode to perform: @@ -81,8 +83,16 @@ class BPServiceActor implements Runnable { DatanodeProtocolClientSideTranslatorPB bpNamenode; private long lastHeartbeat = 0; private volatile boolean initialized = false; - private final LinkedList receivedAndDeletedBlockList - = new LinkedList(); + + /** + * Between block reports (which happen on the order of once an hour) the + * DN reports smaller incremental changes to its block list. This map, + * keyed by block ID, contains the pending changes which have yet to be + * reported to the NN. Access should be synchronized on this object. + */ + private final Map pendingIncrementalBR + = Maps.newHashMap(); + private volatile int pendingReceivedRequests = 0; private volatile boolean shouldServiceRun = true; private final DataNode dn; @@ -242,28 +252,39 @@ class BPServiceActor implements Runnable { // check if there are newly received blocks ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null; - int currentReceivedRequestsCounter; - synchronized (receivedAndDeletedBlockList) { - currentReceivedRequestsCounter = pendingReceivedRequests; - int numBlocks = receivedAndDeletedBlockList.size(); + synchronized (pendingIncrementalBR) { + int numBlocks = pendingIncrementalBR.size(); if (numBlocks > 0) { // // Send newly-received and deleted blockids to namenode // - receivedAndDeletedBlockArray = receivedAndDeletedBlockList - .toArray(new ReceivedDeletedBlockInfo[numBlocks]); + receivedAndDeletedBlockArray = pendingIncrementalBR + .values().toArray(new ReceivedDeletedBlockInfo[numBlocks]); } + pendingIncrementalBR.clear(); } if (receivedAndDeletedBlockArray != null) { StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( bpRegistration.getStorageID(), receivedAndDeletedBlockArray) }; - bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(), - report); - synchronized (receivedAndDeletedBlockList) { - for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) { - receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]); + boolean success = false; + try { + bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(), + report); + success = true; + } finally { + synchronized (pendingIncrementalBR) { + if (!success) { + // If we didn't succeed in sending the report, put all of the + // blocks back onto our queue, but only in the case where we didn't + // put something newer in the meantime. + for (ReceivedDeletedBlockInfo rdbi : receivedAndDeletedBlockArray) { + if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) { + pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi); + } + } + } + pendingReceivedRequests = pendingIncrementalBR.size(); } - pendingReceivedRequests -= currentReceivedRequestsCounter; } } } @@ -274,16 +295,18 @@ class BPServiceActor implements Runnable { * client? For now we don't. */ void notifyNamenodeBlockImmediately(ReceivedDeletedBlockInfo bInfo) { - synchronized (receivedAndDeletedBlockList) { - receivedAndDeletedBlockList.add(bInfo); + synchronized (pendingIncrementalBR) { + pendingIncrementalBR.put( + bInfo.getBlock().getBlockId(), bInfo); pendingReceivedRequests++; - receivedAndDeletedBlockList.notifyAll(); + pendingIncrementalBR.notifyAll(); } } void notifyNamenodeDeletedBlock(ReceivedDeletedBlockInfo bInfo) { - synchronized (receivedAndDeletedBlockList) { - receivedAndDeletedBlockList.add(bInfo); + synchronized (pendingIncrementalBR) { + pendingIncrementalBR.put( + bInfo.getBlock().getBlockId(), bInfo); } } @@ -292,13 +315,13 @@ class BPServiceActor implements Runnable { */ @VisibleForTesting void triggerBlockReportForTests() throws IOException { - synchronized (receivedAndDeletedBlockList) { + synchronized (pendingIncrementalBR) { lastBlockReport = 0; lastHeartbeat = 0; - receivedAndDeletedBlockList.notifyAll(); + pendingIncrementalBR.notifyAll(); while (lastBlockReport == 0) { try { - receivedAndDeletedBlockList.wait(100); + pendingIncrementalBR.wait(100); } catch (InterruptedException e) { return; } @@ -308,12 +331,12 @@ class BPServiceActor implements Runnable { @VisibleForTesting void triggerHeartbeatForTests() throws IOException { - synchronized (receivedAndDeletedBlockList) { + synchronized (pendingIncrementalBR) { lastHeartbeat = 0; - receivedAndDeletedBlockList.notifyAll(); + pendingIncrementalBR.notifyAll(); while (lastHeartbeat == 0) { try { - receivedAndDeletedBlockList.wait(100); + pendingIncrementalBR.wait(100); } catch (InterruptedException e) { return; } @@ -323,13 +346,13 @@ class BPServiceActor implements Runnable { @VisibleForTesting void triggerDeletionReportForTests() throws IOException { - synchronized (receivedAndDeletedBlockList) { + synchronized (pendingIncrementalBR) { lastDeletedReport = 0; - receivedAndDeletedBlockList.notifyAll(); + pendingIncrementalBR.notifyAll(); while (lastDeletedReport == 0) { try { - receivedAndDeletedBlockList.wait(100); + pendingIncrementalBR.wait(100); } catch (InterruptedException e) { return; } @@ -527,10 +550,10 @@ class BPServiceActor implements Runnable { // long waitTime = dnConf.heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat); - synchronized(receivedAndDeletedBlockList) { + synchronized(pendingIncrementalBR) { if (waitTime > 0 && pendingReceivedRequests == 0) { try { - receivedAndDeletedBlockList.wait(waitTime); + pendingIncrementalBR.wait(waitTime); } catch (InterruptedException ie) { LOG.warn("BPOfferService for " + this + " interrupted"); }