HDFS-2972. Small optimization building incremental block report. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1292497 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-02-22 20:37:28 +00:00
parent c14912785d
commit 7527e943e6
2 changed files with 56 additions and 31 deletions

View File

@ -222,3 +222,5 @@ HDFS-2952. NN should not start with upgrade option or with a pending an unfinali
HDFS-2974. MiniDFSCluster does not delete standby NN name dirs during format. (atm)
HDFS-2929. Stress test and fixes for block synchronization (todd)
HDFS-2972. Small optimization building incremental block report (todd)

View File

@ -24,7 +24,8 @@ import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.Collection;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
@ -54,6 +55,7 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
/**
* A thread per active or standby namenode to perform:
@ -81,8 +83,16 @@ class BPServiceActor implements Runnable {
DatanodeProtocolClientSideTranslatorPB bpNamenode;
private long lastHeartbeat = 0;
private volatile boolean initialized = false;
private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList
= new LinkedList<ReceivedDeletedBlockInfo>();
/**
* Between block reports (which happen on the order of once an hour) the
* DN reports smaller incremental changes to its block list. This map,
* keyed by block ID, contains the pending changes which have yet to be
* reported to the NN. Access should be synchronized on this object.
*/
private final Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR
= Maps.newHashMap();
private volatile int pendingReceivedRequests = 0;
private volatile boolean shouldServiceRun = true;
private final DataNode dn;
@ -242,28 +252,39 @@ class BPServiceActor implements Runnable {
// check if there are newly received blocks
ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
int currentReceivedRequestsCounter;
synchronized (receivedAndDeletedBlockList) {
currentReceivedRequestsCounter = pendingReceivedRequests;
int numBlocks = receivedAndDeletedBlockList.size();
synchronized (pendingIncrementalBR) {
int numBlocks = pendingIncrementalBR.size();
if (numBlocks > 0) {
//
// Send newly-received and deleted blockids to namenode
//
receivedAndDeletedBlockArray = receivedAndDeletedBlockList
.toArray(new ReceivedDeletedBlockInfo[numBlocks]);
receivedAndDeletedBlockArray = pendingIncrementalBR
.values().toArray(new ReceivedDeletedBlockInfo[numBlocks]);
}
pendingIncrementalBR.clear();
}
if (receivedAndDeletedBlockArray != null) {
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
bpRegistration.getStorageID(), receivedAndDeletedBlockArray) };
bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(),
report);
synchronized (receivedAndDeletedBlockList) {
for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) {
receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]);
boolean success = false;
try {
bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(),
report);
success = true;
} finally {
synchronized (pendingIncrementalBR) {
if (!success) {
// If we didn't succeed in sending the report, put all of the
// blocks back onto our queue, but only in the case where we didn't
// put something newer in the meantime.
for (ReceivedDeletedBlockInfo rdbi : receivedAndDeletedBlockArray) {
if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) {
pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
}
}
}
pendingReceivedRequests = pendingIncrementalBR.size();
}
pendingReceivedRequests -= currentReceivedRequestsCounter;
}
}
}
@ -274,16 +295,18 @@ class BPServiceActor implements Runnable {
* client? For now we don't.
*/
void notifyNamenodeBlockImmediately(ReceivedDeletedBlockInfo bInfo) {
synchronized (receivedAndDeletedBlockList) {
receivedAndDeletedBlockList.add(bInfo);
synchronized (pendingIncrementalBR) {
pendingIncrementalBR.put(
bInfo.getBlock().getBlockId(), bInfo);
pendingReceivedRequests++;
receivedAndDeletedBlockList.notifyAll();
pendingIncrementalBR.notifyAll();
}
}
void notifyNamenodeDeletedBlock(ReceivedDeletedBlockInfo bInfo) {
synchronized (receivedAndDeletedBlockList) {
receivedAndDeletedBlockList.add(bInfo);
synchronized (pendingIncrementalBR) {
pendingIncrementalBR.put(
bInfo.getBlock().getBlockId(), bInfo);
}
}
@ -292,13 +315,13 @@ class BPServiceActor implements Runnable {
*/
@VisibleForTesting
void triggerBlockReportForTests() throws IOException {
synchronized (receivedAndDeletedBlockList) {
synchronized (pendingIncrementalBR) {
lastBlockReport = 0;
lastHeartbeat = 0;
receivedAndDeletedBlockList.notifyAll();
pendingIncrementalBR.notifyAll();
while (lastBlockReport == 0) {
try {
receivedAndDeletedBlockList.wait(100);
pendingIncrementalBR.wait(100);
} catch (InterruptedException e) {
return;
}
@ -308,12 +331,12 @@ class BPServiceActor implements Runnable {
@VisibleForTesting
void triggerHeartbeatForTests() throws IOException {
synchronized (receivedAndDeletedBlockList) {
synchronized (pendingIncrementalBR) {
lastHeartbeat = 0;
receivedAndDeletedBlockList.notifyAll();
pendingIncrementalBR.notifyAll();
while (lastHeartbeat == 0) {
try {
receivedAndDeletedBlockList.wait(100);
pendingIncrementalBR.wait(100);
} catch (InterruptedException e) {
return;
}
@ -323,13 +346,13 @@ class BPServiceActor implements Runnable {
@VisibleForTesting
void triggerDeletionReportForTests() throws IOException {
synchronized (receivedAndDeletedBlockList) {
synchronized (pendingIncrementalBR) {
lastDeletedReport = 0;
receivedAndDeletedBlockList.notifyAll();
pendingIncrementalBR.notifyAll();
while (lastDeletedReport == 0) {
try {
receivedAndDeletedBlockList.wait(100);
pendingIncrementalBR.wait(100);
} catch (InterruptedException e) {
return;
}
@ -527,10 +550,10 @@ class BPServiceActor implements Runnable {
//
long waitTime = dnConf.heartBeatInterval -
(System.currentTimeMillis() - lastHeartbeat);
synchronized(receivedAndDeletedBlockList) {
synchronized(pendingIncrementalBR) {
if (waitTime > 0 && pendingReceivedRequests == 0) {
try {
receivedAndDeletedBlockList.wait(waitTime);
pendingIncrementalBR.wait(waitTime);
} catch (InterruptedException ie) {
LOG.warn("BPOfferService for " + this + " interrupted");
}