HDFS-5667. Include DatanodeStorage in StorageReport. (Arpit Agarwal)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1555929 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2014-01-06 17:28:23 +00:00
parent b80343a55f
commit f8a9329f2b
14 changed files with 91 additions and 71 deletions

View File

@ -449,12 +449,6 @@ Trunk (Unreleased)
HDFS-5626. dfsadmin -report shows incorrect cache values. (cmccabe) HDFS-5626. dfsadmin -report shows incorrect cache values. (cmccabe)
HDFS-5406. Send incremental block reports for all storages in a
single call. (Arpit Agarwal)
HDFS-5454. DataNode UUID should be assigned prior to FsDataset
initialization. (Arpit Agarwal)
HDFS-5679. TestCacheDirectives should handle the case where native code HDFS-5679. TestCacheDirectives should handle the case where native code
is not available. (wang) is not available. (wang)
@ -596,6 +590,14 @@ Trunk (Unreleased)
HDFS-5648. Get rid of FsDatasetImpl#perVolumeReplicaMap. (Arpit Agarwal) HDFS-5648. Get rid of FsDatasetImpl#perVolumeReplicaMap. (Arpit Agarwal)
HDFS-5406. Send incremental block reports for all storages in a
single call. (Arpit Agarwal)
HDFS-5454. DataNode UUID should be assigned prior to FsDataset
initialization. (Arpit Agarwal)
HDFS-5667. Include DatanodeStorage in StorageReport. (Arpit Agarwal)
Release 2.4.0 - UNRELEASED Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -1559,13 +1559,17 @@ public static StorageReportProto convert(StorageReport r) {
StorageReportProto.Builder builder = StorageReportProto.newBuilder() StorageReportProto.Builder builder = StorageReportProto.newBuilder()
.setBlockPoolUsed(r.getBlockPoolUsed()).setCapacity(r.getCapacity()) .setBlockPoolUsed(r.getBlockPoolUsed()).setCapacity(r.getCapacity())
.setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining()) .setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining())
.setStorageUuid(r.getStorageID()); .setStorageUuid(r.getStorage().getStorageID())
.setStorage(convert(r.getStorage()));
return builder.build(); return builder.build();
} }
public static StorageReport convert(StorageReportProto p) { public static StorageReport convert(StorageReportProto p) {
return new StorageReport(p.getStorageUuid(), p.getFailed(), return new StorageReport(
p.getCapacity(), p.getDfsUsed(), p.getRemaining(), p.hasStorage() ?
convert(p.getStorage()) :
new DatanodeStorage(p.getStorageUuid()),
p.getFailed(), p.getCapacity(), p.getDfsUsed(), p.getRemaining(),
p.getBlockPoolUsed()); p.getBlockPoolUsed());
} }

View File

@ -355,11 +355,11 @@ public void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
setLastUpdate(Time.now()); setLastUpdate(Time.now());
this.volumeFailures = volFailures; this.volumeFailures = volFailures;
for (StorageReport report : reports) { for (StorageReport report : reports) {
DatanodeStorageInfo storage = storageMap.get(report.getStorageID()); DatanodeStorageInfo storage = storageMap.get(report.getStorage().getStorageID());
if (storage == null) { if (storage == null) {
// This is seen during cluster initialization when the heartbeat // This is seen during cluster initialization when the heartbeat
// is received before the initial block reports from each storage. // is received before the initial block reports from each storage.
storage = updateStorage(new DatanodeStorage(report.getStorageID())); storage = updateStorage(report.getStorage());
} }
storage.receivedHeartbeat(report); storage.receivedHeartbeat(report);
totalCapacity += report.getCapacity(); totalCapacity += report.getCapacity();

View File

@ -121,7 +121,7 @@ public StorageReport[] getStorageReports(String bpid)
reports = new StorageReport[volumes.volumes.size()]; reports = new StorageReport[volumes.volumes.size()];
int i = 0; int i = 0;
for (FsVolumeImpl volume : volumes.volumes) { for (FsVolumeImpl volume : volumes.volumes) {
reports[i++] = new StorageReport(volume.getStorageID(), reports[i++] = new StorageReport(volume.toDatanodeStorage(),
false, false,
volume.getCapacity(), volume.getCapacity(),
volume.getDfsUsed(), volume.getDfsUsed(),
@ -237,12 +237,9 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>( final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>(
storage.getNumStorageDirs()); storage.getNumStorageDirs());
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
// TODO: getStorageTypeFromLocations() is only a temporary workaround and
// should be replaced with getting storage type from DataStorage (missing
// storage type now) directly.
Storage.StorageDirectory sd = storage.getStorageDir(idx); Storage.StorageDirectory sd = storage.getStorageDir(idx);
final File dir = sd.getCurrentDir(); final File dir = sd.getCurrentDir();
final StorageType storageType = getStorageTypeFromLocations(dataLocations, dir); final StorageType storageType = getStorageTypeFromLocations(dataLocations, sd.getRoot());
volArray.add(new FsVolumeImpl(this, sd.getStorageUuid(), dir, conf, volArray.add(new FsVolumeImpl(this, sd.getStorageUuid(), dir, conf,
storageType)); storageType));
LOG.info("Added volume - " + dir + ", StorageType: " + storageType); LOG.info("Added volume - " + dir + ", StorageType: " + storageType);

View File

@ -21,7 +21,7 @@
* Utilization report for a Datanode storage * Utilization report for a Datanode storage
*/ */
public class StorageReport { public class StorageReport {
private final String storageID; private final DatanodeStorage storage;
private final boolean failed; private final boolean failed;
private final long capacity; private final long capacity;
private final long dfsUsed; private final long dfsUsed;
@ -30,9 +30,9 @@ public class StorageReport {
public static final StorageReport[] EMPTY_ARRAY = {}; public static final StorageReport[] EMPTY_ARRAY = {};
public StorageReport(String sid, boolean failed, long capacity, long dfsUsed, public StorageReport(DatanodeStorage storage, boolean failed,
long remaining, long bpUsed) { long capacity, long dfsUsed, long remaining, long bpUsed) {
this.storageID = sid; this.storage = storage;
this.failed = failed; this.failed = failed;
this.capacity = capacity; this.capacity = capacity;
this.dfsUsed = dfsUsed; this.dfsUsed = dfsUsed;
@ -40,8 +40,8 @@ public StorageReport(String sid, boolean failed, long capacity, long dfsUsed,
this.blockPoolUsed = bpUsed; this.blockPoolUsed = bpUsed;
} }
public String getStorageID() { public DatanodeStorage getStorage() {
return storageID; return storage;
} }
public boolean isFailed() { public boolean isFailed() {

View File

@ -196,12 +196,13 @@ message HeartbeatRequestProto {
} }
message StorageReportProto { message StorageReportProto {
required string storageUuid = 1; required string storageUuid = 1 [ deprecated = true ];
optional bool failed = 2 [ default = false ]; optional bool failed = 2 [ default = false ];
optional uint64 capacity = 3 [ default = 0 ]; optional uint64 capacity = 3 [ default = 0 ];
optional uint64 dfsUsed = 4 [ default = 0 ]; optional uint64 dfsUsed = 4 [ default = 0 ];
optional uint64 remaining = 5 [ default = 0 ]; optional uint64 remaining = 5 [ default = 0 ];
optional uint64 blockPoolUsed = 6 [ default = 0 ]; optional uint64 blockPoolUsed = 6 [ default = 0 ];
optional DatanodeStorageProto storage = 7; // supersedes StorageUuid
} }
/** /**

View File

@ -140,6 +140,7 @@ public static class Builder {
private int nameNodeHttpPort = 0; private int nameNodeHttpPort = 0;
private final Configuration conf; private final Configuration conf;
private int numDataNodes = 1; private int numDataNodes = 1;
private StorageType storageType = StorageType.DEFAULT;
private boolean format = true; private boolean format = true;
private boolean manageNameDfsDirs = true; private boolean manageNameDfsDirs = true;
private boolean manageNameDfsSharedDirs = true; private boolean manageNameDfsSharedDirs = true;
@ -185,6 +186,14 @@ public Builder numDataNodes(int val) {
return this; return this;
} }
/**
* Default: StorageType.DEFAULT
*/
public Builder storageType(StorageType type) {
this.storageType = type;
return this;
}
/** /**
* Default: true * Default: true
*/ */
@ -341,6 +350,7 @@ protected MiniDFSCluster(Builder builder) throws IOException {
initMiniDFSCluster(builder.conf, initMiniDFSCluster(builder.conf,
builder.numDataNodes, builder.numDataNodes,
builder.storageType,
builder.format, builder.format,
builder.manageNameDfsDirs, builder.manageNameDfsDirs,
builder.manageNameDfsSharedDirs, builder.manageNameDfsSharedDirs,
@ -592,7 +602,7 @@ public MiniDFSCluster(int nameNodePort,
String[] racks, String hosts[], String[] racks, String hosts[],
long[] simulatedCapacities) throws IOException { long[] simulatedCapacities) throws IOException {
this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
initMiniDFSCluster(conf, numDataNodes, format, initMiniDFSCluster(conf, numDataNodes, StorageType.DEFAULT, format,
manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs, manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
operation, racks, hosts, operation, racks, hosts,
simulatedCapacities, null, true, false, simulatedCapacities, null, true, false,
@ -601,7 +611,7 @@ public MiniDFSCluster(int nameNodePort,
private void initMiniDFSCluster( private void initMiniDFSCluster(
Configuration conf, Configuration conf,
int numDataNodes, boolean format, boolean manageNameDfsDirs, int numDataNodes, StorageType storageType, boolean format, boolean manageNameDfsDirs,
boolean manageNameDfsSharedDirs, boolean enableManagedDfsDirsRedundancy, boolean manageNameDfsSharedDirs, boolean enableManagedDfsDirsRedundancy,
boolean manageDataDfsDirs, StartupOption operation, String[] racks, boolean manageDataDfsDirs, StartupOption operation, String[] racks,
String[] hosts, long[] simulatedCapacities, String clusterId, String[] hosts, long[] simulatedCapacities, String clusterId,
@ -670,7 +680,7 @@ private void initMiniDFSCluster(
} }
// Start the DataNodes // Start the DataNodes
startDataNodes(conf, numDataNodes, manageDataDfsDirs, operation, racks, startDataNodes(conf, numDataNodes, storageType, manageDataDfsDirs, operation, racks,
hosts, simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, checkDataNodeHostConfig); hosts, simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, checkDataNodeHostConfig);
waitClusterUp(); waitClusterUp();
//make sure ProxyUsers uses the latest conf //make sure ProxyUsers uses the latest conf
@ -990,6 +1000,19 @@ public void waitClusterUp() throws IOException {
} }
} }
String makeDataNodeDirs(int dnIndex, StorageType storageType) throws IOException {
StringBuilder sb = new StringBuilder();
for (int j = 0; j < DIRS_PER_DATANODE; ++j) {
File dir = getInstanceStorageDir(dnIndex, j);
dir.mkdirs();
if (!dir.isDirectory()) {
throw new IOException("Mkdirs failed to create directory for DataNode " + dir);
}
sb.append((j > 0 ? "," : "") + "[" + storageType + "]" + fileAsURI(dir));
}
return sb.toString();
}
/** /**
* Modify the config and start up additional DataNodes. The info port for * Modify the config and start up additional DataNodes. The info port for
* DataNodes is guaranteed to use a free port. * DataNodes is guaranteed to use a free port.
@ -1052,7 +1075,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
String[] racks, String[] hosts, String[] racks, String[] hosts,
long[] simulatedCapacities, long[] simulatedCapacities,
boolean setupHostsFile) throws IOException { boolean setupHostsFile) throws IOException {
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, hosts, startDataNodes(conf, numDataNodes, StorageType.DEFAULT, manageDfsDirs, operation, racks, hosts,
simulatedCapacities, setupHostsFile, false, false); simulatedCapacities, setupHostsFile, false, false);
} }
@ -1066,7 +1089,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
long[] simulatedCapacities, long[] simulatedCapacities,
boolean setupHostsFile, boolean setupHostsFile,
boolean checkDataNodeAddrConfig) throws IOException { boolean checkDataNodeAddrConfig) throws IOException {
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, hosts, startDataNodes(conf, numDataNodes, StorageType.DEFAULT, manageDfsDirs, operation, racks, hosts,
simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false); simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false);
} }
@ -1098,7 +1121,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
* @throws IllegalStateException if NameNode has been shutdown * @throws IllegalStateException if NameNode has been shutdown
*/ */
public synchronized void startDataNodes(Configuration conf, int numDataNodes, public synchronized void startDataNodes(Configuration conf, int numDataNodes,
boolean manageDfsDirs, StartupOption operation, StorageType storageType, boolean manageDfsDirs, StartupOption operation,
String[] racks, String[] hosts, String[] racks, String[] hosts,
long[] simulatedCapacities, long[] simulatedCapacities,
boolean setupHostsFile, boolean setupHostsFile,
@ -1154,16 +1177,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
// Set up datanode address // Set up datanode address
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig); setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
if (manageDfsDirs) { if (manageDfsDirs) {
StringBuilder sb = new StringBuilder(); String dirs = makeDataNodeDirs(i, storageType);
for (int j = 0; j < DIRS_PER_DATANODE; ++j) {
File dir = getInstanceStorageDir(i, j);
dir.mkdirs();
if (!dir.isDirectory()) {
throw new IOException("Mkdirs failed to create directory for DataNode " + dir);
}
sb.append((j > 0 ? "," : "") + fileAsURI(dir));
}
String dirs = sb.toString();
dnConf.set(DFS_DATANODE_DATA_DIR_KEY, dirs); dnConf.set(DFS_DATANODE_DATA_DIR_KEY, dirs);
conf.set(DFS_DATANODE_DATA_DIR_KEY, dirs); conf.set(DFS_DATANODE_DATA_DIR_KEY, dirs);
} }

View File

@ -50,7 +50,7 @@ public static void setNodeGroups (String[] nodeGroups) {
} }
public synchronized void startDataNodes(Configuration conf, int numDataNodes, public synchronized void startDataNodes(Configuration conf, int numDataNodes,
boolean manageDfsDirs, StartupOption operation, StorageType storageType, boolean manageDfsDirs, StartupOption operation,
String[] racks, String[] nodeGroups, String[] hosts, String[] racks, String[] nodeGroups, String[] hosts,
long[] simulatedCapacities, long[] simulatedCapacities,
boolean setupHostsFile, boolean setupHostsFile,
@ -112,15 +112,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
// Set up datanode address // Set up datanode address
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig); setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
if (manageDfsDirs) { if (manageDfsDirs) {
File dir1 = getInstanceStorageDir(i, 0); String dirs = makeDataNodeDirs(i, storageType);
File dir2 = getInstanceStorageDir(i, 1);
dir1.mkdirs();
dir2.mkdirs();
if (!dir1.isDirectory() || !dir2.isDirectory()) {
throw new IOException("Mkdirs failed to create directory for DataNode "
+ i + ": " + dir1 + " or " + dir2);
}
String dirs = fileAsURI(dir1) + "," + fileAsURI(dir2);
dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs); dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs); conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
} }
@ -198,7 +190,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
String[] racks, String[] nodeGroups, String[] hosts, String[] racks, String[] nodeGroups, String[] hosts,
long[] simulatedCapacities, long[] simulatedCapacities,
boolean setupHostsFile) throws IOException { boolean setupHostsFile) throws IOException {
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, nodeGroups, startDataNodes(conf, numDataNodes, StorageType.DEFAULT, manageDfsDirs, operation, racks, nodeGroups,
hosts, simulatedCapacities, setupHostsFile, false, false); hosts, simulatedCapacities, setupHostsFile, false, false);
} }
@ -213,13 +205,13 @@ public void startDataNodes(Configuration conf, int numDataNodes,
// This is for initialize from parent class. // This is for initialize from parent class.
@Override @Override
public synchronized void startDataNodes(Configuration conf, int numDataNodes, public synchronized void startDataNodes(Configuration conf, int numDataNodes,
boolean manageDfsDirs, StartupOption operation, StorageType storageType, boolean manageDfsDirs, StartupOption operation,
String[] racks, String[] hosts, String[] racks, String[] hosts,
long[] simulatedCapacities, long[] simulatedCapacities,
boolean setupHostsFile, boolean setupHostsFile,
boolean checkDataNodeAddrConfig, boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig) throws IOException { boolean checkDataNodeHostConfig) throws IOException {
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, startDataNodes(conf, numDataNodes, storageType, manageDfsDirs, operation, racks,
NODE_GROUPS, hosts, simulatedCapacities, setupHostsFile, NODE_GROUPS, hosts, simulatedCapacities, setupHostsFile,
checkDataNodeAddrConfig, checkDataNodeHostConfig); checkDataNodeAddrConfig, checkDataNodeHostConfig);
} }

View File

@ -257,8 +257,10 @@ public static StorageReport[] getStorageReportsForDatanode(
DatanodeDescriptor dnd) { DatanodeDescriptor dnd) {
ArrayList<StorageReport> reports = new ArrayList<StorageReport>(); ArrayList<StorageReport> reports = new ArrayList<StorageReport>();
for (DatanodeStorageInfo storage : dnd.getStorageInfos()) { for (DatanodeStorageInfo storage : dnd.getStorageInfos()) {
DatanodeStorage dns = new DatanodeStorage(
storage.getStorageID(), storage.getState(), storage.getStorageType());
StorageReport report = new StorageReport( StorageReport report = new StorageReport(
storage.getStorageID(), false, storage.getCapacity(), dns ,false, storage.getCapacity(),
storage.getDfsUsed(), storage.getRemaining(), storage.getDfsUsed(), storage.getRemaining(),
storage.getBlockPoolUsed()); storage.getBlockPoolUsed());
reports.add(report); reports.add(report);

View File

@ -470,11 +470,14 @@ public void testSortNodeByFields() throws Exception {
BlockManagerTestUtil.updateStorage(dnDesc1, new DatanodeStorage("dnStorage1")); BlockManagerTestUtil.updateStorage(dnDesc1, new DatanodeStorage("dnStorage1"));
BlockManagerTestUtil.updateStorage(dnDesc2, new DatanodeStorage("dnStorage2")); BlockManagerTestUtil.updateStorage(dnDesc2, new DatanodeStorage("dnStorage2"));
DatanodeStorage dns1 = new DatanodeStorage("dnStorage1");
DatanodeStorage dns2 = new DatanodeStorage("dnStorage2");
StorageReport[] report1 = new StorageReport[] { StorageReport[] report1 = new StorageReport[] {
new StorageReport("dnStorage1", false, 1024, 100, 924, 100) new StorageReport(dns1, false, 1024, 100, 924, 100)
}; };
StorageReport[] report2 = new StorageReport[] { StorageReport[] report2 = new StorageReport[] {
new StorageReport("dnStorage2", false, 2500, 200, 1848, 200) new StorageReport(dns2, false, 2500, 200, 1848, 200)
}; };
dnDesc1.updateHeartbeat(report1, 5l, 3l, 10, 2); dnDesc1.updateHeartbeat(report1, 5l, 3l, 10, 2);
dnDesc2.updateHeartbeat(report2, 10l, 2l, 20, 1); dnDesc2.updateHeartbeat(report2, 10l, 2l, 20, 1);

View File

@ -394,8 +394,9 @@ String getStorageUuid() {
} }
synchronized StorageReport getStorageReport(String bpid) { synchronized StorageReport getStorageReport(String bpid) {
return new StorageReport(getStorageUuid(), false, getCapacity(), return new StorageReport(new DatanodeStorage(getStorageUuid()),
getUsed(), getFree(), map.get(bpid).getUsed()); false, getCapacity(), getUsed(), getFree(),
map.get(bpid).getUsed());
} }
} }

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.junit.After; import org.junit.After;
@ -186,9 +187,8 @@ public void testLocalDirs() throws Exception {
// Check permissions on directories in 'dfs.datanode.data.dir' // Check permissions on directories in 'dfs.datanode.data.dir'
FileSystem localFS = FileSystem.getLocal(conf); FileSystem localFS = FileSystem.getLocal(conf);
for (DataNode dn : cluster.getDataNodes()) { for (DataNode dn : cluster.getDataNodes()) {
String[] dataDirs = for (FsVolumeSpi v : dn.getFSDataset().getVolumes()) {
dn.getConf().getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); String dir = v.getBasePath();
for (String dir : dataDirs) {
Path dataDir = new Path(dir); Path dataDir = new Path(dir);
FsPermission actual = localFS.getFileStatus(dataDir).getPermission(); FsPermission actual = localFS.getFileStatus(dataDir).getPermission();
assertEquals("Permission for dir: " + dataDir + ", is " + actual + assertEquals("Permission for dir: " + dataDir + ", is " + actual +

View File

@ -938,7 +938,7 @@ void register() throws IOException {
// register datanode // register datanode
dnRegistration = nameNodeProto.registerDatanode(dnRegistration); dnRegistration = nameNodeProto.registerDatanode(dnRegistration);
//first block reports //first block reports
storage = new DatanodeStorage(dnRegistration.getDatanodeUuid()); storage = new DatanodeStorage(DatanodeStorage.generateUuid());
final StorageBlockReport[] reports = { final StorageBlockReport[] reports = {
new StorageBlockReport(storage, new StorageBlockReport(storage,
new BlockListAsLongs(null, null).getBlockListAsLongs()) new BlockListAsLongs(null, null).getBlockListAsLongs())
@ -954,8 +954,8 @@ void register() throws IOException {
void sendHeartbeat() throws IOException { void sendHeartbeat() throws IOException {
// register datanode // register datanode
// TODO:FEDERATION currently a single block pool is supported // TODO:FEDERATION currently a single block pool is supported
StorageReport[] rep = { new StorageReport(dnRegistration.getDatanodeUuid(), StorageReport[] rep = { new StorageReport(storage, false,
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) }; DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, rep, DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, rep,
0L, 0L, 0, 0, 0).getCommands(); 0L, 0L, 0, 0, 0).getCommands();
if(cmds != null) { if(cmds != null) {
@ -1001,7 +1001,7 @@ public int compareTo(String xferAddr) {
@SuppressWarnings("unused") // keep it for future blockReceived benchmark @SuppressWarnings("unused") // keep it for future blockReceived benchmark
int replicateBlocks() throws IOException { int replicateBlocks() throws IOException {
// register datanode // register datanode
StorageReport[] rep = { new StorageReport(dnRegistration.getDatanodeUuid(), StorageReport[] rep = { new StorageReport(storage,
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) }; false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
rep, 0L, 0L, 0, 0, 0).getCommands(); rep, 0L, 0L, 0, 0, 0).getCommands();
@ -1010,7 +1010,8 @@ int replicateBlocks() throws IOException {
if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) { if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {
// Send a copy of a block to another datanode // Send a copy of a block to another datanode
BlockCommand bcmd = (BlockCommand)cmd; BlockCommand bcmd = (BlockCommand)cmd;
return transferBlocks(bcmd.getBlocks(), bcmd.getTargets()); return transferBlocks(bcmd.getBlocks(), bcmd.getTargets(),
bcmd.getTargetStorageIDs());
} }
} }
} }
@ -1023,12 +1024,14 @@ int replicateBlocks() throws IOException {
* that the blocks have been received. * that the blocks have been received.
*/ */
private int transferBlocks( Block blocks[], private int transferBlocks( Block blocks[],
DatanodeInfo xferTargets[][] DatanodeInfo xferTargets[][],
String targetStorageIDs[][]
) throws IOException { ) throws IOException {
for(int i = 0; i < blocks.length; i++) { for(int i = 0; i < blocks.length; i++) {
DatanodeInfo blockTargets[] = xferTargets[i]; DatanodeInfo blockTargets[] = xferTargets[i];
for(int t = 0; t < blockTargets.length; t++) { for(int t = 0; t < blockTargets.length; t++) {
DatanodeInfo dnInfo = blockTargets[t]; DatanodeInfo dnInfo = blockTargets[t];
String targetStorageID = targetStorageIDs[i][t];
DatanodeRegistration receivedDNReg; DatanodeRegistration receivedDNReg;
receivedDNReg = new DatanodeRegistration(dnInfo, receivedDNReg = new DatanodeRegistration(dnInfo,
new DataStorage(nsInfo), new DataStorage(nsInfo),
@ -1038,7 +1041,7 @@ private int transferBlocks( Block blocks[],
blocks[i], ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, blocks[i], ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
null) }; null) };
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
receivedDNReg.getDatanodeUuid(), rdBlocks) }; targetStorageID, rdBlocks) };
nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode
.getNamesystem().getBlockPoolId(), report); .getNamesystem().getBlockPoolId(), report);
} }
@ -1127,7 +1130,7 @@ void generateInputs(int[] ignore) throws IOException {
} }
// create files // create files
LOG.info("Creating " + nrFiles + " with " + blocksPerFile + " blocks each."); LOG.info("Creating " + nrFiles + " files with " + blocksPerFile + " blocks each.");
FileNameGenerator nameGenerator; FileNameGenerator nameGenerator;
nameGenerator = new FileNameGenerator(getBaseDir(), 100); nameGenerator = new FileNameGenerator(getBaseDir(), 100);
String clientName = getClientName(007); String clientName = getClientName(007);
@ -1161,7 +1164,7 @@ private ExtendedBlock addBlocks(String fileName, String clientName)
loc.getBlock().getLocalBlock(), loc.getBlock().getLocalBlock(),
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) }; ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) };
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
datanodes[dnIdx].dnRegistration.getDatanodeUuid(), rdBlocks) }; datanodes[dnIdx].storage.getStorageID(), rdBlocks) };
nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc
.getBlock().getBlockPoolId(), report); .getBlock().getBlockPoolId(), report);
} }

View File

@ -140,8 +140,9 @@ public void testDeadDatanode() throws Exception {
// Ensure heartbeat from dead datanode is rejected with a command // Ensure heartbeat from dead datanode is rejected with a command
// that asks datanode to register again // that asks datanode to register again
StorageReport[] rep = { new StorageReport(reg.getDatanodeUuid(), false, 0, 0, StorageReport[] rep = { new StorageReport(
0, 0) }; new DatanodeStorage(reg.getDatanodeUuid()),
false, 0, 0, 0, 0) };
DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0) DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0)
.getCommands(); .getCommands();
assertEquals(1, cmd.length); assertEquals(1, cmd.length);