HDFS-7847. Modify NNThroughputBenchmark to be able to operate on a remote NameNode (Charles Lamb via Colin P. McCabe)

This commit is contained in:
Colin Patrick Mccabe 2015-05-05 11:27:36 -07:00
parent e4c3b52c89
commit ffce9a3413
3 changed files with 137 additions and 42 deletions

View File

@ -510,6 +510,9 @@ Release 2.8.0 - UNRELEASED
HDFS-7758. Retire FsDatasetSpi#getVolumes() and use HDFS-7758. Retire FsDatasetSpi#getVolumes() and use
FsDatasetSpi#getVolumeRefs() instead (Lei (Eddy) Xu via Colin P. McCabe) FsDatasetSpi#getVolumeRefs() instead (Lei (Eddy) Xu via Colin P. McCabe)
HDFS-7847. Modify NNThroughputBenchmark to be able to operate on a remote
NameNode (Charles Lamb via Colin P. McCabe)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -48,6 +48,7 @@ import java.lang.reflect.Modifier;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.net.URI;
import java.net.URL; import java.net.URL;
import java.net.URLConnection; import java.net.URLConnection;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -64,6 +65,7 @@ import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -129,12 +131,14 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping; import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -147,6 +151,7 @@ import org.apache.log4j.Level;
import org.junit.Assume; import org.junit.Assume;
import org.mockito.internal.util.reflection.Whitebox; import org.mockito.internal.util.reflection.Whitebox;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -1755,6 +1760,41 @@ public class DFSTestUtil {
GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level); GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
} }
/**
* Get the NamenodeProtocol RPC proxy for the NN associated with this
* DFSClient object
*
* @param nameNodeUri the URI of the NN to get a proxy for.
*
* @return the Namenode RPC proxy associated with this DFSClient object
*/
@VisibleForTesting
public static NamenodeProtocol getNamenodeProtocolProxy(Configuration conf,
URI nameNodeUri, UserGroupInformation ugi)
throws IOException {
return NameNodeProxies.createNonHAProxy(conf,
NameNode.getAddress(nameNodeUri), NamenodeProtocol.class, ugi, false).
getProxy();
}
/**
* Get the RefreshUserMappingsProtocol RPC proxy for the NN associated with
* this DFSClient object
*
* @param nameNodeUri the URI of the NN to get a proxy for.
*
* @return the RefreshUserMappingsProtocol RPC proxy associated with this
* DFSClient object
*/
@VisibleForTesting
public static RefreshUserMappingsProtocol getRefreshUserMappingsProtocolProxy(
Configuration conf, URI nameNodeUri) throws IOException {
final AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
return NameNodeProxies.createProxy(conf,
nameNodeUri, RefreshUserMappingsProtocol.class,
nnFallbackToSimpleAuth).getProxy();
}
/** /**
* Set the datanode dead * Set the datanode dead
*/ */

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
@ -30,19 +31,24 @@ import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
@ -53,6 +59,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
@ -63,6 +70,8 @@ import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -96,6 +105,9 @@ import org.apache.log4j.LogManager;
* By default the refresh is never called.</li> * By default the refresh is never called.</li>
* <li>-keepResults do not clean up the name-space after execution.</li> * <li>-keepResults do not clean up the name-space after execution.</li>
* <li>-useExisting do not recreate the name-space, use existing data.</li> * <li>-useExisting do not recreate the name-space, use existing data.</li>
* <li>-namenode will run the test against a namenode in another
* process or on another host. If you use this option, the namenode
* must have dfs.namenode.fs-limits.min-block-size set to 16.</li>
* </ol> * </ol>
* *
* The benchmark first generates inputs for each thread so that the * The benchmark first generates inputs for each thread so that the
@ -111,11 +123,20 @@ public class NNThroughputBenchmark implements Tool {
private static final Log LOG = LogFactory.getLog(NNThroughputBenchmark.class); private static final Log LOG = LogFactory.getLog(NNThroughputBenchmark.class);
private static final int BLOCK_SIZE = 16; private static final int BLOCK_SIZE = 16;
private static final String GENERAL_OPTIONS_USAGE = private static final String GENERAL_OPTIONS_USAGE =
" [-keepResults] | [-logLevel L] | [-UGCacheRefreshCount G]"; " [-keepResults] | [-logLevel L] | [-UGCacheRefreshCount G] |" +
" [-namenode <namenode URI>]\n" +
" If using -namenode, set the namenode's" +
" dfs.namenode.fs-limits.min-block-size to 16.";
static Configuration config; static Configuration config;
static NameNode nameNode; static NameNode nameNode;
static NamenodeProtocols nameNodeProto; static NamenodeProtocol nameNodeProto;
static ClientProtocol clientProto;
static DatanodeProtocol dataNodeProto;
static RefreshUserMappingsProtocol refreshUserMappingsProto;
static String bpid = null;
private String namenodeUri = null; // NN URI to use, if specified
NNThroughputBenchmark(Configuration conf) throws IOException { NNThroughputBenchmark(Configuration conf) throws IOException {
config = conf; config = conf;
@ -264,7 +285,7 @@ public class NNThroughputBenchmark implements Tool {
for(StatsDaemon d : daemons) for(StatsDaemon d : daemons)
d.start(); d.start();
} finally { } finally {
while(isInPorgress()) { while(isInProgress()) {
// try {Thread.sleep(500);} catch (InterruptedException e) {} // try {Thread.sleep(500);} catch (InterruptedException e) {}
} }
elapsedTime = Time.now() - start; elapsedTime = Time.now() - start;
@ -275,7 +296,7 @@ public class NNThroughputBenchmark implements Tool {
} }
} }
private boolean isInPorgress() { private boolean isInProgress() {
for(StatsDaemon d : daemons) for(StatsDaemon d : daemons)
if(d.isInProgress()) if(d.isInProgress())
return true; return true;
@ -283,10 +304,10 @@ public class NNThroughputBenchmark implements Tool {
} }
void cleanUp() throws IOException { void cleanUp() throws IOException {
nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
false); false);
if(!keepResults) if(!keepResults)
nameNodeProto.delete(getBaseDir(), true); clientProto.delete(getBaseDir(), true);
} }
int getNumOpsExecuted() { int getNumOpsExecuted() {
@ -360,6 +381,12 @@ public class NNThroughputBenchmark implements Tool {
args.remove(ugrcIndex); args.remove(ugrcIndex);
} }
try {
namenodeUri = StringUtils.popOptionWithArgument("-namenode", args);
} catch (IllegalArgumentException iae) {
printUsage();
}
String type = args.get(1); String type = args.get(1);
if(OP_ALL_NAME.equals(type)) { if(OP_ALL_NAME.equals(type)) {
type = getOpName(); type = getOpName();
@ -418,7 +445,7 @@ public class NNThroughputBenchmark implements Tool {
void benchmarkOne() throws IOException { void benchmarkOne() throws IOException {
for(int idx = 0; idx < opsPerThread; idx++) { for(int idx = 0; idx < opsPerThread; idx++) {
if((localNumOpsExecuted+1) % statsOp.ugcRefreshCount == 0) if((localNumOpsExecuted+1) % statsOp.ugcRefreshCount == 0)
nameNodeProto.refreshUserToGroupsMappings(); refreshUserMappingsProto.refreshUserToGroupsMappings();
long stat = statsOp.executeOp(daemonId, idx, arg1); long stat = statsOp.executeOp(daemonId, idx, arg1);
localNumOpsExecuted++; localNumOpsExecuted++;
localCumulativeTime += stat; localCumulativeTime += stat;
@ -484,10 +511,10 @@ public class NNThroughputBenchmark implements Tool {
@Override @Override
long executeOp(int daemonId, int inputIdx, String ignore) long executeOp(int daemonId, int inputIdx, String ignore)
throws IOException { throws IOException {
nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
false); false);
long start = Time.now(); long start = Time.now();
nameNodeProto.delete(BASE_DIR_NAME, true); clientProto.delete(BASE_DIR_NAME, true);
long end = Time.now(); long end = Time.now();
return end-start; return end-start;
} }
@ -553,7 +580,7 @@ public class NNThroughputBenchmark implements Tool {
@Override @Override
void generateInputs(int[] opsPerThread) throws IOException { void generateInputs(int[] opsPerThread) throws IOException {
assert opsPerThread.length == numThreads : "Error opsPerThread.length"; assert opsPerThread.length == numThreads : "Error opsPerThread.length";
nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
false); false);
// int generatedFileIdx = 0; // int generatedFileIdx = 0;
LOG.info("Generate " + numOpsRequired + " intputs for " + getOpName()); LOG.info("Generate " + numOpsRequired + " intputs for " + getOpName());
@ -588,13 +615,13 @@ public class NNThroughputBenchmark implements Tool {
throws IOException { throws IOException {
long start = Time.now(); long start = Time.now();
// dummyActionNoSynch(fileIdx); // dummyActionNoSynch(fileIdx);
nameNodeProto.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(), clientProto.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
clientName, new EnumSetWritable<CreateFlag>(EnumSet clientName, new EnumSetWritable<CreateFlag>(EnumSet
.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, .of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true,
replication, BLOCK_SIZE, null); replication, BLOCK_SIZE, CryptoProtocolVersion.supported());
long end = Time.now(); long end = Time.now();
for(boolean written = !closeUponCreate; !written; for(boolean written = !closeUponCreate; !written;
written = nameNodeProto.complete(fileNames[daemonId][inputIdx], written = clientProto.complete(fileNames[daemonId][inputIdx],
clientName, null, HdfsConstants.GRANDFATHER_INODE_ID)); clientName, null, HdfsConstants.GRANDFATHER_INODE_ID));
return end-start; return end-start;
} }
@ -657,7 +684,7 @@ public class NNThroughputBenchmark implements Tool {
@Override @Override
void generateInputs(int[] opsPerThread) throws IOException { void generateInputs(int[] opsPerThread) throws IOException {
assert opsPerThread.length == numThreads : "Error opsPerThread.length"; assert opsPerThread.length == numThreads : "Error opsPerThread.length";
nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
false); false);
LOG.info("Generate " + numOpsRequired + " inputs for " + getOpName()); LOG.info("Generate " + numOpsRequired + " inputs for " + getOpName());
dirPaths = new String[numThreads][]; dirPaths = new String[numThreads][];
@ -685,7 +712,7 @@ public class NNThroughputBenchmark implements Tool {
long executeOp(int daemonId, int inputIdx, String clientName) long executeOp(int daemonId, int inputIdx, String clientName)
throws IOException { throws IOException {
long start = Time.now(); long start = Time.now();
nameNodeProto.mkdirs(dirPaths[daemonId][inputIdx], clientProto.mkdirs(dirPaths[daemonId][inputIdx],
FsPermission.getDefault(), true); FsPermission.getDefault(), true);
long end = Time.now(); long end = Time.now();
return end-start; return end-start;
@ -757,11 +784,11 @@ public class NNThroughputBenchmark implements Tool {
} }
// use the same files for open // use the same files for open
super.generateInputs(opsPerThread); super.generateInputs(opsPerThread);
if(nameNodeProto.getFileInfo(opCreate.getBaseDir()) != null if(clientProto.getFileInfo(opCreate.getBaseDir()) != null
&& nameNodeProto.getFileInfo(getBaseDir()) == null) { && clientProto.getFileInfo(getBaseDir()) == null) {
nameNodeProto.rename(opCreate.getBaseDir(), getBaseDir()); clientProto.rename(opCreate.getBaseDir(), getBaseDir());
} }
if(nameNodeProto.getFileInfo(getBaseDir()) == null) { if(clientProto.getFileInfo(getBaseDir()) == null) {
throw new IOException(getBaseDir() + " does not exist."); throw new IOException(getBaseDir() + " does not exist.");
} }
} }
@ -773,7 +800,7 @@ public class NNThroughputBenchmark implements Tool {
long executeOp(int daemonId, int inputIdx, String ignore) long executeOp(int daemonId, int inputIdx, String ignore)
throws IOException { throws IOException {
long start = Time.now(); long start = Time.now();
nameNodeProto.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE); clientProto.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
long end = Time.now(); long end = Time.now();
return end-start; return end-start;
} }
@ -803,7 +830,7 @@ public class NNThroughputBenchmark implements Tool {
long executeOp(int daemonId, int inputIdx, String ignore) long executeOp(int daemonId, int inputIdx, String ignore)
throws IOException { throws IOException {
long start = Time.now(); long start = Time.now();
nameNodeProto.delete(fileNames[daemonId][inputIdx], false); clientProto.delete(fileNames[daemonId][inputIdx], false);
long end = Time.now(); long end = Time.now();
return end-start; return end-start;
} }
@ -833,7 +860,7 @@ public class NNThroughputBenchmark implements Tool {
long executeOp(int daemonId, int inputIdx, String ignore) long executeOp(int daemonId, int inputIdx, String ignore)
throws IOException { throws IOException {
long start = Time.now(); long start = Time.now();
nameNodeProto.getFileInfo(fileNames[daemonId][inputIdx]); clientProto.getFileInfo(fileNames[daemonId][inputIdx]);
long end = Time.now(); long end = Time.now();
return end-start; return end-start;
} }
@ -877,7 +904,7 @@ public class NNThroughputBenchmark implements Tool {
long executeOp(int daemonId, int inputIdx, String ignore) long executeOp(int daemonId, int inputIdx, String ignore)
throws IOException { throws IOException {
long start = Time.now(); long start = Time.now();
nameNodeProto.rename(fileNames[daemonId][inputIdx], clientProto.rename(fileNames[daemonId][inputIdx],
destNames[daemonId][inputIdx]); destNames[daemonId][inputIdx]);
long end = Time.now(); long end = Time.now();
return end-start; return end-start;
@ -933,14 +960,14 @@ public class NNThroughputBenchmark implements Tool {
new DataStorage(nsInfo), new DataStorage(nsInfo),
new ExportedBlockKeys(), VersionInfo.getVersion()); new ExportedBlockKeys(), VersionInfo.getVersion());
// register datanode // register datanode
dnRegistration = nameNodeProto.registerDatanode(dnRegistration); dnRegistration = dataNodeProto.registerDatanode(dnRegistration);
dnRegistration.setNamespaceInfo(nsInfo);
//first block reports //first block reports
storage = new DatanodeStorage(DatanodeStorage.generateUuid()); storage = new DatanodeStorage(DatanodeStorage.generateUuid());
final StorageBlockReport[] reports = { final StorageBlockReport[] reports = {
new StorageBlockReport(storage, BlockListAsLongs.EMPTY) new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
}; };
nameNodeProto.blockReport(dnRegistration, dataNodeProto.blockReport(dnRegistration, bpid, reports,
nameNode.getNamesystem().getBlockPoolId(), reports,
new BlockReportContext(1, 0, System.nanoTime())); new BlockReportContext(1, 0, System.nanoTime()));
} }
@ -953,7 +980,7 @@ public class NNThroughputBenchmark implements Tool {
// TODO:FEDERATION currently a single block pool is supported // TODO:FEDERATION currently a single block pool is supported
StorageReport[] rep = { new StorageReport(storage, false, StorageReport[] rep = { new StorageReport(storage, false,
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) }; DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, rep, DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
0L, 0L, 0, 0, 0, null).getCommands(); 0L, 0L, 0, 0, 0, null).getCommands();
if(cmds != null) { if(cmds != null) {
for (DatanodeCommand cmd : cmds ) { for (DatanodeCommand cmd : cmds ) {
@ -1002,7 +1029,7 @@ public class NNThroughputBenchmark implements Tool {
// register datanode // register datanode
StorageReport[] rep = { new StorageReport(storage, StorageReport[] rep = { new StorageReport(storage,
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) }; false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
rep, 0L, 0L, 0, 0, 0, null).getCommands(); rep, 0L, 0L, 0, 0, 0, null).getCommands();
if (cmds != null) { if (cmds != null) {
for (DatanodeCommand cmd : cmds) { for (DatanodeCommand cmd : cmds) {
@ -1041,8 +1068,7 @@ public class NNThroughputBenchmark implements Tool {
null) }; null) };
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
targetStorageID, rdBlocks) }; targetStorageID, rdBlocks) };
nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode dataNodeProto.blockReceivedAndDeleted(receivedDNReg, bpid, report);
.getNamesystem().getBlockPoolId(), report);
} }
} }
return blocks.length; return blocks.length;
@ -1133,15 +1159,15 @@ public class NNThroughputBenchmark implements Tool {
FileNameGenerator nameGenerator; FileNameGenerator nameGenerator;
nameGenerator = new FileNameGenerator(getBaseDir(), 100); nameGenerator = new FileNameGenerator(getBaseDir(), 100);
String clientName = getClientName(007); String clientName = getClientName(007);
nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
false); false);
for(int idx=0; idx < nrFiles; idx++) { for(int idx=0; idx < nrFiles; idx++) {
String fileName = nameGenerator.getNextFileName("ThroughputBench"); String fileName = nameGenerator.getNextFileName("ThroughputBench");
nameNodeProto.create(fileName, FsPermission.getDefault(), clientName, clientProto.create(fileName, FsPermission.getDefault(), clientName,
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication, new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication,
BLOCK_SIZE, null); BLOCK_SIZE, CryptoProtocolVersion.supported());
ExtendedBlock lastBlock = addBlocks(fileName, clientName); ExtendedBlock lastBlock = addBlocks(fileName, clientName);
nameNodeProto.complete(fileName, clientName, lastBlock, HdfsConstants.GRANDFATHER_INODE_ID); clientProto.complete(fileName, clientName, lastBlock, HdfsConstants.GRANDFATHER_INODE_ID);
} }
// prepare block reports // prepare block reports
for(int idx=0; idx < nrDatanodes; idx++) { for(int idx=0; idx < nrDatanodes; idx++) {
@ -1153,7 +1179,7 @@ public class NNThroughputBenchmark implements Tool {
throws IOException { throws IOException {
ExtendedBlock prevBlock = null; ExtendedBlock prevBlock = null;
for(int jdx = 0; jdx < blocksPerFile; jdx++) { for(int jdx = 0; jdx < blocksPerFile; jdx++) {
LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName, LocatedBlock loc = clientProto.addBlock(fileName, clientName,
prevBlock, null, HdfsConstants.GRANDFATHER_INODE_ID, null); prevBlock, null, HdfsConstants.GRANDFATHER_INODE_ID, null);
prevBlock = loc.getBlock(); prevBlock = loc.getBlock();
for(DatanodeInfo dnInfo : loc.getLocations()) { for(DatanodeInfo dnInfo : loc.getLocations()) {
@ -1164,8 +1190,8 @@ public class NNThroughputBenchmark implements Tool {
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) }; ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) };
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
datanodes[dnIdx].storage.getStorageID(), rdBlocks) }; datanodes[dnIdx].storage.getStorageID(), rdBlocks) };
nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc dataNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration,
.getBlock().getBlockPoolId(), report); bpid, report);
} }
} }
return prevBlock; return prevBlock;
@ -1186,8 +1212,7 @@ public class NNThroughputBenchmark implements Tool {
long start = Time.now(); long start = Time.now();
StorageBlockReport[] report = { new StorageBlockReport( StorageBlockReport[] report = { new StorageBlockReport(
dn.storage, dn.getBlockReportList()) }; dn.storage, dn.getBlockReportList()) };
nameNodeProto.blockReport(dn.dnRegistration, dataNodeProto.blockReport(dn.dnRegistration, bpid, report,
nameNode.getNamesystem().getBlockPoolId(), report,
new BlockReportContext(1, 0, System.nanoTime())); new BlockReportContext(1, 0, System.nanoTime()));
long end = Time.now(); long end = Time.now();
return end-start; return end-start;
@ -1318,7 +1343,7 @@ public class NNThroughputBenchmark implements Tool {
LOG.info("Datanode " + dn + " is decommissioned."); LOG.info("Datanode " + dn + " is decommissioned.");
} }
excludeFile.close(); excludeFile.close();
nameNodeProto.refreshNodes(); clientProto.refreshNodes();
} }
/** /**
@ -1414,8 +1439,6 @@ public class NNThroughputBenchmark implements Tool {
// Start the NameNode // Start the NameNode
String[] argv = new String[] {}; String[] argv = new String[] {};
nameNode = NameNode.createNameNode(argv, config);
nameNodeProto = nameNode.getRpcServer();
List<OperationStatsBase> ops = new ArrayList<OperationStatsBase>(); List<OperationStatsBase> ops = new ArrayList<OperationStatsBase>();
OperationStatsBase opStat = null; OperationStatsBase opStat = null;
@ -1456,6 +1479,29 @@ public class NNThroughputBenchmark implements Tool {
opStat = new CleanAllStats(args); opStat = new CleanAllStats(args);
ops.add(opStat); ops.add(opStat);
} }
if (namenodeUri == null) {
nameNode = NameNode.createNameNode(argv, config);
NamenodeProtocols nnProtos = nameNode.getRpcServer();
nameNodeProto = nnProtos;
clientProto = nnProtos;
dataNodeProto = nnProtos;
refreshUserMappingsProto = nnProtos;
bpid = nameNode.getNamesystem().getBlockPoolId();
} else {
FileSystem.setDefaultUri(getConf(), namenodeUri);
DistributedFileSystem dfs = (DistributedFileSystem)
FileSystem.get(getConf());
final URI nnUri = new URI(namenodeUri);
nameNodeProto = DFSTestUtil.getNamenodeProtocolProxy(config, nnUri,
UserGroupInformation.getCurrentUser());
clientProto = dfs.getClient().getNamenode();
dataNodeProto = new DatanodeProtocolClientSideTranslatorPB(
NameNode.getAddress(nnUri), config);
refreshUserMappingsProto =
DFSTestUtil.getRefreshUserMappingsProtocolProxy(config, nnUri);
getBlockPoolId(dfs);
}
if(ops.size() == 0) if(ops.size() == 0)
printUsage(); printUsage();
// run each benchmark // run each benchmark
@ -1476,6 +1522,12 @@ public class NNThroughputBenchmark implements Tool {
return 0; return 0;
} }
private void getBlockPoolId(DistributedFileSystem unused)
throws IOException {
final NamespaceInfo nsInfo = nameNodeProto.versionRequest();
bpid = nsInfo.getBlockPoolID();
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
NNThroughputBenchmark bench = null; NNThroughputBenchmark bench = null;
try { try {