diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java index 26c7ffb02ce..26340a9a77c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java @@ -63,6 +63,34 @@ public abstract class BlockListAsLongs implements Iterable { public Iterator iterator() { return Collections.emptyIterator(); } + @Override + public boolean isStorageReport() { + return false; + } + }; + + // STORAGE_REPORT is used to report all storages in the DN + public static final BlockListAsLongs STORAGE_REPORT = new BlockListAsLongs() { + @Override + public int getNumberOfBlocks() { + return -1; + } + @Override + public ByteString getBlocksBuffer() { + return ByteString.EMPTY; + } + @Override + public long[] getBlockListAsLongs() { + return EMPTY_LONGS; + } + @Override + public Iterator iterator() { + return Collections.emptyIterator(); + } + @Override + public boolean isStorageReport() { + return true; + } }; /** @@ -252,6 +280,13 @@ public abstract class BlockListAsLongs implements Iterable { */ abstract public long[] getBlockListAsLongs(); + /** + * Return true for STORAGE_REPORT BlocksListsAsLongs. + * Otherwise return false. + * @return boolean + */ + abstract public boolean isStorageReport(); + /** * Returns a singleton iterator over blocks in the block report. Do not * add the returned blocks to a collection. @@ -391,6 +426,11 @@ public abstract class BlockListAsLongs implements Iterable { return longs; } + @Override + public boolean isStorageReport() { + return false; + } + @Override public Iterator iterator() { return new Iterator() { @@ -474,6 +514,11 @@ public abstract class BlockListAsLongs implements Iterable { return longs; } + @Override + public boolean isStorageReport() { + return false; + } + @Override public Iterator iterator() { return new Iterator() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 207ac5b67c3..5b7dc763ba6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1889,8 +1889,8 @@ public class BlockManager implements BlockStatsMXBean { */ public boolean processReport(final DatanodeID nodeID, final DatanodeStorage storage, - final BlockListAsLongs newReport, BlockReportContext context, - boolean lastStorageInRpc) throws IOException { + final BlockListAsLongs newReport, + BlockReportContext context) throws IOException { namesystem.writeLock(); final long startTime = Time.monotonicNow(); //after acquiring write lock final long endTime; @@ -1940,30 +1940,14 @@ public class BlockManager implements BlockStatsMXBean { storageInfo.receivedBlockReport(); if (context != null) { - storageInfo.setLastBlockReportId(context.getReportId()); - if (lastStorageInRpc) { - int rpcsSeen = node.updateBlockReportContext(context); - if (rpcsSeen >= context.getTotalRpcs()) { - long leaseId = blockReportLeaseManager.removeLease(node); - BlockManagerFaultInjector.getInstance(). - removeBlockReportLease(node, leaseId); - List zombies = node.removeZombieStorages(); - if (zombies.isEmpty()) { - LOG.debug("processReport 0x{}: no zombie storages found.", - Long.toHexString(context.getReportId())); - } else { - for (DatanodeStorageInfo zombie : zombies) { - removeZombieReplicas(context, zombie); - } - } - node.clearBlockReportContext(); - } else { - LOG.debug("processReport 0x{}: {} more RPCs remaining in this " + - "report.", Long.toHexString(context.getReportId()), - (context.getTotalRpcs() - rpcsSeen) - ); - } + if (context.getTotalRpcs() == context.getCurRpc() + 1) { + long leaseId = this.getBlockReportLeaseManager().removeLease(node); + BlockManagerFaultInjector.getInstance(). + removeBlockReportLease(node, leaseId); } + LOG.debug("Processing RPC with index {} out of total {} RPCs in " + + "processReport 0x{}", context.getCurRpc(), + context.getTotalRpcs(), Long.toHexString(context.getReportId())); } } finally { endTime = Time.monotonicNow(); @@ -1989,6 +1973,26 @@ public class BlockManager implements BlockStatsMXBean { return !node.hasStaleStorages(); } + public void removeZombieStorages(DatanodeRegistration nodeReg, + BlockReportContext context, Set storageIDsInBlockReport) + throws UnregisteredNodeException { + namesystem.writeLock(); + DatanodeDescriptor node = this.getDatanodeManager().getDatanode(nodeReg); + if (node != null) { + List zombies = + node.removeZombieStorages(storageIDsInBlockReport); + if (zombies.isEmpty()) { + LOG.debug("processReport 0x{}: no zombie storages found.", + Long.toHexString(context.getReportId())); + } else { + for (DatanodeStorageInfo zombie : zombies) { + this.removeZombieReplicas(context, zombie); + } + } + } + namesystem.writeUnlock(); + } + private void removeZombieReplicas(BlockReportContext context, DatanodeStorageInfo zombie) { LOG.warn("processReport 0x{}: removing zombie storage {}, which no " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java index 7db05c7aaa5..34e094923fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java @@ -308,10 +308,10 @@ class BlockReportLeaseManager { return false; } if (node.leaseId == 0) { - LOG.warn("BR lease 0x{} is not valid for DN {}, because the DN " + + LOG.warn("BR lease 0x{} is not found for DN {}, because the DN " + "is not in the pending set.", Long.toHexString(id), dn.getDatanodeUuid()); - return false; + return true; } if (pruneIfExpired(monotonicNowMs, node)) { LOG.warn("BR lease 0x{} is not valid for DN {}, because the lease " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index d34b214e21d..341746e3778 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; -import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -41,7 +40,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; -import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -153,9 +151,6 @@ public class DatanodeDescriptor extends DatanodeInfo { public final DecommissioningStatus decommissioningStatus = new DecommissioningStatus(); - private long curBlockReportId = 0; - - private BitSet curBlockReportRpcsSeen = null; private final Map storageMap = new HashMap<>(); @@ -253,20 +248,6 @@ public class DatanodeDescriptor extends DatanodeInfo { updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null); } - public int updateBlockReportContext(BlockReportContext context) { - if (curBlockReportId != context.getReportId()) { - curBlockReportId = context.getReportId(); - curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs()); - } - curBlockReportRpcsSeen.set(context.getCurRpc()); - return curBlockReportRpcsSeen.cardinality(); - } - - public void clearBlockReportContext() { - curBlockReportId = 0; - curBlockReportRpcsSeen = null; - } - public CachedBlocksList getPendingCached() { return pendingCached; } @@ -330,7 +311,8 @@ public class DatanodeDescriptor extends DatanodeInfo { } } - List removeZombieStorages() { + List + removeZombieStorages(Set storageIDsInBlockReport) { List zombies = null; synchronized (storageMap) { Iterator> iter = @@ -338,18 +320,13 @@ public class DatanodeDescriptor extends DatanodeInfo { while (iter.hasNext()) { Map.Entry entry = iter.next(); DatanodeStorageInfo storageInfo = entry.getValue(); - if (storageInfo.getLastBlockReportId() != curBlockReportId) { - LOG.info("{} had lastBlockReportId 0x{} but curBlockReportId = 0x{}", - storageInfo.getStorageID(), - Long.toHexString(storageInfo.getLastBlockReportId()), - Long.toHexString(curBlockReportId)); + if (!storageIDsInBlockReport.contains(storageInfo.getStorageID())) { iter.remove(); if (zombies == null) { zombies = new LinkedList<>(); } zombies.add(storageInfo); } - storageInfo.setLastBlockReportId(0); } } return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index dda04b32c66..211956fe6fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -122,9 +122,6 @@ public class DatanodeStorageInfo { private volatile BlockInfo blockList = null; private int numBlocks = 0; - // The ID of the last full block report which updated this storage. - private long lastBlockReportId = 0; - /** The number of block reports received */ private int blockReportCount = 0; @@ -189,14 +186,6 @@ public class DatanodeStorageInfo { this.blockPoolUsed = blockPoolUsed; } - long getLastBlockReportId() { - return lastBlockReportId; - } - - void setLastBlockReportId(long lastBlockReportId) { - this.lastBlockReportId = lastBlockReportId; - } - State getState() { return this.state; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 24b1378c1b5..024c581a1c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -367,11 +367,36 @@ class BPServiceActor implements Runnable { } else { // Send one block report per message. for (int r = 0; r < reports.length; r++) { - StorageBlockReport singleReport[] = { reports[r] }; - DatanodeCommand cmd = bpNamenode.blockReport( - bpRegistration, bpos.getBlockPoolId(), singleReport, - new BlockReportContext(reports.length, r, reportId, - fullBrLeaseId)); + StorageBlockReport[] singleReport = {reports[r]}; + DatanodeCommand cmd; + if (r != reports.length - 1) { + cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), + singleReport, new BlockReportContext(reports.length, r, + reportId, fullBrLeaseId)); + } else { + StorageBlockReport[] lastSplitReport = + new StorageBlockReport[perVolumeBlockLists.size()]; + // When block reports are split, the last RPC in the block report + // has the information about all storages in the block report. + // See HDFS-10301 for more details. To achieve this, the last RPC + // has 'n' storage reports, where 'n' is the number of storages in + // a DN. The actual block replicas are reported only for the + // last/n-th storage. + i = 0; + for(Map.Entry kvPair : + perVolumeBlockLists.entrySet()) { + lastSplitReport[i++] = new StorageBlockReport( + kvPair.getKey(), BlockListAsLongs.STORAGE_REPORT); + if (i == r) { + lastSplitReport[i] = reports[r]; + break; + } + } + cmd = bpNamenode.blockReport( + bpRegistration, bpos.getBlockPoolId(), lastSplitReport, + new BlockReportContext(reports.length, r, reportId, + fullBrLeaseId)); + } numReportsSent++; numRPCs++; if (cmd != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 2996fcf9e19..f56c5680d0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1415,24 +1415,36 @@ public class NameNodeRpcServer implements NamenodeProtocols { boolean noStaleStorages = false; for (int r = 0; r < reports.length; r++) { final BlockListAsLongs blocks = reports[r].getBlocks(); - // - // BlockManager.processReport accumulates information of prior calls - // for the same node and storage, so the value returned by the last - // call of this loop is the final updated value for noStaleStorage. - // - final int index = r; - noStaleStorages = bm.runBlockOp(new Callable() { - @Override - public Boolean call() throws IOException { - return bm.processReport(nodeReg, reports[index].getStorage(), - blocks, context, (index == reports.length - 1)); - } - }); - metrics.incrStorageBlockReportOps(); + if (!blocks.isStorageReport()) { + // + // BlockManager.processReport accumulates information of prior calls + // for the same node and storage, so the value returned by the last + // call of this loop is the final updated value for noStaleStorage. + // + final int index = r; + noStaleStorages = bm.runBlockOp(new Callable() { + @Override + public Boolean call() + throws IOException { + return bm.processReport(nodeReg, reports[index].getStorage(), + blocks, context); + } + }); + metrics.incrStorageBlockReportOps(); + } } BlockManagerFaultInjector.getInstance(). incomingBlockReportRpc(nodeReg, context); + if (nn.getFSImage().isUpgradeFinalized() && + context.getTotalRpcs() == context.getCurRpc() + 1) { + Set storageIDsInBlockReport = new HashSet<>(); + for (StorageBlockReport report : reports) { + storageIDsInBlockReport.add(report.getStorage().getStorageID()); + } + bm.removeZombieStorages(nodeReg, context, storageIDsInBlockReport); + } + if (nn.getFSImage().isUpgradeFinalized() && !namesystem.isRollingUpgrade() && !nn.isStandbyState() && diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index ada75c884f2..c4b75024481 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -710,12 +710,12 @@ public class TestBlockManager { reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null, false); + BlockListAsLongs.EMPTY, null); assertEquals(1, ds.getBlockReportCount()); // send block report again, should NOT be processed reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null, false); + BlockListAsLongs.EMPTY, null); assertEquals(1, ds.getBlockReportCount()); // re-register as if node restarted, should update existing node @@ -726,7 +726,7 @@ public class TestBlockManager { // send block report, should be processed after restart reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null, false); + BlockListAsLongs.EMPTY, null); // Reinitialize as registration with empty storage list pruned // node.storageMap. ds = node.getStorageInfos()[0]; @@ -755,7 +755,7 @@ public class TestBlockManager { reset(node); doReturn(1).when(node).numBlocks(); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null, false); + BlockListAsLongs.EMPTY, null); assertEquals(1, ds.getBlockReportCount()); } @@ -828,7 +828,7 @@ public class TestBlockManager { // Make sure it's the first full report assertEquals(0, ds.getBlockReportCount()); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - builder.build(), null, false); + builder.build(), null); assertEquals(1, ds.getBlockReportCount()); // verify the storage info is correct diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java index cea686596e5..e54b736538f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java @@ -19,34 +19,40 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import com.google.common.base.Supplier; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Test; -import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; @@ -55,8 +61,6 @@ import java.io.FileReader; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.Writer; -import java.util.Arrays; -import java.util.EnumSet; import java.util.Iterator; import java.util.List; import java.util.UUID; @@ -365,4 +369,67 @@ public class TestNameNodePrunesMissingStorages { cluster.shutdown(); } } + + @Test(timeout=300000) + public void testInterleavedFullBlockReports() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, + 36000000L); + int numStoragesPerDatanode = 6; + final MiniDFSCluster cluster = new MiniDFSCluster + .Builder(conf).numDataNodes(1) + .storagesPerDatanode(numStoragesPerDatanode) + .build(); + try { + LOG.info("waiting for cluster to become active..."); + cluster.waitActive(); + // Get the datanode registration and the block reports + DataNode dn = cluster.getDataNodes().get(0); + final String blockPoolId = cluster.getNamesystem().getBlockPoolId(); + LOG.info("Block pool id: " + blockPoolId); + final DatanodeRegistration dnR = dn.getDNRegistrationForBP(blockPoolId); + Map perVolumeBlockLists = + dn.getFSDataset().getBlockReports(blockPoolId); + final StorageBlockReport[] reports = + new StorageBlockReport[perVolumeBlockLists.size()]; + int reportIndex = 0; + for(Map.Entry kvPair : + perVolumeBlockLists.entrySet()) { + DatanodeStorage dnStorage = kvPair.getKey(); + BlockListAsLongs blockList = kvPair.getValue(); + reports[reportIndex++] = + new StorageBlockReport(dnStorage, blockList); + } + // Get the list of storage ids associated with the datanode + // before the test + BlockManager bm = + cluster.getNameNode().getNamesystem().getBlockManager(); + final DatanodeDescriptor dnDescriptor = bm.getDatanodeManager(). + getDatanode(cluster.getDataNodes().get(0).getDatanodeUuid()); + DatanodeStorageInfo[] storageInfos = dnDescriptor.getStorageInfos(); + // Send the full block report concurrently using + // numThreads=numStoragesPerDatanode + ExecutorService executorService = Executors. + newFixedThreadPool(numStoragesPerDatanode); + List> futureList = + new ArrayList<>(numStoragesPerDatanode); + for (int i = 0; i < numStoragesPerDatanode; i++) { + futureList.add(executorService.submit(new Callable() { + @Override + public DatanodeCommand call() throws IOException { + return cluster.getNameNodeRpc().blockReport(dnR, blockPoolId, + reports, new BlockReportContext(1, 0, System.nanoTime(), 0L)); + } + })); + } + for (Future future: futureList) { + future.get(); + } + executorService.shutdown(); + // Verify that the storages match before and after the test + Assert.assertArrayEquals(storageInfos, dnDescriptor.getStorageInfos()); + } finally { + cluster.shutdown(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java index ad5862a353c..2eef66f4bcb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java @@ -41,6 +41,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.*; import static org.mockito.Mockito.times; @@ -88,6 +89,34 @@ public class TestDnRespectsBlockReportSplitThreshold { blockCount * BLOCK_SIZE, BLOCK_SIZE, REPL_FACTOR, seed); } + private void verifyCapturedArgumentsSplit( + ArgumentCaptor captor, + int expectedReportsPerCall, + int expectedTotalBlockCount) { + List listOfReports = captor.getAllValues(); + int numBlocksReported = 0; + int storageIndex = 0; + int listOfReportsSize = listOfReports.size(); + for (StorageBlockReport[] reports : listOfReports) { + if (storageIndex < (listOfReportsSize - 1)) { + assertThat(reports.length, is(expectedReportsPerCall)); + } else { + assertThat(reports.length, is(listOfReportsSize)); + } + for (StorageBlockReport report : reports) { + BlockListAsLongs blockList = report.getBlocks(); + if (!blockList.isStorageReport()) { + numBlocksReported += blockList.getNumberOfBlocks(); + } else { + assertEquals(blockList.getNumberOfBlocks(), -1); + } + } + storageIndex++; + } + + assert(numBlocksReported >= expectedTotalBlockCount); + } + private void verifyCapturedArguments( ArgumentCaptor captor, int expectedReportsPerCall, @@ -136,7 +165,7 @@ public class TestDnRespectsBlockReportSplitThreshold { anyString(), captor.capture(), Mockito.anyObject()); - verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE); + verifyCapturedArgumentsSplit(captor, 1, BLOCKS_IN_FILE); } /** @@ -200,7 +229,7 @@ public class TestDnRespectsBlockReportSplitThreshold { anyString(), captor.capture(), Mockito.anyObject()); - verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE); + verifyCapturedArgumentsSplit(captor, 1, BLOCKS_IN_FILE); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java index 67bbefe3428..4466393b1bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.IOException; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; @@ -35,13 +36,32 @@ public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase { @Override protected void sendBlockReports(DatanodeRegistration dnR, String poolId, StorageBlockReport[] reports) throws IOException { - int i = 0; - for (StorageBlockReport report : reports) { - LOG.info("Sending block report for storage " + report.getStorage().getStorageID()); - StorageBlockReport[] singletonReport = { report }; - cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport, - new BlockReportContext(reports.length, i, System.nanoTime(), 0L)); - i++; + for (int r = 0; r < reports.length; r++) { + LOG.info("Sending block report for storage " + + reports[r].getStorage().getStorageID()); + StorageBlockReport[] singletonReport = {reports[r]}; + if (r != reports.length - 1) { + cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport, + new BlockReportContext(reports.length, r, System.nanoTime(), + 0L)); + } else { + StorageBlockReport[] lastSplitReport = + new StorageBlockReport[reports.length]; + // When block reports are split, send a dummy storage report for all + // other storages in the blockreport along with the last storage report + for (int i = 0; i <= r; i++) { + if (i == r) { + lastSplitReport[i] = reports[r]; + break; + } + lastSplitReport[i] = + new StorageBlockReport(reports[i].getStorage(), + BlockListAsLongs.STORAGE_REPORT); + } + cluster.getNameNodeRpc().blockReport(dnR, poolId, lastSplitReport, + new BlockReportContext(reports.length, r, System.nanoTime(), + 0L)); + } } } }