HDFS-5406. Send incremental block reports for all storages in a single call.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1551093 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-12-16 00:58:40 +00:00
parent 44a6560b5d
commit 938565925a
6 changed files with 170 additions and 61 deletions

View File

@ -451,6 +451,9 @@ Trunk (Unreleased)
HDFS-5626. dfsadmin -report shows incorrect cache values. (cmccabe) HDFS-5626. dfsadmin -report shows incorrect cache values. (cmccabe)
HDFS-5406. Send incremental block reports for all storages in a
single call. (Arpit Agarwal)
BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS
HDFS-4985. Add storage type to the protocol and expose it in block report HDFS-4985. Add storage type to the protocol and expose it in block report

View File

@ -22,6 +22,7 @@ import static org.apache.hadoop.util.Time.now;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -273,7 +274,8 @@ class BPServiceActor implements Runnable {
private void reportReceivedDeletedBlocks() throws IOException { private void reportReceivedDeletedBlocks() throws IOException {
// Generate a list of the pending reports for each storage under the lock // Generate a list of the pending reports for each storage under the lock
Map<String, ReceivedDeletedBlockInfo[]> blockArrays = Maps.newHashMap(); ArrayList<StorageReceivedDeletedBlocks> reports =
new ArrayList<StorageReceivedDeletedBlocks>(pendingIncrementalBRperStorage.size());
synchronized (pendingIncrementalBRperStorage) { synchronized (pendingIncrementalBRperStorage) {
for (Map.Entry<String, PerStoragePendingIncrementalBR> entry : for (Map.Entry<String, PerStoragePendingIncrementalBR> entry :
pendingIncrementalBRperStorage.entrySet()) { pendingIncrementalBRperStorage.entrySet()) {
@ -286,33 +288,34 @@ class BPServiceActor implements Runnable {
pendingReceivedRequests = pendingReceivedRequests =
(pendingReceivedRequests > rdbi.length ? (pendingReceivedRequests > rdbi.length ?
(pendingReceivedRequests - rdbi.length) : 0); (pendingReceivedRequests - rdbi.length) : 0);
blockArrays.put(storageUuid, rdbi); reports.add(new StorageReceivedDeletedBlocks(storageUuid, rdbi));
} }
} }
} }
// Send incremental block reports to the Namenode outside the lock if (reports.size() == 0) {
for (Map.Entry<String, ReceivedDeletedBlockInfo[]> entry : // Nothing new to report.
blockArrays.entrySet()) { return;
final String storageUuid = entry.getKey(); }
final ReceivedDeletedBlockInfo[] rdbi = entry.getValue();
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( // Send incremental block reports to the Namenode outside the lock
storageUuid, rdbi) }; boolean success = false;
boolean success = false; try {
try { bpNamenode.blockReceivedAndDeleted(bpRegistration,
bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(),
bpos.getBlockPoolId(), report); reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]));
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {
synchronized (pendingIncrementalBRperStorage) { synchronized (pendingIncrementalBRperStorage) {
for (StorageReceivedDeletedBlocks report : reports) {
// If we didn't succeed in sending the report, put all of the // If we didn't succeed in sending the report, put all of the
// blocks back onto our queue, but only in the case where we // blocks back onto our queue, but only in the case where we
// didn't put something newer in the meantime. // didn't put something newer in the meantime.
PerStoragePendingIncrementalBR perStorageMap = PerStoragePendingIncrementalBR perStorageMap =
pendingIncrementalBRperStorage.get(storageUuid); pendingIncrementalBRperStorage.get(report.getStorageID());
pendingReceivedRequests += perStorageMap.putMissingBlockInfos(rdbi); pendingReceivedRequests +=
perStorageMap.putMissingBlockInfos(report.getBlocks());
} }
} }
} }

View File

