HDFS-5667. Merge r1555929 and r1555956 from trunk to branch-2.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1556087 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1ec54d9532
commit
70ee0a35fb
|
@ -386,6 +386,8 @@ BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS
|
|||
HDFS-5406. Send incremental block reports for all storages in a
|
||||
single call. (Arpit Agarwal)
|
||||
|
||||
HDFS-5667. Include DatanodeStorage in StorageReport. (Arpit Agarwal)
|
||||
|
||||
Release 2.3.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -1458,12 +1458,16 @@ public class PBHelper {
|
|||
return StorageReportProto.newBuilder()
|
||||
.setBlockPoolUsed(r.getBlockPoolUsed()).setCapacity(r.getCapacity())
|
||||
.setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining())
|
||||
.setStorageUuid(r.getStorageID()).build();
|
||||
.setStorageUuid(r.getStorage().getStorageID())
|
||||
.setStorage(convert(r.getStorage())).build();
|
||||
}
|
||||
|
||||
public static StorageReport convert(StorageReportProto p) {
|
||||
return new StorageReport(p.getStorageUuid(), p.getFailed(),
|
||||
p.getCapacity(), p.getDfsUsed(), p.getRemaining(),
|
||||
return new StorageReport(
|
||||
p.hasStorage() ?
|
||||
convert(p.getStorage()) :
|
||||
new DatanodeStorage(p.getStorageUuid()),
|
||||
p.getFailed(), p.getCapacity(), p.getDfsUsed(), p.getRemaining(),
|
||||
p.getBlockPoolUsed());
|
||||
}
|
||||
|
||||
|
|
|
@ -268,11 +268,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
setLastUpdate(Time.now());
|
||||
this.volumeFailures = volFailures;
|
||||
for (StorageReport report : reports) {
|
||||
DatanodeStorageInfo storage = storageMap.get(report.getStorageID());
|
||||
DatanodeStorageInfo storage = storageMap.get(report.getStorage().getStorageID());
|
||||
if (storage == null) {
|
||||
// This is seen during cluster initialization when the heartbeat
|
||||
// is received before the initial block reports from each storage.
|
||||
storage = updateStorage(new DatanodeStorage(report.getStorageID()));
|
||||
storage = updateStorage(report.getStorage());
|
||||
}
|
||||
storage.receivedHeartbeat(report);
|
||||
totalCapacity += report.getCapacity();
|
||||
|
|
|
@ -120,7 +120,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
reports = new StorageReport[volumes.volumes.size()];
|
||||
int i = 0;
|
||||
for (FsVolumeImpl volume : volumes.volumes) {
|
||||
reports[i++] = new StorageReport(volume.getStorageID(),
|
||||
reports[i++] = new StorageReport(volume.toDatanodeStorage(),
|
||||
false,
|
||||
volume.getCapacity(),
|
||||
volume.getDfsUsed(),
|
||||
|
@ -235,12 +235,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>(
|
||||
storage.getNumStorageDirs());
|
||||
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
||||
// TODO: getStorageTypeFromLocations() is only a temporary workaround and
|
||||
// should be replaced with getting storage type from DataStorage (missing
|
||||
// storage type now) directly.
|
||||
Storage.StorageDirectory sd = storage.getStorageDir(idx);
|
||||
final File dir = sd.getCurrentDir();
|
||||
final StorageType storageType = getStorageTypeFromLocations(dataLocations, dir);
|
||||
final StorageType storageType = getStorageTypeFromLocations(dataLocations, sd.getRoot());
|
||||
volArray.add(new FsVolumeImpl(this, sd.getStorageUuid(), dir, conf,
|
||||
storageType));
|
||||
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.apache.hadoop.hdfs.server.protocol;
|
|||
* Utilization report for a Datanode storage
|
||||
*/
|
||||
public class StorageReport {
|
||||
private final String storageID;
|
||||
private final DatanodeStorage storage;
|
||||
private final boolean failed;
|
||||
private final long capacity;
|
||||
private final long dfsUsed;
|
||||
|
@ -30,9 +30,9 @@ public class StorageReport {
|
|||
|
||||
public static final StorageReport[] EMPTY_ARRAY = {};
|
||||
|
||||
public StorageReport(String sid, boolean failed, long capacity, long dfsUsed,
|
||||
long remaining, long bpUsed) {
|
||||
this.storageID = sid;
|
||||
public StorageReport(DatanodeStorage storage, boolean failed,
|
||||
long capacity, long dfsUsed, long remaining, long bpUsed) {
|
||||
this.storage = storage;
|
||||
this.failed = failed;
|
||||
this.capacity = capacity;
|
||||
this.dfsUsed = dfsUsed;
|
||||
|
@ -40,8 +40,8 @@ public class StorageReport {
|
|||
this.blockPoolUsed = bpUsed;
|
||||
}
|
||||
|
||||
public String getStorageID() {
|
||||
return storageID;
|
||||
public DatanodeStorage getStorage() {
|
||||
return storage;
|
||||
}
|
||||
|
||||
public boolean isFailed() {
|
||||
|
|
|
@ -175,12 +175,13 @@ message HeartbeatRequestProto {
|
|||
}
|
||||
|
||||
message StorageReportProto {
|
||||
required string storageUuid = 1;
|
||||
required string storageUuid = 1 [ deprecated = true ];
|
||||
optional bool failed = 2 [ default = false ];
|
||||
optional uint64 capacity = 3 [ default = 0 ];
|
||||
optional uint64 dfsUsed = 4 [ default = 0 ];
|
||||
optional uint64 remaining = 5 [ default = 0 ];
|
||||
optional uint64 blockPoolUsed = 6 [ default = 0 ];
|
||||
optional DatanodeStorageProto storage = 7; // supersedes StorageUuid
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -140,6 +140,7 @@ public class MiniDFSCluster {
|
|||
private int nameNodeHttpPort = 0;
|
||||
private final Configuration conf;
|
||||
private int numDataNodes = 1;
|
||||
private StorageType storageType = StorageType.DEFAULT;
|
||||
private boolean format = true;
|
||||
private boolean manageNameDfsDirs = true;
|
||||
private boolean manageNameDfsSharedDirs = true;
|
||||
|
@ -185,6 +186,14 @@ public class MiniDFSCluster {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Default: StorageType.DEFAULT
|
||||
*/
|
||||
public Builder storageType(StorageType type) {
|
||||
this.storageType = type;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Default: true
|
||||
*/
|
||||
|
@ -341,6 +350,7 @@ public class MiniDFSCluster {
|
|||
|
||||
initMiniDFSCluster(builder.conf,
|
||||
builder.numDataNodes,
|
||||
builder.storageType,
|
||||
builder.format,
|
||||
builder.manageNameDfsDirs,
|
||||
builder.manageNameDfsSharedDirs,
|
||||
|
@ -590,7 +600,7 @@ public class MiniDFSCluster {
|
|||
String[] racks, String hosts[],
|
||||
long[] simulatedCapacities) throws IOException {
|
||||
this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
|
||||
initMiniDFSCluster(conf, numDataNodes, format,
|
||||
initMiniDFSCluster(conf, numDataNodes, StorageType.DEFAULT, format,
|
||||
manageNameDfsDirs, true, true, manageDataDfsDirs,
|
||||
operation, racks, hosts,
|
||||
simulatedCapacities, null, true, false,
|
||||
|
@ -599,7 +609,7 @@ public class MiniDFSCluster {
|
|||
|
||||
private void initMiniDFSCluster(
|
||||
Configuration conf,
|
||||
int numDataNodes, boolean format, boolean manageNameDfsDirs,
|
||||
int numDataNodes, StorageType storageType, boolean format, boolean manageNameDfsDirs,
|
||||
boolean manageNameDfsSharedDirs, boolean enableManagedDfsDirsRedundancy,
|
||||
boolean manageDataDfsDirs, StartupOption operation, String[] racks,
|
||||
String[] hosts, long[] simulatedCapacities, String clusterId,
|
||||
|
@ -671,7 +681,7 @@ public class MiniDFSCluster {
|
|||
}
|
||||
|
||||
// Start the DataNodes
|
||||
startDataNodes(conf, numDataNodes, manageDataDfsDirs, operation, racks,
|
||||
startDataNodes(conf, numDataNodes, storageType, manageDataDfsDirs, operation, racks,
|
||||
hosts, simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, checkDataNodeHostConfig);
|
||||
waitClusterUp();
|
||||
//make sure ProxyUsers uses the latest conf
|
||||
|
@ -991,6 +1001,19 @@ public class MiniDFSCluster {
|
|||
}
|
||||
}
|
||||
|
||||
String makeDataNodeDirs(int dnIndex, StorageType storageType) throws IOException {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (int j = 0; j < DIRS_PER_DATANODE; ++j) {
|
||||
File dir = getInstanceStorageDir(dnIndex, j);
|
||||
dir.mkdirs();
|
||||
if (!dir.isDirectory()) {
|
||||
throw new IOException("Mkdirs failed to create directory for DataNode " + dir);
|
||||
}
|
||||
sb.append((j > 0 ? "," : "") + "[" + storageType + "]" + fileAsURI(dir));
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Modify the config and start up additional DataNodes. The info port for
|
||||
* DataNodes is guaranteed to use a free port.
|
||||
|
@ -1053,7 +1076,7 @@ public class MiniDFSCluster {
|
|||
String[] racks, String[] hosts,
|
||||
long[] simulatedCapacities,
|
||||
boolean setupHostsFile) throws IOException {
|
||||
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, hosts,
|
||||
startDataNodes(conf, numDataNodes, StorageType.DEFAULT, manageDfsDirs, operation, racks, hosts,
|
||||
simulatedCapacities, setupHostsFile, false, false);
|
||||
}
|
||||
|
||||
|
@ -1067,7 +1090,7 @@ public class MiniDFSCluster {
|
|||
long[] simulatedCapacities,
|
||||
boolean setupHostsFile,
|
||||
boolean checkDataNodeAddrConfig) throws IOException {
|
||||
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, hosts,
|
||||
startDataNodes(conf, numDataNodes, StorageType.DEFAULT, manageDfsDirs, operation, racks, hosts,
|
||||
simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false);
|
||||
}
|
||||
|
||||
|
@ -1099,7 +1122,7 @@ public class MiniDFSCluster {
|
|||
* @throws IllegalStateException if NameNode has been shutdown
|
||||
*/
|
||||
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
||||
boolean manageDfsDirs, StartupOption operation,
|
||||
StorageType storageType, boolean manageDfsDirs, StartupOption operation,
|
||||
String[] racks, String[] hosts,
|
||||
long[] simulatedCapacities,
|
||||
boolean setupHostsFile,
|
||||
|
@ -1155,16 +1178,7 @@ public class MiniDFSCluster {
|
|||
// Set up datanode address
|
||||
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
|
||||
if (manageDfsDirs) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (int j = 0; j < DIRS_PER_DATANODE; ++j) {
|
||||
File dir = getInstanceStorageDir(i, j);
|
||||
dir.mkdirs();
|
||||
if (!dir.isDirectory()) {
|
||||
throw new IOException("Mkdirs failed to create directory for DataNode " + dir);
|
||||
}
|
||||
sb.append((j > 0 ? "," : "") + fileAsURI(dir));
|
||||
}
|
||||
String dirs = sb.toString();
|
||||
String dirs = makeDataNodeDirs(i, storageType);
|
||||
dnConf.set(DFS_DATANODE_DATA_DIR_KEY, dirs);
|
||||
conf.set(DFS_DATANODE_DATA_DIR_KEY, dirs);
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ public class MiniDFSClusterWithNodeGroup extends MiniDFSCluster {
|
|||
}
|
||||
|
||||
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
||||
boolean manageDfsDirs, StartupOption operation,
|
||||
StorageType storageType, boolean manageDfsDirs, StartupOption operation,
|
||||
String[] racks, String[] nodeGroups, String[] hosts,
|
||||
long[] simulatedCapacities,
|
||||
boolean setupHostsFile,
|
||||
|
@ -112,15 +112,7 @@ public class MiniDFSClusterWithNodeGroup extends MiniDFSCluster {
|
|||
// Set up datanode address
|
||||
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
|
||||
if (manageDfsDirs) {
|
||||
File dir1 = getInstanceStorageDir(i, 0);
|
||||
File dir2 = getInstanceStorageDir(i, 1);
|
||||
dir1.mkdirs();
|
||||
dir2.mkdirs();
|
||||
if (!dir1.isDirectory() || !dir2.isDirectory()) {
|
||||
throw new IOException("Mkdirs failed to create directory for DataNode "
|
||||
+ i + ": " + dir1 + " or " + dir2);
|
||||
}
|
||||
String dirs = fileAsURI(dir1) + "," + fileAsURI(dir2);
|
||||
String dirs = makeDataNodeDirs(i, storageType);
|
||||
dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
|
||||
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
|
||||
}
|
||||
|
@ -198,7 +190,7 @@ public class MiniDFSClusterWithNodeGroup extends MiniDFSCluster {
|
|||
String[] racks, String[] nodeGroups, String[] hosts,
|
||||
long[] simulatedCapacities,
|
||||
boolean setupHostsFile) throws IOException {
|
||||
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, nodeGroups,
|
||||
startDataNodes(conf, numDataNodes, StorageType.DEFAULT, manageDfsDirs, operation, racks, nodeGroups,
|
||||
hosts, simulatedCapacities, setupHostsFile, false, false);
|
||||
}
|
||||
|
||||
|
@ -213,13 +205,13 @@ public class MiniDFSClusterWithNodeGroup extends MiniDFSCluster {
|
|||
// This is for initialize from parent class.
|
||||
@Override
|
||||
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
||||
boolean manageDfsDirs, StartupOption operation,
|
||||
StorageType storageType, boolean manageDfsDirs, StartupOption operation,
|
||||
String[] racks, String[] hosts,
|
||||
long[] simulatedCapacities,
|
||||
boolean setupHostsFile,
|
||||
boolean checkDataNodeAddrConfig,
|
||||
boolean checkDataNodeHostConfig) throws IOException {
|
||||
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks,
|
||||
startDataNodes(conf, numDataNodes, storageType, manageDfsDirs, operation, racks,
|
||||
NODE_GROUPS, hosts, simulatedCapacities, setupHostsFile,
|
||||
checkDataNodeAddrConfig, checkDataNodeHostConfig);
|
||||
}
|
||||
|
|
|
@ -257,8 +257,10 @@ public class BlockManagerTestUtil {
|
|||
DatanodeDescriptor dnd) {
|
||||
ArrayList<StorageReport> reports = new ArrayList<StorageReport>();
|
||||
for (DatanodeStorageInfo storage : dnd.getStorageInfos()) {
|
||||
DatanodeStorage dns = new DatanodeStorage(
|
||||
storage.getStorageID(), storage.getState(), storage.getStorageType());
|
||||
StorageReport report = new StorageReport(
|
||||
storage.getStorageID(), false, storage.getCapacity(),
|
||||
dns ,false, storage.getCapacity(),
|
||||
storage.getDfsUsed(), storage.getRemaining(),
|
||||
storage.getBlockPoolUsed());
|
||||
reports.add(report);
|
||||
|
|
|
@ -468,11 +468,14 @@ public class TestJspHelper {
|
|||
BlockManagerTestUtil.updateStorage(dnDesc1, new DatanodeStorage("dnStorage1"));
|
||||
BlockManagerTestUtil.updateStorage(dnDesc2, new DatanodeStorage("dnStorage2"));
|
||||
|
||||
DatanodeStorage dns1 = new DatanodeStorage("dnStorage1");
|
||||
DatanodeStorage dns2 = new DatanodeStorage("dnStorage2");
|
||||
|
||||
StorageReport[] report1 = new StorageReport[] {
|
||||
new StorageReport("dnStorage1", false, 1024, 100, 924, 100)
|
||||
new StorageReport(dns1, false, 1024, 100, 924, 100)
|
||||
};
|
||||
StorageReport[] report2 = new StorageReport[] {
|
||||
new StorageReport("dnStorage2", false, 2500, 200, 1848, 200)
|
||||
new StorageReport(dns2, false, 2500, 200, 1848, 200)
|
||||
};
|
||||
dnDesc1.updateHeartbeat(report1, 10, 2);
|
||||
dnDesc2.updateHeartbeat(report2, 20, 1);
|
||||
|
|
|
@ -394,8 +394,9 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
}
|
||||
|
||||
synchronized StorageReport getStorageReport(String bpid) {
|
||||
return new StorageReport(getStorageUuid(), false, getCapacity(),
|
||||
getUsed(), getFree(), map.get(bpid).getUsed());
|
||||
return new StorageReport(new DatanodeStorage(getStorageUuid()),
|
||||
false, getCapacity(), getUsed(), getFree(),
|
||||
map.get(bpid).getUsed());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|||
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.junit.After;
|
||||
|
@ -186,9 +187,8 @@ public class TestDiskError {
|
|||
// Check permissions on directories in 'dfs.datanode.data.dir'
|
||||
FileSystem localFS = FileSystem.getLocal(conf);
|
||||
for (DataNode dn : cluster.getDataNodes()) {
|
||||
String[] dataDirs =
|
||||
dn.getConf().getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
|
||||
for (String dir : dataDirs) {
|
||||
for (FsVolumeSpi v : dn.getFSDataset().getVolumes()) {
|
||||
String dir = v.getBasePath();
|
||||
Path dataDir = new Path(dir);
|
||||
FsPermission actual = localFS.getFileStatus(dataDir).getPermission();
|
||||
assertEquals("Permission for dir: " + dataDir + ", is " + actual +
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.*;
|
||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
|
||||
public class TestStorageReport {
|
||||
public static final Log LOG = LogFactory.getLog(TestStorageReport.class);
|
||||
|
||||
private static short REPL_FACTOR = 1;
|
||||
private static final StorageType storageType = StorageType.SSD; // pick non-default.
|
||||
|
||||
private static Configuration conf;
|
||||
private MiniDFSCluster cluster;
|
||||
private DistributedFileSystem fs;
|
||||
static String bpid;
|
||||
|
||||
@Before
|
||||
public void startUpCluster() throws IOException {
|
||||
conf = new HdfsConfiguration();
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(REPL_FACTOR)
|
||||
.storageType(storageType)
|
||||
.build();
|
||||
fs = cluster.getFileSystem();
|
||||
bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
}
|
||||
|
||||
@After
|
||||
public void shutDownCluster() throws IOException {
|
||||
if (cluster != null) {
|
||||
fs.close();
|
||||
cluster.shutdown();
|
||||
cluster = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure that storage type and storage state are propagated
|
||||
* in Storage Reports.
|
||||
*/
|
||||
@Test
|
||||
public void testStorageReportHasStorageTypeAndState() throws IOException {
|
||||
|
||||
// Make sure we are not testing with the default type, that would not
|
||||
// be a very good test.
|
||||
assertNotSame(storageType, StorageType.DEFAULT);
|
||||
NameNode nn = cluster.getNameNode();
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
|
||||
// Insert a spy object for the NN RPC.
|
||||
DatanodeProtocolClientSideTranslatorPB nnSpy =
|
||||
DataNodeTestUtils.spyOnBposToNN(dn, nn);
|
||||
|
||||
// Trigger a heartbeat so there is an interaction with the spy
|
||||
// object.
|
||||
DataNodeTestUtils.triggerHeartbeat(dn);
|
||||
|
||||
// Verify that the callback passed in the expected parameters.
|
||||
ArgumentCaptor<StorageReport[]> captor =
|
||||
ArgumentCaptor.forClass(StorageReport[].class);
|
||||
|
||||
Mockito.verify(nnSpy).sendHeartbeat(
|
||||
any(DatanodeRegistration.class),
|
||||
captor.capture(),
|
||||
anyInt(), anyInt(), anyInt());
|
||||
|
||||
StorageReport[] reports = captor.getValue();
|
||||
|
||||
for (StorageReport report: reports) {
|
||||
assertThat(report.getStorage().getStorageType(), is(storageType));
|
||||
assertThat(report.getStorage().getState(), is(DatanodeStorage.State.NORMAL));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -938,7 +938,7 @@ public class NNThroughputBenchmark implements Tool {
|
|||
// register datanode
|
||||
dnRegistration = nameNodeProto.registerDatanode(dnRegistration);
|
||||
//first block reports
|
||||
storage = new DatanodeStorage(dnRegistration.getDatanodeUuid());
|
||||
storage = new DatanodeStorage(DatanodeStorage.generateUuid());
|
||||
final StorageBlockReport[] reports = {
|
||||
new StorageBlockReport(storage,
|
||||
new BlockListAsLongs(null, null).getBlockListAsLongs())
|
||||
|
@ -954,8 +954,8 @@ public class NNThroughputBenchmark implements Tool {
|
|||
void sendHeartbeat() throws IOException {
|
||||
// register datanode
|
||||
// TODO:FEDERATION currently a single block pool is supported
|
||||
StorageReport[] rep = { new StorageReport(dnRegistration.getDatanodeUuid(),
|
||||
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
|
||||
StorageReport[] rep = { new StorageReport(storage, false,
|
||||
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
|
||||
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
|
||||
rep, 0, 0, 0).getCommands();
|
||||
if(cmds != null) {
|
||||
|
@ -1001,7 +1001,7 @@ public class NNThroughputBenchmark implements Tool {
|
|||
@SuppressWarnings("unused") // keep it for future blockReceived benchmark
|
||||
int replicateBlocks() throws IOException {
|
||||
// register datanode
|
||||
StorageReport[] rep = { new StorageReport(dnRegistration.getDatanodeUuid(),
|
||||
StorageReport[] rep = { new StorageReport(storage,
|
||||
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
|
||||
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
|
||||
rep, 0, 0, 0).getCommands();
|
||||
|
@ -1010,7 +1010,8 @@ public class NNThroughputBenchmark implements Tool {
|
|||
if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {
|
||||
// Send a copy of a block to another datanode
|
||||
BlockCommand bcmd = (BlockCommand)cmd;
|
||||
return transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
|
||||
return transferBlocks(bcmd.getBlocks(), bcmd.getTargets(),
|
||||
bcmd.getTargetStorageIDs());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1023,12 +1024,14 @@ public class NNThroughputBenchmark implements Tool {
|
|||
* that the blocks have been received.
|
||||
*/
|
||||
private int transferBlocks( Block blocks[],
|
||||
DatanodeInfo xferTargets[][]
|
||||
DatanodeInfo xferTargets[][],
|
||||
String targetStorageIDs[][]
|
||||
) throws IOException {
|
||||
for(int i = 0; i < blocks.length; i++) {
|
||||
DatanodeInfo blockTargets[] = xferTargets[i];
|
||||
for(int t = 0; t < blockTargets.length; t++) {
|
||||
DatanodeInfo dnInfo = blockTargets[t];
|
||||
String targetStorageID = targetStorageIDs[i][t];
|
||||
DatanodeRegistration receivedDNReg;
|
||||
receivedDNReg = new DatanodeRegistration(dnInfo,
|
||||
new DataStorage(nsInfo),
|
||||
|
@ -1038,7 +1041,7 @@ public class NNThroughputBenchmark implements Tool {
|
|||
blocks[i], ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
|
||||
null) };
|
||||
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
|
||||
receivedDNReg.getDatanodeUuid(), rdBlocks) };
|
||||
targetStorageID, rdBlocks) };
|
||||
nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode
|
||||
.getNamesystem().getBlockPoolId(), report);
|
||||
}
|
||||
|
@ -1127,7 +1130,7 @@ public class NNThroughputBenchmark implements Tool {
|
|||
}
|
||||
|
||||
// create files
|
||||
LOG.info("Creating " + nrFiles + " with " + blocksPerFile + " blocks each.");
|
||||
LOG.info("Creating " + nrFiles + " files with " + blocksPerFile + " blocks each.");
|
||||
FileNameGenerator nameGenerator;
|
||||
nameGenerator = new FileNameGenerator(getBaseDir(), 100);
|
||||
String clientName = getClientName(007);
|
||||
|
@ -1161,7 +1164,7 @@ public class NNThroughputBenchmark implements Tool {
|
|||
loc.getBlock().getLocalBlock(),
|
||||
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) };
|
||||
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
|
||||
datanodes[dnIdx].dnRegistration.getDatanodeUuid(), rdBlocks) };
|
||||
datanodes[dnIdx].storage.getStorageID(), rdBlocks) };
|
||||
nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc
|
||||
.getBlock().getBlockPoolId(), report);
|
||||
}
|
||||
|
|
|
@ -140,8 +140,9 @@ public class TestDeadDatanode {
|
|||
|
||||
// Ensure heartbeat from dead datanode is rejected with a command
|
||||
// that asks datanode to register again
|
||||
StorageReport[] rep = { new StorageReport(reg.getDatanodeUuid(), false, 0, 0,
|
||||
0, 0) };
|
||||
StorageReport[] rep = { new StorageReport(
|
||||
new DatanodeStorage(reg.getDatanodeUuid()),
|
||||
false, 0, 0, 0, 0) };
|
||||
DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0, 0, 0).getCommands();
|
||||
assertEquals(1, cmd.length);
|
||||
assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
|
||||
|
|
Loading…
Reference in New Issue