HBASE-2997 Performance fixes - profiler driven
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@997519 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5581a249e8
commit
009f461738
@ -961,6 +961,7 @@ Release 0.21.0 - Unreleased
|
||||
OPTIMIZATIONS
|
||||
HBASE-410 [testing] Speed up the test suite
|
||||
HBASE-2041 Change WAL default configuration values
|
||||
HBASE-2997 Performance fixes - profiler driven
|
||||
|
||||
|
||||
Release 0.20.0 - Tue Sep 8 12:53:05 PDT 2009
|
||||
|
@ -186,6 +186,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
|
||||
desc.values.entrySet()) {
|
||||
this.values.put(e.getKey(), e.getValue());
|
||||
}
|
||||
setMaxVersions(desc.getMaxVersions());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -371,12 +372,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
|
||||
}
|
||||
|
||||
/** @return maximum number of versions */
|
||||
public synchronized int getMaxVersions() {
|
||||
if (this.cachedMaxVersions == -1) {
|
||||
String value = getValue(HConstants.VERSIONS);
|
||||
this.cachedMaxVersions = (value != null)?
|
||||
Integer.valueOf(value).intValue(): DEFAULT_VERSIONS;
|
||||
}
|
||||
public int getMaxVersions() {
|
||||
return this.cachedMaxVersions;
|
||||
}
|
||||
|
||||
@ -385,6 +381,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
|
||||
*/
|
||||
public void setMaxVersions(int maxVersions) {
|
||||
setValue(HConstants.VERSIONS, Integer.toString(maxVersions));
|
||||
cachedMaxVersions = maxVersions;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -641,12 +638,12 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
|
||||
ImmutableBytesWritable value = new ImmutableBytesWritable();
|
||||
key.readFields(in);
|
||||
value.readFields(in);
|
||||
|
||||
|
||||
// in version 8, the BloomFilter setting changed from bool to enum
|
||||
if (version < 8 && Bytes.toString(key.get()).equals(BLOOMFILTER)) {
|
||||
value.set(Bytes.toBytes(
|
||||
Boolean.getBoolean(Bytes.toString(value.get()))
|
||||
? BloomType.ROW.toString()
|
||||
? BloomType.ROW.toString()
|
||||
: BloomType.NONE.toString()));
|
||||
}
|
||||
|
||||
@ -656,6 +653,9 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
|
||||
// Convert old values.
|
||||
setValue(COMPRESSION, Compression.Algorithm.NONE.getName());
|
||||
}
|
||||
String value = getValue(HConstants.VERSIONS);
|
||||
this.cachedMaxVersions = (value != null)?
|
||||
Integer.valueOf(value).intValue(): DEFAULT_VERSIONS;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -202,6 +202,10 @@ public class KeyValue implements Writable, HeapSize {
|
||||
private int offset = 0;
|
||||
private int length = 0;
|
||||
|
||||
// the row cached
|
||||
private byte [] rowCache = null;
|
||||
|
||||
|
||||
/** Here be dragons **/
|
||||
|
||||
// used to achieve atomic operations in the memstore.
|
||||
@ -673,8 +677,13 @@ public class KeyValue implements Writable, HeapSize {
|
||||
/**
|
||||
* @return Length of key portion.
|
||||
*/
|
||||
private int keyLength = 0;
|
||||
|
||||
public int getKeyLength() {
|
||||
return Bytes.toInt(this.bytes, this.offset);
|
||||
if (keyLength == 0) {
|
||||
keyLength = Bytes.toInt(this.bytes, this.offset);
|
||||
}
|
||||
return keyLength;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -861,19 +870,25 @@ public class KeyValue implements Writable, HeapSize {
|
||||
* @return Row in a new byte array.
|
||||
*/
|
||||
public byte [] getRow() {
|
||||
int o = getRowOffset();
|
||||
short l = getRowLength();
|
||||
byte [] result = new byte[l];
|
||||
System.arraycopy(getBuffer(), o, result, 0, l);
|
||||
return result;
|
||||
if (rowCache == null) {
|
||||
int o = getRowOffset();
|
||||
short l = getRowLength();
|
||||
rowCache = new byte[l];
|
||||
System.arraycopy(getBuffer(), o, rowCache, 0, l);
|
||||
}
|
||||
return rowCache;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return Timestamp
|
||||
*/
|
||||
private long timestampCache = -1;
|
||||
public long getTimestamp() {
|
||||
return getTimestamp(getKeyLength());
|
||||
if (timestampCache == -1) {
|
||||
timestampCache = getTimestamp(getKeyLength());
|
||||
}
|
||||
return timestampCache;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1194,7 +1194,8 @@ public class HFile {
|
||||
return null;
|
||||
}
|
||||
return new KeyValue(this.block.array(),
|
||||
this.block.arrayOffset() + this.block.position() - 8);
|
||||
this.block.arrayOffset() + this.block.position() - 8,
|
||||
this.currKeyLen+this.currValueLen+8);
|
||||
}
|
||||
|
||||
public ByteBuffer getKey() {
|
||||
@ -1238,16 +1239,17 @@ public class HFile {
|
||||
return false;
|
||||
}
|
||||
block = reader.readBlock(this.currBlock, this.cacheBlocks, this.pread);
|
||||
currKeyLen = block.getInt();
|
||||
currValueLen = block.getInt();
|
||||
currKeyLen = Bytes.toInt(block.array(), block.arrayOffset()+block.position(), 4);
|
||||
currValueLen = Bytes.toInt(block.array(), block.arrayOffset()+block.position()+4, 4);
|
||||
block.position(block.position()+8);
|
||||
blockFetches++;
|
||||
return true;
|
||||
}
|
||||
// LOG.debug("rem:" + block.remaining() + " p:" + block.position() +
|
||||
// " kl: " + currKeyLen + " kv: " + currValueLen);
|
||||
|
||||
currKeyLen = block.getInt();
|
||||
currValueLen = block.getInt();
|
||||
currKeyLen = Bytes.toInt(block.array(), block.arrayOffset()+block.position(), 4);
|
||||
currValueLen = Bytes.toInt(block.array(), block.arrayOffset()+block.position()+4, 4);
|
||||
block.position(block.position()+8);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -480,22 +480,6 @@ public class HBaseRPC {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a server for a protocol implementation instance listening on a
|
||||
* port and address.
|
||||
*
|
||||
* @param instance instance
|
||||
* @param bindAddress bind address
|
||||
* @param port port to bind to
|
||||
* @param conf configuration
|
||||
* @return Server
|
||||
* @throws IOException e
|
||||
*/
|
||||
public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf)
|
||||
throws IOException {
|
||||
return getServer(instance, bindAddress, port, 1, false, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a server for a protocol implementation instance listening on a
|
||||
* port and address.
|
||||
@ -509,32 +493,22 @@ public class HBaseRPC {
|
||||
* @return Server
|
||||
* @throws IOException e
|
||||
*/
|
||||
public static Server getServer(final Object instance, final String bindAddress, final int port,
|
||||
public static Server getServer(final Object instance,
|
||||
final Class<?>[] ifaces,
|
||||
final String bindAddress, final int port,
|
||||
final int numHandlers,
|
||||
final boolean verbose, Configuration conf)
|
||||
throws IOException {
|
||||
return new Server(instance, conf, bindAddress, port, numHandlers, verbose);
|
||||
return new Server(instance, ifaces, conf, bindAddress, port, numHandlers, verbose);
|
||||
}
|
||||
|
||||
/** An RPC Server. */
|
||||
public static class Server extends HBaseServer {
|
||||
private Object instance;
|
||||
private Class<?> implementation;
|
||||
private Class<?> ifaces[];
|
||||
private boolean verbose;
|
||||
|
||||
/**
|
||||
* Construct an RPC server.
|
||||
* @param instance the instance whose methods will be called
|
||||
* @param conf the configuration to use
|
||||
* @param bindAddress the address to bind on to listen for connection
|
||||
* @param port the port to listen for connections on
|
||||
* @throws IOException e
|
||||
*/
|
||||
public Server(Object instance, Configuration conf, String bindAddress, int port)
|
||||
throws IOException {
|
||||
this(instance, conf, bindAddress, port, 1, false);
|
||||
}
|
||||
|
||||
private static String classNameBase(String className) {
|
||||
String[] names = className.split("\\.", -1);
|
||||
if (names == null || names.length == 0) {
|
||||
@ -552,12 +526,19 @@ public class HBaseRPC {
|
||||
* @param verbose whether each call should be logged
|
||||
* @throws IOException e
|
||||
*/
|
||||
public Server(Object instance, Configuration conf, String bindAddress, int port,
|
||||
public Server(Object instance, final Class<?>[] ifaces,
|
||||
Configuration conf, String bindAddress, int port,
|
||||
int numHandlers, boolean verbose) throws IOException {
|
||||
super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()));
|
||||
this.instance = instance;
|
||||
this.implementation = instance.getClass();
|
||||
|
||||
this.verbose = verbose;
|
||||
|
||||
this.ifaces = ifaces;
|
||||
|
||||
// create metrics for the advertised interfaces this server implements.
|
||||
this.rpcMetrics.createMetrics(this.ifaces);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -92,14 +92,26 @@ public class HBaseRpcMetrics implements Updater {
|
||||
return new MetricsTimeVaryingRate(key, this.registry);
|
||||
}
|
||||
|
||||
public synchronized void inc(String name, int amt) {
|
||||
public void inc(String name, int amt) {
|
||||
MetricsTimeVaryingRate m = get(name);
|
||||
if (m == null) {
|
||||
m = create(name);
|
||||
LOG.warn("Got inc() request for method that doesnt exist: " +
|
||||
name);
|
||||
return; // ignore methods that dont exist.
|
||||
}
|
||||
m.inc(amt);
|
||||
}
|
||||
|
||||
public void createMetrics(Class<?> []ifaces) {
|
||||
for (Class<?> iface : ifaces) {
|
||||
Method[] methods = iface.getMethods();
|
||||
for (Method method : methods) {
|
||||
if (get(method.getName()) == null)
|
||||
create(method.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Push the metrics to the monitoring subsystem on doUpdate() call.
|
||||
* @param context ctx
|
||||
|
@ -182,8 +182,10 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||
*/
|
||||
HServerAddress a = new HServerAddress(getMyAddress(this.conf));
|
||||
int numHandlers = conf.getInt("hbase.regionserver.handler.count", 10);
|
||||
this.rpcServer = HBaseRPC.getServer(this, a.getBindAddress(), a.getPort(),
|
||||
numHandlers, false, conf);
|
||||
this.rpcServer = HBaseRPC.getServer(this,
|
||||
new Class<?>[]{HMasterInterface.class, HMasterRegionInterface.class},
|
||||
a.getBindAddress(), a.getPort(),
|
||||
numHandlers, false, conf);
|
||||
this.address = new HServerAddress(rpcServer.getListenerAddress());
|
||||
|
||||
// set the thread name now we have an address
|
||||
|
@ -316,10 +316,17 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||
this.abortRequested = false;
|
||||
this.stopped = false;
|
||||
|
||||
|
||||
//HRegionInterface,
|
||||
//HBaseRPCErrorHandler, Runnable, Watcher, Stoppable, OnlineRegions
|
||||
|
||||
// Server to handle client requests
|
||||
this.server = HBaseRPC.getServer(this, address.getBindAddress(), address
|
||||
.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), false,
|
||||
conf);
|
||||
this.server = HBaseRPC.getServer(this,
|
||||
new Class<?>[]{HRegionInterface.class, HBaseRPCErrorHandler.class,
|
||||
OnlineRegions.class},
|
||||
address.getBindAddress(),
|
||||
address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
|
||||
false, conf);
|
||||
this.server.setErrorHandler(this);
|
||||
// Address is giving a default IP for the moment. Will be changed after
|
||||
// calling the master.
|
||||
@ -665,15 +672,15 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||
+ this.serverInfo.getServerAddress() + ", Now=" + hsa.toString());
|
||||
this.serverInfo.setServerAddress(hsa);
|
||||
}
|
||||
|
||||
// hack! Maps DFSClient => RegionServer for logs. HDFS made this
|
||||
|
||||
// hack! Maps DFSClient => RegionServer for logs. HDFS made this
|
||||
// config param for task trackers, but we can piggyback off of it.
|
||||
if (this.conf.get("mapred.task.id") == null) {
|
||||
this.conf.set("mapred.task.id",
|
||||
"hb_rs_" + this.serverInfo.getServerName() + "_" +
|
||||
System.currentTimeMillis());
|
||||
}
|
||||
|
||||
|
||||
// Master sent us hbase.rootdir to use. Should be fully qualified
|
||||
// path with file system specification included. Set 'fs.defaultFS'
|
||||
// to match the filesystem on hbase.rootdir else underlying hadoop hdfs
|
||||
@ -2202,7 +2209,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||
byte[] regionName = e.getKey();
|
||||
List<Action> actionsForRegion = e.getValue();
|
||||
// sort based on the row id - this helps in the case where we reach the
|
||||
// end of a region, so that we don't have to try the rest of the
|
||||
// end of a region, so that we don't have to try the rest of the
|
||||
// actions in the list.
|
||||
Collections.sort(actionsForRegion);
|
||||
Row action = null;
|
||||
|
@ -397,7 +397,7 @@ public class MemStore implements HeapSize {
|
||||
KeyValue kv = it.next();
|
||||
|
||||
// if this isnt the row we are interested in, then bail:
|
||||
if (!firstKv.matchingRow(kv)) {
|
||||
if (!firstKv.matchingColumn(family,qualifier) || !firstKv.matchingRow(kv) ) {
|
||||
break; // rows dont match, bail.
|
||||
}
|
||||
|
||||
@ -430,7 +430,7 @@ public class MemStore implements HeapSize {
|
||||
}
|
||||
|
||||
// if this isnt the row we are interested in, then bail:
|
||||
if (!firstKv.matchingRow(kv)) {
|
||||
if (!firstKv.matchingColumn(family,qualifier) || !firstKv.matchingRow(kv)) {
|
||||
break; // rows dont match, bail.
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user