HDFS-6094. Merging r1578478 from trunk to branch-2.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1578480 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
92db0d3558
commit
4ea49506cf
|
@ -382,6 +382,9 @@ Release 2.4.0 - UNRELEASED
|
|||
HDFS-6102. Lower the default maximum items per directory to fix PB fsimage
|
||||
loading. (wang)
|
||||
|
||||
HDFS-6094. The same block can be counted twice towards safe mode
|
||||
threshold. (Arpit Agarwal)
|
||||
|
||||
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)
|
||||
|
|
|
@ -252,7 +252,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
|
|||
for (StorageReceivedDeletedBlocks storageBlock : receivedAndDeletedBlocks) {
|
||||
StorageReceivedDeletedBlocksProto.Builder repBuilder =
|
||||
StorageReceivedDeletedBlocksProto.newBuilder();
|
||||
repBuilder.setStorageUuid(storageBlock.getStorageID());
|
||||
repBuilder.setStorageUuid(storageBlock.getStorage().getStorageID()); // Set for wire compatibility.
|
||||
repBuilder.setStorage(PBHelper.convert(storageBlock.getStorage()));
|
||||
for (ReceivedDeletedBlockInfo rdBlock : storageBlock.getBlocks()) {
|
||||
repBuilder.addBlocks(PBHelper.convert(rdBlock));
|
||||
}
|
||||
|
|
|
@ -198,7 +198,12 @@ public class DatanodeProtocolServerSideTranslatorPB implements
|
|||
for (int j = 0; j < list.size(); j++) {
|
||||
rdBlocks[j] = PBHelper.convert(list.get(j));
|
||||
}
|
||||
info[i] = new StorageReceivedDeletedBlocks(sBlock.getStorageUuid(), rdBlocks);
|
||||
if (sBlock.hasStorage()) {
|
||||
info[i] = new StorageReceivedDeletedBlocks(
|
||||
PBHelper.convert(sBlock.getStorage()), rdBlocks);
|
||||
} else {
|
||||
info[i] = new StorageReceivedDeletedBlocks(sBlock.getStorageUuid(), rdBlocks);
|
||||
}
|
||||
}
|
||||
try {
|
||||
impl.blockReceivedAndDeleted(PBHelper.convert(request.getRegistration()),
|
||||
|
|
|
@ -1671,9 +1671,6 @@ public class BlockManager {
|
|||
if (storageInfo == null) {
|
||||
// We handle this for backwards compatibility.
|
||||
storageInfo = node.updateStorage(storage);
|
||||
LOG.warn("Unknown storageId " + storage.getStorageID() +
|
||||
", updating storageMap. This indicates a buggy " +
|
||||
"DataNode that isn't heartbeating correctly.");
|
||||
}
|
||||
if (namesystem.isInStartupSafeMode()
|
||||
&& storageInfo.getBlockReportCount() > 0) {
|
||||
|
@ -2280,7 +2277,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
|
||||
numLiveReplicas >= minReplication) {
|
||||
storedBlock = completeBlock(bc, storedBlock, false);
|
||||
} else if (storedBlock.isComplete()) {
|
||||
} else if (storedBlock.isComplete() && added) {
|
||||
// check whether safe replication is reached for the block
|
||||
// only complete blocks are counted towards that
|
||||
// Is no-op if not in safe mode.
|
||||
|
@ -2861,8 +2858,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
* This method must be called with FSNamesystem lock held.
|
||||
*/
|
||||
public void processIncrementalBlockReport(final DatanodeID nodeID,
|
||||
final String poolId, final StorageReceivedDeletedBlocks srdb)
|
||||
throws IOException {
|
||||
final StorageReceivedDeletedBlocks srdb) throws IOException {
|
||||
assert namesystem.hasWriteLock();
|
||||
int received = 0;
|
||||
int deleted = 0;
|
||||
|
@ -2877,6 +2873,15 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
"Got incremental block report from unregistered or dead node");
|
||||
}
|
||||
|
||||
if (node.getStorageInfo(srdb.getStorage().getStorageID()) == null) {
|
||||
// The DataNode is reporting an unknown storage. Usually the NN learns
|
||||
// about new storages from heartbeats but during NN restart we may
|
||||
// receive a block report or incremental report before the heartbeat.
|
||||
// We must handle this for protocol compatibility. This issue was
|
||||
// uncovered by HDFS-6904.
|
||||
node.updateStorage(srdb.getStorage());
|
||||
}
|
||||
|
||||
for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
|
||||
switch (rdbi.getStatus()) {
|
||||
case DELETED_BLOCK:
|
||||
|
@ -2884,13 +2889,14 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
deleted++;
|
||||
break;
|
||||
case RECEIVED_BLOCK:
|
||||
addBlock(node, srdb.getStorageID(), rdbi.getBlock(), rdbi.getDelHints());
|
||||
addBlock(node, srdb.getStorage().getStorageID(),
|
||||
rdbi.getBlock(), rdbi.getDelHints());
|
||||
received++;
|
||||
break;
|
||||
case RECEIVING_BLOCK:
|
||||
receiving++;
|
||||
processAndHandleReportedBlock(node, srdb.getStorageID(), rdbi.getBlock(),
|
||||
ReplicaState.RBW, null);
|
||||
processAndHandleReportedBlock(node, srdb.getStorage().getStorageID(),
|
||||
rdbi.getBlock(), ReplicaState.RBW, null);
|
||||
break;
|
||||
default:
|
||||
String msg =
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -247,7 +248,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
return false;
|
||||
}
|
||||
|
||||
DatanodeStorageInfo getStorageInfo(String storageID) {
|
||||
@VisibleForTesting
|
||||
public DatanodeStorageInfo getStorageInfo(String storageID) {
|
||||
synchronized (storageMap) {
|
||||
return storageMap.get(storageID);
|
||||
}
|
||||
|
@ -368,12 +370,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
setLastUpdate(Time.now());
|
||||
this.volumeFailures = volFailures;
|
||||
for (StorageReport report : reports) {
|
||||
DatanodeStorageInfo storage = storageMap.get(report.getStorage().getStorageID());
|
||||
if (storage == null) {
|
||||
// This is seen during cluster initialization when the heartbeat
|
||||
// is received before the initial block reports from each storage.
|
||||
storage = updateStorage(report.getStorage());
|
||||
}
|
||||
DatanodeStorageInfo storage = updateStorage(report.getStorage());
|
||||
storage.receivedHeartbeat(report);
|
||||
totalCapacity += report.getCapacity();
|
||||
totalRemaining += report.getRemaining();
|
||||
|
@ -671,6 +668,15 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
" for DN " + getXferAddr());
|
||||
storage = new DatanodeStorageInfo(this, s);
|
||||
storageMap.put(s.getStorageID(), storage);
|
||||
} else if (storage.getState() != s.getState() ||
|
||||
storage.getStorageType() != s.getStorageType()) {
|
||||
// For backwards compatibility, make sure that the type and
|
||||
// state are updated. Some reports from older datanodes do
|
||||
// not include these fields so we may have assumed defaults.
|
||||
// This check can be removed in the next major release after
|
||||
// 2.4.
|
||||
storage.updateFromStorage(s);
|
||||
storageMap.put(storage.getStorageID(), storage);
|
||||
}
|
||||
return storage;
|
||||
}
|
||||
|
|
|
@ -71,6 +71,11 @@ public class DatanodeStorageInfo {
|
|||
return storageTypes;
|
||||
}
|
||||
|
||||
public void updateFromStorage(DatanodeStorage storage) {
|
||||
state = storage.getState();
|
||||
storageType = storage.getStorageType();
|
||||
}
|
||||
|
||||
/**
|
||||
* Iterates over the list of blocks belonging to the data-node.
|
||||
*/
|
||||
|
@ -98,8 +103,8 @@ public class DatanodeStorageInfo {
|
|||
|
||||
private final DatanodeDescriptor dn;
|
||||
private final String storageID;
|
||||
private final StorageType storageType;
|
||||
private final State state;
|
||||
private StorageType storageType;
|
||||
private State state;
|
||||
|
||||
private long capacity;
|
||||
private long dfsUsed;
|
||||
|
|
|
@ -98,7 +98,7 @@ class BPServiceActor implements Runnable {
|
|||
* keyed by block ID, contains the pending changes which have yet to be
|
||||
* reported to the NN. Access should be synchronized on this object.
|
||||
*/
|
||||
private final Map<String, PerStoragePendingIncrementalBR>
|
||||
private final Map<DatanodeStorage, PerStoragePendingIncrementalBR>
|
||||
pendingIncrementalBRperStorage = Maps.newHashMap();
|
||||
|
||||
// IBR = Incremental Block Report. If this flag is set then an IBR will be
|
||||
|
@ -270,15 +270,15 @@ class BPServiceActor implements Runnable {
|
|||
ArrayList<StorageReceivedDeletedBlocks> reports =
|
||||
new ArrayList<StorageReceivedDeletedBlocks>(pendingIncrementalBRperStorage.size());
|
||||
synchronized (pendingIncrementalBRperStorage) {
|
||||
for (Map.Entry<String, PerStoragePendingIncrementalBR> entry :
|
||||
for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry :
|
||||
pendingIncrementalBRperStorage.entrySet()) {
|
||||
final String storageUuid = entry.getKey();
|
||||
final DatanodeStorage storage = entry.getKey();
|
||||
final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();
|
||||
|
||||
if (perStorageMap.getBlockInfoCount() > 0) {
|
||||
// Send newly-received and deleted blockids to namenode
|
||||
ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos();
|
||||
reports.add(new StorageReceivedDeletedBlocks(storageUuid, rdbi));
|
||||
reports.add(new StorageReceivedDeletedBlocks(storage, rdbi));
|
||||
}
|
||||
}
|
||||
sendImmediateIBR = false;
|
||||
|
@ -304,7 +304,7 @@ class BPServiceActor implements Runnable {
|
|||
// blocks back onto our queue, but only in the case where we
|
||||
// didn't put something newer in the meantime.
|
||||
PerStoragePendingIncrementalBR perStorageMap =
|
||||
pendingIncrementalBRperStorage.get(report.getStorageID());
|
||||
pendingIncrementalBRperStorage.get(report.getStorage());
|
||||
perStorageMap.putMissingBlockInfos(report.getBlocks());
|
||||
sendImmediateIBR = true;
|
||||
}
|
||||
|
@ -319,16 +319,16 @@ class BPServiceActor implements Runnable {
|
|||
* @return
|
||||
*/
|
||||
private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage(
|
||||
String storageUuid) {
|
||||
DatanodeStorage storage) {
|
||||
PerStoragePendingIncrementalBR mapForStorage =
|
||||
pendingIncrementalBRperStorage.get(storageUuid);
|
||||
pendingIncrementalBRperStorage.get(storage);
|
||||
|
||||
if (mapForStorage == null) {
|
||||
// This is the first time we are adding incremental BR state for
|
||||
// this storage so create a new map. This is required once per
|
||||
// storage, per service actor.
|
||||
mapForStorage = new PerStoragePendingIncrementalBR();
|
||||
pendingIncrementalBRperStorage.put(storageUuid, mapForStorage);
|
||||
pendingIncrementalBRperStorage.put(storage, mapForStorage);
|
||||
}
|
||||
|
||||
return mapForStorage;
|
||||
|
@ -343,16 +343,16 @@ class BPServiceActor implements Runnable {
|
|||
* @param storageUuid
|
||||
*/
|
||||
void addPendingReplicationBlockInfo(ReceivedDeletedBlockInfo bInfo,
|
||||
String storageUuid) {
|
||||
DatanodeStorage storage) {
|
||||
// Make sure another entry for the same block is first removed.
|
||||
// There may only be one such entry.
|
||||
for (Map.Entry<String, PerStoragePendingIncrementalBR> entry :
|
||||
for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry :
|
||||
pendingIncrementalBRperStorage.entrySet()) {
|
||||
if (entry.getValue().removeBlockInfo(bInfo)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
getIncrementalBRMapForStorage(storageUuid).putBlockInfo(bInfo);
|
||||
getIncrementalBRMapForStorage(storage).putBlockInfo(bInfo);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -363,7 +363,8 @@ class BPServiceActor implements Runnable {
|
|||
void notifyNamenodeBlockImmediately(
|
||||
ReceivedDeletedBlockInfo bInfo, String storageUuid) {
|
||||
synchronized (pendingIncrementalBRperStorage) {
|
||||
addPendingReplicationBlockInfo(bInfo, storageUuid);
|
||||
addPendingReplicationBlockInfo(
|
||||
bInfo, dn.getFSDataset().getStorage(storageUuid));
|
||||
sendImmediateIBR = true;
|
||||
pendingIncrementalBRperStorage.notifyAll();
|
||||
}
|
||||
|
@ -372,7 +373,8 @@ class BPServiceActor implements Runnable {
|
|||
void notifyNamenodeDeletedBlock(
|
||||
ReceivedDeletedBlockInfo bInfo, String storageUuid) {
|
||||
synchronized (pendingIncrementalBRperStorage) {
|
||||
addPendingReplicationBlockInfo(bInfo, storageUuid);
|
||||
addPendingReplicationBlockInfo(
|
||||
bInfo, dn.getFSDataset().getStorage(storageUuid));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -79,7 +79,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||
|
||||
/**
|
||||
* Create rolling logs.
|
||||
*
|
||||
*
|
||||
* @param prefix the prefix of the log names.
|
||||
* @return rolling logs
|
||||
*/
|
||||
|
@ -89,6 +89,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||
/** @return a list of volumes. */
|
||||
public List<V> getVolumes();
|
||||
|
||||
/** @return a storage with the given storage ID */
|
||||
public DatanodeStorage getStorage(final String storageUuid);
|
||||
|
||||
/** @return one or more storage reports for attached volumes. */
|
||||
public StorageReport[] getStorageReports(String bpid)
|
||||
throws IOException;
|
||||
|
|
|
@ -75,6 +75,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
return volumes.volumes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatanodeStorage getStorage(final String storageUuid) {
|
||||
return storageMap.get(storageUuid);
|
||||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public StorageReport[] getStorageReports(String bpid)
|
||||
throws IOException {
|
||||
|
@ -157,6 +162,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
final DataNode datanode;
|
||||
final DataStorage dataStorage;
|
||||
final FsVolumeList volumes;
|
||||
final Map<String, DatanodeStorage> storageMap;
|
||||
final FsDatasetAsyncDiskService asyncDiskService;
|
||||
final FsDatasetCache cacheManager;
|
||||
private final int validVolsRequired;
|
||||
|
@ -198,6 +204,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
+ ", volume failures tolerated: " + volFailuresTolerated);
|
||||
}
|
||||
|
||||
storageMap = new HashMap<String, DatanodeStorage>();
|
||||
final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>(
|
||||
storage.getNumStorageDirs());
|
||||
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
||||
|
@ -207,6 +214,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
volArray.add(new FsVolumeImpl(this, sd.getStorageUuid(), dir, conf,
|
||||
storageType));
|
||||
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
|
||||
storageMap.put(sd.getStorageUuid(),
|
||||
new DatanodeStorage(sd.getStorageUuid(), DatanodeStorage.State.NORMAL, storageType));
|
||||
}
|
||||
volumeMap = new ReplicaMap(this);
|
||||
|
||||
|
|
|
@ -5382,7 +5382,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
throws IOException {
|
||||
writeLock();
|
||||
try {
|
||||
blockManager.processIncrementalBlockReport(nodeID, poolId, srdb);
|
||||
blockManager.processIncrementalBlockReport(nodeID, srdb);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
|
|
|
@ -23,20 +23,32 @@ package org.apache.hadoop.hdfs.server.protocol;
|
|||
* storage.
|
||||
*/
|
||||
public class StorageReceivedDeletedBlocks {
|
||||
private final String storageID;
|
||||
DatanodeStorage storage;
|
||||
private final ReceivedDeletedBlockInfo[] blocks;
|
||||
|
||||
|
||||
@Deprecated
|
||||
public String getStorageID() {
|
||||
return storageID;
|
||||
return storage.getStorageID();
|
||||
}
|
||||
|
||||
public DatanodeStorage getStorage() {
|
||||
return storage;
|
||||
}
|
||||
|
||||
public ReceivedDeletedBlockInfo[] getBlocks() {
|
||||
return blocks;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public StorageReceivedDeletedBlocks(final String storageID,
|
||||
final ReceivedDeletedBlockInfo[] blocks) {
|
||||
this.storageID = storageID;
|
||||
this.storage = new DatanodeStorage(storageID);
|
||||
this.blocks = blocks;
|
||||
}
|
||||
|
||||
public StorageReceivedDeletedBlocks(final DatanodeStorage storage,
|
||||
final ReceivedDeletedBlockInfo[] blocks) {
|
||||
this.storage = storage;
|
||||
this.blocks = blocks;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -292,8 +292,9 @@ message ReceivedDeletedBlockInfoProto {
|
|||
* List of blocks received and deleted for a storage.
|
||||
*/
|
||||
message StorageReceivedDeletedBlocksProto {
|
||||
required string storageUuid = 1;
|
||||
required string storageUuid = 1 [ deprecated = true ];
|
||||
repeated ReceivedDeletedBlockInfoProto blocks = 2;
|
||||
optional DatanodeStorageProto storage = 3; // supersedes storageUuid.
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1084,6 +1084,13 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatanodeStorage getStorage(final String storageUuid) {
|
||||
return storageUuid.equals(storage.getStorageUuid()) ?
|
||||
storage.dnStorage :
|
||||
null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StorageReport[] getStorageReports(String bpid) {
|
||||
return new StorageReport[] {storage.getStorageReport(bpid)};
|
||||
|
|
|
@ -21,11 +21,11 @@ import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
|||
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -38,10 +38,12 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.*;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.protocol.*;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.After;
|
||||
|
@ -53,6 +55,7 @@ import org.junit.Test;
|
|||
* correctly handled by NN. Tests the following variations:
|
||||
* #1 - Incremental BRs from all storages combined in a single call.
|
||||
* #2 - Incremental BRs from separate storages sent in separate calls.
|
||||
* #3 - Incremental BR from an unknown storage should be rejected.
|
||||
*
|
||||
* We also verify that the DataNode is not splitting the reports (it may do so
|
||||
* in the future).
|
||||
|
@ -71,6 +74,9 @@ public class TestIncrementalBrVariations {
|
|||
private DistributedFileSystem fs;
|
||||
private DFSClient client;
|
||||
private static Configuration conf;
|
||||
private String poolId;
|
||||
private DataNode dn0; // DataNode at index0 in the MiniDFSCluster
|
||||
private DatanodeRegistration dn0Reg; // DataNodeRegistration for dn0
|
||||
|
||||
static {
|
||||
((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
|
||||
|
@ -88,6 +94,9 @@ public class TestIncrementalBrVariations {
|
|||
fs = cluster.getFileSystem();
|
||||
client = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()),
|
||||
cluster.getConfiguration(0));
|
||||
dn0 = cluster.getDataNodes().get(0);
|
||||
poolId = cluster.getNamesystem().getBlockPoolId();
|
||||
dn0Reg = dn0.getDNRegistrationForBP(poolId);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -132,19 +141,14 @@ public class TestIncrementalBrVariations {
|
|||
// Get the block list for the file with the block locations.
|
||||
LocatedBlocks blocks = createFileGetBlocks(GenericTestUtils.getMethodName());
|
||||
|
||||
// A blocks belong to the same file, hence same BP
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||
|
||||
// We will send 'fake' incremental block reports to the NN that look
|
||||
// like they originated from DN 0.
|
||||
StorageReceivedDeletedBlocks reports[] =
|
||||
new StorageReceivedDeletedBlocks[dn.getFSDataset().getVolumes().size()];
|
||||
new StorageReceivedDeletedBlocks[dn0.getFSDataset().getVolumes().size()];
|
||||
|
||||
// Lie to the NN that one block on each storage has been deleted.
|
||||
for (int i = 0; i < reports.length; ++i) {
|
||||
FsVolumeSpi volume = dn.getFSDataset().getVolumes().get(i);
|
||||
FsVolumeSpi volume = dn0.getFSDataset().getVolumes().get(i);
|
||||
|
||||
boolean foundBlockOnStorage = false;
|
||||
ReceivedDeletedBlockInfo rdbi[] = new ReceivedDeletedBlockInfo[1];
|
||||
|
@ -166,13 +170,14 @@ public class TestIncrementalBrVariations {
|
|||
if (splitReports) {
|
||||
// If we are splitting reports then send the report for this storage now.
|
||||
StorageReceivedDeletedBlocks singletonReport[] = { reports[i] };
|
||||
cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, singletonReport);
|
||||
cluster.getNameNodeRpc().blockReceivedAndDeleted(
|
||||
dn0Reg, poolId, singletonReport);
|
||||
}
|
||||
}
|
||||
|
||||
if (!splitReports) {
|
||||
// Send a combined report.
|
||||
cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, reports);
|
||||
cluster.getNameNodeRpc().blockReceivedAndDeleted(dn0Reg, poolId, reports);
|
||||
}
|
||||
|
||||
// Make sure that the deleted block from each storage was picked up
|
||||
|
@ -191,11 +196,10 @@ public class TestIncrementalBrVariations {
|
|||
throws IOException, InterruptedException {
|
||||
LocatedBlocks blocks = createFileGetBlocks(GenericTestUtils.getMethodName());
|
||||
assertThat(cluster.getDataNodes().size(), is(1));
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
|
||||
// Remove all blocks from the DataNode.
|
||||
for (LocatedBlock block : blocks.getLocatedBlocks()) {
|
||||
dn.notifyNamenodeDeletedBlock(
|
||||
dn0.notifyNamenodeDeletedBlock(
|
||||
block.getBlock(), block.getStorageIDs()[0]);
|
||||
}
|
||||
|
||||
|
@ -203,11 +207,55 @@ public class TestIncrementalBrVariations {
|
|||
long ops = getLongCounter("BlockReceivedAndDeletedOps", getMetrics(NN_METRICS));
|
||||
|
||||
// Trigger a report to the NameNode and give it a few seconds.
|
||||
DataNodeTestUtils.triggerBlockReport(dn);
|
||||
DataNodeTestUtils.triggerBlockReport(dn0);
|
||||
Thread.sleep(5000);
|
||||
|
||||
// Ensure that NameNodeRpcServer.blockReceivedAndDeletes is invoked
|
||||
// exactly once after we triggered the report.
|
||||
assertCounter("BlockReceivedAndDeletedOps", ops+1, getMetrics(NN_METRICS));
|
||||
}
|
||||
|
||||
private static Block getDummyBlock() {
|
||||
return new Block(10000000L, 100L, 1048576L);
|
||||
}
|
||||
|
||||
private static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock(
|
||||
Block block, DatanodeStorage storage) {
|
||||
ReceivedDeletedBlockInfo[] receivedBlocks = new ReceivedDeletedBlockInfo[1];
|
||||
receivedBlocks[0] = new ReceivedDeletedBlockInfo(block, BlockStatus.RECEIVED_BLOCK, null);
|
||||
StorageReceivedDeletedBlocks[] reports = new StorageReceivedDeletedBlocks[1];
|
||||
reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks);
|
||||
return reports;
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the NameNode can learn about new storages from incremental
|
||||
* block reports.
|
||||
* This tests the fix for the error condition seen in HDFS-6904.
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test (timeout=60000)
|
||||
public void testNnLearnsNewStorages()
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
// Generate a report for a fake block on a fake storage.
|
||||
final String newStorageUuid = UUID.randomUUID().toString();
|
||||
final DatanodeStorage newStorage = new DatanodeStorage(newStorageUuid);
|
||||
StorageReceivedDeletedBlocks[] reports = makeReportForReceivedBlock(
|
||||
getDummyBlock(), newStorage);
|
||||
|
||||
// Send the report to the NN.
|
||||
cluster.getNameNodeRpc().blockReceivedAndDeleted(dn0Reg, poolId, reports);
|
||||
|
||||
// Make sure that the NN has learned of the new storage.
|
||||
DatanodeStorageInfo storageInfo = cluster.getNameNode()
|
||||
.getNamesystem()
|
||||
.getBlockManager()
|
||||
.getDatanodeManager()
|
||||
.getDatanode(dn0.getDatanodeId())
|
||||
.getStorageInfo(newStorageUuid);
|
||||
assertNotNull(storageInfo);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue