diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java index 26340a9a77c..26c7ffb02ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java @@ -63,34 +63,6 @@ public abstract class BlockListAsLongs implements Iterable { public Iterator iterator() { return Collections.emptyIterator(); } - @Override - public boolean isStorageReport() { - return false; - } - }; - - // STORAGE_REPORT is used to report all storages in the DN - public static final BlockListAsLongs STORAGE_REPORT = new BlockListAsLongs() { - @Override - public int getNumberOfBlocks() { - return -1; - } - @Override - public ByteString getBlocksBuffer() { - return ByteString.EMPTY; - } - @Override - public long[] getBlockListAsLongs() { - return EMPTY_LONGS; - } - @Override - public Iterator iterator() { - return Collections.emptyIterator(); - } - @Override - public boolean isStorageReport() { - return true; - } }; /** @@ -280,13 +252,6 @@ public abstract class BlockListAsLongs implements Iterable { */ abstract public long[] getBlockListAsLongs(); - /** - * Return true for STORAGE_REPORT BlocksListsAsLongs. - * Otherwise return false. - * @return boolean - */ - abstract public boolean isStorageReport(); - /** * Returns a singleton iterator over blocks in the block report. Do not * add the returned blocks to a collection. @@ -426,11 +391,6 @@ public abstract class BlockListAsLongs implements Iterable { return longs; } - @Override - public boolean isStorageReport() { - return false; - } - @Override public Iterator iterator() { return new Iterator() { @@ -514,11 +474,6 @@ public abstract class BlockListAsLongs implements Iterable { return longs; } - @Override - public boolean isStorageReport() { - return false; - } - @Override public Iterator iterator() { return new Iterator() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index d927b2a8a66..349b018505f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2138,7 +2138,7 @@ public class BlockManager implements BlockStatsMXBean { public boolean processReport(final DatanodeID nodeID, final DatanodeStorage storage, final BlockListAsLongs newReport, - BlockReportContext context) throws IOException { + BlockReportContext context, boolean lastStorageInRpc) throws IOException { namesystem.writeLock(); final long startTime = Time.monotonicNow(); //after acquiring write lock final long endTime; @@ -2189,14 +2189,30 @@ public class BlockManager implements BlockStatsMXBean { storageInfo.receivedBlockReport(); if (context != null) { - if (context.getTotalRpcs() == context.getCurRpc() + 1) { - long leaseId = this.getBlockReportLeaseManager().removeLease(node); - BlockManagerFaultInjector.getInstance(). - removeBlockReportLease(node, leaseId); + storageInfo.setLastBlockReportId(context.getReportId()); + if (lastStorageInRpc) { + int rpcsSeen = node.updateBlockReportContext(context); + if (rpcsSeen >= context.getTotalRpcs()) { + long leaseId = blockReportLeaseManager.removeLease(node); + BlockManagerFaultInjector.getInstance(). + removeBlockReportLease(node, leaseId); + List zombies = node.removeZombieStorages(); + if (zombies.isEmpty()) { + LOG.debug("processReport 0x{}: no zombie storages found.", + Long.toHexString(context.getReportId())); + } else { + for (DatanodeStorageInfo zombie : zombies) { + removeZombieReplicas(context, zombie); + } + } + node.clearBlockReportContext(); + } else { + LOG.debug("processReport 0x{}: {} more RPCs remaining in this " + + "report.", Long.toHexString(context.getReportId()), + (context.getTotalRpcs() - rpcsSeen) + ); + } } - LOG.debug("Processing RPC with index {} out of total {} RPCs in " - + "processReport 0x{}", context.getCurRpc(), - context.getTotalRpcs(), Long.toHexString(context.getReportId())); } } finally { endTime = Time.monotonicNow(); @@ -2222,26 +2238,6 @@ public class BlockManager implements BlockStatsMXBean { return !node.hasStaleStorages(); } - public void removeZombieStorages(DatanodeRegistration nodeReg, - BlockReportContext context, Set storageIDsInBlockReport) - throws UnregisteredNodeException { - namesystem.writeLock(); - DatanodeDescriptor node = this.getDatanodeManager().getDatanode(nodeReg); - if (node != null) { - List zombies = - node.removeZombieStorages(storageIDsInBlockReport); - if (zombies.isEmpty()) { - LOG.debug("processReport 0x{}: no zombie storages found.", - Long.toHexString(context.getReportId())); - } else { - for (DatanodeStorageInfo zombie : zombies) { - this.removeZombieReplicas(context, zombie); - } - } - } - namesystem.writeUnlock(); - } - private void removeZombieReplicas(BlockReportContext context, DatanodeStorageInfo zombie) { LOG.warn("processReport 0x{}: removing zombie storage {}, which no " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java index 34e094923fc..7db05c7aaa5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java @@ -308,10 +308,10 @@ class BlockReportLeaseManager { return false; } if (node.leaseId == 0) { - LOG.warn("BR lease 0x{} is not found for DN {}, because the DN " + + LOG.warn("BR lease 0x{} is not valid for DN {}, because the DN " + "is not in the pending set.", Long.toHexString(id), dn.getDatanodeUuid()); - return true; + return false; } if (pruneIfExpired(monotonicNowMs, node)) { LOG.warn("BR lease 0x{} is not valid for DN {}, because the lease " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index d807ab61364..1646129680c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -42,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -152,6 +154,9 @@ public class DatanodeDescriptor extends DatanodeInfo { public final DecommissioningStatus decommissioningStatus = new DecommissioningStatus(); + private long curBlockReportId = 0; + + private BitSet curBlockReportRpcsSeen = null; private final Map storageMap = new HashMap<>(); @@ -252,6 +257,20 @@ public class DatanodeDescriptor extends DatanodeInfo { updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null); } + public int updateBlockReportContext(BlockReportContext context) { + if (curBlockReportId != context.getReportId()) { + curBlockReportId = context.getReportId(); + curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs()); + } + curBlockReportRpcsSeen.set(context.getCurRpc()); + return curBlockReportRpcsSeen.cardinality(); + } + + public void clearBlockReportContext() { + curBlockReportId = 0; + curBlockReportRpcsSeen = null; + } + public CachedBlocksList getPendingCached() { return pendingCached; } @@ -315,8 +334,7 @@ public class DatanodeDescriptor extends DatanodeInfo { } } - List - removeZombieStorages(Set storageIDsInBlockReport) { + List removeZombieStorages() { List zombies = null; synchronized (storageMap) { Iterator> iter = @@ -324,13 +342,18 @@ public class DatanodeDescriptor extends DatanodeInfo { while (iter.hasNext()) { Map.Entry entry = iter.next(); DatanodeStorageInfo storageInfo = entry.getValue(); - if (!storageIDsInBlockReport.contains(storageInfo.getStorageID())) { + if (storageInfo.getLastBlockReportId() != curBlockReportId) { + LOG.info("{} had lastBlockReportId 0x{} but curBlockReportId = 0x{}", + storageInfo.getStorageID(), + Long.toHexString(storageInfo.getLastBlockReportId()), + Long.toHexString(curBlockReportId)); iter.remove(); if (zombies == null) { zombies = new LinkedList<>(); } zombies.add(storageInfo); } + storageInfo.setLastBlockReportId(0); } } return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index 1b7cd7c6fcf..843a8d514ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -98,6 +98,9 @@ public class DatanodeStorageInfo { private final FoldedTreeSet blocks = new FoldedTreeSet<>(); + // The ID of the last full block report which updated this storage. + private long lastBlockReportId = 0; + /** The number of block reports received */ private int blockReportCount = 0; @@ -162,6 +165,14 @@ public class DatanodeStorageInfo { this.blockPoolUsed = blockPoolUsed; } + long getLastBlockReportId() { + return lastBlockReportId; + } + + void setLastBlockReportId(long lastBlockReportId) { + this.lastBlockReportId = lastBlockReportId; + } + State getState() { return this.state; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index f18cf0bf65c..69989fbbc66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -367,36 +367,11 @@ class BPServiceActor implements Runnable { } else { // Send one block report per message. for (int r = 0; r < reports.length; r++) { - StorageBlockReport[] singleReport = {reports[r]}; - DatanodeCommand cmd; - if (r != reports.length - 1) { - cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), - singleReport, new BlockReportContext(reports.length, r, - reportId, fullBrLeaseId, true)); - } else { - StorageBlockReport[] lastSplitReport = - new StorageBlockReport[perVolumeBlockLists.size()]; - // When block reports are split, the last RPC in the block report - // has the information about all storages in the block report. - // See HDFS-10301 for more details. To achieve this, the last RPC - // has 'n' storage reports, where 'n' is the number of storages in - // a DN. The actual block replicas are reported only for the - // last/n-th storage. - i = 0; - for(Map.Entry kvPair : - perVolumeBlockLists.entrySet()) { - lastSplitReport[i++] = new StorageBlockReport( - kvPair.getKey(), BlockListAsLongs.STORAGE_REPORT); - if (i == r) { - lastSplitReport[i] = reports[r]; - break; - } - } - cmd = bpNamenode.blockReport( - bpRegistration, bpos.getBlockPoolId(), lastSplitReport, - new BlockReportContext(reports.length, r, reportId, - fullBrLeaseId, true)); - } + StorageBlockReport singleReport[] = { reports[r] }; + DatanodeCommand cmd = bpNamenode.blockReport( + bpRegistration, bpos.getBlockPoolId(), singleReport, + new BlockReportContext(reports.length, r, reportId, + fullBrLeaseId, true)); numReportsSent++; numRPCs++; if (cmd != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 3f36fcceef5..6b529498686 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1435,36 +1435,24 @@ public class NameNodeRpcServer implements NamenodeProtocols { boolean noStaleStorages = false; for (int r = 0; r < reports.length; r++) { final BlockListAsLongs blocks = reports[r].getBlocks(); - if (!blocks.isStorageReport()) { - // - // BlockManager.processReport accumulates information of prior calls - // for the same node and storage, so the value returned by the last - // call of this loop is the final updated value for noStaleStorage. - // - final int index = r; - noStaleStorages = bm.runBlockOp(new Callable() { - @Override - public Boolean call() - throws IOException { - return bm.processReport(nodeReg, reports[index].getStorage(), - blocks, context); - } - }); - metrics.incrStorageBlockReportOps(); - } + // + // BlockManager.processReport accumulates information of prior calls + // for the same node and storage, so the value returned by the last + // call of this loop is the final updated value for noStaleStorage. + // + final int index = r; + noStaleStorages = bm.runBlockOp(new Callable() { + @Override + public Boolean call() throws IOException { + return bm.processReport(nodeReg, reports[index].getStorage(), + blocks, context, (index == reports.length - 1)); + } + }); + metrics.incrStorageBlockReportOps(); } BlockManagerFaultInjector.getInstance(). incomingBlockReportRpc(nodeReg, context); - if (nn.getFSImage().isUpgradeFinalized() && - context.getTotalRpcs() == context.getCurRpc() + 1) { - Set storageIDsInBlockReport = new HashSet<>(); - for (StorageBlockReport report : reports) { - storageIDsInBlockReport.add(report.getStorage().getStorageID()); - } - bm.removeZombieStorages(nodeReg, context, storageIDsInBlockReport); - } - if (nn.getFSImage().isUpgradeFinalized() && !namesystem.isRollingUpgrade() && !nn.isStandbyState() && diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 8c231d1e2e7..394fae96558 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -713,12 +713,12 @@ public class TestBlockManager { reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null); + BlockListAsLongs.EMPTY, null, false); assertEquals(1, ds.getBlockReportCount()); // send block report again, should NOT be processed reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null); + BlockListAsLongs.EMPTY, null, false); assertEquals(1, ds.getBlockReportCount()); // re-register as if node restarted, should update existing node @@ -729,7 +729,7 @@ public class TestBlockManager { // send block report, should be processed after restart reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null); + BlockListAsLongs.EMPTY, null, false); // Reinitialize as registration with empty storage list pruned // node.storageMap. ds = node.getStorageInfos()[0]; @@ -758,7 +758,7 @@ public class TestBlockManager { reset(node); doReturn(1).when(node).numBlocks(); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null); + BlockListAsLongs.EMPTY, null, false); assertEquals(1, ds.getBlockReportCount()); } @@ -832,7 +832,7 @@ public class TestBlockManager { assertEquals(0, ds.getBlockReportCount()); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), builder.build(), - new BlockReportContext(1, 0, System.nanoTime(), 0, true)); + new BlockReportContext(1, 0, System.nanoTime(), 0, true), false); assertEquals(1, ds.getBlockReportCount()); // verify the storage info is correct @@ -871,7 +871,8 @@ public class TestBlockManager { assertEquals(0, ds.getBlockReportCount()); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), generateReport(blocks), - new BlockReportContext(1, 0, System.nanoTime(), 0, false)); + new BlockReportContext(1, 0, System.nanoTime(), 0, false), + false); assertEquals(1, ds.getBlockReportCount()); // verify the storage info is correct for (BlockInfo block : blocks) { @@ -881,7 +882,8 @@ public class TestBlockManager { // Send unsorted report bm.processReport(node, new DatanodeStorage(ds.getStorageID()), generateReport(blocks), - new BlockReportContext(1, 0, System.nanoTime(), 0, false)); + new BlockReportContext(1, 0, System.nanoTime(), 0, false), + false); assertEquals(2, ds.getBlockReportCount()); // verify the storage info is correct for (BlockInfo block : blocks) { @@ -892,7 +894,8 @@ public class TestBlockManager { Collections.sort(blocks); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), generateReport(blocks), - new BlockReportContext(1, 0, System.nanoTime(), 0, true)); + new BlockReportContext(1, 0, System.nanoTime(), 0, true), + false); assertEquals(3, ds.getBlockReportCount()); // verify the storage info is correct for (BlockInfo block : blocks) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java index be38afeadce..b11b48aed79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java @@ -19,40 +19,34 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import com.google.common.base.Supplier; -import java.util.ArrayList; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; -import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; -import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; -import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Test; +import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; @@ -61,6 +55,8 @@ import java.io.FileReader; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.Writer; +import java.util.Arrays; +import java.util.EnumSet; import java.util.Iterator; import java.util.List; import java.util.UUID; @@ -372,68 +368,4 @@ public class TestNameNodePrunesMissingStorages { cluster.shutdown(); } } - - @Test(timeout=300000) - public void testInterleavedFullBlockReports() throws Exception { - Configuration conf = new HdfsConfiguration(); - conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, - 36000000L); - int numStoragesPerDatanode = 6; - final MiniDFSCluster cluster = new MiniDFSCluster - .Builder(conf).numDataNodes(1) - .storagesPerDatanode(numStoragesPerDatanode) - .build(); - try { - LOG.info("waiting for cluster to become active..."); - cluster.waitActive(); - // Get the datanode registration and the block reports - DataNode dn = cluster.getDataNodes().get(0); - final String blockPoolId = cluster.getNamesystem().getBlockPoolId(); - LOG.info("Block pool id: " + blockPoolId); - final DatanodeRegistration dnR = dn.getDNRegistrationForBP(blockPoolId); - Map perVolumeBlockLists = - dn.getFSDataset().getBlockReports(blockPoolId); - final StorageBlockReport[] reports = - new StorageBlockReport[perVolumeBlockLists.size()]; - int reportIndex = 0; - for(Map.Entry kvPair : - perVolumeBlockLists.entrySet()) { - DatanodeStorage dnStorage = kvPair.getKey(); - BlockListAsLongs blockList = kvPair.getValue(); - reports[reportIndex++] = - new StorageBlockReport(dnStorage, blockList); - } - // Get the list of storage ids associated with the datanode - // before the test - BlockManager bm = - cluster.getNameNode().getNamesystem().getBlockManager(); - final DatanodeDescriptor dnDescriptor = bm.getDatanodeManager(). - getDatanode(cluster.getDataNodes().get(0).getDatanodeUuid()); - DatanodeStorageInfo[] storageInfos = dnDescriptor.getStorageInfos(); - // Send the full block report concurrently using - // numThreads=numStoragesPerDatanode - ExecutorService executorService = Executors. - newFixedThreadPool(numStoragesPerDatanode); - List> futureList = - new ArrayList<>(numStoragesPerDatanode); - for (int i = 0; i < numStoragesPerDatanode; i++) { - futureList.add(executorService.submit(new Callable() { - @Override - public DatanodeCommand call() throws IOException { - return cluster.getNameNodeRpc().blockReport(dnR, blockPoolId, - reports, new BlockReportContext(1, 0, System.nanoTime(), - 0L, true)); - } - })); - } - for (Future future: futureList) { - future.get(); - } - executorService.shutdown(); - // Verify that the storages match before and after the test - Assert.assertArrayEquals(storageInfos, dnDescriptor.getStorageInfos()); - } finally { - cluster.shutdown(); - } - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java index f41c5466741..bf0e3c11bdd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java @@ -41,7 +41,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.*; import static org.mockito.Mockito.times; @@ -89,34 +88,6 @@ public class TestDnRespectsBlockReportSplitThreshold { blockCount * BLOCK_SIZE, BLOCK_SIZE, REPL_FACTOR, seed); } - private void verifyCapturedArgumentsSplit( - ArgumentCaptor captor, - int expectedReportsPerCall, - int expectedTotalBlockCount) { - List listOfReports = captor.getAllValues(); - int numBlocksReported = 0; - int storageIndex = 0; - int listOfReportsSize = listOfReports.size(); - for (StorageBlockReport[] reports : listOfReports) { - if (storageIndex < (listOfReportsSize - 1)) { - assertThat(reports.length, is(expectedReportsPerCall)); - } else { - assertThat(reports.length, is(listOfReportsSize)); - } - for (StorageBlockReport report : reports) { - BlockListAsLongs blockList = report.getBlocks(); - if (!blockList.isStorageReport()) { - numBlocksReported += blockList.getNumberOfBlocks(); - } else { - assertEquals(blockList.getNumberOfBlocks(), -1); - } - } - storageIndex++; - } - - assert(numBlocksReported >= expectedTotalBlockCount); - } - private void verifyCapturedArguments( ArgumentCaptor captor, int expectedReportsPerCall, @@ -165,7 +136,7 @@ public class TestDnRespectsBlockReportSplitThreshold { anyString(), captor.capture(), Mockito.anyObject()); - verifyCapturedArgumentsSplit(captor, 1, BLOCKS_IN_FILE); + verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE); } /** @@ -229,7 +200,7 @@ public class TestDnRespectsBlockReportSplitThreshold { anyString(), captor.capture(), Mockito.anyObject()); - verifyCapturedArgumentsSplit(captor, 1, BLOCKS_IN_FILE); + verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java index 524243bbd8b..791ee20190f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.IOException; -import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; @@ -35,32 +34,13 @@ public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase { @Override protected void sendBlockReports(DatanodeRegistration dnR, String poolId, StorageBlockReport[] reports) throws IOException { - for (int r = 0; r < reports.length; r++) { - LOG.info("Sending block report for storage " + - reports[r].getStorage().getStorageID()); - StorageBlockReport[] singletonReport = {reports[r]}; - if (r != reports.length - 1) { - cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport, - new BlockReportContext(reports.length, r, System.nanoTime(), - 0L, true)); - } else { - StorageBlockReport[] lastSplitReport = - new StorageBlockReport[reports.length]; - // When block reports are split, send a dummy storage report for all - // other storages in the blockreport along with the last storage report - for (int i = 0; i <= r; i++) { - if (i == r) { - lastSplitReport[i] = reports[r]; - break; - } - lastSplitReport[i] = - new StorageBlockReport(reports[i].getStorage(), - BlockListAsLongs.STORAGE_REPORT); - } - cluster.getNameNodeRpc().blockReport(dnR, poolId, lastSplitReport, - new BlockReportContext(reports.length, r, System.nanoTime(), - 0L, true)); - } + int i = 0; + for (StorageBlockReport report : reports) { + LOG.info("Sending block report for storage " + report.getStorage().getStorageID()); + StorageBlockReport[] singletonReport = { report }; + cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport, + new BlockReportContext(reports.length, i, System.nanoTime(), 0L, true)); + i++; } } }