From c75ea989d657b10616b7a2230501b7021f7ebe18 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Wed, 8 Oct 2008 22:49:50 +0000 Subject: [PATCH] HBASE-576 Investigate IPC performance git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@703013 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../hadoop/hbase/io/HbaseObjectWritable.java | 5 + .../hadoop/hbase/ipc/HMasterInterface.java | 5 +- .../hbase/ipc/HMasterRegionInterface.java | 4 +- .../hadoop/hbase/ipc/HRegionInterface.java | 5 +- .../org/apache/hadoop/hbase/ipc/HbaseRPC.java | 102 +++++++++++++++--- .../ipc/TransactionalRegionInterface.java | 6 +- .../hadoop/hbase/PerformanceEvaluation.java | 102 +++++++++++++++--- .../org/apache/hadoop/hbase/TestCompare.java | 2 +- 9 files changed, 194 insertions(+), 38 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 2eec2186e5e..7e96b7d51a1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -26,6 +26,7 @@ Release 0.19.0 - Unreleased (Doğacan Güney via Stack) HBASE-908 Add approximate counting to CountingBloomFilter (Andrzej Bialecki via Stack) + HBASE-576 Investigate IPC performance NEW FEATURES HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters] diff --git a/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java b/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java index 07ca061431e..e2f9adb31eb 100644 --- a/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java +++ b/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java @@ -126,6 +126,11 @@ public class HbaseObjectWritable implements Writable, Configurable { addToMap(RowResult.class, code++); addToMap(HRegionInfo[].class, code++); addToMap(MapWritable.class, code++); + try { + addToMap(Class.forName("[Lorg.apache.hadoop.hbase.io.RowResult;"), code++); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } } private Class declaredClass; diff --git a/src/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java b/src/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java index dbe4e9b2563..ef7af43374d 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java +++ b/src/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java @@ -38,8 +38,9 @@ public interface HMasterInterface extends VersionedProtocol { * -- HADOOP-2495 and then to 3 when we changed the RPC to send codes instead * of actual class names (HADOOP-2519). *

Version 4 when we moved to all byte arrays (HBASE-42). + *

Version 5 HBASE-576. */ - public static final long versionID = 4L; + public static final long versionID = 5L; /** @return true if master is available */ public boolean isMasterRunning(); @@ -126,4 +127,4 @@ public interface HMasterInterface extends VersionedProtocol { * @return address of server that serves the root region */ public HServerAddress findRootRegion(); -} \ No newline at end of file +} diff --git a/src/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java b/src/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java index 975aa8af004..26e1119de21 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java +++ b/src/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionInfo; /** * HRegionServers interact with the HMasterRegionInterface to report on local * goings-on and to obtain data-handling instructions from the HMaster. + *

Changes here need to be reflected in HbaseObjectWritable HbaseRPC#Invoker. */ public interface HMasterRegionInterface extends VersionedProtocol { /** @@ -38,8 +39,9 @@ public interface HMasterRegionInterface extends VersionedProtocol { * MapWritable instead of a HbaseMapWritable as part of HBASE-82 changes. * Version 3 was when HMsg was refactored so it could carry optional * messages (HBASE-504). + *

HBASE-576 we moved this to 4. */ - public static final long versionID = 3L; + public static final long versionID = 4L; /** * Called when a region server first starts diff --git a/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java b/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java index 9e4bb7a7d6d..08242a1ee5c 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java +++ b/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java @@ -37,8 +37,9 @@ public interface HRegionInterface extends VersionedProtocol { /** * Protocol version. * Upped to 5 when we added scanner caching + *

HBASE-576, we moved this to 6. */ - public static final long versionID = 5L; + public static final long versionID = 6L; /** * Get metainfo about an HRegion @@ -220,4 +221,4 @@ public interface HRegionInterface extends VersionedProtocol { */ public void unlockRow(final byte [] regionName, final long lockId) throws IOException; -} \ No newline at end of file +} diff --git a/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java b/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java index 6d6e97400a0..de4b52915b3 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java +++ b/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java @@ -30,6 +30,8 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; @@ -40,8 +42,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.RetriesExhaustedException; -import org.apache.hadoop.io.ObjectWritable; -import org.apache.hadoop.io.Text; +import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.HBaseClient; @@ -82,11 +83,28 @@ public class HbaseRPC { private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.HbaseRPC"); - private HbaseRPC() {} // no public ctor + private HbaseRPC() { + super(); + } // no public ctor /** A method invocation, including the method name and its parameters.*/ private static class Invocation implements Writable, Configurable { + // Here we maintain two static maps of method names to code and vice versa. + private static final Map CODE_TO_METHODNAME = + new HashMap(); + private static final Map METHODNAME_TO_CODE = + new HashMap(); + // Special code that means 'not-encoded'. + private static final byte NOT_ENCODED = 0; + static { + byte code = NOT_ENCODED + 1; + code = addToMap(VersionedProtocol.class, code); + code = addToMap(HMasterInterface.class, code); + code = addToMap(HMasterRegionInterface.class, code); + code = addToMap(TransactionalRegionInterface.class, code); + } + private String methodName; @SuppressWarnings("unchecked") private Class[] parameterClasses; @@ -94,7 +112,9 @@ public class HbaseRPC { private Configuration conf; /** default constructor */ - public Invocation() {} + public Invocation() { + super(); + } /** * @param method @@ -117,21 +137,23 @@ public class HbaseRPC { public Object[] getParameters() { return parameters; } public void readFields(DataInput in) throws IOException { - methodName = Text.readString(in); + byte code = in.readByte(); + methodName = CODE_TO_METHODNAME.get(Byte.valueOf(code)); parameters = new Object[in.readInt()]; parameterClasses = new Class[parameters.length]; - ObjectWritable objectWritable = new ObjectWritable(); + HbaseObjectWritable objectWritable = new HbaseObjectWritable(); for (int i = 0; i < parameters.length; i++) { - parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf); + parameters[i] = HbaseObjectWritable.readObject(in, objectWritable, + this.conf); parameterClasses[i] = objectWritable.getDeclaredClass(); } } public void write(DataOutput out) throws IOException { - Text.writeString(out, methodName); + writeMethodNameCode(out, this.methodName); out.writeInt(parameterClasses.length); for (int i = 0; i < parameterClasses.length; i++) { - ObjectWritable.writeObject(out, parameters[i], parameterClasses[i], + HbaseObjectWritable.writeObject(out, parameters[i], parameterClasses[i], conf); } } @@ -157,7 +179,54 @@ public class HbaseRPC { public Configuration getConf() { return this.conf; } + + private static void addToMap(final String name, final byte code) { + if (METHODNAME_TO_CODE.containsKey(name)) { + return; + } + METHODNAME_TO_CODE.put(name, Byte.valueOf(code)); + CODE_TO_METHODNAME.put(Byte.valueOf(code), name); + } + + /* + * @param c Class whose methods we'll add to the map of methods to codes + * (and vice versa). + * @param code Current state of the byte code. + * @return State of code when this method is done. + */ + private static byte addToMap(final Class c, final byte code) { + byte localCode = code; + Method [] methods = c.getMethods(); + // There are no guarantees about the order in which items are returned in + // so do a sort (Was seeing that sort was one way on one server and then + // another on different server). + Arrays.sort(methods, new Comparator() { + public int compare(Method left, Method right) { + return left.getName().compareTo(right.getName()); + } + }); + for (int i = 0; i < methods.length; i++) { + addToMap(methods[i].getName(), localCode++); + } + return localCode; + } + /* + * Write out the code byte for passed Class. + * @param out + * @param c + * @throws IOException + */ + static void writeMethodNameCode(final DataOutput out, final String methodname) + throws IOException { + Byte code = METHODNAME_TO_CODE.get(methodname); + if (code == null) { + LOG.error("Unsupported type " + methodname); + throw new UnsupportedOperationException("No code for unexpected " + + methodname); + } + out.writeByte(code.byteValue()); + } } /* Cache a client using its socket factory as the hash key */ @@ -181,7 +250,7 @@ public class HbaseRPC { // per-job, we choose (a). Client client = clients.get(factory); if (client == null) { - client = new HBaseClient(ObjectWritable.class, conf, factory); + client = new HBaseClient(HbaseObjectWritable.class, conf, factory); clients.put(factory, client); } else { ((HBaseClient)client).incCount(); @@ -217,7 +286,7 @@ public class HbaseRPC { } } - private static ClientCache CLIENTS=new ClientCache(); + private static ClientCache CLIENTS = new ClientCache(); private static class Invoker implements InvocationHandler { private InetSocketAddress address; @@ -242,7 +311,7 @@ public class HbaseRPC { Method method, Object[] args) throws Throwable { long startTime = System.currentTimeMillis(); - ObjectWritable value = (ObjectWritable) + HbaseObjectWritable value = (HbaseObjectWritable) client.call(new Invocation(method, args), address, ticket); long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); @@ -379,8 +448,8 @@ public class HbaseRPC { */ public static VersionedProtocol getProxy(Class protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, - Configuration conf, SocketFactory factory) throws IOException { - + Configuration conf, SocketFactory factory) + throws IOException { VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[] { protocol }, @@ -452,7 +521,7 @@ public class HbaseRPC { (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length); for (int i = 0; i < values.length; i++) if (wrappedValues[i] != null) - values[i] = ((ObjectWritable)wrappedValues[i]).get(); + values[i] = ((HbaseObjectWritable)wrappedValues[i]).get(); return values; } finally { @@ -545,7 +614,6 @@ public class HbaseRPC { try { Invocation call = (Invocation)param; if (verbose) log("Call: " + call); - Method method = implementation.getMethod(call.getMethodName(), call.getParameterClasses()); @@ -573,7 +641,7 @@ public class HbaseRPC { if (verbose) log("Return: "+value); - return new ObjectWritable(method.getReturnType(), value); + return new HbaseObjectWritable(method.getReturnType(), value); } catch (InvocationTargetException e) { Throwable target = e.getTargetException(); diff --git a/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java b/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java index a611d518f46..2ea9d8ff90f 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java +++ b/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java @@ -29,8 +29,10 @@ import org.apache.hadoop.hbase.io.RowResult; * */ public interface TransactionalRegionInterface extends HRegionInterface { - /** Interface version number */ - public static final long versionID = 1L; + /** Interface version number + * Moved to 2 for hbase-576. + */ + public static final long versionID = 2L; /** * Sent to initiate a transaction. diff --git a/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java b/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java index d05e9c2a12f..bc4b37cd62b 100644 --- a/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.io.PrintStream; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; @@ -51,7 +52,8 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; -import org.apache.log4j.Logger; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** @@ -71,12 +73,12 @@ import org.apache.log4j.Logger; * runs an individual client. Each client does about 1GB of data. */ public class PerformanceEvaluation implements HConstants { - static final Logger LOG = - Logger.getLogger(PerformanceEvaluation.class.getName()); + private static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName()); private static final int ROW_LENGTH = 1000; private static final int ONE_GB = 1024 * 1024 * 1000; private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH; + static final byte [] COLUMN_NAME = Bytes.toBytes(COLUMN_FAMILY_STR + "data"); protected static HTableDescriptor tableDescriptor; @@ -102,6 +104,7 @@ public class PerformanceEvaluation implements HConstants { volatile HBaseConfiguration conf; private boolean miniCluster = false; + private boolean nomapred = false; private int N = 1; private int R = ROWS_PER_GB; private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); @@ -223,10 +226,68 @@ public class PerformanceEvaluation implements HConstants { private void runNIsMoreThanOne(final String cmd) throws IOException { checkTable(new HBaseAdmin(conf)); - - // Run a mapreduce job. Run as many maps as asked-for clients. - // Before we start up the job, write out an input file with instruction - // per client regards which row they are to start on. + if (this.nomapred) { + doMultipleClients(cmd); + } else { + doMapReduce(cmd); + } + } + + /* + * Run all clients in this vm each to its own thread. + * @param cmd Command to run. + * @throws IOException + */ + @SuppressWarnings("unused") + private void doMultipleClients(final String cmd) throws IOException { + final List threads = new ArrayList(this.N); + final int perClientRows = R/N; + for (int i = 0; i < this.N; i++) { + Thread t = new Thread (Integer.toString(i)) { + @Override + public void run() { + super.run(); + PerformanceEvaluation pe = new PerformanceEvaluation(conf); + int index = Integer.parseInt(getName()); + try { + long elapsedTime = pe.runOneClient(cmd, index * perClientRows, + perClientRows, perClientRows, + new Status() { + public void setStatus(final String msg) throws IOException { + LOG.info("client-" + getName() + " " + msg); + } + }); + LOG.info("Finished " + getName() + " in " + elapsedTime + + "ms writing " + perClientRows + " rows"); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + threads.add(t); + } + for (Thread t: threads) { + t.start(); + } + for (Thread t: threads) { + while(t.isAlive()) { + try { + t.join(); + } catch (InterruptedException e) { + LOG.debug("Interrupted, continuing" + e.toString()); + } + } + } + } + + /* + * Run a mapreduce job. Run as many maps as asked-for clients. + * Before we start up the job, write out an input file with instruction + * per client regards which row they are to start on. + * @param cmd Command to run. + * @throws IOException + */ + private void doMapReduce(final String cmd) throws IOException { Path inputDir = writeInputFile(this.conf); this.conf.set(EvaluationMapTask.CMD_KEY, cmd); JobConf job = new JobConf(this.conf, this.getClass()); @@ -274,7 +335,7 @@ public class PerformanceEvaluation implements HConstants { } return subdir; } - + /* * A test. * Subclass to particularize what happens per row. @@ -559,7 +620,7 @@ public class PerformanceEvaluation implements HConstants { if (cmd.equals(RANDOM_READ_MEM)) { // For this one test, so all fits in memory, make R smaller (See // pg. 9 of BigTable paper). - R = (ONE_GB / 10) * N; + R = (this.R / 10) * N; } MiniHBaseCluster hbaseMiniCluster = null; @@ -574,7 +635,6 @@ public class PerformanceEvaluation implements HConstants { conf.set(HConstants.HBASE_DIR, parentdir.toString()); fs.mkdirs(parentdir); FSUtils.setVersion(fs, parentdir); - hbaseMiniCluster = new MiniHBaseCluster(this.conf, N); } @@ -604,13 +664,17 @@ public class PerformanceEvaluation implements HConstants { System.err.println(message); } System.err.println("Usage: java " + this.getClass().getName() + - "[--master=host:port] [--miniCluster] "); + " [--master=HOST:PORT] \\"); + System.err.println(" [--miniCluster] [--nomapred] [--rows=ROWS] "); System.err.println(); System.err.println("Options:"); System.err.println(" master Specify host and port of HBase " + "cluster master. If not present,"); System.err.println(" address is read from configuration"); System.err.println(" miniCluster Run the test on an HBaseMiniCluster"); + System.err.println(" nomapred Run multiple clients using threads " + + "(rather than use mapreduce)"); + System.err.println(" rows Rows each client runs. Default: One million"); System.err.println(); System.err.println("Command:"); System.err.println(" randomRead Run random read test"); @@ -643,7 +707,7 @@ public class PerformanceEvaluation implements HConstants { } // Set total number of rows to write. - R = ROWS_PER_GB * N; + this.R = this.R * N; } private int doCommandLine(final String[] args) { @@ -675,6 +739,18 @@ public class PerformanceEvaluation implements HConstants { this.miniCluster = true; continue; } + + final String nmr = "--nomapred"; + if (cmd.startsWith(nmr)) { + this.nomapred = true; + continue; + } + + final String rows = "--rows="; + if (cmd.startsWith(rows)) { + this.R = Integer.parseInt(cmd.substring(rows.length())); + continue; + } if (COMMANDS.contains(cmd)) { getArgs(i + 1, args); @@ -699,6 +775,6 @@ public class PerformanceEvaluation implements HConstants { public static void main(final String[] args) { HBaseConfiguration c = new HBaseConfiguration(); System.exit(new PerformanceEvaluation(c). - doCommandLine(args)); + doCommandLine(args)); } } diff --git a/src/test/org/apache/hadoop/hbase/TestCompare.java b/src/test/org/apache/hadoop/hbase/TestCompare.java index 822a34f6549..1592b9e0fc5 100644 --- a/src/test/org/apache/hadoop/hbase/TestCompare.java +++ b/src/test/org/apache/hadoop/hbase/TestCompare.java @@ -59,7 +59,7 @@ public class TestCompare extends TestCase { */ public void testHStoreKeyBorderCases() { HRegionInfo info = new HRegionInfo(new HTableDescriptor("testtable"), - HConstants.EMPTY_BYTE_ARRAY,HConstants.EMPTY_BYTE_ARRAY); + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY); HStoreKey rowA = new HStoreKey("testtable,www.hbase.org/,1234", "", Long.MAX_VALUE, info); HStoreKey rowB = new HStoreKey("testtable,www.hbase.org/%20,99999",