HDFS-7847. Modify NNThroughputBenchmark to be able to operate on a remote NameNode (Charles Lamb via Colin P. McCabe)
(cherry picked from commit ffce9a3413
)
This commit is contained in:
parent
da410ea02a
commit
f79b1f0190
|
@ -186,6 +186,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-8314. Move HdfsServerConstants#IO_FILE_BUFFER_SIZE and
|
HDFS-8314. Move HdfsServerConstants#IO_FILE_BUFFER_SIZE and
|
||||||
SMALL_BUFFER_SIZE to the users. (Li Lu via wheat9)
|
SMALL_BUFFER_SIZE to the users. (Li Lu via wheat9)
|
||||||
|
|
||||||
|
HDFS-7847. Modify NNThroughputBenchmark to be able to operate on a remote
|
||||||
|
NameNode (Charles Lamb via Colin P. McCabe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -48,6 +48,7 @@ import java.lang.reflect.Modifier;
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
|
import java.net.URI;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.net.URLConnection;
|
import java.net.URLConnection;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
@ -64,6 +65,7 @@ import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -129,12 +131,14 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
|
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.net.unix.DomainSocket;
|
import org.apache.hadoop.net.unix.DomainSocket;
|
||||||
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
||||||
|
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
|
||||||
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
|
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
@ -147,6 +151,7 @@ import org.apache.log4j.Level;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
import org.mockito.internal.util.reflection.Whitebox;
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
@ -1763,6 +1768,41 @@ public class DFSTestUtil {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the NamenodeProtocol RPC proxy for the NN associated with this
|
||||||
|
* DFSClient object
|
||||||
|
*
|
||||||
|
* @param nameNodeUri the URI of the NN to get a proxy for.
|
||||||
|
*
|
||||||
|
* @return the Namenode RPC proxy associated with this DFSClient object
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public static NamenodeProtocol getNamenodeProtocolProxy(Configuration conf,
|
||||||
|
URI nameNodeUri, UserGroupInformation ugi)
|
||||||
|
throws IOException {
|
||||||
|
return NameNodeProxies.createNonHAProxy(conf,
|
||||||
|
NameNode.getAddress(nameNodeUri), NamenodeProtocol.class, ugi, false).
|
||||||
|
getProxy();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the RefreshUserMappingsProtocol RPC proxy for the NN associated with
|
||||||
|
* this DFSClient object
|
||||||
|
*
|
||||||
|
* @param nameNodeUri the URI of the NN to get a proxy for.
|
||||||
|
*
|
||||||
|
* @return the RefreshUserMappingsProtocol RPC proxy associated with this
|
||||||
|
* DFSClient object
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public static RefreshUserMappingsProtocol getRefreshUserMappingsProtocolProxy(
|
||||||
|
Configuration conf, URI nameNodeUri) throws IOException {
|
||||||
|
final AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
|
||||||
|
return NameNodeProxies.createProxy(conf,
|
||||||
|
nameNodeUri, RefreshUserMappingsProtocol.class,
|
||||||
|
nnFallbackToSimpleAuth).getProxy();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the datanode dead
|
* Set the datanode dead
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
@ -30,19 +31,24 @@ import com.google.common.base.Preconditions;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
@ -53,6 +59,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||||
|
@ -63,6 +70,8 @@ import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.apache.hadoop.net.DNS;
|
import org.apache.hadoop.net.DNS;
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
import org.apache.hadoop.security.Groups;
|
import org.apache.hadoop.security.Groups;
|
||||||
|
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
@ -96,6 +105,9 @@ import org.apache.log4j.LogManager;
|
||||||
* By default the refresh is never called.</li>
|
* By default the refresh is never called.</li>
|
||||||
* <li>-keepResults do not clean up the name-space after execution.</li>
|
* <li>-keepResults do not clean up the name-space after execution.</li>
|
||||||
* <li>-useExisting do not recreate the name-space, use existing data.</li>
|
* <li>-useExisting do not recreate the name-space, use existing data.</li>
|
||||||
|
* <li>-namenode will run the test against a namenode in another
|
||||||
|
* process or on another host. If you use this option, the namenode
|
||||||
|
* must have dfs.namenode.fs-limits.min-block-size set to 16.</li>
|
||||||
* </ol>
|
* </ol>
|
||||||
*
|
*
|
||||||
* The benchmark first generates inputs for each thread so that the
|
* The benchmark first generates inputs for each thread so that the
|
||||||
|
@ -111,11 +123,20 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
private static final Log LOG = LogFactory.getLog(NNThroughputBenchmark.class);
|
private static final Log LOG = LogFactory.getLog(NNThroughputBenchmark.class);
|
||||||
private static final int BLOCK_SIZE = 16;
|
private static final int BLOCK_SIZE = 16;
|
||||||
private static final String GENERAL_OPTIONS_USAGE =
|
private static final String GENERAL_OPTIONS_USAGE =
|
||||||
" [-keepResults] | [-logLevel L] | [-UGCacheRefreshCount G]";
|
" [-keepResults] | [-logLevel L] | [-UGCacheRefreshCount G] |" +
|
||||||
|
" [-namenode <namenode URI>]\n" +
|
||||||
|
" If using -namenode, set the namenode's" +
|
||||||
|
" dfs.namenode.fs-limits.min-block-size to 16.";
|
||||||
|
|
||||||
static Configuration config;
|
static Configuration config;
|
||||||
static NameNode nameNode;
|
static NameNode nameNode;
|
||||||
static NamenodeProtocols nameNodeProto;
|
static NamenodeProtocol nameNodeProto;
|
||||||
|
static ClientProtocol clientProto;
|
||||||
|
static DatanodeProtocol dataNodeProto;
|
||||||
|
static RefreshUserMappingsProtocol refreshUserMappingsProto;
|
||||||
|
static String bpid = null;
|
||||||
|
|
||||||
|
private String namenodeUri = null; // NN URI to use, if specified
|
||||||
|
|
||||||
NNThroughputBenchmark(Configuration conf) throws IOException {
|
NNThroughputBenchmark(Configuration conf) throws IOException {
|
||||||
config = conf;
|
config = conf;
|
||||||
|
@ -264,7 +285,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
for(StatsDaemon d : daemons)
|
for(StatsDaemon d : daemons)
|
||||||
d.start();
|
d.start();
|
||||||
} finally {
|
} finally {
|
||||||
while(isInPorgress()) {
|
while(isInProgress()) {
|
||||||
// try {Thread.sleep(500);} catch (InterruptedException e) {}
|
// try {Thread.sleep(500);} catch (InterruptedException e) {}
|
||||||
}
|
}
|
||||||
elapsedTime = Time.now() - start;
|
elapsedTime = Time.now() - start;
|
||||||
|
@ -275,7 +296,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isInPorgress() {
|
private boolean isInProgress() {
|
||||||
for(StatsDaemon d : daemons)
|
for(StatsDaemon d : daemons)
|
||||||
if(d.isInProgress())
|
if(d.isInProgress())
|
||||||
return true;
|
return true;
|
||||||
|
@ -283,10 +304,10 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
void cleanUp() throws IOException {
|
void cleanUp() throws IOException {
|
||||||
nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
|
clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
|
||||||
false);
|
false);
|
||||||
if(!keepResults)
|
if(!keepResults)
|
||||||
nameNodeProto.delete(getBaseDir(), true);
|
clientProto.delete(getBaseDir(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
int getNumOpsExecuted() {
|
int getNumOpsExecuted() {
|
||||||
|
@ -360,6 +381,12 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
args.remove(ugrcIndex);
|
args.remove(ugrcIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
namenodeUri = StringUtils.popOptionWithArgument("-namenode", args);
|
||||||
|
} catch (IllegalArgumentException iae) {
|
||||||
|
printUsage();
|
||||||
|
}
|
||||||
|
|
||||||
String type = args.get(1);
|
String type = args.get(1);
|
||||||
if(OP_ALL_NAME.equals(type)) {
|
if(OP_ALL_NAME.equals(type)) {
|
||||||
type = getOpName();
|
type = getOpName();
|
||||||
|
@ -418,7 +445,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
void benchmarkOne() throws IOException {
|
void benchmarkOne() throws IOException {
|
||||||
for(int idx = 0; idx < opsPerThread; idx++) {
|
for(int idx = 0; idx < opsPerThread; idx++) {
|
||||||
if((localNumOpsExecuted+1) % statsOp.ugcRefreshCount == 0)
|
if((localNumOpsExecuted+1) % statsOp.ugcRefreshCount == 0)
|
||||||
nameNodeProto.refreshUserToGroupsMappings();
|
refreshUserMappingsProto.refreshUserToGroupsMappings();
|
||||||
long stat = statsOp.executeOp(daemonId, idx, arg1);
|
long stat = statsOp.executeOp(daemonId, idx, arg1);
|
||||||
localNumOpsExecuted++;
|
localNumOpsExecuted++;
|
||||||
localCumulativeTime += stat;
|
localCumulativeTime += stat;
|
||||||
|
@ -484,10 +511,10 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
@Override
|
@Override
|
||||||
long executeOp(int daemonId, int inputIdx, String ignore)
|
long executeOp(int daemonId, int inputIdx, String ignore)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
|
clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
|
||||||
false);
|
false);
|
||||||
long start = Time.now();
|
long start = Time.now();
|
||||||
nameNodeProto.delete(BASE_DIR_NAME, true);
|
clientProto.delete(BASE_DIR_NAME, true);
|
||||||
long end = Time.now();
|
long end = Time.now();
|
||||||
return end-start;
|
return end-start;
|
||||||
}
|
}
|
||||||
|
@ -553,7 +580,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
@Override
|
@Override
|
||||||
void generateInputs(int[] opsPerThread) throws IOException {
|
void generateInputs(int[] opsPerThread) throws IOException {
|
||||||
assert opsPerThread.length == numThreads : "Error opsPerThread.length";
|
assert opsPerThread.length == numThreads : "Error opsPerThread.length";
|
||||||
nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
|
clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
|
||||||
false);
|
false);
|
||||||
// int generatedFileIdx = 0;
|
// int generatedFileIdx = 0;
|
||||||
LOG.info("Generate " + numOpsRequired + " intputs for " + getOpName());
|
LOG.info("Generate " + numOpsRequired + " intputs for " + getOpName());
|
||||||
|
@ -588,13 +615,13 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
long start = Time.now();
|
long start = Time.now();
|
||||||
// dummyActionNoSynch(fileIdx);
|
// dummyActionNoSynch(fileIdx);
|
||||||
nameNodeProto.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
|
clientProto.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
|
||||||
clientName, new EnumSetWritable<CreateFlag>(EnumSet
|
clientName, new EnumSetWritable<CreateFlag>(EnumSet
|
||||||
.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true,
|
.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true,
|
||||||
replication, BLOCK_SIZE, null);
|
replication, BLOCK_SIZE, CryptoProtocolVersion.supported());
|
||||||
long end = Time.now();
|
long end = Time.now();
|
||||||
for(boolean written = !closeUponCreate; !written;
|
for(boolean written = !closeUponCreate; !written;
|
||||||
written = nameNodeProto.complete(fileNames[daemonId][inputIdx],
|
written = clientProto.complete(fileNames[daemonId][inputIdx],
|
||||||
clientName, null, HdfsConstants.GRANDFATHER_INODE_ID));
|
clientName, null, HdfsConstants.GRANDFATHER_INODE_ID));
|
||||||
return end-start;
|
return end-start;
|
||||||
}
|
}
|
||||||
|
@ -657,7 +684,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
@Override
|
@Override
|
||||||
void generateInputs(int[] opsPerThread) throws IOException {
|
void generateInputs(int[] opsPerThread) throws IOException {
|
||||||
assert opsPerThread.length == numThreads : "Error opsPerThread.length";
|
assert opsPerThread.length == numThreads : "Error opsPerThread.length";
|
||||||
nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
|
clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
|
||||||
false);
|
false);
|
||||||
LOG.info("Generate " + numOpsRequired + " inputs for " + getOpName());
|
LOG.info("Generate " + numOpsRequired + " inputs for " + getOpName());
|
||||||
dirPaths = new String[numThreads][];
|
dirPaths = new String[numThreads][];
|
||||||
|
@ -685,7 +712,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
long executeOp(int daemonId, int inputIdx, String clientName)
|
long executeOp(int daemonId, int inputIdx, String clientName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
long start = Time.now();
|
long start = Time.now();
|
||||||
nameNodeProto.mkdirs(dirPaths[daemonId][inputIdx],
|
clientProto.mkdirs(dirPaths[daemonId][inputIdx],
|
||||||
FsPermission.getDefault(), true);
|
FsPermission.getDefault(), true);
|
||||||
long end = Time.now();
|
long end = Time.now();
|
||||||
return end-start;
|
return end-start;
|
||||||
|
@ -757,11 +784,11 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
}
|
}
|
||||||
// use the same files for open
|
// use the same files for open
|
||||||
super.generateInputs(opsPerThread);
|
super.generateInputs(opsPerThread);
|
||||||
if(nameNodeProto.getFileInfo(opCreate.getBaseDir()) != null
|
if(clientProto.getFileInfo(opCreate.getBaseDir()) != null
|
||||||
&& nameNodeProto.getFileInfo(getBaseDir()) == null) {
|
&& clientProto.getFileInfo(getBaseDir()) == null) {
|
||||||
nameNodeProto.rename(opCreate.getBaseDir(), getBaseDir());
|
clientProto.rename(opCreate.getBaseDir(), getBaseDir());
|
||||||
}
|
}
|
||||||
if(nameNodeProto.getFileInfo(getBaseDir()) == null) {
|
if(clientProto.getFileInfo(getBaseDir()) == null) {
|
||||||
throw new IOException(getBaseDir() + " does not exist.");
|
throw new IOException(getBaseDir() + " does not exist.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -773,7 +800,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
long executeOp(int daemonId, int inputIdx, String ignore)
|
long executeOp(int daemonId, int inputIdx, String ignore)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
long start = Time.now();
|
long start = Time.now();
|
||||||
nameNodeProto.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
|
clientProto.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
|
||||||
long end = Time.now();
|
long end = Time.now();
|
||||||
return end-start;
|
return end-start;
|
||||||
}
|
}
|
||||||
|
@ -803,7 +830,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
long executeOp(int daemonId, int inputIdx, String ignore)
|
long executeOp(int daemonId, int inputIdx, String ignore)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
long start = Time.now();
|
long start = Time.now();
|
||||||
nameNodeProto.delete(fileNames[daemonId][inputIdx], false);
|
clientProto.delete(fileNames[daemonId][inputIdx], false);
|
||||||
long end = Time.now();
|
long end = Time.now();
|
||||||
return end-start;
|
return end-start;
|
||||||
}
|
}
|
||||||
|
@ -833,7 +860,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
long executeOp(int daemonId, int inputIdx, String ignore)
|
long executeOp(int daemonId, int inputIdx, String ignore)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
long start = Time.now();
|
long start = Time.now();
|
||||||
nameNodeProto.getFileInfo(fileNames[daemonId][inputIdx]);
|
clientProto.getFileInfo(fileNames[daemonId][inputIdx]);
|
||||||
long end = Time.now();
|
long end = Time.now();
|
||||||
return end-start;
|
return end-start;
|
||||||
}
|
}
|
||||||
|
@ -877,7 +904,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
long executeOp(int daemonId, int inputIdx, String ignore)
|
long executeOp(int daemonId, int inputIdx, String ignore)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
long start = Time.now();
|
long start = Time.now();
|
||||||
nameNodeProto.rename(fileNames[daemonId][inputIdx],
|
clientProto.rename(fileNames[daemonId][inputIdx],
|
||||||
destNames[daemonId][inputIdx]);
|
destNames[daemonId][inputIdx]);
|
||||||
long end = Time.now();
|
long end = Time.now();
|
||||||
return end-start;
|
return end-start;
|
||||||
|
@ -933,14 +960,14 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
new DataStorage(nsInfo),
|
new DataStorage(nsInfo),
|
||||||
new ExportedBlockKeys(), VersionInfo.getVersion());
|
new ExportedBlockKeys(), VersionInfo.getVersion());
|
||||||
// register datanode
|
// register datanode
|
||||||
dnRegistration = nameNodeProto.registerDatanode(dnRegistration);
|
dnRegistration = dataNodeProto.registerDatanode(dnRegistration);
|
||||||
|
dnRegistration.setNamespaceInfo(nsInfo);
|
||||||
//first block reports
|
//first block reports
|
||||||
storage = new DatanodeStorage(DatanodeStorage.generateUuid());
|
storage = new DatanodeStorage(DatanodeStorage.generateUuid());
|
||||||
final StorageBlockReport[] reports = {
|
final StorageBlockReport[] reports = {
|
||||||
new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
|
new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
|
||||||
};
|
};
|
||||||
nameNodeProto.blockReport(dnRegistration,
|
dataNodeProto.blockReport(dnRegistration, bpid, reports,
|
||||||
nameNode.getNamesystem().getBlockPoolId(), reports,
|
|
||||||
new BlockReportContext(1, 0, System.nanoTime()));
|
new BlockReportContext(1, 0, System.nanoTime()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -953,7 +980,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
// TODO:FEDERATION currently a single block pool is supported
|
// TODO:FEDERATION currently a single block pool is supported
|
||||||
StorageReport[] rep = { new StorageReport(storage, false,
|
StorageReport[] rep = { new StorageReport(storage, false,
|
||||||
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
|
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
|
||||||
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, rep,
|
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
|
||||||
0L, 0L, 0, 0, 0, null).getCommands();
|
0L, 0L, 0, 0, 0, null).getCommands();
|
||||||
if(cmds != null) {
|
if(cmds != null) {
|
||||||
for (DatanodeCommand cmd : cmds ) {
|
for (DatanodeCommand cmd : cmds ) {
|
||||||
|
@ -1002,7 +1029,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
// register datanode
|
// register datanode
|
||||||
StorageReport[] rep = { new StorageReport(storage,
|
StorageReport[] rep = { new StorageReport(storage,
|
||||||
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
|
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
|
||||||
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
|
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
|
||||||
rep, 0L, 0L, 0, 0, 0, null).getCommands();
|
rep, 0L, 0L, 0, 0, 0, null).getCommands();
|
||||||
if (cmds != null) {
|
if (cmds != null) {
|
||||||
for (DatanodeCommand cmd : cmds) {
|
for (DatanodeCommand cmd : cmds) {
|
||||||
|
@ -1041,8 +1068,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
null) };
|
null) };
|
||||||
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
|
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
|
||||||
targetStorageID, rdBlocks) };
|
targetStorageID, rdBlocks) };
|
||||||
nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode
|
dataNodeProto.blockReceivedAndDeleted(receivedDNReg, bpid, report);
|
||||||
.getNamesystem().getBlockPoolId(), report);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return blocks.length;
|
return blocks.length;
|
||||||
|
@ -1133,15 +1159,15 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
FileNameGenerator nameGenerator;
|
FileNameGenerator nameGenerator;
|
||||||
nameGenerator = new FileNameGenerator(getBaseDir(), 100);
|
nameGenerator = new FileNameGenerator(getBaseDir(), 100);
|
||||||
String clientName = getClientName(007);
|
String clientName = getClientName(007);
|
||||||
nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
|
clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
|
||||||
false);
|
false);
|
||||||
for(int idx=0; idx < nrFiles; idx++) {
|
for(int idx=0; idx < nrFiles; idx++) {
|
||||||
String fileName = nameGenerator.getNextFileName("ThroughputBench");
|
String fileName = nameGenerator.getNextFileName("ThroughputBench");
|
||||||
nameNodeProto.create(fileName, FsPermission.getDefault(), clientName,
|
clientProto.create(fileName, FsPermission.getDefault(), clientName,
|
||||||
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication,
|
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication,
|
||||||
BLOCK_SIZE, null);
|
BLOCK_SIZE, CryptoProtocolVersion.supported());
|
||||||
ExtendedBlock lastBlock = addBlocks(fileName, clientName);
|
ExtendedBlock lastBlock = addBlocks(fileName, clientName);
|
||||||
nameNodeProto.complete(fileName, clientName, lastBlock, HdfsConstants.GRANDFATHER_INODE_ID);
|
clientProto.complete(fileName, clientName, lastBlock, HdfsConstants.GRANDFATHER_INODE_ID);
|
||||||
}
|
}
|
||||||
// prepare block reports
|
// prepare block reports
|
||||||
for(int idx=0; idx < nrDatanodes; idx++) {
|
for(int idx=0; idx < nrDatanodes; idx++) {
|
||||||
|
@ -1153,7 +1179,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
ExtendedBlock prevBlock = null;
|
ExtendedBlock prevBlock = null;
|
||||||
for(int jdx = 0; jdx < blocksPerFile; jdx++) {
|
for(int jdx = 0; jdx < blocksPerFile; jdx++) {
|
||||||
LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName,
|
LocatedBlock loc = clientProto.addBlock(fileName, clientName,
|
||||||
prevBlock, null, HdfsConstants.GRANDFATHER_INODE_ID, null);
|
prevBlock, null, HdfsConstants.GRANDFATHER_INODE_ID, null);
|
||||||
prevBlock = loc.getBlock();
|
prevBlock = loc.getBlock();
|
||||||
for(DatanodeInfo dnInfo : loc.getLocations()) {
|
for(DatanodeInfo dnInfo : loc.getLocations()) {
|
||||||
|
@ -1164,8 +1190,8 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) };
|
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) };
|
||||||
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
|
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
|
||||||
datanodes[dnIdx].storage.getStorageID(), rdBlocks) };
|
datanodes[dnIdx].storage.getStorageID(), rdBlocks) };
|
||||||
nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc
|
dataNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration,
|
||||||
.getBlock().getBlockPoolId(), report);
|
bpid, report);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return prevBlock;
|
return prevBlock;
|
||||||
|
@ -1186,8 +1212,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
long start = Time.now();
|
long start = Time.now();
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
dn.storage, dn.getBlockReportList()) };
|
dn.storage, dn.getBlockReportList()) };
|
||||||
nameNodeProto.blockReport(dn.dnRegistration,
|
dataNodeProto.blockReport(dn.dnRegistration, bpid, report,
|
||||||
nameNode.getNamesystem().getBlockPoolId(), report,
|
|
||||||
new BlockReportContext(1, 0, System.nanoTime()));
|
new BlockReportContext(1, 0, System.nanoTime()));
|
||||||
long end = Time.now();
|
long end = Time.now();
|
||||||
return end-start;
|
return end-start;
|
||||||
|
@ -1318,7 +1343,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
LOG.info("Datanode " + dn + " is decommissioned.");
|
LOG.info("Datanode " + dn + " is decommissioned.");
|
||||||
}
|
}
|
||||||
excludeFile.close();
|
excludeFile.close();
|
||||||
nameNodeProto.refreshNodes();
|
clientProto.refreshNodes();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1414,8 +1439,6 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
|
|
||||||
// Start the NameNode
|
// Start the NameNode
|
||||||
String[] argv = new String[] {};
|
String[] argv = new String[] {};
|
||||||
nameNode = NameNode.createNameNode(argv, config);
|
|
||||||
nameNodeProto = nameNode.getRpcServer();
|
|
||||||
|
|
||||||
List<OperationStatsBase> ops = new ArrayList<OperationStatsBase>();
|
List<OperationStatsBase> ops = new ArrayList<OperationStatsBase>();
|
||||||
OperationStatsBase opStat = null;
|
OperationStatsBase opStat = null;
|
||||||
|
@ -1456,6 +1479,29 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
opStat = new CleanAllStats(args);
|
opStat = new CleanAllStats(args);
|
||||||
ops.add(opStat);
|
ops.add(opStat);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (namenodeUri == null) {
|
||||||
|
nameNode = NameNode.createNameNode(argv, config);
|
||||||
|
NamenodeProtocols nnProtos = nameNode.getRpcServer();
|
||||||
|
nameNodeProto = nnProtos;
|
||||||
|
clientProto = nnProtos;
|
||||||
|
dataNodeProto = nnProtos;
|
||||||
|
refreshUserMappingsProto = nnProtos;
|
||||||
|
bpid = nameNode.getNamesystem().getBlockPoolId();
|
||||||
|
} else {
|
||||||
|
FileSystem.setDefaultUri(getConf(), namenodeUri);
|
||||||
|
DistributedFileSystem dfs = (DistributedFileSystem)
|
||||||
|
FileSystem.get(getConf());
|
||||||
|
final URI nnUri = new URI(namenodeUri);
|
||||||
|
nameNodeProto = DFSTestUtil.getNamenodeProtocolProxy(config, nnUri,
|
||||||
|
UserGroupInformation.getCurrentUser());
|
||||||
|
clientProto = dfs.getClient().getNamenode();
|
||||||
|
dataNodeProto = new DatanodeProtocolClientSideTranslatorPB(
|
||||||
|
NameNode.getAddress(nnUri), config);
|
||||||
|
refreshUserMappingsProto =
|
||||||
|
DFSTestUtil.getRefreshUserMappingsProtocolProxy(config, nnUri);
|
||||||
|
getBlockPoolId(dfs);
|
||||||
|
}
|
||||||
if(ops.size() == 0)
|
if(ops.size() == 0)
|
||||||
printUsage();
|
printUsage();
|
||||||
// run each benchmark
|
// run each benchmark
|
||||||
|
@ -1476,6 +1522,12 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void getBlockPoolId(DistributedFileSystem unused)
|
||||||
|
throws IOException {
|
||||||
|
final NamespaceInfo nsInfo = nameNodeProto.versionRequest();
|
||||||
|
bpid = nsInfo.getBlockPoolID();
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
NNThroughputBenchmark bench = null;
|
NNThroughputBenchmark bench = null;
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue