HBASE-10960 Enhance HBase Thrift 1 to include "append" and "checkAndPut" operations (Srikanth Srungarapu)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1590152 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Hsieh 2014-04-25 21:22:55 +00:00
parent dcae5488cd
commit ca303c699d
5 changed files with 4072 additions and 1178 deletions

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@ -77,6 +78,7 @@ import org.apache.hadoop.hbase.thrift.generated.Hbase;
import org.apache.hadoop.hbase.thrift.generated.IOError;
import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
import org.apache.hadoop.hbase.thrift.generated.Mutation;
import org.apache.hadoop.hbase.thrift.generated.TAppend;
import org.apache.hadoop.hbase.thrift.generated.TCell;
import org.apache.hadoop.hbase.thrift.generated.TIncrement;
import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
@ -1486,6 +1488,58 @@ public class ThriftServerRunner implements Runnable {
increment(tinc);
}
}
@Override
public List<TCell> append(TAppend tappend) throws IOError, TException {
if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
throw new TException("Must supply a table and a row key; can't append");
}
try {
HTable table = getTable(tappend.getTable());
Append append = ThriftUtilities.appendFromThrift(tappend);
Result result = table.append(append);
return ThriftUtilities.cellFromHBase(result.rawCells());
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(e.getMessage());
}
}
@Override
public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
IllegalArgument, TException {
Put put;
try {
put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
addAttributes(put, attributes);
byte[][] famAndQf = KeyValue.parseColumn(getBytes(mput.column));
put.addImmutable(famAndQf[0], famAndQf[1], mput.value != null ? getBytes(mput.value)
: HConstants.EMPTY_BYTE_ARRAY);
put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
} catch (IllegalArgumentException e) {
LOG.warn(e.getMessage(), e);
throw new IllegalArgument(e.getMessage());
}
HTable table = null;
try {
table = getTable(tableName);
byte[][] famAndQf = KeyValue.parseColumn(getBytes(column));
return table.checkAndPut(getBytes(row), famAndQf[0], famAndQf[1],
value != null ? getBytes(value) : HConstants.EMPTY_BYTE_ARRAY, put);
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(e.getMessage());
} catch (IllegalArgumentException e) {
LOG.warn(e.getMessage(), e);
throw new IllegalArgument(e.getMessage());
}
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.thrift;
import static org.apache.hadoop.hbase.util.Bytes.getBytes;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@ -28,12 +30,14 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
import org.apache.hadoop.hbase.thrift.generated.TAppend;
import org.apache.hadoop.hbase.thrift.generated.TCell;
import org.apache.hadoop.hbase.thrift.generated.TColumn;
import org.apache.hadoop.hbase.thrift.generated.TIncrement;
@ -202,4 +206,28 @@ public class ThriftUtilities {
inc.addColumn(famAndQf[0], famAndQf[1], tincrement.getAmmount());
return inc;
}
/**
* From a {@link TAppend} create an {@link Append}.
* @param tappend the Thrift version of an append.
* @return an increment that the {@link TAppend} represented.
*/
public static Append appendFromThrift(TAppend tappend) {
Append append = new Append(tappend.getRow());
List<ByteBuffer> columns = tappend.getColumns();
List<ByteBuffer> values = tappend.getValues();
if (columns.size() != values.size()) {
throw new IllegalArgumentException(
"Sizes of columns and values in tappend object are not matching");
}
int length = columns.size();
for (int i = 0; i < length; i++) {
byte[][] famAndQf = KeyValue.parseColumn(getBytes(columns.get(i)));
append.add(famAndQf[0], famAndQf[1], getBytes(values.get(i)));
}
return append;
}
}

View File

@ -153,6 +153,16 @@ struct TScan {
9:optional bool reversed
}
/**
* An Append object is used to specify the parameters for performing the append operation.
*/
struct TAppend {
1:Text table,
2:Text row,
3:list<Text> columns,
4:list<Text> values
}
//
// Exceptions
//
@ -923,4 +933,43 @@ service Hbase {
1:Text row,
) throws (1:IOError io)
/**
* Appends values to one or more columns within a single row.
*
* @return values of columns after the append operation.
*/
list<TCell> append(
/** The single append operation to apply */
1:TAppend append,
) throws (1:IOError io)
/**
* Atomically checks if a row/family/qualifier value matches the expected
* value. If it does, it adds the corresponding mutation operation for put.
*
* @return true if the new put was executed, false otherwise
*/
bool checkAndPut(
/** name of table */
1:Text tableName,
/** row key */
2:Text row,
/** column name */
3:Text column,
/** the expected value for the column parameter, if not
provided the check is for the non-existence of the
column in question */
5:Text value
/** mutation for the put */
6:Mutation mput,
/** Mutation attributes */
7:map<Text, Text> attributes
) throws (1:IOError io, 2:IllegalArgument ia)
}

View File

@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.thrift;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
import org.apache.hadoop.hbase.thrift.generated.Hbase;
import org.apache.hadoop.hbase.thrift.generated.IOError;
import org.apache.hadoop.hbase.thrift.generated.Mutation;
import org.apache.hadoop.hbase.thrift.generated.TAppend;
import org.apache.hadoop.hbase.thrift.generated.TCell;
import org.apache.hadoop.hbase.thrift.generated.TIncrement;
import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
@ -122,6 +124,8 @@ public class TestThriftServer {
doTestFilterRegistration();
doTestGetRegionInfo();
doTestIncrements();
doTestAppend();
doTestCheckAndPut();
}
/**
@ -631,6 +635,68 @@ public class TestThriftServer {
}
}
/**
* Appends the value to a cell and checks that the cell value is updated properly.
*
* @throws Exception
*/
public static void doTestAppend() throws Exception {
ThriftServerRunner.HBaseHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
handler.createTable(tableAname, getColumnDescriptors());
try {
List<Mutation> mutations = new ArrayList<Mutation>(1);
mutations.add(new Mutation(false, columnAname, valueAname, true));
handler.mutateRow(tableAname, rowAname, mutations, null);
List<ByteBuffer> columnList = new ArrayList<ByteBuffer>();
columnList.add(columnAname);
List<ByteBuffer> valueList = new ArrayList<ByteBuffer>();
valueList.add(valueBname);
TAppend append = new TAppend(tableAname, rowAname, columnList, valueList);
handler.append(append);
TRowResult rowResult = handler.getRow(tableAname, rowAname, null).get(0);
assertEquals(rowAname, rowResult.row);
assertArrayEquals(Bytes.add(valueAname.array(), valueBname.array()),
rowResult.columns.get(columnAname).value.array());
} finally {
handler.disableTable(tableAname);
handler.deleteTable(tableAname);
}
}
/**
* Check that checkAndPut fails if the cell does not exist, then put in the cell, then check that
* the checkAndPut succeeds.
*
* @throws Exception
*/
public static void doTestCheckAndPut() throws Exception {
ThriftServerRunner.HBaseHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
handler.createTable(tableAname, getColumnDescriptors());
try {
List<Mutation> mutations = new ArrayList<Mutation>(1);
mutations.add(new Mutation(false, columnAname, valueAname, true));
Mutation putB = (new Mutation(false, columnBname, valueBname, true));
assertFalse(handler.checkAndPut(tableAname, rowAname, columnAname, valueAname, putB, null));
handler.mutateRow(tableAname, rowAname, mutations, null);
assertTrue(handler.checkAndPut(tableAname, rowAname, columnAname, valueAname, putB, null));
TRowResult rowResult = handler.getRow(tableAname, rowAname, null).get(0);
assertEquals(rowAname, rowResult.row);
assertEquals(valueBname, rowResult.columns.get(columnBname).value);
} finally {
handler.disableTable(tableAname);
handler.deleteTable(tableAname);
}
}
/**
*
* @return a List of ColumnDescriptors for use in creating a table. Has one