HADOOP-2495 inor performance improvements: Slim-down BatchOperation, etc.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@607602 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2007-12-30 22:22:16 +00:00
parent 6b005b4f3f
commit 0e8bbbf2f7
15 changed files with 808 additions and 118 deletions

View File

@ -22,6 +22,7 @@ Trunk (unreleased changes)
HADOOP-2479 Save on number of Text object creations
HADOOP-2485 Make mapfile index interval configurable (Set default to 32
instead of 128)
HADOOP-2495 Minor performance improvements: Slim-down BatchOperation, etc.
BUG FIXES
HADOOP-2059 In tests, exceptions in min dfs shutdown should not fail test

View File

@ -211,6 +211,13 @@
skip every nth index member when reading back the index into memory.
</description>
</property>
<property>
<name>hbase.io.seqfile.compression.type</name>
<value>NONE</value>
<description>The compression type for hbase sequencefile.Writers
such as hlog.
</description>
</property>
<!-- HbaseShell Configurations -->
<property>

View File

@ -34,11 +34,11 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.HbaseRPC;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
/**
@ -177,7 +177,7 @@ public class HConnectionManager implements HConstants {
MASTER_ADDRESS, DEFAULT_MASTER_ADDRESS));
try {
HMasterInterface tryMaster = (HMasterInterface)RPC.getProxy(
HMasterInterface tryMaster = (HMasterInterface)HbaseRPC.getProxy(
HMasterInterface.class, HMasterInterface.versionID,
masterLocation.getInetSocketAddress(), this.conf);
@ -360,13 +360,11 @@ public class HConnectionManager implements HConstants {
try {
versionId =
serverInterfaceClass.getDeclaredField("versionID").getLong(server);
} catch (IllegalAccessException e) {
// Should never happen unless visibility of versionID changes
throw new UnsupportedOperationException(
"Unable to open a connection to a " +
serverInterfaceClass.getName() + " server.", e);
} catch (NoSuchFieldException e) {
// Should never happen unless versionID field name changes in HRegionInterface
throw new UnsupportedOperationException(
@ -375,13 +373,11 @@ public class HConnectionManager implements HConstants {
}
try {
server = (HRegionInterface) RPC.waitForProxy(serverInterfaceClass,
server = (HRegionInterface)HbaseRPC.waitForProxy(serverInterfaceClass,
versionId, regionServer.getInetSocketAddress(), this.conf);
} catch (RemoteException e) {
throw RemoteExceptionHandler.decodeRemoteException(e);
}
this.servers.put(regionServer.toString(), server);
}
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Reader;
/**
@ -177,7 +178,7 @@ public class HLog implements HConstants {
"; map content " + logWriters.toString());
}
w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class,
HLogEdit.class);
HLogEdit.class, getCompressionType(conf));
// Use copy of regionName; regionName object is reused inside in
// HStoreKey.getRegionName so its content changes as we iterate.
logWriters.put(new Text(regionName), w);
@ -239,6 +240,16 @@ public class HLog implements HConstants {
rollWriter();
}
/**
* Get the compression type for the hlog files.
* @param c Configuration to use.
* @return the kind of compression to use
*/
private static CompressionType getCompressionType(final Configuration c) {
String name = c.get("hbase.io.seqfile.compression.type");
return name == null? CompressionType.NONE: CompressionType.valueOf(name);
}
/**
* Called by HRegionServer when it opens a new region to ensure that log
* sequence numbers are always greater than the latest sequence number of the
@ -298,7 +309,7 @@ public class HLog implements HConstants {
}
Path newPath = computeFilename(filenum++);
this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath,
HLogKey.class, HLogEdit.class);
HLogKey.class, HLogEdit.class, getCompressionType(this.conf));
LOG.info("new log writer created at " + newPath);
// Can we delete any of the old log files?

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.HbaseRPC;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.InfoServer;
import org.apache.hadoop.hbase.util.Sleeper;
@ -59,7 +60,6 @@ import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
@ -919,7 +919,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
this.serverLeases = new Leases(this.leaseTimeout,
conf.getInt("hbase.master.lease.thread.wakefrequency", 15 * 1000));
this.server = RPC.getServer(this, address.getBindAddress(),
this.server = HbaseRPC.getServer(this, address.getBindAddress(),
address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
false, conf);

View File

@ -30,8 +30,11 @@ import java.io.IOException;
* tables.
*/
public interface HMasterInterface extends VersionedProtocol {
/** Interface version */
public static final long versionID = 1L;
/**
* Interface version.
* Version was incremented to 2 when we brought the hadoop RPC local to hbase.
*/
public static final long versionID = 2L;
/** @return true if master is available */
public boolean isMasterRunning();

View File

@ -1155,15 +1155,12 @@ public class HRegion implements HConstants {
for (BatchOperation op: b) {
HStoreKey key = new HStoreKey(row, op.getColumn(), commitTime);
byte[] val = null;
switch(op.getOp()) {
case PUT:
if (op.isPut()) {
val = op.getValue();
if (HLogEdit.isDeleted(val)) {
throw new IOException("Cannot insert value: " + val);
}
break;
case DELETE:
} else {
if (timestamp == LATEST_TIMESTAMP) {
// Save off these deletes
if (deletes == null) {
@ -1173,7 +1170,6 @@ public class HRegion implements HConstants {
} else {
val = HLogEdit.deleteBytes.get();
}
break;
}
if (val != null) {
localput(lockid, key, val);

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.HbaseRPC;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.InfoServer;
import org.apache.hadoop.hbase.util.Sleeper;
@ -62,7 +63,6 @@ import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.util.StringUtils;
@ -661,7 +661,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
this.workerThread = new Thread(worker);
this.sleeper = new Sleeper(this.msgInterval, this.stopRequested);
// Server to handle client requests
this.server = RPC.getServer(this, address.getBindAddress(),
this.server = HbaseRPC.getServer(this, address.getBindAddress(),
address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
false, conf);
this.serverInfo = new HServerInfo(new HServerAddress(
@ -1060,7 +1060,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
LOG.debug("Telling master we are up");
}
// Do initial RPC setup.
this.hbaseMaster = (HMasterRegionInterface)RPC.waitForProxy(
this.hbaseMaster = (HMasterRegionInterface)HbaseRPC.waitForProxy(
HMasterRegionInterface.class, HMasterRegionInterface.versionID,
new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
this.conf);

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.io.TextSequence;
import org.apache.hadoop.io.*;
import java.io.*;
import java.nio.ByteBuffer;
/**
* A Key for a stored row
@ -322,9 +323,11 @@ public class HStoreKey implements WritableComparable {
private static int getColonOffset(final Text col)
throws InvalidColumnNameException {
int offset = -1;
for (int i = 0; i < col.getLength(); i++) {
if (col.charAt(i) == COLUMN_FAMILY_DELIMITER) {
offset = i;
ByteBuffer bb = ByteBuffer.wrap(col.getBytes());
for (int lastPosition = bb.position(); bb.hasRemaining();
lastPosition = bb.position()) {
if (Text.bytesToCodePoint(bb) == COLUMN_FAMILY_DELIMITER) {
offset = lastPosition;
break;
}
}

View File

@ -27,56 +27,38 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
/**
* Batch update operations such as put, delete, and deleteAll.
* Batch update operation.
*
* If value is null, its a DELETE operation. If its non-null, its a PUT.
* This object is purposely bare-bones because many instances are created
* during bulk uploads. We have one class for DELETEs and PUTs rather than
* a class per type because it makes the serialization easier.
* @see BatchUpdate
*/
public class BatchOperation implements Writable {
/**
* Operation types.
* @see org.apache.hadoop.io.SequenceFile.Writer
*/
public static enum Operation {
/** update a field */
PUT,
/** delete a field */
DELETE}
private Operation op;
private Text column;
// A null value defines DELETE operations.
private byte[] value;
/** default constructor used by Writable */
/** Default constructor used by Writable */
public BatchOperation() {
this(new Text());
}
/**
* Creates a DELETE operation
*
* Creates a DELETE batch operation.
* @param column column name
*/
public BatchOperation(final Text column) {
this(Operation.DELETE, column, null);
this(column, null);
}
/**
* Creates a PUT operation
*
* Create a batch operation.
* @param column column name
* @param value column value
* @param value column value. If non-null, this is a PUT operation.
*/
public BatchOperation(final Text column, final byte [] value) {
this(Operation.PUT, column, value);
}
/**
* Creates a put operation
*
* @param operation the operation (put or get)
* @param column column name
* @param value column value
*/
public BatchOperation(final Operation operation, final Text column,
final byte[] value) {
this.op = operation;
this.column = column;
this.value = value;
}
@ -85,47 +67,42 @@ public class BatchOperation implements Writable {
* @return the column
*/
public Text getColumn() {
return column;
}
/**
* @return the operation
*/
public Operation getOp() {
return this.op;
return this.column;
}
/**
* @return the value
*/
public byte[] getValue() {
return value;
}
//
// Writable
//
/**
* {@inheritDoc}
*/
public void readFields(DataInput in) throws IOException {
int ordinal = in.readInt();
this.op = Operation.values()[ordinal];
column.readFields(in);
if (this.op == Operation.PUT) {
value = new byte[in.readInt()];
in.readFully(value);
}
return this.value;
}
/**
* {@inheritDoc}
* @return True if this is a PUT operation (this.value is not null).
*/
public void write(DataOutput out) throws IOException {
out.writeInt(this.op.ordinal());
column.write(out);
if (this.op == Operation.PUT) {
public boolean isPut() {
return this.value != null;
}
// Writable methods
// This is a hotspot when updating deserializing incoming client submissions.
// In Performance Evaluation sequentialWrite, 70% of object allocations are
// done in here.
public void readFields(final DataInput in) throws IOException {
this.column.readFields(in);
// Is there a value to read?
if (in.readBoolean()) {
this.value = new byte[in.readInt()];
in.readFully(this.value);
}
}
public void write(final DataOutput out) throws IOException {
this.column.write(out);
boolean p = isPut();
out.writeBoolean(p);
if (p) {
out.writeInt(value.length);
out.write(value);
}

View File

@ -99,13 +99,17 @@ public class BatchUpdate implements Writable, Iterable<BatchOperation> {
*
* @param lid lock id returned from startUpdate
* @param column column whose value is being set
* @param val new value for column
* @param val new value for column. Cannot be null (can be empty).
*/
public synchronized void put(final long lid, final Text column,
final byte val[]) {
if(this.lockid != lid) {
throw new IllegalArgumentException("invalid lockid " + lid);
}
if (val == null) {
// If null, the PUT becomes a DELETE operation.
throw new IllegalArgumentException("Passed value cannot be null");
}
operations.add(new BatchOperation(column, val));
}
@ -138,10 +142,7 @@ public class BatchUpdate implements Writable, Iterable<BatchOperation> {
// Writable
//
/**
* {@inheritDoc}
*/
public void readFields(DataInput in) throws IOException {
public void readFields(final DataInput in) throws IOException {
row.readFields(in);
int nOps = in.readInt();
for (int i = 0; i < nOps; i++) {
@ -151,10 +152,7 @@ public class BatchUpdate implements Writable, Iterable<BatchOperation> {
}
}
/**
* {@inheritDoc}
*/
public void write(DataOutput out) throws IOException {
public void write(final DataOutput out) throws IOException {
row.write(out);
out.writeInt(operations.size());
for (BatchOperation op: operations) {

View File

@ -0,0 +1,274 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.lang.reflect.Array;
import java.io.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
/** A polymorphic Writable that writes an instance with it's class name.
* Handles arrays, strings and primitive types without a Writable wrapper.
*
* This is a copy of the hadoop version renamed. Removes UTF8 (HADOOP-414).
* Using Text intead of UTF-8 saves ~2% CPU between reading and writing objects
* running a short sequentialWrite Performance Evaluation test just in
* ObjectWritable alone; more when we're doing randomRead-ing. Other
* optimizations include our passing codes for classes instead of the
* actual class names themselves.
*
* <p>Has other optimizations passing codes instead of class names.
*/
public class HbaseObjectWritable implements Writable, Configurable {
private Class declaredClass;
private Object instance;
private Configuration conf;
public HbaseObjectWritable() {}
public HbaseObjectWritable(Object instance) {
set(instance);
}
public HbaseObjectWritable(Class declaredClass, Object instance) {
this.declaredClass = declaredClass;
this.instance = instance;
}
/** Return the instance, or null if none. */
public Object get() { return instance; }
/** Return the class this is meant to be. */
public Class getDeclaredClass() { return declaredClass; }
/** Reset the instance. */
public void set(Object instance) {
this.declaredClass = instance.getClass();
this.instance = instance;
}
public String toString() {
return "OW[class=" + declaredClass + ",value=" + instance + "]";
}
public void readFields(DataInput in) throws IOException {
readObject(in, this, this.conf);
}
public void write(DataOutput out) throws IOException {
writeObject(out, instance, declaredClass, conf);
}
private static final Map<String, Class<?>> PRIMITIVE_NAMES = new HashMap<String, Class<?>>();
static {
PRIMITIVE_NAMES.put("boolean", Boolean.TYPE);
PRIMITIVE_NAMES.put("byte", Byte.TYPE);
PRIMITIVE_NAMES.put("char", Character.TYPE);
PRIMITIVE_NAMES.put("short", Short.TYPE);
PRIMITIVE_NAMES.put("int", Integer.TYPE);
PRIMITIVE_NAMES.put("long", Long.TYPE);
PRIMITIVE_NAMES.put("float", Float.TYPE);
PRIMITIVE_NAMES.put("double", Double.TYPE);
PRIMITIVE_NAMES.put("void", Void.TYPE);
}
private static class NullInstance extends Configured implements Writable {
private Class<?> declaredClass;
public NullInstance() { super(null); }
public NullInstance(Class declaredClass, Configuration conf) {
super(conf);
this.declaredClass = declaredClass;
}
public void readFields(DataInput in) throws IOException {
String className = Text.readString(in);
declaredClass = PRIMITIVE_NAMES.get(className);
if (declaredClass == null) {
try {
declaredClass = getConf().getClassByName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e.toString());
}
}
}
public void write(DataOutput out) throws IOException {
Text.writeString(out, declaredClass.getName());
}
}
/** Write a {@link Writable}, {@link String}, primitive type, or an array of
* the preceding. */
public static void writeObject(DataOutput out, Object instance,
Class declaredClass,
Configuration conf) throws IOException {
if (instance == null) { // null
instance = new NullInstance(declaredClass, conf);
declaredClass = Writable.class;
}
Text.writeString(out, declaredClass.getName()); // always write declared
if (declaredClass.isArray()) { // array
int length = Array.getLength(instance);
out.writeInt(length);
for (int i = 0; i < length; i++) {
writeObject(out, Array.get(instance, i),
declaredClass.getComponentType(), conf);
}
} else if (declaredClass == String.class) { // String
Text.writeString(out, (String)instance);
} else if (declaredClass.isPrimitive()) { // primitive type
if (declaredClass == Boolean.TYPE) { // boolean
out.writeBoolean(((Boolean)instance).booleanValue());
} else if (declaredClass == Character.TYPE) { // char
out.writeChar(((Character)instance).charValue());
} else if (declaredClass == Byte.TYPE) { // byte
out.writeByte(((Byte)instance).byteValue());
} else if (declaredClass == Short.TYPE) { // short
out.writeShort(((Short)instance).shortValue());
} else if (declaredClass == Integer.TYPE) { // int
out.writeInt(((Integer)instance).intValue());
} else if (declaredClass == Long.TYPE) { // long
out.writeLong(((Long)instance).longValue());
} else if (declaredClass == Float.TYPE) { // float
out.writeFloat(((Float)instance).floatValue());
} else if (declaredClass == Double.TYPE) { // double
out.writeDouble(((Double)instance).doubleValue());
} else if (declaredClass == Void.TYPE) { // void
} else {
throw new IllegalArgumentException("Not a primitive: "+declaredClass);
}
} else if (declaredClass.isEnum()) { // enum
Text.writeString(out, ((Enum)instance).name());
} else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable
Text.writeString(out, instance.getClass().getName());
((Writable)instance).write(out);
} else {
throw new IOException("Can't write: "+instance+" as "+declaredClass);
}
}
/** Read a {@link Writable}, {@link String}, primitive type, or an array of
* the preceding. */
public static Object readObject(DataInput in, Configuration conf)
throws IOException {
return readObject(in, null, conf);
}
/** Read a {@link Writable}, {@link String}, primitive type, or an array of
* the preceding. */
@SuppressWarnings("unchecked")
public static Object readObject(DataInput in, HbaseObjectWritable objectWritable, Configuration conf)
throws IOException {
String className = Text.readString(in);
Class<?> declaredClass = PRIMITIVE_NAMES.get(className);
if (declaredClass == null) {
try {
declaredClass = conf.getClassByName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException("readObject can't find class", e);
}
}
Object instance;
if (declaredClass.isPrimitive()) { // primitive types
if (declaredClass == Boolean.TYPE) { // boolean
instance = Boolean.valueOf(in.readBoolean());
} else if (declaredClass == Character.TYPE) { // char
instance = Character.valueOf(in.readChar());
} else if (declaredClass == Byte.TYPE) { // byte
instance = Byte.valueOf(in.readByte());
} else if (declaredClass == Short.TYPE) { // short
instance = Short.valueOf(in.readShort());
} else if (declaredClass == Integer.TYPE) { // int
instance = Integer.valueOf(in.readInt());
} else if (declaredClass == Long.TYPE) { // long
instance = Long.valueOf(in.readLong());
} else if (declaredClass == Float.TYPE) { // float
instance = Float.valueOf(in.readFloat());
} else if (declaredClass == Double.TYPE) { // double
instance = Double.valueOf(in.readDouble());
} else if (declaredClass == Void.TYPE) { // void
instance = null;
} else {
throw new IllegalArgumentException("Not a primitive: "+declaredClass);
}
} else if (declaredClass.isArray()) { // array
int length = in.readInt();
instance = Array.newInstance(declaredClass.getComponentType(), length);
for (int i = 0; i < length; i++) {
Array.set(instance, i, readObject(in, conf));
}
} else if (declaredClass == String.class) { // String
instance = Text.readString(in);
} else if (declaredClass.isEnum()) { // enum
instance = Enum.valueOf((Class<? extends Enum>) declaredClass, Text.readString(in));
} else { // Writable
Class instanceClass = null;
try {
instanceClass = conf.getClassByName(Text.readString(in));
} catch (ClassNotFoundException e) {
throw new RuntimeException("readObject can't find class", e);
}
Writable writable = WritableFactories.newInstance(instanceClass, conf);
writable.readFields(in);
instance = writable;
if (instanceClass == NullInstance.class) { // null
declaredClass = ((NullInstance)instance).declaredClass;
instance = null;
}
}
if (objectWritable != null) { // store values
objectWritable.declaredClass = declaredClass;
objectWritable.instance = instance;
}
return instance;
}
public void setConf(Configuration conf) {
this.conf = conf;
}
public Configuration getConf() {
return this.conf;
}
}

View File

@ -127,8 +127,9 @@ public class TextSequence extends Text {
public int hashCode() {
int hash = 1;
for (int i = this.start; i < getLength(); i++)
hash = (31 * hash) + this.delegatee.getBytes()[i];
byte [] b = this.delegatee.getBytes();
for (int i = this.start, length = getLength(); i < length; i++)
hash = (31 * hash) + b[i];
return hash;
}

View File

@ -0,0 +1,442 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.lang.reflect.Proxy;
import java.lang.reflect.Method;
import java.lang.reflect.Array;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.io.*;
import java.util.Map;
import java.util.HashMap;
import java.util.Collection;
import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.io.*;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.conf.*;
/** A simple RPC mechanism.
*
* This is a local hbase copy of the hadoop RPC so we can do things like
* address HADOOP-414 for hbase-only and try other hbase-specific
* optimizations like using our own version of ObjectWritable. Class has been
* renamed to avoid confusing it w/ hadoop versions.
*
* <p>Below are continued the class comments from hadoop RPC class.
*
* A <i>protocol</i> is a Java interface. All parameters and return types must
* be one of:
*
* <ul> <li>a primitive type, <code>boolean</code>, <code>byte</code>,
* <code>char</code>, <code>short</code>, <code>int</code>, <code>long</code>,
* <code>float</code>, <code>double</code>, or <code>void</code>; or</li>
*
* <li>a {@link String}; or</li>
*
* <li>a {@link Writable}; or</li>
*
* <li>an array of the above types</li> </ul>
*
* All methods in the protocol should throw only IOException. No field data of
* the protocol instance is transmitted.
*
* @see org.apache.hadoop.ipc.RPC
*/
public class HbaseRPC {
private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.RPC");
private HbaseRPC() {} // no public ctor
/** A method invocation, including the method name and its parameters.*/
private static class Invocation implements Writable, Configurable {
private String methodName;
private Class[] parameterClasses;
private Object[] parameters;
private Configuration conf;
public Invocation() {}
public Invocation(Method method, Object[] parameters) {
this.methodName = method.getName();
this.parameterClasses = method.getParameterTypes();
this.parameters = parameters;
}
/** The name of the method invoked. */
public String getMethodName() { return methodName; }
/** The parameter classes. */
public Class[] getParameterClasses() { return parameterClasses; }
/** The parameter instances. */
public Object[] getParameters() { return parameters; }
public void readFields(DataInput in) throws IOException {
methodName = Text.readString(in);
parameters = new Object[in.readInt()];
parameterClasses = new Class[parameters.length];
HbaseObjectWritable objectWritable = new HbaseObjectWritable();
for (int i = 0; i < parameters.length; i++) {
parameters[i] = HbaseObjectWritable.readObject(in, objectWritable, this.conf);
parameterClasses[i] = objectWritable.getDeclaredClass();
}
}
public void write(DataOutput out) throws IOException {
Text.writeString(out, methodName);
out.writeInt(parameterClasses.length);
for (int i = 0; i < parameterClasses.length; i++) {
HbaseObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
conf);
}
}
public String toString() {
StringBuffer buffer = new StringBuffer();
buffer.append(methodName);
buffer.append("(");
for (int i = 0; i < parameters.length; i++) {
if (i != 0)
buffer.append(", ");
buffer.append(parameters[i]);
}
buffer.append(")");
return buffer.toString();
}
public void setConf(Configuration conf) {
this.conf = conf;
}
public Configuration getConf() {
return this.conf;
}
}
private static Map<SocketFactory, Client> CLIENTS =
new HashMap<SocketFactory, Client>();
private static synchronized Client getClient(Configuration conf,
SocketFactory factory) {
// Construct & cache client. The configuration is only used for timeout,
// and Clients have connection pools. So we can either (a) lose some
// connection pooling and leak sockets, or (b) use the same timeout for all
// configurations. Since the IPC is usually intended globally, not
// per-job, we choose (a).
Client client = CLIENTS.get(factory);
if (client == null) {
client = new Client(HbaseObjectWritable.class, conf, factory);
CLIENTS.put(factory, client);
}
return client;
}
/**
* Construct & cache client with the default SocketFactory.
* @param conf
* @return
*/
private static Client getClient(Configuration conf) {
return getClient(conf, SocketFactory.getDefault());
}
/**
* Stop all RPC client connections
*/
public static synchronized void stopClient(){
for (Client client : CLIENTS.values())
client.stop();
CLIENTS.clear();
}
/*
* remove specified client from the list of clients.
*/
static synchronized void removeClients() {
CLIENTS.clear();
}
static synchronized Collection allClients() {
return CLIENTS.values();
}
private static class Invoker implements InvocationHandler {
private InetSocketAddress address;
private UserGroupInformation ticket;
private Client client;
public Invoker(InetSocketAddress address, UserGroupInformation ticket,
Configuration conf, SocketFactory factory) {
this.address = address;
this.ticket = ticket;
this.client = getClient(conf, factory);
}
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
long startTime = System.currentTimeMillis();
HbaseObjectWritable value = (HbaseObjectWritable)
client.call(new Invocation(method, args), address, ticket);
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
return value.get();
}
}
/**
* A version mismatch for the RPC protocol.
*/
public static class VersionMismatch extends IOException {
private String interfaceName;
private long clientVersion;
private long serverVersion;
/**
* Create a version mismatch exception
* @param interfaceName the name of the protocol mismatch
* @param clientVersion the client's version of the protocol
* @param serverVersion the server's version of the protocol
*/
public VersionMismatch(String interfaceName, long clientVersion,
long serverVersion) {
super("Protocol " + interfaceName + " version mismatch. (client = " +
clientVersion + ", server = " + serverVersion + ")");
this.interfaceName = interfaceName;
this.clientVersion = clientVersion;
this.serverVersion = serverVersion;
}
/**
* Get the interface name
* @return the java class name
* (eg. org.apache.hadoop.mapred.InterTrackerProtocol)
*/
public String getInterfaceName() {
return interfaceName;
}
/**
* Get the client's prefered version
*/
public long getClientVersion() {
return clientVersion;
}
/**
* Get the server's agreed to version.
*/
public long getServerVersion() {
return serverVersion;
}
}
public static VersionedProtocol waitForProxy(Class protocol,
long clientVersion,
InetSocketAddress addr,
Configuration conf
) throws IOException {
while (true) {
try {
return getProxy(protocol, clientVersion, addr, conf);
} catch(ConnectException se) { // namenode has not been started
LOG.info("Server at " + addr + " not available yet, Zzzzz...");
} catch(SocketTimeoutException te) { // namenode is busy
LOG.info("Problem connecting to server: " + addr);
}
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
// IGNORE
}
}
}
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
public static VersionedProtocol getProxy(Class<?> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf,
SocketFactory factory) throws IOException {
return getProxy(protocol, clientVersion, addr, null, conf, factory);
}
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
public static VersionedProtocol getProxy(Class<?> protocol,
long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory) throws IOException {
VersionedProtocol proxy =
(VersionedProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
new Invoker(addr, ticket, conf, factory));
long serverVersion = proxy.getProtocolVersion(protocol.getName(),
clientVersion);
if (serverVersion == clientVersion) {
return proxy;
} else {
throw new VersionMismatch(protocol.getName(), clientVersion,
serverVersion);
}
}
/**
* Construct a client-side proxy object with the default SocketFactory
*
* @param protocol
* @param clientVersion
* @param addr
* @param conf
* @return a proxy instance
* @throws IOException
*/
public static VersionedProtocol getProxy(Class<?> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf)
throws IOException {
return getProxy(protocol, clientVersion, addr, conf, NetUtils
.getDefaultSocketFactory(conf));
}
/** Expert: Make multiple, parallel calls to a set of servers. */
public static Object[] call(Method method, Object[][] params,
InetSocketAddress[] addrs, Configuration conf)
throws IOException {
Invocation[] invocations = new Invocation[params.length];
for (int i = 0; i < params.length; i++)
invocations[i] = new Invocation(method, params[i]);
Writable[] wrappedValues = getClient(conf).call(invocations, addrs);
if (method.getReturnType() == Void.TYPE) {
return null;
}
Object[] values =
(Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
for (int i = 0; i < values.length; i++)
if (wrappedValues[i] != null)
values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
return values;
}
/** Construct a server for a protocol implementation instance listening on a
* port and address. */
public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf)
throws IOException {
return getServer(instance, bindAddress, port, 1, false, conf);
}
/** Construct a server for a protocol implementation instance listening on a
* port and address. */
public static Server getServer(final Object instance, final String bindAddress, final int port,
final int numHandlers,
final boolean verbose, Configuration conf)
throws IOException {
return new Server(instance, conf, bindAddress, port, numHandlers, verbose);
}
/** An RPC Server. */
public static class Server extends org.apache.hadoop.ipc.Server {
private Object instance;
private Class<?> implementation;
private boolean verbose;
/** Construct an RPC server.
* @param instance the instance whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
*/
public Server(Object instance, Configuration conf, String bindAddress, int port)
throws IOException {
this(instance, conf, bindAddress, port, 1, false);
}
/** Construct an RPC server.
* @param instance the instance whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
* @param numHandlers the number of method handler threads to run
* @param verbose whether each call should be logged
*/
public Server(Object instance, Configuration conf, String bindAddress, int port,
int numHandlers, boolean verbose) throws IOException {
super(bindAddress, port, Invocation.class, numHandlers, conf);
this.instance = instance;
this.implementation = instance.getClass();
this.verbose = verbose;
}
public Writable call(Writable param) throws IOException {
try {
Invocation call = (Invocation)param;
if (verbose) log("Call: " + call);
Method method =
implementation.getMethod(call.getMethodName(),
call.getParameterClasses());
long startTime = System.currentTimeMillis();
Object value = method.invoke(instance, call.getParameters());
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Served: " + call.getMethodName() + " " + callTime);
if (verbose) log("Return: "+value);
return new HbaseObjectWritable(method.getReturnType(), value);
} catch (InvocationTargetException e) {
Throwable target = e.getTargetException();
if (target instanceof IOException) {
throw (IOException)target;
} else {
IOException ioe = new IOException(target.toString());
ioe.setStackTrace(target.getStackTrace());
throw ioe;
}
} catch (Throwable e) {
IOException ioe = new IOException(e.toString());
ioe.setStackTrace(e.getStackTrace());
throw ioe;
}
}
}
private static void log(String value) {
if (value!= null && value.length() > 55)
value = value.substring(0, 55)+"...";
LOG.info(value);
}
}

View File

@ -19,11 +19,6 @@
*/
package org.apache.hadoop.hbase.io;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.io.Text;
@ -55,18 +50,4 @@ public class TestTextSequence extends HBaseTestCase {
assertTrue(ts.compareTo(family) == 0);
assertTrue(ts.equals(family));
}
public void testSerialize() throws Exception {
final Text t = new Text(getName());
final TextSequence ts = new TextSequence(t, 1, 3);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dao = new DataOutputStream(baos);
ts.write(dao);
dao.close();
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
DataInputStream dis = new DataInputStream(bais);
TextSequence deserializeTs = new TextSequence();
deserializeTs.readFields(dis);
assertTrue(ts.equals(deserializeTs));
}
}