HBASE-6043 Add Increment Coalescing in thrift.

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1344034 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-05-29 22:31:54 +00:00
parent 522dd51ef9
commit dfab705a3b
6 changed files with 2165 additions and 349 deletions

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.OperationWithAttributes;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@ -76,6 +77,7 @@ import org.apache.hadoop.hbase.thrift.generated.IOError;
import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
import org.apache.hadoop.hbase.thrift.generated.Mutation;
import org.apache.hadoop.hbase.thrift.generated.TCell;
import org.apache.hadoop.hbase.thrift.generated.TIncrement;
import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
import org.apache.hadoop.hbase.thrift.generated.TRowResult;
import org.apache.hadoop.hbase.thrift.generated.TScan;
@ -118,6 +120,7 @@ public class ThriftServerRunner implements Runnable {
static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
public static final int DEFAULT_LISTEN_PORT = 9090;
@ -411,6 +414,8 @@ public class ThriftServerRunner implements Runnable {
}
};
IncrementCoalescer coalescer = null;
/**
* Returns a list of all the column families for a given htable.
*
@ -437,7 +442,7 @@ public class ThriftServerRunner implements Runnable {
* @throws IOException
* @throws IOError
*/
protected HTable getTable(final byte[] tableName) throws
public HTable getTable(final byte[] tableName) throws
IOException {
String table = new String(tableName);
Map<String, HTable> tables = threadLocalTables.get();
@ -447,7 +452,7 @@ public class ThriftServerRunner implements Runnable {
return tables.get(table);
}
protected HTable getTable(final ByteBuffer tableName) throws IOException {
public HTable getTable(final ByteBuffer tableName) throws IOException {
return getTable(getBytes(tableName));
}
@ -497,6 +502,7 @@ public class ThriftServerRunner implements Runnable {
protected HBaseHandler(final Configuration c) throws IOException {
this.conf = c;
scannerMap = new HashMap<Integer, ResultScanner>();
this.coalescer = new IncrementCoalescer(this);
}
/**
@ -1399,7 +1405,43 @@ public class ThriftServerRunner implements Runnable {
private void initMetrics(ThriftMetrics metrics) {
this.metrics = metrics;
}
@Override
public void increment(TIncrement tincrement) throws IOError, TException {
if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
throw new TException("Must supply a table and a row key; can't increment");
}
if (conf.getBoolean(COALESCE_INC_KEY, false)) {
this.coalescer.queueIncrement(tincrement);
return;
}
try {
HTable table = getTable(tincrement.getTable());
Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
table.increment(inc);
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(e.getMessage());
}
}
@Override
public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
if (conf.getBoolean(COALESCE_INC_KEY, false)) {
this.coalescer.queueIncrements(tincrements);
return;
}
for (TIncrement tinc : tincrements) {
increment(tinc);
}
}
}
/**
* Adds all the attributes into the Operation object
*/

View File

@ -26,6 +26,7 @@ import java.util.TreeMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.regionserver.StoreFile;
@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
import org.apache.hadoop.hbase.thrift.generated.TCell;
import org.apache.hadoop.hbase.thrift.generated.TIncrement;
import org.apache.hadoop.hbase.thrift.generated.TRowResult;
import org.apache.hadoop.hbase.util.Bytes;
@ -156,4 +158,18 @@ public class ThriftUtilities {
Result [] result = { in };
return rowResultFromHBase(result);
}
/**
* From a {@link TIncrement} create an {@link Increment}.
* @param tincrement the Thrift version of an increment
* @return an increment that the {@link TIncrement} represented.
*/
public static Increment incrementFromThrift(TIncrement tincrement) {
Increment inc = new Increment(tincrement.getRow());
byte[][] famAndQf = KeyValue.parseColumn(tincrement.getColumn());
if (famAndQf.length <1 ) return null;
byte[] qual = famAndQf.length == 1 ? new byte[0]: famAndQf[1];
inc.addColumn(famAndQf[0], qual, tincrement.getAmmount());
return inc;
}
}

View File

@ -443,7 +443,7 @@ public class TRowResult implements org.apache.thrift.TBase<TRowResult, TRowResul
for (int _i9 = 0; _i9 < _map8.size; ++_i9)
{
ByteBuffer _key10; // required
TCell _val11; // required
TCell _val11; // optional
_key10 = iprot.readBinary();
_val11 = new TCell();
_val11.read(iprot);

View File

@ -110,6 +110,16 @@ struct BatchMutation {
2:list<Mutation> mutations
}
/**
* For increments that are not incrementColumnValue
* equivalents.
*/
struct TIncrement {
1:Text table,
2:Text row,
3:Text column,
4:i64 ammount
}
/**
* Holds row name and then a map of columns to cells.
@ -627,6 +637,23 @@ service Hbase {
3:map<Text, Text> attributes
) throws (1:IOError io)
/**
* Increment a cell by the ammount.
* Increments can be applied async if hbase.regionserver.thrift.coalesceIncrement is set to true.
* False is the default. Turn to true if you need the extra performance and can accept some
* data loss if a thrift server dies with increments still in the queue.
*/
void increment(
/** The single increment to apply */
1:TIncrement increment
) throws (1:IOError io)
void incrementRows(
/** The list of increments */
1:list<TIncrement> increments
) throws (1:IOError io)
/**
* Completely delete the row's cells marked with a timestamp
* equal-to or older than the passed timestamp.

View File

@ -37,12 +37,14 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.hadoop.hbase.thrift.ThriftServerRunner.HBaseHandler;
import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
import org.apache.hadoop.hbase.thrift.generated.Hbase;
import org.apache.hadoop.hbase.thrift.generated.IOError;
import org.apache.hadoop.hbase.thrift.generated.Mutation;
import org.apache.hadoop.hbase.thrift.generated.TCell;
import org.apache.hadoop.hbase.thrift.generated.TIncrement;
import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
import org.apache.hadoop.hbase.thrift.generated.TRowResult;
import org.apache.hadoop.hbase.util.Bytes;
@ -70,11 +72,15 @@ public class TestThriftServer {
private static ByteBuffer asByteBuffer(String i) {
return ByteBuffer.wrap(Bytes.toBytes(i));
}
private static ByteBuffer asByteBuffer(long l) {
return ByteBuffer.wrap(Bytes.toBytes(l));
}
// Static names for tables, columns, rows, and values
private static ByteBuffer tableAname = asByteBuffer("tableA");
private static ByteBuffer tableBname = asByteBuffer("tableB");
private static ByteBuffer columnAname = asByteBuffer("columnA:");
private static ByteBuffer columnAAname = asByteBuffer("columnA:A");
private static ByteBuffer columnBname = asByteBuffer("columnB:");
private static ByteBuffer rowAname = asByteBuffer("rowA");
private static ByteBuffer rowBname = asByteBuffer("rowB");
@ -82,9 +88,11 @@ public class TestThriftServer {
private static ByteBuffer valueBname = asByteBuffer("valueB");
private static ByteBuffer valueCname = asByteBuffer("valueC");
private static ByteBuffer valueDname = asByteBuffer("valueD");
private static ByteBuffer valueEname = asByteBuffer(100l);
@BeforeClass
public static void beforeClass() throws Exception {
UTIL.getConfiguration().setBoolean(ThriftServerRunner.COALESCE_INC_KEY, true);
UTIL.startMiniCluster();
}
@ -112,6 +120,7 @@ public class TestThriftServer {
doTestGetTableRegions();
doTestFilterRegistration();
doTestGetRegionInfo();
doTestIncrements();
}
/**
@ -237,6 +246,44 @@ public class TestThriftServer {
return record.getMetric(name).longValue();
}
public void doTestIncrements() throws Exception {
ThriftServerRunner.HBaseHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
createTestTables(handler);
doTestIncrements(handler);
dropTestTables(handler);
}
public static void doTestIncrements(HBaseHandler handler) throws Exception {
List<Mutation> mutations = new ArrayList<Mutation>(1);
mutations.add(new Mutation(false, columnAAname, valueEname, true));
mutations.add(new Mutation(false, columnAname, valueEname, true));
handler.mutateRow(tableAname, rowAname, mutations, null);
handler.mutateRow(tableAname, rowBname, mutations, null);
List<TIncrement> increments = new ArrayList<TIncrement>();
increments.add(new TIncrement(tableAname, rowBname, columnAAname, 7));
increments.add(new TIncrement(tableAname, rowBname, columnAAname, 7));
increments.add(new TIncrement(tableAname, rowBname, columnAAname, 7));
int numIncrements = 60000;
for (int i = 0; i < numIncrements; i++) {
handler.increment(new TIncrement(tableAname, rowAname, columnAname, 2));
handler.incrementRows(increments);
}
Thread.sleep(1000);
long lv = handler.get(tableAname, rowAname, columnAname, null).get(0).value.getLong();
assertEquals((100 + (2 * numIncrements)), lv );
lv = handler.get(tableAname, rowBname, columnAAname, null).get(0).value.getLong();
assertEquals((100 + (3 * 7 * numIncrements)), lv);
assertTrue(handler.coalescer.getSuccessfulCoalescings() > 0);
}
/**
* Tests adding a series of Mutations and BatchMutations, including a
* delete mutation. Also tests data retrieval, and getting back multiple