From 0e8bbbf2f79b631fdb68411c50a26e41cedf724d Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Sun, 30 Dec 2007 22:22:16 +0000 Subject: [PATCH] HADOOP-2495 inor performance improvements: Slim-down BatchOperation, etc. git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@607602 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + conf/hbase-default.xml | 7 + .../hadoop/hbase/HConnectionManager.java | 10 +- src/java/org/apache/hadoop/hbase/HLog.java | 15 +- src/java/org/apache/hadoop/hbase/HMaster.java | 4 +- .../apache/hadoop/hbase/HMasterInterface.java | 7 +- src/java/org/apache/hadoop/hbase/HRegion.java | 8 +- .../apache/hadoop/hbase/HRegionServer.java | 6 +- .../org/apache/hadoop/hbase/HStoreKey.java | 11 +- .../hadoop/hbase/io/BatchOperation.java | 99 ++-- .../apache/hadoop/hbase/io/BatchUpdate.java | 18 +- .../hadoop/hbase/io/HbaseObjectWritable.java | 274 +++++++++++ .../apache/hadoop/hbase/io/TextSequence.java | 5 +- .../org/apache/hadoop/hbase/ipc/HbaseRPC.java | 442 ++++++++++++++++++ .../hadoop/hbase/io/TestTextSequence.java | 19 - 15 files changed, 808 insertions(+), 118 deletions(-) create mode 100644 src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java create mode 100644 src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java diff --git a/CHANGES.txt b/CHANGES.txt index cd5c7e5d17a..42fb3097b1c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -22,6 +22,7 @@ Trunk (unreleased changes) HADOOP-2479 Save on number of Text object creations HADOOP-2485 Make mapfile index interval configurable (Set default to 32 instead of 128) + HADOOP-2495 Minor performance improvements: Slim-down BatchOperation, etc. BUG FIXES HADOOP-2059 In tests, exceptions in min dfs shutdown should not fail test diff --git a/conf/hbase-default.xml b/conf/hbase-default.xml index ced5a408555..26b4b1b25ea 100644 --- a/conf/hbase-default.xml +++ b/conf/hbase-default.xml @@ -211,6 +211,13 @@ skip every nth index member when reading back the index into memory. + + hbase.io.seqfile.compression.type + NONE + The compression type for hbase sequencefile.Writers + such as hlog. + + diff --git a/src/java/org/apache/hadoop/hbase/HConnectionManager.java b/src/java/org/apache/hadoop/hbase/HConnectionManager.java index 5dad7963528..7a54a701b6a 100644 --- a/src/java/org/apache/hadoop/hbase/HConnectionManager.java +++ b/src/java/org/apache/hadoop/hbase/HConnectionManager.java @@ -34,11 +34,11 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.ipc.HbaseRPC; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; /** @@ -177,7 +177,7 @@ public class HConnectionManager implements HConstants { MASTER_ADDRESS, DEFAULT_MASTER_ADDRESS)); try { - HMasterInterface tryMaster = (HMasterInterface)RPC.getProxy( + HMasterInterface tryMaster = (HMasterInterface)HbaseRPC.getProxy( HMasterInterface.class, HMasterInterface.versionID, masterLocation.getInetSocketAddress(), this.conf); @@ -360,13 +360,11 @@ public class HConnectionManager implements HConstants { try { versionId = serverInterfaceClass.getDeclaredField("versionID").getLong(server); - } catch (IllegalAccessException e) { // Should never happen unless visibility of versionID changes throw new UnsupportedOperationException( "Unable to open a connection to a " + serverInterfaceClass.getName() + " server.", e); - } catch (NoSuchFieldException e) { // Should never happen unless versionID field name changes in HRegionInterface throw new UnsupportedOperationException( @@ -375,13 +373,11 @@ public class HConnectionManager implements HConstants { } try { - server = (HRegionInterface) RPC.waitForProxy(serverInterfaceClass, + server = (HRegionInterface)HbaseRPC.waitForProxy(serverInterfaceClass, versionId, regionServer.getInetSocketAddress(), this.conf); - } catch (RemoteException e) { throw RemoteExceptionHandler.decodeRemoteException(e); } - this.servers.put(regionServer.toString(), server); } } diff --git a/src/java/org/apache/hadoop/hbase/HLog.java b/src/java/org/apache/hadoop/hbase/HLog.java index 327bafe818b..ef5f88fc5b6 100644 --- a/src/java/org/apache/hadoop/hbase/HLog.java +++ b/src/java/org/apache/hadoop/hbase/HLog.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.Reader; /** @@ -177,7 +178,7 @@ public class HLog implements HConstants { "; map content " + logWriters.toString()); } w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class, - HLogEdit.class); + HLogEdit.class, getCompressionType(conf)); // Use copy of regionName; regionName object is reused inside in // HStoreKey.getRegionName so its content changes as we iterate. logWriters.put(new Text(regionName), w); @@ -238,6 +239,16 @@ public class HLog implements HConstants { fs.mkdirs(dir); rollWriter(); } + + /** + * Get the compression type for the hlog files. + * @param c Configuration to use. + * @return the kind of compression to use + */ + private static CompressionType getCompressionType(final Configuration c) { + String name = c.get("hbase.io.seqfile.compression.type"); + return name == null? CompressionType.NONE: CompressionType.valueOf(name); + } /** * Called by HRegionServer when it opens a new region to ensure that log @@ -298,7 +309,7 @@ public class HLog implements HConstants { } Path newPath = computeFilename(filenum++); this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath, - HLogKey.class, HLogEdit.class); + HLogKey.class, HLogEdit.class, getCompressionType(this.conf)); LOG.info("new log writer created at " + newPath); // Can we delete any of the old log files? diff --git a/src/java/org/apache/hadoop/hbase/HMaster.java b/src/java/org/apache/hadoop/hbase/HMaster.java index c87b51e28b5..0fdc86bb328 100644 --- a/src/java/org/apache/hadoop/hbase/HMaster.java +++ b/src/java/org/apache/hadoop/hbase/HMaster.java @@ -51,6 +51,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.ipc.HbaseRPC; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.InfoServer; import org.apache.hadoop.hbase.util.Sleeper; @@ -59,7 +60,6 @@ import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.Server; @@ -919,7 +919,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, this.serverLeases = new Leases(this.leaseTimeout, conf.getInt("hbase.master.lease.thread.wakefrequency", 15 * 1000)); - this.server = RPC.getServer(this, address.getBindAddress(), + this.server = HbaseRPC.getServer(this, address.getBindAddress(), address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), false, conf); diff --git a/src/java/org/apache/hadoop/hbase/HMasterInterface.java b/src/java/org/apache/hadoop/hbase/HMasterInterface.java index 98440df3830..743ed005998 100644 --- a/src/java/org/apache/hadoop/hbase/HMasterInterface.java +++ b/src/java/org/apache/hadoop/hbase/HMasterInterface.java @@ -30,8 +30,11 @@ import java.io.IOException; * tables. */ public interface HMasterInterface extends VersionedProtocol { - /** Interface version */ - public static final long versionID = 1L; + /** + * Interface version. + * Version was incremented to 2 when we brought the hadoop RPC local to hbase. + */ + public static final long versionID = 2L; /** @return true if master is available */ public boolean isMasterRunning(); diff --git a/src/java/org/apache/hadoop/hbase/HRegion.java b/src/java/org/apache/hadoop/hbase/HRegion.java index 95d49187f0c..c6d7211034e 100644 --- a/src/java/org/apache/hadoop/hbase/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/HRegion.java @@ -1155,15 +1155,12 @@ public class HRegion implements HConstants { for (BatchOperation op: b) { HStoreKey key = new HStoreKey(row, op.getColumn(), commitTime); byte[] val = null; - switch(op.getOp()) { - case PUT: + if (op.isPut()) { val = op.getValue(); if (HLogEdit.isDeleted(val)) { throw new IOException("Cannot insert value: " + val); } - break; - - case DELETE: + } else { if (timestamp == LATEST_TIMESTAMP) { // Save off these deletes if (deletes == null) { @@ -1173,7 +1170,6 @@ public class HRegion implements HConstants { } else { val = HLogEdit.deleteBytes.get(); } - break; } if (val != null) { localput(lockid, key, val); diff --git a/src/java/org/apache/hadoop/hbase/HRegionServer.java b/src/java/org/apache/hadoop/hbase/HRegionServer.java index 49bf9e088c1..18a7b813b78 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/HRegionServer.java @@ -54,6 +54,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.ipc.HbaseRPC; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.InfoServer; import org.apache.hadoop.hbase.util.Sleeper; @@ -62,7 +63,6 @@ import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.DNS; import org.apache.hadoop.util.StringUtils; @@ -661,7 +661,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { this.workerThread = new Thread(worker); this.sleeper = new Sleeper(this.msgInterval, this.stopRequested); // Server to handle client requests - this.server = RPC.getServer(this, address.getBindAddress(), + this.server = HbaseRPC.getServer(this, address.getBindAddress(), address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), false, conf); this.serverInfo = new HServerInfo(new HServerAddress( @@ -1060,7 +1060,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { LOG.debug("Telling master we are up"); } // Do initial RPC setup. - this.hbaseMaster = (HMasterRegionInterface)RPC.waitForProxy( + this.hbaseMaster = (HMasterRegionInterface)HbaseRPC.waitForProxy( HMasterRegionInterface.class, HMasterRegionInterface.versionID, new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(), this.conf); diff --git a/src/java/org/apache/hadoop/hbase/HStoreKey.java b/src/java/org/apache/hadoop/hbase/HStoreKey.java index edde010e7b0..512aef77756 100644 --- a/src/java/org/apache/hadoop/hbase/HStoreKey.java +++ b/src/java/org/apache/hadoop/hbase/HStoreKey.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.io.TextSequence; import org.apache.hadoop.io.*; import java.io.*; +import java.nio.ByteBuffer; /** * A Key for a stored row @@ -225,7 +226,7 @@ public class HStoreKey implements WritableComparable { // Comparable public int compareTo(Object o) { - HStoreKey other = (HStoreKey) o; + HStoreKey other = (HStoreKey)o; int result = this.row.compareTo(other.row); if (result != 0) { return result; @@ -322,9 +323,11 @@ public class HStoreKey implements WritableComparable { private static int getColonOffset(final Text col) throws InvalidColumnNameException { int offset = -1; - for (int i = 0; i < col.getLength(); i++) { - if (col.charAt(i) == COLUMN_FAMILY_DELIMITER) { - offset = i; + ByteBuffer bb = ByteBuffer.wrap(col.getBytes()); + for (int lastPosition = bb.position(); bb.hasRemaining(); + lastPosition = bb.position()) { + if (Text.bytesToCodePoint(bb) == COLUMN_FAMILY_DELIMITER) { + offset = lastPosition; break; } } diff --git a/src/java/org/apache/hadoop/hbase/io/BatchOperation.java b/src/java/org/apache/hadoop/hbase/io/BatchOperation.java index 4dfe46e0f23..47e5287065f 100644 --- a/src/java/org/apache/hadoop/hbase/io/BatchOperation.java +++ b/src/java/org/apache/hadoop/hbase/io/BatchOperation.java @@ -27,56 +27,38 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; /** - * Batch update operations such as put, delete, and deleteAll. + * Batch update operation. + * + * If value is null, its a DELETE operation. If its non-null, its a PUT. + * This object is purposely bare-bones because many instances are created + * during bulk uploads. We have one class for DELETEs and PUTs rather than + * a class per type because it makes the serialization easier. + * @see BatchUpdate */ public class BatchOperation implements Writable { - /** - * Operation types. - * @see org.apache.hadoop.io.SequenceFile.Writer - */ - public static enum Operation { - /** update a field */ - PUT, - /** delete a field */ - DELETE} - - private Operation op; private Text column; - private byte[] value; - /** default constructor used by Writable */ + // A null value defines DELETE operations. + private byte[] value; + + /** Default constructor used by Writable */ public BatchOperation() { this(new Text()); } /** - * Creates a DELETE operation - * + * Creates a DELETE batch operation. * @param column column name */ public BatchOperation(final Text column) { - this(Operation.DELETE, column, null); + this(column, null); } /** - * Creates a PUT operation - * + * Create a batch operation. * @param column column name - * @param value column value + * @param value column value. If non-null, this is a PUT operation. */ public BatchOperation(final Text column, final byte [] value) { - this(Operation.PUT, column, value); - } - - /** - * Creates a put operation - * - * @param operation the operation (put or get) - * @param column column name - * @param value column value - */ - public BatchOperation(final Operation operation, final Text column, - final byte[] value) { - this.op = operation; this.column = column; this.value = value; } @@ -85,47 +67,42 @@ public class BatchOperation implements Writable { * @return the column */ public Text getColumn() { - return column; - } - - /** - * @return the operation - */ - public Operation getOp() { - return this.op; + return this.column; } /** * @return the value */ public byte[] getValue() { - return value; + return this.value; } - - // - // Writable - // /** - * {@inheritDoc} + * @return True if this is a PUT operation (this.value is not null). */ - public void readFields(DataInput in) throws IOException { - int ordinal = in.readInt(); - this.op = Operation.values()[ordinal]; - column.readFields(in); - if (this.op == Operation.PUT) { - value = new byte[in.readInt()]; - in.readFully(value); + public boolean isPut() { + return this.value != null; + } + + // Writable methods + + // This is a hotspot when updating deserializing incoming client submissions. + // In Performance Evaluation sequentialWrite, 70% of object allocations are + // done in here. + public void readFields(final DataInput in) throws IOException { + this.column.readFields(in); + // Is there a value to read? + if (in.readBoolean()) { + this.value = new byte[in.readInt()]; + in.readFully(this.value); } } - /** - * {@inheritDoc} - */ - public void write(DataOutput out) throws IOException { - out.writeInt(this.op.ordinal()); - column.write(out); - if (this.op == Operation.PUT) { + public void write(final DataOutput out) throws IOException { + this.column.write(out); + boolean p = isPut(); + out.writeBoolean(p); + if (p) { out.writeInt(value.length); out.write(value); } diff --git a/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java b/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java index 006db9a8103..64dd6747796 100644 --- a/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java +++ b/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java @@ -99,13 +99,17 @@ public class BatchUpdate implements Writable, Iterable { * * @param lid lock id returned from startUpdate * @param column column whose value is being set - * @param val new value for column + * @param val new value for column. Cannot be null (can be empty). */ public synchronized void put(final long lid, final Text column, final byte val[]) { if(this.lockid != lid) { throw new IllegalArgumentException("invalid lockid " + lid); } + if (val == null) { + // If null, the PUT becomes a DELETE operation. + throw new IllegalArgumentException("Passed value cannot be null"); + } operations.add(new BatchOperation(column, val)); } @@ -138,10 +142,7 @@ public class BatchUpdate implements Writable, Iterable { // Writable // - /** - * {@inheritDoc} - */ - public void readFields(DataInput in) throws IOException { + public void readFields(final DataInput in) throws IOException { row.readFields(in); int nOps = in.readInt(); for (int i = 0; i < nOps; i++) { @@ -151,14 +152,11 @@ public class BatchUpdate implements Writable, Iterable { } } - /** - * {@inheritDoc} - */ - public void write(DataOutput out) throws IOException { + public void write(final DataOutput out) throws IOException { row.write(out); out.writeInt(operations.size()); for (BatchOperation op: operations) { op.write(out); } } -} +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java b/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java new file mode 100644 index 00000000000..2a8fbf1f3de --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +import java.lang.reflect.Array; + +import java.io.*; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; + +/** A polymorphic Writable that writes an instance with it's class name. + * Handles arrays, strings and primitive types without a Writable wrapper. + * + * This is a copy of the hadoop version renamed. Removes UTF8 (HADOOP-414). + * Using Text intead of UTF-8 saves ~2% CPU between reading and writing objects + * running a short sequentialWrite Performance Evaluation test just in + * ObjectWritable alone; more when we're doing randomRead-ing. Other + * optimizations include our passing codes for classes instead of the + * actual class names themselves. + * + *

Has other optimizations passing codes instead of class names. + */ +public class HbaseObjectWritable implements Writable, Configurable { + + private Class declaredClass; + private Object instance; + private Configuration conf; + + public HbaseObjectWritable() {} + + public HbaseObjectWritable(Object instance) { + set(instance); + } + + public HbaseObjectWritable(Class declaredClass, Object instance) { + this.declaredClass = declaredClass; + this.instance = instance; + } + + /** Return the instance, or null if none. */ + public Object get() { return instance; } + + /** Return the class this is meant to be. */ + public Class getDeclaredClass() { return declaredClass; } + + /** Reset the instance. */ + public void set(Object instance) { + this.declaredClass = instance.getClass(); + this.instance = instance; + } + + public String toString() { + return "OW[class=" + declaredClass + ",value=" + instance + "]"; + } + + + public void readFields(DataInput in) throws IOException { + readObject(in, this, this.conf); + } + + public void write(DataOutput out) throws IOException { + writeObject(out, instance, declaredClass, conf); + } + + private static final Map> PRIMITIVE_NAMES = new HashMap>(); + static { + PRIMITIVE_NAMES.put("boolean", Boolean.TYPE); + PRIMITIVE_NAMES.put("byte", Byte.TYPE); + PRIMITIVE_NAMES.put("char", Character.TYPE); + PRIMITIVE_NAMES.put("short", Short.TYPE); + PRIMITIVE_NAMES.put("int", Integer.TYPE); + PRIMITIVE_NAMES.put("long", Long.TYPE); + PRIMITIVE_NAMES.put("float", Float.TYPE); + PRIMITIVE_NAMES.put("double", Double.TYPE); + PRIMITIVE_NAMES.put("void", Void.TYPE); + } + + private static class NullInstance extends Configured implements Writable { + private Class declaredClass; + public NullInstance() { super(null); } + public NullInstance(Class declaredClass, Configuration conf) { + super(conf); + this.declaredClass = declaredClass; + } + public void readFields(DataInput in) throws IOException { + String className = Text.readString(in); + declaredClass = PRIMITIVE_NAMES.get(className); + if (declaredClass == null) { + try { + declaredClass = getConf().getClassByName(className); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e.toString()); + } + } + } + public void write(DataOutput out) throws IOException { + Text.writeString(out, declaredClass.getName()); + } + } + + /** Write a {@link Writable}, {@link String}, primitive type, or an array of + * the preceding. */ + public static void writeObject(DataOutput out, Object instance, + Class declaredClass, + Configuration conf) throws IOException { + + if (instance == null) { // null + instance = new NullInstance(declaredClass, conf); + declaredClass = Writable.class; + } + + Text.writeString(out, declaredClass.getName()); // always write declared + + if (declaredClass.isArray()) { // array + int length = Array.getLength(instance); + out.writeInt(length); + for (int i = 0; i < length; i++) { + writeObject(out, Array.get(instance, i), + declaredClass.getComponentType(), conf); + } + + } else if (declaredClass == String.class) { // String + Text.writeString(out, (String)instance); + + } else if (declaredClass.isPrimitive()) { // primitive type + + if (declaredClass == Boolean.TYPE) { // boolean + out.writeBoolean(((Boolean)instance).booleanValue()); + } else if (declaredClass == Character.TYPE) { // char + out.writeChar(((Character)instance).charValue()); + } else if (declaredClass == Byte.TYPE) { // byte + out.writeByte(((Byte)instance).byteValue()); + } else if (declaredClass == Short.TYPE) { // short + out.writeShort(((Short)instance).shortValue()); + } else if (declaredClass == Integer.TYPE) { // int + out.writeInt(((Integer)instance).intValue()); + } else if (declaredClass == Long.TYPE) { // long + out.writeLong(((Long)instance).longValue()); + } else if (declaredClass == Float.TYPE) { // float + out.writeFloat(((Float)instance).floatValue()); + } else if (declaredClass == Double.TYPE) { // double + out.writeDouble(((Double)instance).doubleValue()); + } else if (declaredClass == Void.TYPE) { // void + } else { + throw new IllegalArgumentException("Not a primitive: "+declaredClass); + } + } else if (declaredClass.isEnum()) { // enum + Text.writeString(out, ((Enum)instance).name()); + } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable + Text.writeString(out, instance.getClass().getName()); + ((Writable)instance).write(out); + + } else { + throw new IOException("Can't write: "+instance+" as "+declaredClass); + } + } + + + /** Read a {@link Writable}, {@link String}, primitive type, or an array of + * the preceding. */ + public static Object readObject(DataInput in, Configuration conf) + throws IOException { + return readObject(in, null, conf); + } + + /** Read a {@link Writable}, {@link String}, primitive type, or an array of + * the preceding. */ + @SuppressWarnings("unchecked") + public static Object readObject(DataInput in, HbaseObjectWritable objectWritable, Configuration conf) + throws IOException { + String className = Text.readString(in); + Class declaredClass = PRIMITIVE_NAMES.get(className); + if (declaredClass == null) { + try { + declaredClass = conf.getClassByName(className); + } catch (ClassNotFoundException e) { + throw new RuntimeException("readObject can't find class", e); + } + } + + Object instance; + + if (declaredClass.isPrimitive()) { // primitive types + + if (declaredClass == Boolean.TYPE) { // boolean + instance = Boolean.valueOf(in.readBoolean()); + } else if (declaredClass == Character.TYPE) { // char + instance = Character.valueOf(in.readChar()); + } else if (declaredClass == Byte.TYPE) { // byte + instance = Byte.valueOf(in.readByte()); + } else if (declaredClass == Short.TYPE) { // short + instance = Short.valueOf(in.readShort()); + } else if (declaredClass == Integer.TYPE) { // int + instance = Integer.valueOf(in.readInt()); + } else if (declaredClass == Long.TYPE) { // long + instance = Long.valueOf(in.readLong()); + } else if (declaredClass == Float.TYPE) { // float + instance = Float.valueOf(in.readFloat()); + } else if (declaredClass == Double.TYPE) { // double + instance = Double.valueOf(in.readDouble()); + } else if (declaredClass == Void.TYPE) { // void + instance = null; + } else { + throw new IllegalArgumentException("Not a primitive: "+declaredClass); + } + + } else if (declaredClass.isArray()) { // array + int length = in.readInt(); + instance = Array.newInstance(declaredClass.getComponentType(), length); + for (int i = 0; i < length; i++) { + Array.set(instance, i, readObject(in, conf)); + } + + } else if (declaredClass == String.class) { // String + instance = Text.readString(in); + } else if (declaredClass.isEnum()) { // enum + instance = Enum.valueOf((Class) declaredClass, Text.readString(in)); + } else { // Writable + Class instanceClass = null; + try { + instanceClass = conf.getClassByName(Text.readString(in)); + } catch (ClassNotFoundException e) { + throw new RuntimeException("readObject can't find class", e); + } + + Writable writable = WritableFactories.newInstance(instanceClass, conf); + writable.readFields(in); + instance = writable; + + if (instanceClass == NullInstance.class) { // null + declaredClass = ((NullInstance)instance).declaredClass; + instance = null; + } + } + + if (objectWritable != null) { // store values + objectWritable.declaredClass = declaredClass; + objectWritable.instance = instance; + } + + return instance; + + } + + public void setConf(Configuration conf) { + this.conf = conf; + } + + public Configuration getConf() { + return this.conf; + } + +} diff --git a/src/java/org/apache/hadoop/hbase/io/TextSequence.java b/src/java/org/apache/hadoop/hbase/io/TextSequence.java index bff362bcd31..a34adf57ee2 100644 --- a/src/java/org/apache/hadoop/hbase/io/TextSequence.java +++ b/src/java/org/apache/hadoop/hbase/io/TextSequence.java @@ -127,8 +127,9 @@ public class TextSequence extends Text { public int hashCode() { int hash = 1; - for (int i = this.start; i < getLength(); i++) - hash = (31 * hash) + this.delegatee.getBytes()[i]; + byte [] b = this.delegatee.getBytes(); + for (int i = this.start, length = getLength(); i < length; i++) + hash = (31 * hash) + b[i]; return hash; } diff --git a/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java b/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java new file mode 100644 index 00000000000..dd3f270fd4a --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java @@ -0,0 +1,442 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.lang.reflect.Proxy; +import java.lang.reflect.Method; +import java.lang.reflect.Array; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; + +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; +import java.io.*; +import java.util.Map; +import java.util.HashMap; +import java.util.Collection; + +import javax.net.SocketFactory; + +import org.apache.commons.logging.*; + +import org.apache.hadoop.hbase.io.HbaseObjectWritable; +import org.apache.hadoop.io.*; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.conf.*; + +/** A simple RPC mechanism. + * + * This is a local hbase copy of the hadoop RPC so we can do things like + * address HADOOP-414 for hbase-only and try other hbase-specific + * optimizations like using our own version of ObjectWritable. Class has been + * renamed to avoid confusing it w/ hadoop versions. + * + *

Below are continued the class comments from hadoop RPC class. + * + * A protocol is a Java interface. All parameters and return types must + * be one of: + * + *

  • a primitive type, boolean, byte, + * char, short, int, long, + * float, double, or void; or
  • + * + *
  • a {@link String}; or
  • + * + *
  • a {@link Writable}; or
  • + * + *
  • an array of the above types
+ * + * All methods in the protocol should throw only IOException. No field data of + * the protocol instance is transmitted. + * + * @see org.apache.hadoop.ipc.RPC + */ +public class HbaseRPC { + private static final Log LOG = + LogFactory.getLog("org.apache.hadoop.ipc.RPC"); + + private HbaseRPC() {} // no public ctor + + + /** A method invocation, including the method name and its parameters.*/ + private static class Invocation implements Writable, Configurable { + private String methodName; + private Class[] parameterClasses; + private Object[] parameters; + private Configuration conf; + + public Invocation() {} + + public Invocation(Method method, Object[] parameters) { + this.methodName = method.getName(); + this.parameterClasses = method.getParameterTypes(); + this.parameters = parameters; + } + + /** The name of the method invoked. */ + public String getMethodName() { return methodName; } + + /** The parameter classes. */ + public Class[] getParameterClasses() { return parameterClasses; } + + /** The parameter instances. */ + public Object[] getParameters() { return parameters; } + + public void readFields(DataInput in) throws IOException { + methodName = Text.readString(in); + parameters = new Object[in.readInt()]; + parameterClasses = new Class[parameters.length]; + HbaseObjectWritable objectWritable = new HbaseObjectWritable(); + for (int i = 0; i < parameters.length; i++) { + parameters[i] = HbaseObjectWritable.readObject(in, objectWritable, this.conf); + parameterClasses[i] = objectWritable.getDeclaredClass(); + } + } + + public void write(DataOutput out) throws IOException { + Text.writeString(out, methodName); + out.writeInt(parameterClasses.length); + for (int i = 0; i < parameterClasses.length; i++) { + HbaseObjectWritable.writeObject(out, parameters[i], parameterClasses[i], + conf); + } + } + + public String toString() { + StringBuffer buffer = new StringBuffer(); + buffer.append(methodName); + buffer.append("("); + for (int i = 0; i < parameters.length; i++) { + if (i != 0) + buffer.append(", "); + buffer.append(parameters[i]); + } + buffer.append(")"); + return buffer.toString(); + } + + public void setConf(Configuration conf) { + this.conf = conf; + } + + public Configuration getConf() { + return this.conf; + } + + } + + private static Map CLIENTS = + new HashMap(); + + private static synchronized Client getClient(Configuration conf, + SocketFactory factory) { + // Construct & cache client. The configuration is only used for timeout, + // and Clients have connection pools. So we can either (a) lose some + // connection pooling and leak sockets, or (b) use the same timeout for all + // configurations. Since the IPC is usually intended globally, not + // per-job, we choose (a). + Client client = CLIENTS.get(factory); + if (client == null) { + client = new Client(HbaseObjectWritable.class, conf, factory); + CLIENTS.put(factory, client); + } + return client; + } + + /** + * Construct & cache client with the default SocketFactory. + * @param conf + * @return + */ + private static Client getClient(Configuration conf) { + return getClient(conf, SocketFactory.getDefault()); + } + + /** + * Stop all RPC client connections + */ + public static synchronized void stopClient(){ + for (Client client : CLIENTS.values()) + client.stop(); + CLIENTS.clear(); + } + + /* + * remove specified client from the list of clients. + */ + static synchronized void removeClients() { + CLIENTS.clear(); + } + + static synchronized Collection allClients() { + return CLIENTS.values(); + } + + private static class Invoker implements InvocationHandler { + private InetSocketAddress address; + private UserGroupInformation ticket; + private Client client; + + public Invoker(InetSocketAddress address, UserGroupInformation ticket, + Configuration conf, SocketFactory factory) { + this.address = address; + this.ticket = ticket; + this.client = getClient(conf, factory); + } + + public Object invoke(Object proxy, Method method, Object[] args) + throws Throwable { + long startTime = System.currentTimeMillis(); + HbaseObjectWritable value = (HbaseObjectWritable) + client.call(new Invocation(method, args), address, ticket); + long callTime = System.currentTimeMillis() - startTime; + LOG.debug("Call: " + method.getName() + " " + callTime); + return value.get(); + } + } + + /** + * A version mismatch for the RPC protocol. + */ + public static class VersionMismatch extends IOException { + private String interfaceName; + private long clientVersion; + private long serverVersion; + + /** + * Create a version mismatch exception + * @param interfaceName the name of the protocol mismatch + * @param clientVersion the client's version of the protocol + * @param serverVersion the server's version of the protocol + */ + public VersionMismatch(String interfaceName, long clientVersion, + long serverVersion) { + super("Protocol " + interfaceName + " version mismatch. (client = " + + clientVersion + ", server = " + serverVersion + ")"); + this.interfaceName = interfaceName; + this.clientVersion = clientVersion; + this.serverVersion = serverVersion; + } + + /** + * Get the interface name + * @return the java class name + * (eg. org.apache.hadoop.mapred.InterTrackerProtocol) + */ + public String getInterfaceName() { + return interfaceName; + } + + /** + * Get the client's prefered version + */ + public long getClientVersion() { + return clientVersion; + } + + /** + * Get the server's agreed to version. + */ + public long getServerVersion() { + return serverVersion; + } + } + + public static VersionedProtocol waitForProxy(Class protocol, + long clientVersion, + InetSocketAddress addr, + Configuration conf + ) throws IOException { + while (true) { + try { + return getProxy(protocol, clientVersion, addr, conf); + } catch(ConnectException se) { // namenode has not been started + LOG.info("Server at " + addr + " not available yet, Zzzzz..."); + } catch(SocketTimeoutException te) { // namenode is busy + LOG.info("Problem connecting to server: " + addr); + } + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + // IGNORE + } + } + } + /** Construct a client-side proxy object that implements the named protocol, + * talking to a server at the named address. */ + public static VersionedProtocol getProxy(Class protocol, + long clientVersion, InetSocketAddress addr, Configuration conf, + SocketFactory factory) throws IOException { + return getProxy(protocol, clientVersion, addr, null, conf, factory); + } + + /** Construct a client-side proxy object that implements the named protocol, + * talking to a server at the named address. */ + public static VersionedProtocol getProxy(Class protocol, + long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, + Configuration conf, SocketFactory factory) throws IOException { + + VersionedProtocol proxy = + (VersionedProtocol) Proxy.newProxyInstance( + protocol.getClassLoader(), new Class[] { protocol }, + new Invoker(addr, ticket, conf, factory)); + long serverVersion = proxy.getProtocolVersion(protocol.getName(), + clientVersion); + if (serverVersion == clientVersion) { + return proxy; + } else { + throw new VersionMismatch(protocol.getName(), clientVersion, + serverVersion); + } + } + + /** + * Construct a client-side proxy object with the default SocketFactory + * + * @param protocol + * @param clientVersion + * @param addr + * @param conf + * @return a proxy instance + * @throws IOException + */ + public static VersionedProtocol getProxy(Class protocol, + long clientVersion, InetSocketAddress addr, Configuration conf) + throws IOException { + + return getProxy(protocol, clientVersion, addr, conf, NetUtils + .getDefaultSocketFactory(conf)); + } + + /** Expert: Make multiple, parallel calls to a set of servers. */ + public static Object[] call(Method method, Object[][] params, + InetSocketAddress[] addrs, Configuration conf) + throws IOException { + + Invocation[] invocations = new Invocation[params.length]; + for (int i = 0; i < params.length; i++) + invocations[i] = new Invocation(method, params[i]); + Writable[] wrappedValues = getClient(conf).call(invocations, addrs); + + if (method.getReturnType() == Void.TYPE) { + return null; + } + + Object[] values = + (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length); + for (int i = 0; i < values.length; i++) + if (wrappedValues[i] != null) + values[i] = ((HbaseObjectWritable)wrappedValues[i]).get(); + + return values; + } + + /** Construct a server for a protocol implementation instance listening on a + * port and address. */ + public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) + throws IOException { + return getServer(instance, bindAddress, port, 1, false, conf); + } + + /** Construct a server for a protocol implementation instance listening on a + * port and address. */ + public static Server getServer(final Object instance, final String bindAddress, final int port, + final int numHandlers, + final boolean verbose, Configuration conf) + throws IOException { + return new Server(instance, conf, bindAddress, port, numHandlers, verbose); + } + + /** An RPC Server. */ + public static class Server extends org.apache.hadoop.ipc.Server { + private Object instance; + private Class implementation; + private boolean verbose; + + /** Construct an RPC server. + * @param instance the instance whose methods will be called + * @param conf the configuration to use + * @param bindAddress the address to bind on to listen for connection + * @param port the port to listen for connections on + */ + public Server(Object instance, Configuration conf, String bindAddress, int port) + throws IOException { + this(instance, conf, bindAddress, port, 1, false); + } + + /** Construct an RPC server. + * @param instance the instance whose methods will be called + * @param conf the configuration to use + * @param bindAddress the address to bind on to listen for connection + * @param port the port to listen for connections on + * @param numHandlers the number of method handler threads to run + * @param verbose whether each call should be logged + */ + public Server(Object instance, Configuration conf, String bindAddress, int port, + int numHandlers, boolean verbose) throws IOException { + super(bindAddress, port, Invocation.class, numHandlers, conf); + this.instance = instance; + this.implementation = instance.getClass(); + this.verbose = verbose; + } + + public Writable call(Writable param) throws IOException { + try { + Invocation call = (Invocation)param; + if (verbose) log("Call: " + call); + + Method method = + implementation.getMethod(call.getMethodName(), + call.getParameterClasses()); + + long startTime = System.currentTimeMillis(); + Object value = method.invoke(instance, call.getParameters()); + long callTime = System.currentTimeMillis() - startTime; + LOG.debug("Served: " + call.getMethodName() + " " + callTime); + if (verbose) log("Return: "+value); + + return new HbaseObjectWritable(method.getReturnType(), value); + + } catch (InvocationTargetException e) { + Throwable target = e.getTargetException(); + if (target instanceof IOException) { + throw (IOException)target; + } else { + IOException ioe = new IOException(target.toString()); + ioe.setStackTrace(target.getStackTrace()); + throw ioe; + } + } catch (Throwable e) { + IOException ioe = new IOException(e.toString()); + ioe.setStackTrace(e.getStackTrace()); + throw ioe; + } + } + } + + private static void log(String value) { + if (value!= null && value.length() > 55) + value = value.substring(0, 55)+"..."; + LOG.info(value); + } +} diff --git a/src/test/org/apache/hadoop/hbase/io/TestTextSequence.java b/src/test/org/apache/hadoop/hbase/io/TestTextSequence.java index b41713c5a6f..7fc2d8c2ac0 100644 --- a/src/test/org/apache/hadoop/hbase/io/TestTextSequence.java +++ b/src/test/org/apache/hadoop/hbase/io/TestTextSequence.java @@ -19,11 +19,6 @@ */ package org.apache.hadoop.hbase.io; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; - import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.io.Text; @@ -55,18 +50,4 @@ public class TestTextSequence extends HBaseTestCase { assertTrue(ts.compareTo(family) == 0); assertTrue(ts.equals(family)); } - - public void testSerialize() throws Exception { - final Text t = new Text(getName()); - final TextSequence ts = new TextSequence(t, 1, 3); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dao = new DataOutputStream(baos); - ts.write(dao); - dao.close(); - ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); - DataInputStream dis = new DataInputStream(bais); - TextSequence deserializeTs = new TextSequence(); - deserializeTs.readFields(dis); - assertTrue(ts.equals(deserializeTs)); - } } \ No newline at end of file