HBASE-576 Investigate IPC performance

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@703013 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2008-10-08 22:49:50 +00:00
parent 9607acc45e
commit c75ea989d6
9 changed files with 194 additions and 38 deletions

View File

@ -26,6 +26,7 @@ Release 0.19.0 - Unreleased
(Doğacan Güney via Stack)
HBASE-908 Add approximate counting to CountingBloomFilter
(Andrzej Bialecki via Stack)
HBASE-576 Investigate IPC performance
NEW FEATURES
HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters]

View File

@ -126,6 +126,11 @@ public class HbaseObjectWritable implements Writable, Configurable {
addToMap(RowResult.class, code++);
addToMap(HRegionInfo[].class, code++);
addToMap(MapWritable.class, code++);
try {
addToMap(Class.forName("[Lorg.apache.hadoop.hbase.io.RowResult;"), code++);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
private Class<?> declaredClass;

View File

@ -38,8 +38,9 @@ public interface HMasterInterface extends VersionedProtocol {
* -- HADOOP-2495 and then to 3 when we changed the RPC to send codes instead
* of actual class names (HADOOP-2519).
* <p>Version 4 when we moved to all byte arrays (HBASE-42).
* <p>Version 5 HBASE-576.
*/
public static final long versionID = 4L;
public static final long versionID = 5L;
/** @return true if master is available */
public boolean isMasterRunning();

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
/**
* HRegionServers interact with the HMasterRegionInterface to report on local
* goings-on and to obtain data-handling instructions from the HMaster.
* <p>Changes here need to be reflected in HbaseObjectWritable HbaseRPC#Invoker.
*/
public interface HMasterRegionInterface extends VersionedProtocol {
/**
@ -38,8 +39,9 @@ public interface HMasterRegionInterface extends VersionedProtocol {
* MapWritable instead of a HbaseMapWritable as part of HBASE-82 changes.
* Version 3 was when HMsg was refactored so it could carry optional
* messages (HBASE-504).
* <p>HBASE-576 we moved this to 4.
*/
public static final long versionID = 3L;
public static final long versionID = 4L;
/**
* Called when a region server first starts

View File

@ -37,8 +37,9 @@ public interface HRegionInterface extends VersionedProtocol {
/**
* Protocol version.
* Upped to 5 when we added scanner caching
* <p>HBASE-576, we moved this to 6.
*/
public static final long versionID = 5L;
public static final long versionID = 6L;
/**
* Get metainfo about an HRegion

View File

@ -30,6 +30,8 @@ import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
@ -40,8 +42,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.HBaseClient;
@ -82,11 +83,28 @@ public class HbaseRPC {
private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HbaseRPC");
private HbaseRPC() {} // no public ctor
private HbaseRPC() {
super();
} // no public ctor
/** A method invocation, including the method name and its parameters.*/
private static class Invocation implements Writable, Configurable {
// Here we maintain two static maps of method names to code and vice versa.
private static final Map<Byte, String> CODE_TO_METHODNAME =
new HashMap<Byte, String>();
private static final Map<String, Byte> METHODNAME_TO_CODE =
new HashMap<String, Byte>();
// Special code that means 'not-encoded'.
private static final byte NOT_ENCODED = 0;
static {
byte code = NOT_ENCODED + 1;
code = addToMap(VersionedProtocol.class, code);
code = addToMap(HMasterInterface.class, code);
code = addToMap(HMasterRegionInterface.class, code);
code = addToMap(TransactionalRegionInterface.class, code);
}
private String methodName;
@SuppressWarnings("unchecked")
private Class[] parameterClasses;
@ -94,7 +112,9 @@ public class HbaseRPC {
private Configuration conf;
/** default constructor */
public Invocation() {}
public Invocation() {
super();
}
/**
* @param method
@ -117,21 +137,23 @@ public class HbaseRPC {
public Object[] getParameters() { return parameters; }
public void readFields(DataInput in) throws IOException {
methodName = Text.readString(in);
byte code = in.readByte();
methodName = CODE_TO_METHODNAME.get(Byte.valueOf(code));
parameters = new Object[in.readInt()];
parameterClasses = new Class[parameters.length];
ObjectWritable objectWritable = new ObjectWritable();
HbaseObjectWritable objectWritable = new HbaseObjectWritable();
for (int i = 0; i < parameters.length; i++) {
parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
parameters[i] = HbaseObjectWritable.readObject(in, objectWritable,
this.conf);
parameterClasses[i] = objectWritable.getDeclaredClass();
}
}
public void write(DataOutput out) throws IOException {
Text.writeString(out, methodName);
writeMethodNameCode(out, this.methodName);
out.writeInt(parameterClasses.length);
for (int i = 0; i < parameterClasses.length; i++) {
ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
HbaseObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
conf);
}
}
@ -158,6 +180,53 @@ public class HbaseRPC {
return this.conf;
}
private static void addToMap(final String name, final byte code) {
if (METHODNAME_TO_CODE.containsKey(name)) {
return;
}
METHODNAME_TO_CODE.put(name, Byte.valueOf(code));
CODE_TO_METHODNAME.put(Byte.valueOf(code), name);
}
/*
* @param c Class whose methods we'll add to the map of methods to codes
* (and vice versa).
* @param code Current state of the byte code.
* @return State of <code>code</code> when this method is done.
*/
private static byte addToMap(final Class<?> c, final byte code) {
byte localCode = code;
Method [] methods = c.getMethods();
// There are no guarantees about the order in which items are returned in
// so do a sort (Was seeing that sort was one way on one server and then
// another on different server).
Arrays.sort(methods, new Comparator<Method>() {
public int compare(Method left, Method right) {
return left.getName().compareTo(right.getName());
}
});
for (int i = 0; i < methods.length; i++) {
addToMap(methods[i].getName(), localCode++);
}
return localCode;
}
/*
* Write out the code byte for passed Class.
* @param out
* @param c
* @throws IOException
*/
static void writeMethodNameCode(final DataOutput out, final String methodname)
throws IOException {
Byte code = METHODNAME_TO_CODE.get(methodname);
if (code == null) {
LOG.error("Unsupported type " + methodname);
throw new UnsupportedOperationException("No code for unexpected " +
methodname);
}
out.writeByte(code.byteValue());
}
}
/* Cache a client using its socket factory as the hash key */
@ -181,7 +250,7 @@ public class HbaseRPC {
// per-job, we choose (a).
Client client = clients.get(factory);
if (client == null) {
client = new HBaseClient(ObjectWritable.class, conf, factory);
client = new HBaseClient(HbaseObjectWritable.class, conf, factory);
clients.put(factory, client);
} else {
((HBaseClient)client).incCount();
@ -217,7 +286,7 @@ public class HbaseRPC {
}
}
private static ClientCache CLIENTS=new ClientCache();
private static ClientCache CLIENTS = new ClientCache();
private static class Invoker implements InvocationHandler {
private InetSocketAddress address;
@ -242,7 +311,7 @@ public class HbaseRPC {
Method method, Object[] args)
throws Throwable {
long startTime = System.currentTimeMillis();
ObjectWritable value = (ObjectWritable)
HbaseObjectWritable value = (HbaseObjectWritable)
client.call(new Invocation(method, args), address, ticket);
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
@ -379,8 +448,8 @@ public class HbaseRPC {
*/
public static VersionedProtocol getProxy(Class<?> protocol,
long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory) throws IOException {
Configuration conf, SocketFactory factory)
throws IOException {
VersionedProtocol proxy =
(VersionedProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
@ -452,7 +521,7 @@ public class HbaseRPC {
(Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
for (int i = 0; i < values.length; i++)
if (wrappedValues[i] != null)
values[i] = ((ObjectWritable)wrappedValues[i]).get();
values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
return values;
} finally {
@ -545,7 +614,6 @@ public class HbaseRPC {
try {
Invocation call = (Invocation)param;
if (verbose) log("Call: " + call);
Method method =
implementation.getMethod(call.getMethodName(),
call.getParameterClasses());
@ -573,7 +641,7 @@ public class HbaseRPC {
if (verbose) log("Return: "+value);
return new ObjectWritable(method.getReturnType(), value);
return new HbaseObjectWritable(method.getReturnType(), value);
} catch (InvocationTargetException e) {
Throwable target = e.getTargetException();

View File

@ -29,8 +29,10 @@ import org.apache.hadoop.hbase.io.RowResult;
*
*/
public interface TransactionalRegionInterface extends HRegionInterface {
/** Interface version number */
public static final long versionID = 1L;
/** Interface version number
* Moved to 2 for hbase-576.
*/
public static final long versionID = 2L;
/**
* Sent to initiate a transaction.

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import java.io.PrintStream;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
@ -51,7 +52,8 @@ import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.log4j.Logger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
@ -71,12 +73,12 @@ import org.apache.log4j.Logger;
* runs an individual client. Each client does about 1GB of data.
*/
public class PerformanceEvaluation implements HConstants {
static final Logger LOG =
Logger.getLogger(PerformanceEvaluation.class.getName());
private static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
private static final int ROW_LENGTH = 1000;
private static final int ONE_GB = 1024 * 1024 * 1000;
private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH;
static final byte [] COLUMN_NAME = Bytes.toBytes(COLUMN_FAMILY_STR + "data");
protected static HTableDescriptor tableDescriptor;
@ -102,6 +104,7 @@ public class PerformanceEvaluation implements HConstants {
volatile HBaseConfiguration conf;
private boolean miniCluster = false;
private boolean nomapred = false;
private int N = 1;
private int R = ROWS_PER_GB;
private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
@ -223,10 +226,68 @@ public class PerformanceEvaluation implements HConstants {
private void runNIsMoreThanOne(final String cmd)
throws IOException {
checkTable(new HBaseAdmin(conf));
if (this.nomapred) {
doMultipleClients(cmd);
} else {
doMapReduce(cmd);
}
}
// Run a mapreduce job. Run as many maps as asked-for clients.
// Before we start up the job, write out an input file with instruction
// per client regards which row they are to start on.
/*
* Run all clients in this vm each to its own thread.
* @param cmd Command to run.
* @throws IOException
*/
@SuppressWarnings("unused")
private void doMultipleClients(final String cmd) throws IOException {
final List<Thread> threads = new ArrayList<Thread>(this.N);
final int perClientRows = R/N;
for (int i = 0; i < this.N; i++) {
Thread t = new Thread (Integer.toString(i)) {
@Override
public void run() {
super.run();
PerformanceEvaluation pe = new PerformanceEvaluation(conf);
int index = Integer.parseInt(getName());
try {
long elapsedTime = pe.runOneClient(cmd, index * perClientRows,
perClientRows, perClientRows,
new Status() {
public void setStatus(final String msg) throws IOException {
LOG.info("client-" + getName() + " " + msg);
}
});
LOG.info("Finished " + getName() + " in " + elapsedTime +
"ms writing " + perClientRows + " rows");
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
threads.add(t);
}
for (Thread t: threads) {
t.start();
}
for (Thread t: threads) {
while(t.isAlive()) {
try {
t.join();
} catch (InterruptedException e) {
LOG.debug("Interrupted, continuing" + e.toString());
}
}
}
}
/*
* Run a mapreduce job. Run as many maps as asked-for clients.
* Before we start up the job, write out an input file with instruction
* per client regards which row they are to start on.
* @param cmd Command to run.
* @throws IOException
*/
private void doMapReduce(final String cmd) throws IOException {
Path inputDir = writeInputFile(this.conf);
this.conf.set(EvaluationMapTask.CMD_KEY, cmd);
JobConf job = new JobConf(this.conf, this.getClass());
@ -559,7 +620,7 @@ public class PerformanceEvaluation implements HConstants {
if (cmd.equals(RANDOM_READ_MEM)) {
// For this one test, so all fits in memory, make R smaller (See
// pg. 9 of BigTable paper).
R = (ONE_GB / 10) * N;
R = (this.R / 10) * N;
}
MiniHBaseCluster hbaseMiniCluster = null;
@ -574,7 +635,6 @@ public class PerformanceEvaluation implements HConstants {
conf.set(HConstants.HBASE_DIR, parentdir.toString());
fs.mkdirs(parentdir);
FSUtils.setVersion(fs, parentdir);
hbaseMiniCluster = new MiniHBaseCluster(this.conf, N);
}
@ -604,13 +664,17 @@ public class PerformanceEvaluation implements HConstants {
System.err.println(message);
}
System.err.println("Usage: java " + this.getClass().getName() +
"[--master=host:port] [--miniCluster] <command> <nclients>");
" [--master=HOST:PORT] \\");
System.err.println(" [--miniCluster] [--nomapred] [--rows=ROWS] <command> <nclients>");
System.err.println();
System.err.println("Options:");
System.err.println(" master Specify host and port of HBase " +
"cluster master. If not present,");
System.err.println(" address is read from configuration");
System.err.println(" miniCluster Run the test on an HBaseMiniCluster");
System.err.println(" nomapred Run multiple clients using threads " +
"(rather than use mapreduce)");
System.err.println(" rows Rows each client runs. Default: One million");
System.err.println();
System.err.println("Command:");
System.err.println(" randomRead Run random read test");
@ -643,7 +707,7 @@ public class PerformanceEvaluation implements HConstants {
}
// Set total number of rows to write.
R = ROWS_PER_GB * N;
this.R = this.R * N;
}
private int doCommandLine(final String[] args) {
@ -676,6 +740,18 @@ public class PerformanceEvaluation implements HConstants {
continue;
}
final String nmr = "--nomapred";
if (cmd.startsWith(nmr)) {
this.nomapred = true;
continue;
}
final String rows = "--rows=";
if (cmd.startsWith(rows)) {
this.R = Integer.parseInt(cmd.substring(rows.length()));
continue;
}
if (COMMANDS.contains(cmd)) {
getArgs(i + 1, args);
runTest(cmd);
@ -699,6 +775,6 @@ public class PerformanceEvaluation implements HConstants {
public static void main(final String[] args) {
HBaseConfiguration c = new HBaseConfiguration();
System.exit(new PerformanceEvaluation(c).
doCommandLine(args));
doCommandLine(args));
}
}

View File

@ -59,7 +59,7 @@ public class TestCompare extends TestCase {
*/
public void testHStoreKeyBorderCases() {
HRegionInfo info = new HRegionInfo(new HTableDescriptor("testtable"),
HConstants.EMPTY_BYTE_ARRAY,HConstants.EMPTY_BYTE_ARRAY);
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
HStoreKey rowA = new HStoreKey("testtable,www.hbase.org/,1234",
"", Long.MAX_VALUE, info);
HStoreKey rowB = new HStoreKey("testtable,www.hbase.org/%20,99999",