HDFS-10301. Interleaving processing of storages from repeated block reports causes false zombie storage detection, removes valid blocks. Contributed by Vinitha Gankidi.

This commit is contained in:
Vinitha Reddy Gankidi 2016-07-25 18:33:02 -07:00 committed by Konstantin V Shvachko
parent 62800dc8b6
commit 9ab153d7ae
11 changed files with 271 additions and 103 deletions

View File

@ -63,6 +63,34 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica> {
public Iterator<BlockReportReplica> iterator() { public Iterator<BlockReportReplica> iterator() {
return Collections.emptyIterator(); return Collections.emptyIterator();
} }
@Override
public boolean isStorageReport() {
return false;
}
};
// STORAGE_REPORT is used to report all storages in the DN
public static final BlockListAsLongs STORAGE_REPORT = new BlockListAsLongs() {
@Override
public int getNumberOfBlocks() {
return -1;
}
@Override
public ByteString getBlocksBuffer() {
return ByteString.EMPTY;
}
@Override
public long[] getBlockListAsLongs() {
return EMPTY_LONGS;
}
@Override
public Iterator<BlockReportReplica> iterator() {
return Collections.emptyIterator();
}
@Override
public boolean isStorageReport() {
return true;
}
}; };
/** /**
@ -252,6 +280,13 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica> {
*/ */
abstract public long[] getBlockListAsLongs(); abstract public long[] getBlockListAsLongs();
/**
* Return true for STORAGE_REPORT BlocksListsAsLongs.
* Otherwise return false.
* @return boolean
*/
abstract public boolean isStorageReport();
/** /**
* Returns a singleton iterator over blocks in the block report. Do not * Returns a singleton iterator over blocks in the block report. Do not
* add the returned blocks to a collection. * add the returned blocks to a collection.
@ -391,6 +426,11 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica> {
return longs; return longs;
} }
@Override
public boolean isStorageReport() {
return false;
}
@Override @Override
public Iterator<BlockReportReplica> iterator() { public Iterator<BlockReportReplica> iterator() {
return new Iterator<BlockReportReplica>() { return new Iterator<BlockReportReplica>() {
@ -474,6 +514,11 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica> {
return longs; return longs;
} }
@Override
public boolean isStorageReport() {
return false;
}
@Override @Override
public Iterator<BlockReportReplica> iterator() { public Iterator<BlockReportReplica> iterator() {
return new Iterator<BlockReportReplica>() { return new Iterator<BlockReportReplica>() {

View File

@ -1889,8 +1889,8 @@ public class BlockManager implements BlockStatsMXBean {
*/ */
public boolean processReport(final DatanodeID nodeID, public boolean processReport(final DatanodeID nodeID,
final DatanodeStorage storage, final DatanodeStorage storage,
final BlockListAsLongs newReport, BlockReportContext context, final BlockListAsLongs newReport,
boolean lastStorageInRpc) throws IOException { BlockReportContext context) throws IOException {
namesystem.writeLock(); namesystem.writeLock();
final long startTime = Time.monotonicNow(); //after acquiring write lock final long startTime = Time.monotonicNow(); //after acquiring write lock
final long endTime; final long endTime;
@ -1940,30 +1940,14 @@ public class BlockManager implements BlockStatsMXBean {
storageInfo.receivedBlockReport(); storageInfo.receivedBlockReport();
if (context != null) { if (context != null) {
storageInfo.setLastBlockReportId(context.getReportId()); if (context.getTotalRpcs() == context.getCurRpc() + 1) {
if (lastStorageInRpc) { long leaseId = this.getBlockReportLeaseManager().removeLease(node);
int rpcsSeen = node.updateBlockReportContext(context); BlockManagerFaultInjector.getInstance().
if (rpcsSeen >= context.getTotalRpcs()) { removeBlockReportLease(node, leaseId);
long leaseId = blockReportLeaseManager.removeLease(node);
BlockManagerFaultInjector.getInstance().
removeBlockReportLease(node, leaseId);
List<DatanodeStorageInfo> zombies = node.removeZombieStorages();
if (zombies.isEmpty()) {
LOG.debug("processReport 0x{}: no zombie storages found.",
Long.toHexString(context.getReportId()));
} else {
for (DatanodeStorageInfo zombie : zombies) {
removeZombieReplicas(context, zombie);
}
}
node.clearBlockReportContext();
} else {
LOG.debug("processReport 0x{}: {} more RPCs remaining in this " +
"report.", Long.toHexString(context.getReportId()),
(context.getTotalRpcs() - rpcsSeen)
);
}
} }
LOG.debug("Processing RPC with index {} out of total {} RPCs in "
+ "processReport 0x{}", context.getCurRpc(),
context.getTotalRpcs(), Long.toHexString(context.getReportId()));
} }
} finally { } finally {
endTime = Time.monotonicNow(); endTime = Time.monotonicNow();
@ -1989,6 +1973,26 @@ public class BlockManager implements BlockStatsMXBean {
return !node.hasStaleStorages(); return !node.hasStaleStorages();
} }
public void removeZombieStorages(DatanodeRegistration nodeReg,
BlockReportContext context, Set<String> storageIDsInBlockReport)
throws UnregisteredNodeException {
namesystem.writeLock();
DatanodeDescriptor node = this.getDatanodeManager().getDatanode(nodeReg);
if (node != null) {
List<DatanodeStorageInfo> zombies =
node.removeZombieStorages(storageIDsInBlockReport);
if (zombies.isEmpty()) {
LOG.debug("processReport 0x{}: no zombie storages found.",
Long.toHexString(context.getReportId()));
} else {
for (DatanodeStorageInfo zombie : zombies) {
this.removeZombieReplicas(context, zombie);
}
}
}
namesystem.writeUnlock();
}
private void removeZombieReplicas(BlockReportContext context, private void removeZombieReplicas(BlockReportContext context,
DatanodeStorageInfo zombie) { DatanodeStorageInfo zombie) {
LOG.warn("processReport 0x{}: removing zombie storage {}, which no " + LOG.warn("processReport 0x{}: removing zombie storage {}, which no " +

View File

@ -308,10 +308,10 @@ class BlockReportLeaseManager {
return false; return false;
} }
if (node.leaseId == 0) { if (node.leaseId == 0) {
LOG.warn("BR lease 0x{} is not valid for DN {}, because the DN " + LOG.warn("BR lease 0x{} is not found for DN {}, because the DN " +
"is not in the pending set.", "is not in the pending set.",
Long.toHexString(id), dn.getDatanodeUuid()); Long.toHexString(id), dn.getDatanodeUuid());
return false; return true;
} }
if (pruneIfExpired(monotonicNowMs, node)) { if (pruneIfExpired(monotonicNowMs, node)) {
LOG.warn("BR lease 0x{} is not valid for DN {}, because the lease " + LOG.warn("BR lease 0x{} is not valid for DN {}, because the lease " +

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -41,7 +40,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@ -153,9 +151,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
public final DecommissioningStatus decommissioningStatus = public final DecommissioningStatus decommissioningStatus =
new DecommissioningStatus(); new DecommissioningStatus();
private long curBlockReportId = 0;
private BitSet curBlockReportRpcsSeen = null;
private final Map<String, DatanodeStorageInfo> storageMap = private final Map<String, DatanodeStorageInfo> storageMap =
new HashMap<>(); new HashMap<>();
@ -253,20 +248,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null); updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
} }
public int updateBlockReportContext(BlockReportContext context) {
if (curBlockReportId != context.getReportId()) {
curBlockReportId = context.getReportId();
curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs());
}
curBlockReportRpcsSeen.set(context.getCurRpc());
return curBlockReportRpcsSeen.cardinality();
}
public void clearBlockReportContext() {
curBlockReportId = 0;
curBlockReportRpcsSeen = null;
}
public CachedBlocksList getPendingCached() { public CachedBlocksList getPendingCached() {
return pendingCached; return pendingCached;
} }
@ -330,7 +311,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
} }
} }
List<DatanodeStorageInfo> removeZombieStorages() { List<DatanodeStorageInfo>
removeZombieStorages(Set<String> storageIDsInBlockReport) {
List<DatanodeStorageInfo> zombies = null; List<DatanodeStorageInfo> zombies = null;
synchronized (storageMap) { synchronized (storageMap) {
Iterator<Map.Entry<String, DatanodeStorageInfo>> iter = Iterator<Map.Entry<String, DatanodeStorageInfo>> iter =
@ -338,18 +320,13 @@ public class DatanodeDescriptor extends DatanodeInfo {
while (iter.hasNext()) { while (iter.hasNext()) {
Map.Entry<String, DatanodeStorageInfo> entry = iter.next(); Map.Entry<String, DatanodeStorageInfo> entry = iter.next();
DatanodeStorageInfo storageInfo = entry.getValue(); DatanodeStorageInfo storageInfo = entry.getValue();
if (storageInfo.getLastBlockReportId() != curBlockReportId) { if (!storageIDsInBlockReport.contains(storageInfo.getStorageID())) {
LOG.info("{} had lastBlockReportId 0x{} but curBlockReportId = 0x{}",
storageInfo.getStorageID(),
Long.toHexString(storageInfo.getLastBlockReportId()),
Long.toHexString(curBlockReportId));
iter.remove(); iter.remove();
if (zombies == null) { if (zombies == null) {
zombies = new LinkedList<>(); zombies = new LinkedList<>();
} }
zombies.add(storageInfo); zombies.add(storageInfo);
} }
storageInfo.setLastBlockReportId(0);
} }
} }
return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies; return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies;

View File

@ -122,9 +122,6 @@ public class DatanodeStorageInfo {
private volatile BlockInfo blockList = null; private volatile BlockInfo blockList = null;
private int numBlocks = 0; private int numBlocks = 0;
// The ID of the last full block report which updated this storage.
private long lastBlockReportId = 0;
/** The number of block reports received */ /** The number of block reports received */
private int blockReportCount = 0; private int blockReportCount = 0;
@ -189,14 +186,6 @@ public class DatanodeStorageInfo {
this.blockPoolUsed = blockPoolUsed; this.blockPoolUsed = blockPoolUsed;
} }
long getLastBlockReportId() {
return lastBlockReportId;
}
void setLastBlockReportId(long lastBlockReportId) {
this.lastBlockReportId = lastBlockReportId;
}
State getState() { State getState() {
return this.state; return this.state;
} }

View File

@ -367,11 +367,36 @@ class BPServiceActor implements Runnable {
} else { } else {
// Send one block report per message. // Send one block report per message.
for (int r = 0; r < reports.length; r++) { for (int r = 0; r < reports.length; r++) {
StorageBlockReport singleReport[] = { reports[r] }; StorageBlockReport[] singleReport = {reports[r]};
DatanodeCommand cmd = bpNamenode.blockReport( DatanodeCommand cmd;
bpRegistration, bpos.getBlockPoolId(), singleReport, if (r != reports.length - 1) {
new BlockReportContext(reports.length, r, reportId, cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(),
fullBrLeaseId)); singleReport, new BlockReportContext(reports.length, r,
reportId, fullBrLeaseId));
} else {
StorageBlockReport[] lastSplitReport =
new StorageBlockReport[perVolumeBlockLists.size()];
// When block reports are split, the last RPC in the block report
// has the information about all storages in the block report.
// See HDFS-10301 for more details. To achieve this, the last RPC
// has 'n' storage reports, where 'n' is the number of storages in
// a DN. The actual block replicas are reported only for the
// last/n-th storage.
i = 0;
for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair :
perVolumeBlockLists.entrySet()) {
lastSplitReport[i++] = new StorageBlockReport(
kvPair.getKey(), BlockListAsLongs.STORAGE_REPORT);
if (i == r) {
lastSplitReport[i] = reports[r];
break;
}
}
cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), lastSplitReport,
new BlockReportContext(reports.length, r, reportId,
fullBrLeaseId));
}
numReportsSent++; numReportsSent++;
numRPCs++; numRPCs++;
if (cmd != null) { if (cmd != null) {

View File

@ -1415,24 +1415,36 @@ public class NameNodeRpcServer implements NamenodeProtocols {
boolean noStaleStorages = false; boolean noStaleStorages = false;
for (int r = 0; r < reports.length; r++) { for (int r = 0; r < reports.length; r++) {
final BlockListAsLongs blocks = reports[r].getBlocks(); final BlockListAsLongs blocks = reports[r].getBlocks();
// if (!blocks.isStorageReport()) {
// BlockManager.processReport accumulates information of prior calls //
// for the same node and storage, so the value returned by the last // BlockManager.processReport accumulates information of prior calls
// call of this loop is the final updated value for noStaleStorage. // for the same node and storage, so the value returned by the last
// // call of this loop is the final updated value for noStaleStorage.
final int index = r; //
noStaleStorages = bm.runBlockOp(new Callable<Boolean>() { final int index = r;
@Override noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
public Boolean call() throws IOException { @Override
return bm.processReport(nodeReg, reports[index].getStorage(), public Boolean call()
blocks, context, (index == reports.length - 1)); throws IOException {
} return bm.processReport(nodeReg, reports[index].getStorage(),
}); blocks, context);
metrics.incrStorageBlockReportOps(); }
});
metrics.incrStorageBlockReportOps();
}
} }
BlockManagerFaultInjector.getInstance(). BlockManagerFaultInjector.getInstance().
incomingBlockReportRpc(nodeReg, context); incomingBlockReportRpc(nodeReg, context);
if (nn.getFSImage().isUpgradeFinalized() &&
context.getTotalRpcs() == context.getCurRpc() + 1) {
Set<String> storageIDsInBlockReport = new HashSet<>();
for (StorageBlockReport report : reports) {
storageIDsInBlockReport.add(report.getStorage().getStorageID());
}
bm.removeZombieStorages(nodeReg, context, storageIDsInBlockReport);
}
if (nn.getFSImage().isUpgradeFinalized() && if (nn.getFSImage().isUpgradeFinalized() &&
!namesystem.isRollingUpgrade() && !namesystem.isRollingUpgrade() &&
!nn.isStandbyState() && !nn.isStandbyState() &&

View File

@ -710,12 +710,12 @@ public class TestBlockManager {
reset(node); reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY, null, false); BlockListAsLongs.EMPTY, null);
assertEquals(1, ds.getBlockReportCount()); assertEquals(1, ds.getBlockReportCount());
// send block report again, should NOT be processed // send block report again, should NOT be processed
reset(node); reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY, null, false); BlockListAsLongs.EMPTY, null);
assertEquals(1, ds.getBlockReportCount()); assertEquals(1, ds.getBlockReportCount());
// re-register as if node restarted, should update existing node // re-register as if node restarted, should update existing node
@ -726,7 +726,7 @@ public class TestBlockManager {
// send block report, should be processed after restart // send block report, should be processed after restart
reset(node); reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY, null, false); BlockListAsLongs.EMPTY, null);
// Reinitialize as registration with empty storage list pruned // Reinitialize as registration with empty storage list pruned
// node.storageMap. // node.storageMap.
ds = node.getStorageInfos()[0]; ds = node.getStorageInfos()[0];
@ -755,7 +755,7 @@ public class TestBlockManager {
reset(node); reset(node);
doReturn(1).when(node).numBlocks(); doReturn(1).when(node).numBlocks();
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY, null, false); BlockListAsLongs.EMPTY, null);
assertEquals(1, ds.getBlockReportCount()); assertEquals(1, ds.getBlockReportCount());
} }
@ -828,7 +828,7 @@ public class TestBlockManager {
// Make sure it's the first full report // Make sure it's the first full report
assertEquals(0, ds.getBlockReportCount()); assertEquals(0, ds.getBlockReportCount());
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
builder.build(), null, false); builder.build(), null);
assertEquals(1, ds.getBlockReportCount()); assertEquals(1, ds.getBlockReportCount());
// verify the storage info is correct // verify the storage info is correct

View File

@ -19,34 +19,40 @@
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.BufferedOutputStream;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.BufferedWriter; import java.io.BufferedWriter;
import java.io.File; import java.io.File;
@ -55,8 +61,6 @@ import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.io.Writer; import java.io.Writer;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
@ -365,4 +369,67 @@ public class TestNameNodePrunesMissingStorages {
cluster.shutdown(); cluster.shutdown();
} }
} }
@Test(timeout=300000)
public void testInterleavedFullBlockReports() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
36000000L);
int numStoragesPerDatanode = 6;
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(conf).numDataNodes(1)
.storagesPerDatanode(numStoragesPerDatanode)
.build();
try {
LOG.info("waiting for cluster to become active...");
cluster.waitActive();
// Get the datanode registration and the block reports
DataNode dn = cluster.getDataNodes().get(0);
final String blockPoolId = cluster.getNamesystem().getBlockPoolId();
LOG.info("Block pool id: " + blockPoolId);
final DatanodeRegistration dnR = dn.getDNRegistrationForBP(blockPoolId);
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
dn.getFSDataset().getBlockReports(blockPoolId);
final StorageBlockReport[] reports =
new StorageBlockReport[perVolumeBlockLists.size()];
int reportIndex = 0;
for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair :
perVolumeBlockLists.entrySet()) {
DatanodeStorage dnStorage = kvPair.getKey();
BlockListAsLongs blockList = kvPair.getValue();
reports[reportIndex++] =
new StorageBlockReport(dnStorage, blockList);
}
// Get the list of storage ids associated with the datanode
// before the test
BlockManager bm =
cluster.getNameNode().getNamesystem().getBlockManager();
final DatanodeDescriptor dnDescriptor = bm.getDatanodeManager().
getDatanode(cluster.getDataNodes().get(0).getDatanodeUuid());
DatanodeStorageInfo[] storageInfos = dnDescriptor.getStorageInfos();
// Send the full block report concurrently using
// numThreads=numStoragesPerDatanode
ExecutorService executorService = Executors.
newFixedThreadPool(numStoragesPerDatanode);
List<Future<DatanodeCommand>> futureList =
new ArrayList<>(numStoragesPerDatanode);
for (int i = 0; i < numStoragesPerDatanode; i++) {
futureList.add(executorService.submit(new Callable<DatanodeCommand>() {
@Override
public DatanodeCommand call() throws IOException {
return cluster.getNameNodeRpc().blockReport(dnR, blockPoolId,
reports, new BlockReportContext(1, 0, System.nanoTime(), 0L));
}
}));
}
for (Future<DatanodeCommand> future: futureList) {
future.get();
}
executorService.shutdown();
// Verify that the storages match before and after the test
Assert.assertArrayEquals(storageInfos, dnDescriptor.getStorageInfos());
} finally {
cluster.shutdown();
}
}
} }

View File

@ -41,6 +41,7 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mockito; import org.mockito.Mockito;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.*; import static org.mockito.Matchers.*;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -88,6 +89,34 @@ public class TestDnRespectsBlockReportSplitThreshold {
blockCount * BLOCK_SIZE, BLOCK_SIZE, REPL_FACTOR, seed); blockCount * BLOCK_SIZE, BLOCK_SIZE, REPL_FACTOR, seed);
} }
private void verifyCapturedArgumentsSplit(
ArgumentCaptor<StorageBlockReport[]> captor,
int expectedReportsPerCall,
int expectedTotalBlockCount) {
List<StorageBlockReport[]> listOfReports = captor.getAllValues();
int numBlocksReported = 0;
int storageIndex = 0;
int listOfReportsSize = listOfReports.size();
for (StorageBlockReport[] reports : listOfReports) {
if (storageIndex < (listOfReportsSize - 1)) {
assertThat(reports.length, is(expectedReportsPerCall));
} else {
assertThat(reports.length, is(listOfReportsSize));
}
for (StorageBlockReport report : reports) {
BlockListAsLongs blockList = report.getBlocks();
if (!blockList.isStorageReport()) {
numBlocksReported += blockList.getNumberOfBlocks();
} else {
assertEquals(blockList.getNumberOfBlocks(), -1);
}
}
storageIndex++;
}
assert(numBlocksReported >= expectedTotalBlockCount);
}
private void verifyCapturedArguments( private void verifyCapturedArguments(
ArgumentCaptor<StorageBlockReport[]> captor, ArgumentCaptor<StorageBlockReport[]> captor,
int expectedReportsPerCall, int expectedReportsPerCall,
@ -136,7 +165,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
anyString(), anyString(),
captor.capture(), Mockito.<BlockReportContext>anyObject()); captor.capture(), Mockito.<BlockReportContext>anyObject());
verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE); verifyCapturedArgumentsSplit(captor, 1, BLOCKS_IN_FILE);
} }
/** /**
@ -200,7 +229,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
anyString(), anyString(),
captor.capture(), Mockito.<BlockReportContext>anyObject()); captor.capture(), Mockito.<BlockReportContext>anyObject());
verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE); verifyCapturedArgumentsSplit(captor, 1, BLOCKS_IN_FILE);
} }
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@ -35,13 +36,32 @@ public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase {
@Override @Override
protected void sendBlockReports(DatanodeRegistration dnR, String poolId, protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
StorageBlockReport[] reports) throws IOException { StorageBlockReport[] reports) throws IOException {
int i = 0; for (int r = 0; r < reports.length; r++) {
for (StorageBlockReport report : reports) { LOG.info("Sending block report for storage " +
LOG.info("Sending block report for storage " + report.getStorage().getStorageID()); reports[r].getStorage().getStorageID());
StorageBlockReport[] singletonReport = { report }; StorageBlockReport[] singletonReport = {reports[r]};
cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport, if (r != reports.length - 1) {
new BlockReportContext(reports.length, i, System.nanoTime(), 0L)); cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
i++; new BlockReportContext(reports.length, r, System.nanoTime(),
0L));
} else {
StorageBlockReport[] lastSplitReport =
new StorageBlockReport[reports.length];
// When block reports are split, send a dummy storage report for all
// other storages in the blockreport along with the last storage report
for (int i = 0; i <= r; i++) {
if (i == r) {
lastSplitReport[i] = reports[r];
break;
}
lastSplitReport[i] =
new StorageBlockReport(reports[i].getStorage(),
BlockListAsLongs.STORAGE_REPORT);
}
cluster.getNameNodeRpc().blockReport(dnR, poolId, lastSplitReport,
new BlockReportContext(reports.length, r, System.nanoTime(),
0L));
}
} }
} }
} }