@ -1006,6 +1006,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId, public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException { StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException {
verifyRequest(nodeReg); verifyRequest(nodeReg);
metrics.incrBlockReceivedAndDeletedOps();
if(blockStateChangeLog.isDebugEnabled()) { if(blockStateChangeLog.isDebugEnabled()) {
blockStateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: " blockStateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
+"from "+nodeReg+" "+receivedAndDeletedBlocks.length +"from "+nodeReg+" "+receivedAndDeletedBlocks.length

View File

@ -71,6 +71,8 @@ public class NameNodeMetrics {
MutableCounterLong listSnapshottableDirOps; MutableCounterLong listSnapshottableDirOps;
@Metric("Number of snapshotDiffReport operations") @Metric("Number of snapshotDiffReport operations")
MutableCounterLong snapshotDiffReportOps; MutableCounterLong snapshotDiffReportOps;
@Metric("Number of blockReceivedAndDeleted calls")
MutableCounterLong blockReceivedAndDeletedOps;
@Metric("Journal transactions") MutableRate transactions; @Metric("Journal transactions") MutableRate transactions;
@Metric("Journal syncs") MutableRate syncs; @Metric("Journal syncs") MutableRate syncs;
@ -209,6 +211,10 @@ public class NameNodeMetrics {
snapshotDiffReportOps.incr(); snapshotDiffReportOps.incr();
} }
public void incrBlockReceivedAndDeletedOps() {
blockReceivedAndDeletedOps.incr();
}
public void addTransaction(long latency) { public void addTransaction(long latency) {
transactions.add(latency); transactions.add(latency);
} }

View File

@ -2143,17 +2143,14 @@ public class MiniDFSCluster {
} }
/** /**
* Get a storage directory for a datanode. There are two storage directories * Get a storage directory for a datanode.
* per datanode:
* <ol> * <ol>
* <li><base directory>/data/data<2*dnIndex + 1></li> * <li><base directory>/data/data<2*dnIndex + 1></li>
* <li><base directory>/data/data<2*dnIndex + 2></li> * <li><base directory>/data/data<2*dnIndex + 2></li>
* </ol> * </ol>
* *
* @param dnIndex datanode index (starts from 0) * @param dnIndex datanode index (starts from 0)
* @param dirIndex directory index (0 or 1). Index 0 provides access to the * @param dirIndex directory index.
* first storage directory. Index 1 provides access to the second
* storage directory.
* @return Storage directory * @return Storage directory
*/ */
public static File getStorageDir(int dnIndex, int dirIndex) { public static File getStorageDir(int dnIndex, int dirIndex) {
@ -2164,7 +2161,7 @@ public class MiniDFSCluster {
* Calculate the DN instance-specific path for appending to the base dir * Calculate the DN instance-specific path for appending to the base dir
* to determine the location of the storage of a DN instance in the mini cluster * to determine the location of the storage of a DN instance in the mini cluster
* @param dnIndex datanode index * @param dnIndex datanode index
* @param dirIndex directory index (0 or 1). * @param dirIndex directory index.
* @return * @return
*/ */
private static String getStorageDirPath(int dnIndex, int dirIndex) { private static String getStorageDirPath(int dnIndex, int dirIndex) {

View File

@ -71,7 +71,15 @@ import org.mockito.invocation.InvocationOnMock;
/** /**
* This test simulates a variety of situations when blocks are being * This test simulates a variety of situations when blocks are being
* intentionally corrupted, unexpectedly modified, and so on before a block * intentionally corrupted, unexpectedly modified, and so on before a block
* report is happening * report is happening.
*
* For each test case it runs two variations:
* #1 - For a given DN, the first variation sends block reports for all
* storages in a single call to the NN.
* #2 - For a given DN, the second variation sends block reports for each
* storage in a separate call.
*
* The behavior should be the same in either variation.
*/ */
public class TestBlockReport { public class TestBlockReport {
public static final Log LOG = LogFactory.getLog(TestBlockReport.class); public static final Log LOG = LogFactory.getLog(TestBlockReport.class);
@ -157,6 +165,113 @@ public class TestBlockReport {
return reports; return reports;
} }
/**
* Utility routine to send block reports to the NN, either in a single call
* or reporting one storage per call.
*
* @param dnR
* @param poolId
* @param reports
* @param needtoSplit
* @throws IOException
*/
private void sendBlockReports(DatanodeRegistration dnR, String poolId,
StorageBlockReport[] reports, boolean needtoSplit) throws IOException {
if (!needtoSplit) {
LOG.info("Sending combined block reports for " + dnR);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
} else {
for (StorageBlockReport report : reports) {
LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
StorageBlockReport[] singletonReport = { report };
cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport);
}
}
}
/**
* Test variations blockReport_01 through blockReport_09 with combined
* and split block reports.
*/
@Test
public void blockReportCombined_01() throws IOException {
blockReport_01(false);
}
@Test
public void blockReportSplit_01() throws IOException {
blockReport_01(true);
}
@Test
public void blockReportCombined_02() throws IOException {
blockReport_02(false);
}
@Test
public void blockReportSplit_02() throws IOException {
blockReport_02(true);
}
@Test
public void blockReportCombined_03() throws IOException {
blockReport_03(false);
}
@Test
public void blockReportSplit_03() throws IOException {
blockReport_03(true);
}
@Test
public void blockReportCombined_04() throws IOException {
blockReport_04(false);
}
@Test
public void blockReportSplit_04() throws IOException {
blockReport_04(true);
}
@Test
public void blockReportCombined_06() throws Exception {
blockReport_06(false);
}
@Test
public void blockReportSplit_06() throws Exception {
blockReport_06(true);
}
@Test
public void blockReportCombined_07() throws Exception {
blockReport_07(false);
}
@Test
public void blockReportSplit_07() throws Exception {
blockReport_07(true);
}
@Test
public void blockReportCombined_08() throws Exception {
blockReport_08(false);
}
@Test
public void blockReportSplit_08() throws Exception {
blockReport_08(true);
}
@Test
public void blockReportCombined_09() throws Exception {
blockReport_09(false);
}
@Test
public void blockReportSplit_09() throws Exception {
blockReport_09(true);
}
/** /**
* Test write a file, verifies and closes it. Then the length of the blocks * Test write a file, verifies and closes it. Then the length of the blocks
* are messed up and BlockReport is forced. * are messed up and BlockReport is forced.
@ -164,8 +279,7 @@ public class TestBlockReport {
* *
* @throws java.io.IOException on an error * @throws java.io.IOException on an error
*/ */
@Test private void blockReport_01(boolean splitBlockReports) throws IOException {
public void blockReport_01() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat"); Path filePath = new Path("/" + METHOD_NAME + ".dat");
@ -198,7 +312,7 @@ public class TestBlockReport {
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); sendBlockReports(dnR, poolId, reports, splitBlockReports);
List<LocatedBlock> blocksAfterReport = List<LocatedBlock> blocksAfterReport =
DFSTestUtil.getAllBlocks(fs.open(filePath)); DFSTestUtil.getAllBlocks(fs.open(filePath));
@ -224,8 +338,7 @@ public class TestBlockReport {
* *
* @throws IOException in case of errors * @throws IOException in case of errors
*/ */
@Test private void blockReport_02(boolean splitBlockReports) throws IOException {
public void blockReport_02() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
LOG.info("Running test " + METHOD_NAME); LOG.info("Running test " + METHOD_NAME);
@ -280,7 +393,7 @@ public class TestBlockReport {
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn0, poolId, false, false); StorageBlockReport[] reports = getBlockReports(dn0, poolId, false, false);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); sendBlockReports(dnR, poolId, reports, splitBlockReports);
BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem() BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem()
.getBlockManager()); .getBlockManager());
@ -301,8 +414,7 @@ public class TestBlockReport {
* *
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test private void blockReport_03(boolean splitBlockReports) throws IOException {
public void blockReport_03() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat"); Path filePath = new Path("/" + METHOD_NAME + ".dat");
ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath); ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath);
@ -312,11 +424,7 @@ public class TestBlockReport {
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false); StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false);
DatanodeCommand dnCmd = sendBlockReports(dnR, poolId, reports, splitBlockReports);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
if(LOG.isDebugEnabled()) {
LOG.debug("Got the command: " + dnCmd);
}
printStats(); printStats();
assertThat("Wrong number of corrupt blocks", assertThat("Wrong number of corrupt blocks",
@ -333,8 +441,7 @@ public class TestBlockReport {
* *
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test private void blockReport_04(boolean splitBlockReports) throws IOException {
public void blockReport_04() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat"); Path filePath = new Path("/" + METHOD_NAME + ".dat");
DFSTestUtil.createFile(fs, filePath, DFSTestUtil.createFile(fs, filePath,
@ -352,11 +459,7 @@ public class TestBlockReport {
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
DatanodeCommand dnCmd = sendBlockReports(dnR, poolId, reports, splitBlockReports);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
if(LOG.isDebugEnabled()) {
LOG.debug("Got the command: " + dnCmd);
}
printStats(); printStats();
assertThat("Wrong number of corrupt blocks", assertThat("Wrong number of corrupt blocks",
@ -373,8 +476,7 @@ public class TestBlockReport {
* *
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test private void blockReport_06(boolean splitBlockReports) throws Exception {
public void blockReport_06() throws Exception {
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat"); Path filePath = new Path("/" + METHOD_NAME + ".dat");
final int DN_N1 = DN_N0 + 1; final int DN_N1 = DN_N0 + 1;
@ -387,7 +489,7 @@ public class TestBlockReport {
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); sendBlockReports(dnR, poolId, reports, splitBlockReports);
printStats(); printStats();
assertEquals("Wrong number of PendingReplication Blocks", assertEquals("Wrong number of PendingReplication Blocks",
0, cluster.getNamesystem().getUnderReplicatedBlocks()); 0, cluster.getNamesystem().getUnderReplicatedBlocks());
@ -406,8 +508,7 @@ public class TestBlockReport {
* *
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test private void blockReport_07(boolean splitBlockReports) throws Exception {
public void blockReport_07() throws Exception {
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat"); Path filePath = new Path("/" + METHOD_NAME + ".dat");
final int DN_N1 = DN_N0 + 1; final int DN_N1 = DN_N0 + 1;
@ -421,7 +522,7 @@ public class TestBlockReport {
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false); StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); sendBlockReports(dnR, poolId, reports, splitBlockReports);
printStats(); printStats();
assertThat("Wrong number of corrupt blocks", assertThat("Wrong number of corrupt blocks",
@ -432,7 +533,7 @@ public class TestBlockReport {
cluster.getNamesystem().getPendingReplicationBlocks(), is(0L)); cluster.getNamesystem().getPendingReplicationBlocks(), is(0L));
reports = getBlockReports(dn, poolId, true, true); reports = getBlockReports(dn, poolId, true, true);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); sendBlockReports(dnR, poolId, reports, splitBlockReports);
printStats(); printStats();
assertThat("Wrong number of corrupt blocks", assertThat("Wrong number of corrupt blocks",
@ -458,8 +559,7 @@ public class TestBlockReport {
* *
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test private void blockReport_08(boolean splitBlockReports) throws IOException {
public void blockReport_08() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat"); Path filePath = new Path("/" + METHOD_NAME + ".dat");
final int DN_N1 = DN_N0 + 1; final int DN_N1 = DN_N0 + 1;
@ -483,8 +583,8 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N1); DataNode dn = cluster.getDataNodes().get(DN_N1);
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] report = getBlockReports(dn, poolId, false, false); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
cluster.getNameNodeRpc().blockReport(dnR, poolId, report); sendBlockReports(dnR, poolId, reports, splitBlockReports);
printStats(); printStats();
assertEquals("Wrong number of PendingReplication blocks", assertEquals("Wrong number of PendingReplication blocks",
blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks()); blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks());
@ -500,8 +600,7 @@ public class TestBlockReport {
// Similar to BlockReport_08 but corrupts GS and len of the TEMPORARY's // Similar to BlockReport_08 but corrupts GS and len of the TEMPORARY's
// replica block. Expect the same behaviour: NN should simply ignore this // replica block. Expect the same behaviour: NN should simply ignore this
// block // block
@Test private void blockReport_09(boolean splitBlockReports) throws IOException {
public void blockReport_09() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat"); Path filePath = new Path("/" + METHOD_NAME + ".dat");
final int DN_N1 = DN_N0 + 1; final int DN_N1 = DN_N0 + 1;
@ -526,8 +625,8 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N1); DataNode dn = cluster.getDataNodes().get(DN_N1);
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] report = getBlockReports(dn, poolId, true, true); StorageBlockReport[] reports = getBlockReports(dn, poolId, true, true);
cluster.getNameNodeRpc().blockReport(dnR, poolId, report); sendBlockReports(dnR, poolId, reports, splitBlockReports);
printStats(); printStats();
assertEquals("Wrong number of PendingReplication blocks", assertEquals("Wrong number of PendingReplication blocks",
2, cluster.getNamesystem().getPendingReplicationBlocks()); 2, cluster.getNamesystem().getPendingReplicationBlocks());