HADOOP-1468 Add HBase batch update to reduce RPC overhead

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@556754 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2007-07-16 22:19:59 +00:00
parent 5463de47b3
commit cbb844d5f0
9 changed files with 795 additions and 86 deletions

View File

@ -63,3 +63,5 @@ Trunk (unreleased changes)
39. HADOOP-1581 Un-openable tablename bug
40. HADOOP-1607 [shell] Clear screen command (Edward Yoon via Stack)
41. HADOOP-1614 [hbase] HClient does not protect itself from simultaneous updates
42. HADOOP-1468 Add HBase batch update to reduce RPC overhead

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
@ -33,6 +34,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
@ -62,10 +64,102 @@ public class HClient implements HConstants {
private long currentLockId;
private Class<? extends HRegionInterface> serverInterfaceClass;
protected class BatchHandler {
private HashMap<RegionLocation, BatchUpdate> regionToBatch;
private HashMap<Long, BatchUpdate> lockToBatch;
/** constructor */
public BatchHandler() {
this.regionToBatch = new HashMap<RegionLocation, BatchUpdate>();
this.lockToBatch = new HashMap<Long, BatchUpdate>();
}
/**
* Start a batch row insertion/update.
*
* Manages multiple batch updates that are targeted for multiple servers,
* should the rows span several region servers.
*
* No changes are committed until the client commits the batch operation via
* HClient.batchCommit().
*
* The entire batch update can be abandoned by calling HClient.batchAbort();
*
* Callers to this method are given a handle that corresponds to the row being
* changed. The handle must be supplied on subsequent put or delete calls so
* that the row can be identified.
*
* @param row Name of row to start update against.
* @return Row lockid.
*/
public synchronized long startUpdate(Text row) {
RegionLocation info = getRegionLocation(row);
BatchUpdate batch = regionToBatch.get(info);
if(batch == null) {
batch = new BatchUpdate();
regionToBatch.put(info, batch);
}
long lockid = batch.startUpdate(row);
lockToBatch.put(lockid, batch);
return lockid;
}
/**
* Change the value for the specified column
*
* @param lockid lock id returned from startUpdate
* @param column column whose value is being set
* @param value new value for column
*/
public synchronized void put(long lockid, Text column, byte[] value) {
BatchUpdate batch = lockToBatch.get(lockid);
if (batch == null) {
throw new IllegalArgumentException("invalid lock id " + lockid);
}
batch.put(lockid, column, value);
}
/**
* Delete the value for a column
*
* @param lockid - lock id returned from startUpdate
* @param column - name of column whose value is to be deleted
*/
public synchronized void delete(long lockid, Text column) {
BatchUpdate batch = lockToBatch.get(lockid);
if (batch == null) {
throw new IllegalArgumentException("invalid lock id " + lockid);
}
batch.delete(lockid, column);
}
/**
* Finalize a batch mutation
*
* @param timestamp time to associate with all the changes
* @throws IOException
*/
public synchronized void commit(long timestamp) throws IOException {
try {
for(Map.Entry<RegionLocation, BatchUpdate> e: regionToBatch.entrySet()) {
RegionLocation r = e.getKey();
HRegionInterface server = getHRegionConnection(r.serverAddress);
server.batchUpdate(r.regionInfo.getRegionName(), timestamp,
e.getValue());
}
} catch (RemoteException e) {
throw RemoteExceptionHandler.decodeRemoteException(e);
}
}
}
private BatchHandler batch;
/*
* Data structure that holds current location for a region and its info.
*/
protected static class RegionLocation {
@SuppressWarnings("unchecked")
protected static class RegionLocation implements Comparable {
HRegionInfo regionInfo;
HServerAddress serverAddress;
@ -84,18 +178,48 @@ public class HClient implements HConstants {
}
/**
* @return HRegionInfo
* {@inheritDoc}
*/
@Override
public boolean equals(Object o) {
return this.compareTo(o) == 0;
}
/**
* {@inheritDoc}
*/
@Override
public int hashCode() {
int result = this.regionInfo.hashCode();
result ^= this.serverAddress.hashCode();
return result;
}
/** @return HRegionInfo */
public HRegionInfo getRegionInfo(){
return regionInfo;
}
/**
* @return HServerAddress
*/
/** @return HServerAddress */
public HServerAddress getServerAddress(){
return serverAddress;
}
//
// Comparable
//
/**
* {@inheritDoc}
*/
public int compareTo(Object o) {
RegionLocation other = (RegionLocation) o;
int result = this.regionInfo.compareTo(other.regionInfo);
if(result == 0) {
result = this.serverAddress.compareTo(other.serverAddress);
}
return result;
}
}
// Map tableName -> (Map startRow -> (HRegionInfo, HServerAddress)
@ -121,6 +245,7 @@ public class HClient implements HConstants {
*/
public HClient(Configuration conf) {
this.conf = conf;
this.batch = null;
this.currentLockId = -1;
this.pause = conf.getLong("hbase.client.pause", 30 * 1000);
@ -222,8 +347,6 @@ public class HClient implements HConstants {
*
* @param desc table descriptor for table
*
* @throws RemoteException if exception occurred on remote side of
* connection.
* @throws IllegalArgumentException if the table name is reserved
* @throws MasterNotRunningException if master is not running
* @throws NoServerForRegionException if root region is not being served
@ -254,8 +377,6 @@ public class HClient implements HConstants {
*
* @param desc table descriptor for table
*
* @throws RemoteException if exception occurred on remote side of
* connection.
* @throws IllegalArgumentException if the table name is reserved
* @throws MasterNotRunningException if master is not running
* @throws NoServerForRegionException if root region is not being served
@ -610,7 +731,7 @@ public class HClient implements HConstants {
if(tableName == null || tableName.getLength() == 0) {
throw new IllegalArgumentException("table name cannot be null or zero length");
}
if(this.currentLockId != -1) {
if(this.currentLockId != -1 || batch != null) {
throw new IllegalStateException("update in progress");
}
this.tableServers = getTableServers(tableName);
@ -1315,6 +1436,59 @@ public class HClient implements HConstants {
return new ClientScanner(columns, startRow, timestamp, filter);
}
/**
* Start a batch of row insertions/updates.
*
* No changes are committed until the call to commitBatchUpdate returns.
* A call to abortBatchUpdate will abandon the entire batch.
*
* Note that in batch mode, calls to commit or abort are ignored.
*/
public synchronized void startBatchUpdate() {
if(this.tableServers == null) {
throw new IllegalStateException("Must open table first");
}
if(batch == null) {
batch = new BatchHandler();
}
}
/**
* Abort a batch mutation
*/
public synchronized void abortBatch() {
batch = null;
}
/**
* Finalize a batch mutation
*
* @throws IOException
*/
public synchronized void commitBatch() throws IOException {
commitBatch(System.currentTimeMillis());
}
/**
* Finalize a batch mutation
*
* @param timestamp time to associate with all the changes
* @throws IOException
*/
public synchronized void commitBatch(long timestamp) throws IOException {
if(batch == null) {
throw new IllegalStateException("no batch update in progress");
}
try {
batch.commit(timestamp);
} finally {
batch = null;
}
}
/**
* Start an atomic row insertion/update. No changes are committed until the
* call to commit() returns. A call to abort() will abandon any updates in progress.
@ -1334,6 +1508,9 @@ public class HClient implements HConstants {
if(this.currentLockId != -1) {
throw new IllegalStateException("update in progress");
}
if(batch != null) {
return batch.startUpdate(row);
}
for(int tries = 0; tries < numRetries; tries++) {
IOException e = null;
RegionLocation info = getRegionLocation(row);
@ -1379,6 +1556,14 @@ public class HClient implements HConstants {
* @throws IOException
*/
public void put(long lockid, Text column, byte val[]) throws IOException {
if(val == null) {
throw new IllegalArgumentException("value cannot be null");
}
if(batch != null) {
batch.put(lockid, column, val);
return;
}
if(lockid != this.currentLockId) {
throw new IllegalArgumentException("invalid lockid");
}
@ -1408,6 +1593,11 @@ public class HClient implements HConstants {
* @throws IOException
*/
public void delete(long lockid, Text column) throws IOException {
if(batch != null) {
batch.delete(lockid, column);
return;
}
if(lockid != this.currentLockId) {
throw new IllegalArgumentException("invalid lockid");
}
@ -1436,6 +1626,10 @@ public class HClient implements HConstants {
* @throws IOException
*/
public void abort(long lockid) throws IOException {
if(batch != null) {
return;
}
if(lockid != this.currentLockId) {
throw new IllegalArgumentException("invalid lockid");
}
@ -1471,6 +1665,10 @@ public class HClient implements HConstants {
* @throws IOException
*/
public void commit(long lockid, long timestamp) throws IOException {
if(batch != null) {
return;
}
if(lockid != this.currentLockId) {
throw new IllegalArgumentException("invalid lockid");
}
@ -1497,6 +1695,13 @@ public class HClient implements HConstants {
* @throws IOException
*/
public void renewLease(long lockid) throws IOException {
if(batch != null) {
return;
}
if(lockid != this.currentLockId) {
throw new IllegalArgumentException("invalid lockid");
}
try {
this.currentServer.renewLease(lockid, this.clientid);
} catch(IOException e) {

View File

@ -34,13 +34,39 @@ import org.apache.hadoop.io.WritableComparable;
* HRegions' table descriptor, etc.
*/
public class HRegionInfo implements WritableComparable {
/** delimiter used between portions of a region name */
public static final char DELIMITER = ',';
/**
* Extracts table name prefix from a region name.
* Presumes region names are ASCII characters only.
* @param regionName A region name.
* @return The table prefix of a region name.
*/
public static Text getTableNameFromRegionName(final Text regionName) {
int index = -1;
byte [] bytes = regionName.getBytes();
for (int i = 0; i < bytes.length; i++) {
if (((char) bytes[i]) == DELIMITER) {
index = i;
break;
}
}
if (index == -1) {
throw new IllegalArgumentException(regionName.toString() + " does not " +
"contain " + DELIMITER + " character");
}
byte [] tableName = new byte[index];
System.arraycopy(bytes, 0, tableName, 0, index);
return new Text(tableName);
}
Text regionName;
long regionId;
Text startKey;
Text endKey;
boolean offLine;
HTableDescriptor tableDesc;
public static final char DELIMITER = ',';
/** Default constructor - creates empty object */
public HRegionInfo() {
@ -100,6 +126,34 @@ public class HRegionInfo implements WritableComparable {
this.offLine = false;
}
/** @return the endKey */
public Text getEndKey(){
return endKey;
}
/** @return the regionId */
public long getRegionId(){
return regionId;
}
/** @return the regionName */
public Text getRegionName(){
return regionName;
}
/** @return the startKey */
public Text getStartKey(){
return startKey;
}
/** @return the tableDesc */
public HTableDescriptor getTableDesc(){
return tableDesc;
}
/**
* {@inheritDoc}
*/
@Override
public String toString() {
return "regionname: " + this.regionName.toString() + ", startKey: <" +
@ -107,11 +161,17 @@ public class HRegionInfo implements WritableComparable {
this.tableDesc.toString() + "}";
}
/**
* {@inheritDoc}
*/
@Override
public boolean equals(Object o) {
return this.compareTo(o) == 0;
}
/**
* {@inheritDoc}
*/
@Override
public int hashCode() {
int result = this.regionName.hashCode();
@ -123,10 +183,13 @@ public class HRegionInfo implements WritableComparable {
return result;
}
//////////////////////////////////////////////////////////////////////////////
//
// Writable
//////////////////////////////////////////////////////////////////////////////
//
/**
* {@inheritDoc}
*/
public void write(DataOutput out) throws IOException {
out.writeLong(regionId);
tableDesc.write(out);
@ -136,6 +199,9 @@ public class HRegionInfo implements WritableComparable {
out.writeBoolean(offLine);
}
/**
* {@inheritDoc}
*/
public void readFields(DataInput in) throws IOException {
this.regionId = in.readLong();
this.tableDesc.readFields(in);
@ -145,69 +211,13 @@ public class HRegionInfo implements WritableComparable {
this.offLine = in.readBoolean();
}
/**
* @return the endKey
*/
public Text getEndKey(){
return endKey;
}
/**
* @return the regionId
*/
public long getRegionId(){
return regionId;
}
/**
* @return the regionName
*/
public Text getRegionName(){
return regionName;
}
/**
* Extracts table name prefix from a region name.
* Presumes region names are ASCII characters only.
* @param regionName A region name.
* @return The table prefix of a region name.
*/
public static Text getTableNameFromRegionName(final Text regionName) {
int index = -1;
byte [] bytes = regionName.getBytes();
for (int i = 0; i < bytes.length; i++) {
if (((char) bytes[i]) == DELIMITER) {
index = i;
break;
}
}
if (index == -1) {
throw new IllegalArgumentException(regionName.toString() + " does not " +
"contain " + DELIMITER + " character");
}
byte [] tableName = new byte[index];
System.arraycopy(bytes, 0, tableName, 0, index);
return new Text(tableName);
}
/**
* @return the startKey
*/
public Text getStartKey(){
return startKey;
}
/**
* @return the tableDesc
*/
public HTableDescriptor getTableDesc(){
return tableDesc;
}
/////////////////////////////////////////////////////////////////////////////
//
// Comparable
/////////////////////////////////////////////////////////////////////////////
//
/**
* {@inheritDoc}
*/
public int compareTo(Object o) {
HRegionInfo other = (HRegionInfo) o;

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.VersionedProtocol;
@ -211,6 +212,16 @@ public interface HRegionInterface extends VersionedProtocol {
long timestamp, RowFilterInterface filter)
throws IOException;
/**
* Applies a batch of updates via one RPC
*
* @param regionName name of the region to update
* @param timestamp the time to be associated with the changes
* @param b BatchUpdate
* @throws IOException
*/
public void batchUpdate(Text regionName, long timestamp, BatchUpdate b) throws IOException;
/**
* Get the next set of values
*

View File

@ -38,6 +38,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.BatchOperation;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RemoteException;
@ -987,6 +989,29 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
return getRegion(regionName).getRegionInfo();
}
/**
* {@inheritDoc}
*/
public void batchUpdate(Text regionName, long timestamp, BatchUpdate b) throws IOException {
for(Map.Entry<Text, ArrayList<BatchOperation>> e: b) {
Text row = e.getKey();
long clientid = rand.nextLong();
long lockid = startUpdate(regionName, clientid, row);
for(BatchOperation op: e.getValue()) {
switch(op.getOp()) {
case BatchOperation.PUT_OP:
put(regionName, clientid, lockid, op.getColumn(), op.getValue());
break;
case BatchOperation.DELETE_OP:
delete(regionName, clientid, lockid, op.getColumn());
break;
}
}
commit(regionName, clientid, lockid, timestamp);
}
}
/**
* {@inheritDoc}
*/

View File

@ -24,25 +24,35 @@ import org.apache.hadoop.io.*;
import java.io.*;
import java.net.InetSocketAddress;
/*******************************************************************************
/**
* HServerAddress is a "label" for a HBase server that combines the host
* name and port number.
******************************************************************************/
public class HServerAddress implements Writable {
*/
public class HServerAddress implements WritableComparable {
private InetSocketAddress address;
private String stringValue;
String stringValue;
/** Empty constructor, used for Writable */
public HServerAddress() {
this.address = null;
this.stringValue = null;
}
/**
* Construct a HServerAddress from an InetSocketAddress
* @param address InetSocketAddress of server
*/
public HServerAddress(InetSocketAddress address) {
this.address = address;
this.stringValue = address.getAddress().getHostAddress() + ":" +
address.getPort();
}
/**
* Construct a HServerAddress from a string of the form hostname:port
*
* @param hostAndPort format 'hostname:port'
*/
public HServerAddress(String hostAndPort) {
int colonIndex = hostAndPort.indexOf(':');
if(colonIndex < 0) {
@ -55,11 +65,21 @@ public class HServerAddress implements Writable {
this.stringValue = hostAndPort;
}
/**
* Construct a HServerAddress from hostname, port number
* @param bindAddress host name
* @param port port number
*/
public HServerAddress(String bindAddress, int port) {
this.address = new InetSocketAddress(bindAddress, port);
this.stringValue = bindAddress + ":" + port;
}
/**
* Construct a HServerAddress from another HServerAddress
*
* @param other the HServerAddress to copy from
*/
public HServerAddress(HServerAddress other) {
String bindAddress = other.getBindAddress();
int port = other.getPort();
@ -67,26 +87,54 @@ public class HServerAddress implements Writable {
stringValue = bindAddress + ":" + port;
}
/** @return host name */
public String getBindAddress() {
return address.getAddress().getHostAddress();
}
/** @return port number */
public int getPort() {
return address.getPort();
}
/** @return the InetSocketAddress */
public InetSocketAddress getInetSocketAddress() {
return address;
}
/**
* {@inheritDoc}
*/
@Override
public String toString() {
return (stringValue == null ? "" : stringValue);
}
//////////////////////////////////////////////////////////////////////////////
// Writable
//////////////////////////////////////////////////////////////////////////////
/**
* {@inheritDoc}
*/
@Override
public boolean equals(Object o) {
return this.compareTo(o) == 0;
}
/**
* {@inheritDoc}
*/
@Override
public int hashCode() {
int result = this.address.hashCode();
result ^= this.stringValue.hashCode();
return result;
}
//
// Writable
//
/**
* {@inheritDoc}
*/
public void readFields(DataInput in) throws IOException {
String bindAddress = in.readUTF();
int port = in.readInt();
@ -101,6 +149,9 @@ public class HServerAddress implements Writable {
}
}
/**
* {@inheritDoc}
*/
public void write(DataOutput out) throws IOException {
if(address == null) {
out.writeUTF("");
@ -111,4 +162,16 @@ public class HServerAddress implements Writable {
out.writeInt(address.getPort());
}
}
//
// Comparable
//
/**
* {@inheritDoc}
*/
public int compareTo(Object o) {
HServerAddress other = (HServerAddress) o;
return this.toString().compareTo(other.toString());
}
}

View File

@ -0,0 +1,121 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
/**
* batch update operation
*/
public class BatchOperation implements Writable {
/** put operation */
public static final int PUT_OP = 1;
/** delete operation */
public static final int DELETE_OP = 2;
private int op;
private Text column;
private byte[] value;
/** default constructor used by Writable */
public BatchOperation() {
this.op = 0;
this.column = new Text();
this.value = null;
}
/**
* Creates a put operation
*
* @param column column name
* @param value column value
*/
public BatchOperation(Text column, byte[] value) {
this.op = PUT_OP;
this.column = column;
this.value = value;
}
/**
* Creates a delete operation
*
* @param column name of column to delete
*/
public BatchOperation(Text column) {
this.op = DELETE_OP;
this.column = column;
this.value = null;
}
/**
* @return the column
*/
public Text getColumn() {
return column;
}
/**
* @return the operation
*/
public int getOp() {
return op;
}
/**
* @return the value
*/
public byte[] getValue() {
return value;
}
//
// Writable
//
/**
* {@inheritDoc}
*/
public void readFields(DataInput in) throws IOException {
op = in.readInt();
column.readFields(in);
if(op == PUT_OP) {
value = new byte[in.readInt()];
in.readFully(value);
}
}
/**
* {@inheritDoc}
*/
public void write(DataOutput out) throws IOException {
out.writeInt(op);
column.write(out);
if(op == PUT_OP) {
out.writeInt(value.length);
out.write(value);
}
}
}

View File

@ -0,0 +1,168 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
/**
* A Writable object that contains a series of BatchOperations
*
* There is one BatchUpdate object per server, so a series of batch operations
* can result in multiple BatchUpdate objects if the batch contains rows that
* are served by multiple region servers.
*/
public class BatchUpdate implements Writable,
Iterable<Map.Entry<Text, ArrayList<BatchOperation>>> {
// used to generate lock ids
private Random rand;
// used on client side to map lockid to a set of row updates
private HashMap<Long, ArrayList<BatchOperation>> lockToRowOps;
// the operations for each row
private HashMap<Text, ArrayList<BatchOperation>> operations;
/** constructor */
public BatchUpdate() {
this.rand = new Random();
this.lockToRowOps = new HashMap<Long, ArrayList<BatchOperation>>();
this.operations = new HashMap<Text, ArrayList<BatchOperation>>();
}
/**
* Start a batch row insertion/update.
*
* No changes are committed until the client commits the batch operation via
* HClient.batchCommit().
*
* The entire batch update can be abandoned by calling HClient.batchAbort();
*
* Callers to this method are given a handle that corresponds to the row being
* changed. The handle must be supplied on subsequent put or delete calls so
* that the row can be identified.
*
* @param row Name of row to start update against.
* @return Row lockid.
*/
public synchronized long startUpdate(Text row) {
Long lockid = Long.valueOf(Math.abs(rand.nextLong()));
ArrayList<BatchOperation> ops = operations.get(row);
if(ops == null) {
ops = new ArrayList<BatchOperation>();
operations.put(row, ops);
}
lockToRowOps.put(lockid, ops);
return lockid.longValue();
}
/**
* Change a value for the specified column
*
* @param lockid - lock id returned from startUpdate
* @param column - column whose value is being set
* @param val - new value for column
*/
public synchronized void put(long lockid, Text column, byte val[]) {
ArrayList<BatchOperation> ops = lockToRowOps.get(lockid);
if(ops == null) {
throw new IllegalArgumentException("no row for lockid " + lockid);
}
ops.add(new BatchOperation(column, val));
}
/**
* Delete the value for a column
*
* @param lockid - lock id returned from startUpdate
* @param column - name of column whose value is to be deleted
*/
public synchronized void delete(long lockid, Text column) {
ArrayList<BatchOperation> ops = lockToRowOps.get(lockid);
if(ops == null) {
throw new IllegalArgumentException("no row for lockid " + lockid);
}
ops.add(new BatchOperation(column));
}
//
// Iterable
//
/**
* @return Iterator<Map.Entry<Text, ArrayList<BatchOperation>>>
* Text row -> ArrayList<BatchOperation> changes
*/
public Iterator<Map.Entry<Text, ArrayList<BatchOperation>>> iterator() {
return operations.entrySet().iterator();
}
//
// Writable
//
/**
* {@inheritDoc}
*/
public void readFields(DataInput in) throws IOException {
int nOps = in.readInt();
for (int i = 0; i < nOps; i++) {
Text row = new Text();
row.readFields(in);
int nRowOps = in.readInt();
ArrayList<BatchOperation> rowOps = new ArrayList<BatchOperation>();
for(int j = 0; j < nRowOps; j++) {
BatchOperation op = new BatchOperation();
op.readFields(in);
rowOps.add(op);
}
operations.put(row, rowOps);
}
}
/**
* {@inheritDoc}
*/
public void write(DataOutput out) throws IOException {
out.writeInt(operations.size());
for (Map.Entry<Text, ArrayList<BatchOperation>> e: operations.entrySet()) {
e.getKey().write(out);
ArrayList<BatchOperation> ops = e.getValue();
out.writeInt(ops.size());
for(BatchOperation op: ops) {
op.write(out);
}
}
}
}

View File

@ -0,0 +1,104 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.io.Text;
/**
* Test batch updates
*/
public class TestBatchUpdate extends HBaseClusterTestCase {
private static final String CONTENTS_STR = "contents:";
private static final Text CONTENTS = new Text(CONTENTS_STR);
private static final byte[] value = { 1, 2, 3, 4 };
private HTableDescriptor desc = null;
private HClient client = null;
/**
* {@inheritDoc}
*/
@Override
public void setUp() throws Exception {
super.setUp();
this.client = new HClient(conf);
this.desc = new HTableDescriptor("test");
desc.addFamily(new HColumnDescriptor(CONTENTS_STR));
try {
client.createTable(desc);
client.openTable(desc.getName());
} catch (Exception e) {
e.printStackTrace();
fail();
}
}
/** the test case */
public void testBatchUpdate() {
try {
client.commitBatch();
} catch (IllegalStateException e) {
// expected
} catch (Exception e) {
e.printStackTrace();
fail();
}
client.startBatchUpdate();
try {
client.openTable(HConstants.META_TABLE_NAME);
} catch (IllegalStateException e) {
// expected
} catch (Exception e) {
e.printStackTrace();
fail();
}
try {
long lockid = client.startUpdate(new Text("row1"));
client.put(lockid, CONTENTS, value);
client.delete(lockid, CONTENTS);
lockid = client.startUpdate(new Text("row2"));
client.put(lockid, CONTENTS, value);
client.commitBatch();
Text[] columns = { CONTENTS };
HScannerInterface scanner = client.obtainScanner(columns, new Text());
HStoreKey key = new HStoreKey();
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
while(scanner.next(key, results)) {
for(Map.Entry<Text, byte[]> e: results.entrySet()) {
System.out.println(key + ": row: " + e.getKey() + " value: " +
new String(e.getValue()));
}
}
} catch (Exception e) {
e.printStackTrace();
fail();
}
}
}