HADOOP-8070. Add a standalone benchmark for RPC call performance. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1244185 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1903e544a4
commit
82ffe0e732
|
@ -84,6 +84,8 @@ Trunk (unreleased changes)
|
||||||
HADOOP-7988. Upper case in hostname part of the principals doesn't work with
|
HADOOP-7988. Upper case in hostname part of the principals doesn't work with
|
||||||
kerberos. (jitendra)
|
kerberos. (jitendra)
|
||||||
|
|
||||||
|
HADOOP-8070. Add a standalone benchmark for RPC call performance. (todd)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
HADOOP-8018. Hudson auto test for HDFS has started throwing javadoc
|
HADOOP-8018. Hudson auto test for HDFS has started throwing javadoc
|
||||||
|
|
|
@ -42,6 +42,7 @@
|
||||||
import java.nio.channels.WritableByteChannel;
|
import java.nio.channels.WritableByteChannel;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -92,6 +93,8 @@
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
|
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
|
||||||
* parameter, and return a {@link Writable} as their value. A service runs on
|
* parameter, and return a {@link Writable} as their value. A service runs on
|
||||||
* a port and is defined by a parameter class and a value class.
|
* a port and is defined by a parameter class and a value class.
|
||||||
|
@ -318,6 +321,12 @@ public RpcMetrics getRpcMetrics() {
|
||||||
return rpcMetrics;
|
return rpcMetrics;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
Iterable<? extends Thread> getHandlers() {
|
||||||
|
return Arrays.asList(handlers);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Refresh the service authorization ACL for the service handled by this server.
|
* Refresh the service authorization ACL for the service handled by this server.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -0,0 +1,420 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.management.ManagementFactory;
|
||||||
|
import java.lang.management.ThreadMXBean;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import org.apache.commons.cli.CommandLine;
|
||||||
|
import org.apache.commons.cli.CommandLineParser;
|
||||||
|
import org.apache.commons.cli.GnuParser;
|
||||||
|
import org.apache.commons.cli.HelpFormatter;
|
||||||
|
import org.apache.commons.cli.OptionBuilder;
|
||||||
|
import org.apache.commons.cli.Options;
|
||||||
|
import org.apache.commons.cli.ParseException;
|
||||||
|
import org.apache.hadoop.conf.Configurable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.ipc.RPC.Server;
|
||||||
|
import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl;
|
||||||
|
import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService;
|
||||||
|
import org.apache.hadoop.ipc.TestRPC.TestProtocol;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.test.MultithreadedTestUtil;
|
||||||
|
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
|
||||||
|
import org.apache.hadoop.util.Tool;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
import com.google.protobuf.BlockingService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Benchmark for protobuf RPC.
|
||||||
|
* Run with --help option for usage.
|
||||||
|
*/
|
||||||
|
public class RPCCallBenchmark implements Tool, Configurable {
|
||||||
|
private Configuration conf;
|
||||||
|
private AtomicLong callCount = new AtomicLong(0);
|
||||||
|
private static ThreadMXBean threadBean =
|
||||||
|
ManagementFactory.getThreadMXBean();
|
||||||
|
|
||||||
|
private static class MyOptions {
|
||||||
|
private boolean failed = false;
|
||||||
|
private int serverThreads = 0;
|
||||||
|
private int serverReaderThreads = 1;
|
||||||
|
private int clientThreads = 0;
|
||||||
|
private String host = "0.0.0.0";
|
||||||
|
private int port = 12345;
|
||||||
|
public int secondsToRun = 15;
|
||||||
|
private int msgSize = 1024;
|
||||||
|
public Class<? extends RpcEngine> rpcEngine =
|
||||||
|
WritableRpcEngine.class;
|
||||||
|
|
||||||
|
private MyOptions(String args[]) {
|
||||||
|
try {
|
||||||
|
Options opts = buildOptions();
|
||||||
|
CommandLineParser parser = new GnuParser();
|
||||||
|
CommandLine line = parser.parse(opts, args, true);
|
||||||
|
processOptions(line, opts);
|
||||||
|
validateOptions();
|
||||||
|
} catch (ParseException e) {
|
||||||
|
System.err.println(e.getMessage());
|
||||||
|
System.err.println("Try \"--help\" option for details.");
|
||||||
|
failed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void validateOptions() throws ParseException {
|
||||||
|
if (serverThreads <= 0 && clientThreads <= 0) {
|
||||||
|
throw new ParseException("Must specify at least -c or -s");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("static-access")
|
||||||
|
private Options buildOptions() {
|
||||||
|
Options opts = new Options();
|
||||||
|
opts.addOption(
|
||||||
|
OptionBuilder.withLongOpt("serverThreads").hasArg(true)
|
||||||
|
.withArgName("numthreads")
|
||||||
|
.withDescription("number of server threads (handlers) to run (or 0 to not run server)")
|
||||||
|
.create("s"));
|
||||||
|
opts.addOption(
|
||||||
|
OptionBuilder.withLongOpt("serverReaderThreads").hasArg(true)
|
||||||
|
.withArgName("threads")
|
||||||
|
.withDescription("number of server reader threads to run")
|
||||||
|
.create("r"));
|
||||||
|
|
||||||
|
|
||||||
|
opts.addOption(
|
||||||
|
OptionBuilder.withLongOpt("clientThreads").hasArg(true)
|
||||||
|
.withArgName("numthreads")
|
||||||
|
.withDescription("number of client threads to run (or 0 to not run client)")
|
||||||
|
.create("c"));
|
||||||
|
|
||||||
|
opts.addOption(
|
||||||
|
OptionBuilder.withLongOpt("messageSize").hasArg(true)
|
||||||
|
.withArgName("bytes")
|
||||||
|
.withDescription("size of call parameter in bytes")
|
||||||
|
.create("m"));
|
||||||
|
|
||||||
|
opts.addOption(
|
||||||
|
OptionBuilder.withLongOpt("time").hasArg(true)
|
||||||
|
.withArgName("seconds")
|
||||||
|
.withDescription("number of seconds to run clients for")
|
||||||
|
.create("t"));
|
||||||
|
opts.addOption(
|
||||||
|
OptionBuilder.withLongOpt("port").hasArg(true)
|
||||||
|
.withArgName("port")
|
||||||
|
.withDescription("port to listen or connect on")
|
||||||
|
.create("p"));
|
||||||
|
opts.addOption(
|
||||||
|
OptionBuilder.withLongOpt("host").hasArg(true)
|
||||||
|
.withArgName("addr")
|
||||||
|
.withDescription("host to listen or connect on")
|
||||||
|
.create('h'));
|
||||||
|
|
||||||
|
opts.addOption(
|
||||||
|
OptionBuilder.withLongOpt("engine").hasArg(true)
|
||||||
|
.withArgName("writable|protobuf")
|
||||||
|
.withDescription("engine to use")
|
||||||
|
.create('e'));
|
||||||
|
|
||||||
|
opts.addOption(
|
||||||
|
OptionBuilder.withLongOpt("help").hasArg(false)
|
||||||
|
.withDescription("show this screen")
|
||||||
|
.create('?'));
|
||||||
|
|
||||||
|
return opts;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void processOptions(CommandLine line, Options opts)
|
||||||
|
throws ParseException {
|
||||||
|
if (line.hasOption("help") || line.hasOption('?')) {
|
||||||
|
HelpFormatter formatter = new HelpFormatter();
|
||||||
|
System.out.println("Protobuf IPC benchmark.");
|
||||||
|
System.out.println();
|
||||||
|
formatter.printHelp(100,
|
||||||
|
"java ... PBRPCBenchmark [options]",
|
||||||
|
"\nSupported options:", opts, "");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (line.hasOption('s')) {
|
||||||
|
serverThreads = Integer.parseInt(line.getOptionValue('s'));
|
||||||
|
}
|
||||||
|
if (line.hasOption('r')) {
|
||||||
|
serverReaderThreads = Integer.parseInt(line.getOptionValue('r'));
|
||||||
|
}
|
||||||
|
if (line.hasOption('c')) {
|
||||||
|
clientThreads = Integer.parseInt(line.getOptionValue('c'));
|
||||||
|
}
|
||||||
|
if (line.hasOption('t')) {
|
||||||
|
secondsToRun = Integer.parseInt(line.getOptionValue('t'));
|
||||||
|
}
|
||||||
|
if (line.hasOption('m')) {
|
||||||
|
msgSize = Integer.parseInt(line.getOptionValue('m'));
|
||||||
|
}
|
||||||
|
if (line.hasOption('p')) {
|
||||||
|
port = Integer.parseInt(line.getOptionValue('p'));
|
||||||
|
}
|
||||||
|
if (line.hasOption('h')) {
|
||||||
|
host = line.getOptionValue('h');
|
||||||
|
}
|
||||||
|
if (line.hasOption('e')) {
|
||||||
|
String eng = line.getOptionValue('e');
|
||||||
|
if ("protobuf".equals(eng)) {
|
||||||
|
rpcEngine = ProtobufRpcEngine.class;
|
||||||
|
} else if ("writable".equals(eng)) {
|
||||||
|
rpcEngine = WritableRpcEngine.class;
|
||||||
|
} else {
|
||||||
|
throw new ParseException("invalid engine: " + eng);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
String[] remainingArgs = line.getArgs();
|
||||||
|
if (remainingArgs.length != 0) {
|
||||||
|
throw new ParseException("Extra arguments: " +
|
||||||
|
Joiner.on(" ").join(remainingArgs));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "rpcEngine=" + rpcEngine + "\nserverThreads=" + serverThreads
|
||||||
|
+ "\nserverReaderThreads=" + serverReaderThreads + "\nclientThreads="
|
||||||
|
+ clientThreads + "\nhost=" + host + "\nport=" + port
|
||||||
|
+ "\nsecondsToRun=" + secondsToRun + "\nmsgSize=" + msgSize;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
private Server startServer(MyOptions opts) throws IOException {
|
||||||
|
if (opts.serverThreads <= 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
|
||||||
|
opts.serverReaderThreads);
|
||||||
|
|
||||||
|
RPC.Server server;
|
||||||
|
// Get RPC server for server side implementation
|
||||||
|
if (opts.rpcEngine == ProtobufRpcEngine.class) {
|
||||||
|
// Create server side implementation
|
||||||
|
PBServerImpl serverImpl = new PBServerImpl();
|
||||||
|
BlockingService service = TestProtobufRpcProto
|
||||||
|
.newReflectiveBlockingService(serverImpl);
|
||||||
|
|
||||||
|
server = RPC.getServer(TestRpcService.class, service,
|
||||||
|
opts.host, opts.port, opts.serverThreads, false, conf, null);
|
||||||
|
} else if (opts.rpcEngine == WritableRpcEngine.class) {
|
||||||
|
server = RPC.getServer(TestProtocol.class, new TestRPC.TestImpl(),
|
||||||
|
opts.host, opts.port, opts.serverThreads, false, conf, null);
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException("Bad engine: " + opts.rpcEngine);
|
||||||
|
}
|
||||||
|
server.start();
|
||||||
|
return server;
|
||||||
|
}
|
||||||
|
|
||||||
|
private long getTotalCpuTime(Iterable<? extends Thread> threads) {
|
||||||
|
long total = 0;
|
||||||
|
for (Thread t : threads) {
|
||||||
|
long tid = t.getId();
|
||||||
|
total += threadBean.getThreadCpuTime(tid);
|
||||||
|
}
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int run(String[] args) throws Exception {
|
||||||
|
MyOptions opts = new MyOptions(args);
|
||||||
|
if (opts.failed) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set RPC engine to the configured RPC engine
|
||||||
|
RPC.setProtocolEngine(conf, TestRpcService.class, opts.rpcEngine);
|
||||||
|
|
||||||
|
Server server = startServer(opts);
|
||||||
|
try {
|
||||||
|
|
||||||
|
TestContext ctx = setupClientTestContext(opts);
|
||||||
|
if (ctx != null) {
|
||||||
|
long totalCalls = 0;
|
||||||
|
ctx.startThreads();
|
||||||
|
long veryStart = System.nanoTime();
|
||||||
|
|
||||||
|
// Loop printing results every second until the specified
|
||||||
|
// time has elapsed
|
||||||
|
for (int i = 0; i < opts.secondsToRun ; i++) {
|
||||||
|
long st = System.nanoTime();
|
||||||
|
ctx.waitFor(1000);
|
||||||
|
long et = System.nanoTime();
|
||||||
|
long ct = callCount.getAndSet(0);
|
||||||
|
totalCalls += ct;
|
||||||
|
double callsPerSec = (ct * 1000000000)/(et - st);
|
||||||
|
System.out.println("Calls per second: " + callsPerSec);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Print results
|
||||||
|
|
||||||
|
if (totalCalls > 0) {
|
||||||
|
long veryEnd = System.nanoTime();
|
||||||
|
double callsPerSec =
|
||||||
|
(totalCalls * 1000000000)/(veryEnd - veryStart);
|
||||||
|
long cpuNanosClient = getTotalCpuTime(ctx.getTestThreads());
|
||||||
|
long cpuNanosServer = -1;
|
||||||
|
if (server != null) {
|
||||||
|
cpuNanosServer = getTotalCpuTime(server.getHandlers());;
|
||||||
|
}
|
||||||
|
System.out.println("====== Results ======");
|
||||||
|
System.out.println("Options:\n" + opts);
|
||||||
|
System.out.println("Total calls per second: " + callsPerSec);
|
||||||
|
System.out.println("CPU time per call on client: " +
|
||||||
|
(cpuNanosClient / totalCalls) + " ns");
|
||||||
|
if (server != null) {
|
||||||
|
System.out.println("CPU time per call on server: " +
|
||||||
|
(cpuNanosServer / totalCalls) + " ns");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
System.out.println("No calls!");
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.stop();
|
||||||
|
} else {
|
||||||
|
while (true) {
|
||||||
|
Thread.sleep(10000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (server != null) {
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private TestContext setupClientTestContext(final MyOptions opts)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
if (opts.clientThreads <= 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set up a separate proxy for each client thread,
|
||||||
|
// rather than making them share TCP pipes.
|
||||||
|
int numProxies = opts.clientThreads;
|
||||||
|
final RpcServiceWrapper proxies[] = new RpcServiceWrapper[numProxies];
|
||||||
|
for (int i = 0; i < numProxies; i++) {
|
||||||
|
proxies[i] =
|
||||||
|
UserGroupInformation.createUserForTesting("proxy-" + i,new String[]{})
|
||||||
|
.doAs(new PrivilegedExceptionAction<RpcServiceWrapper>() {
|
||||||
|
@Override
|
||||||
|
public RpcServiceWrapper run() throws Exception {
|
||||||
|
return createRpcClient(opts);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create an echo message of the desired length
|
||||||
|
final StringBuilder msgBuilder = new StringBuilder(opts.msgSize);
|
||||||
|
for (int c = 0; c < opts.msgSize; c++) {
|
||||||
|
msgBuilder.append('x');
|
||||||
|
}
|
||||||
|
final String echoMessage = msgBuilder.toString();
|
||||||
|
|
||||||
|
// Create the clients in a test context
|
||||||
|
TestContext ctx = new TestContext();
|
||||||
|
for (int i = 0; i < opts.clientThreads; i++) {
|
||||||
|
final RpcServiceWrapper proxy = proxies[i % numProxies];
|
||||||
|
|
||||||
|
ctx.addThread(new MultithreadedTestUtil.RepeatingTestThread(ctx) {
|
||||||
|
@Override
|
||||||
|
public void doAnAction() throws Exception {
|
||||||
|
proxy.doEcho(echoMessage);
|
||||||
|
callCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return ctx;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple interface that can be implemented either by the
|
||||||
|
* protobuf or writable implementations.
|
||||||
|
*/
|
||||||
|
private interface RpcServiceWrapper {
|
||||||
|
public String doEcho(String msg) throws Exception;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a client proxy for the specified engine.
|
||||||
|
*/
|
||||||
|
private RpcServiceWrapper createRpcClient(MyOptions opts) throws IOException {
|
||||||
|
InetSocketAddress addr = NetUtils.createSocketAddr(opts.host, opts.port);
|
||||||
|
|
||||||
|
if (opts.rpcEngine == ProtobufRpcEngine.class) {
|
||||||
|
final TestRpcService proxy = RPC.getProxy(TestRpcService.class, 0, addr, conf);
|
||||||
|
return new RpcServiceWrapper() {
|
||||||
|
@Override
|
||||||
|
public String doEcho(String msg) throws Exception {
|
||||||
|
EchoRequestProto req = EchoRequestProto.newBuilder()
|
||||||
|
.setMessage(msg)
|
||||||
|
.build();
|
||||||
|
EchoResponseProto responseProto = proxy.echo(null, req);
|
||||||
|
return responseProto.getMessage();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} else if (opts.rpcEngine == WritableRpcEngine.class) {
|
||||||
|
final TestProtocol proxy = (TestProtocol)RPC.getProxy(
|
||||||
|
TestProtocol.class, TestProtocol.versionID, addr, conf);
|
||||||
|
return new RpcServiceWrapper() {
|
||||||
|
@Override
|
||||||
|
public String doEcho(String msg) throws Exception {
|
||||||
|
return proxy.echo(msg);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException("unsupported engine: " + opts.rpcEngine);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String []args) throws Exception {
|
||||||
|
int rc = ToolRunner.run(new RPCCallBenchmark(), args);
|
||||||
|
System.exit(rc);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setConf(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration getConf() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,53 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
|
public class TestRPCCallBenchmark {
|
||||||
|
|
||||||
|
@Test(timeout=20000)
|
||||||
|
public void testBenchmarkWithWritable() throws Exception {
|
||||||
|
int rc = ToolRunner.run(new RPCCallBenchmark(),
|
||||||
|
new String[] {
|
||||||
|
"--clientThreads", "30",
|
||||||
|
"--serverThreads", "30",
|
||||||
|
"--time", "5",
|
||||||
|
"--serverReaderThreads", "4",
|
||||||
|
"--messageSize", "1024",
|
||||||
|
"--engine", "writable"});
|
||||||
|
assertEquals(0, rc);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=20000)
|
||||||
|
public void testBenchmarkWithProto() throws Exception {
|
||||||
|
int rc = ToolRunner.run(new RPCCallBenchmark(),
|
||||||
|
new String[] {
|
||||||
|
"--clientThreads", "30",
|
||||||
|
"--serverThreads", "30",
|
||||||
|
"--time", "5",
|
||||||
|
"--serverReaderThreads", "4",
|
||||||
|
"--messageSize", "1024",
|
||||||
|
"--engine", "protobuf"});
|
||||||
|
assertEquals(0, rc);
|
||||||
|
}
|
||||||
|
}
|
|
@ -164,6 +164,10 @@ public void stop() throws Exception {
|
||||||
}
|
}
|
||||||
checkException();
|
checkException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Iterable<? extends Thread> getTestThreads() {
|
||||||
|
return testThreads;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue