HBASE-746 Batching row mutations via thrift
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@679561 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3b544ee93c
commit
f08923b0d6
|
@ -86,6 +86,16 @@ struct Mutation {
|
|||
3:Text value
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A BatchMutation object is used to apply a number of Mutations to a single row.
|
||||
*/
|
||||
struct BatchMutation {
|
||||
1:Text row,
|
||||
2:list<Mutation> mutations
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A ScanEntry contains the row, column, and value information for a scanner's
|
||||
* current location.
|
||||
|
@ -286,6 +296,31 @@ service Hbase {
|
|||
void mutateRowTs(1:Text tableName, 2:Text row, 3:list<Mutation> mutations, 4:i64 timestamp)
|
||||
throws (1:IOError io, 2:IllegalArgument ia)
|
||||
|
||||
/**
|
||||
* Apply a series of batches (each a series of mutations on a single row)
|
||||
* in a single transaction. If an exception is thrown, then the
|
||||
* transaction is aborted. Default current timestamp is used, and
|
||||
* all entries will have an identical timestamp.
|
||||
*
|
||||
* @param tableName name of table
|
||||
* @param rowBatches list of row batches
|
||||
*/
|
||||
void mutateRows(1:Text tableName, 2:list<BatchMutation> rowBatches)
|
||||
throws (1:IOError io, 2:IllegalArgument ia)
|
||||
|
||||
/**
|
||||
* Apply a series of batches (each a series of mutations on a single row)
|
||||
* in a single transaction. If an exception is thrown, then the
|
||||
* transaction is aborted. The specified timestamp is used, and
|
||||
* all entries will have an identical timestamp.
|
||||
*
|
||||
* @param tableName name of table
|
||||
* @param rowBatches list of row batches
|
||||
* @param timestamp timestamp
|
||||
*/
|
||||
void mutateRowsTs(1:Text tableName, 2:list<BatchMutation> rowBatches, 3:i64 timestamp)
|
||||
throws (1:IOError io, 2:IllegalArgument ia)
|
||||
|
||||
/**
|
||||
* Delete all cells that match the passed row and column.
|
||||
*
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.io.BatchUpdate;
|
|||
import org.apache.hadoop.hbase.io.Cell;
|
||||
import org.apache.hadoop.hbase.io.RowResult;
|
||||
import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
|
||||
import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
|
||||
import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.thrift.generated.Hbase;
|
||||
import org.apache.hadoop.hbase.thrift.generated.IOError;
|
||||
|
@ -430,6 +431,54 @@ public class ThriftServer {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public void mutateRows(byte[] tableName, ArrayList<BatchMutation> rowBatches)
|
||||
throws IOError, IllegalArgument, TException {
|
||||
mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP);
|
||||
}
|
||||
|
||||
public void mutateRowsTs(byte[] tableName, ArrayList<BatchMutation> rowBatches, long timestamp)
|
||||
throws IOError, IllegalArgument, TException {
|
||||
ArrayList<BatchUpdate> batchUpdates = new ArrayList<BatchUpdate>();
|
||||
|
||||
for (BatchMutation batch : rowBatches) {
|
||||
byte[] row = batch.row;
|
||||
ArrayList<Mutation> mutations = batch.mutations;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("mutateRowTs: table=" + new String(tableName) + ", row="
|
||||
+ new String(row) + ", ts=" + timestamp + " mutations="
|
||||
+ mutations.size());
|
||||
for (Mutation m : mutations) {
|
||||
if (m.isDelete) {
|
||||
LOG.debug("mutateRowTs: : delete - " + getText(m.column));
|
||||
} else {
|
||||
LOG.debug("mutateRowTs: : put - " + getText(m.column) + " => "
|
||||
+ m.value);
|
||||
}
|
||||
}
|
||||
}
|
||||
BatchUpdate batchUpdate = new BatchUpdate(getText(row), timestamp);
|
||||
for (Mutation m : mutations) {
|
||||
if (m.isDelete) {
|
||||
batchUpdate.delete(getText(m.column));
|
||||
} else {
|
||||
batchUpdate.put(getText(m.column), m.value);
|
||||
}
|
||||
}
|
||||
batchUpdates.add(batchUpdate);
|
||||
}
|
||||
|
||||
HTable table = null;
|
||||
try {
|
||||
table = getTable(tableName);
|
||||
table.commit(batchUpdates);
|
||||
} catch (IOException e) {
|
||||
throw new IOError(e.getMessage());
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new IllegalArgument(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void scannerClose(int id) throws IOError, IllegalArgument {
|
||||
LOG.debug("scannerClose: id=" + id);
|
||||
Scanner scanner = getScanner(id);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue