HADOOP-1468 Add HBase batch update to reduce RPC overhead (restrict batches to a single row at a time)

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@560014 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2007-07-26 21:58:22 +00:00
parent 5b1bd1f8f2
commit a9acbeab08
5 changed files with 190 additions and 228 deletions

View File

@ -77,3 +77,5 @@ Trunk (unreleased changes)
region server
49. HADOOP-1646 RegionServer OOME's under sustained, substantial loading by
10 concurrent clients
50. HADOOP-1468 Add HBase batch update to reduce RPC overhead (restrict batches
to a single row at a time)

View File

@ -22,13 +22,15 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -61,99 +63,9 @@ public class HClient implements HConstants {
int numRetries;
HMasterInterface master;
private final Configuration conf;
private volatile long currentLockId;
private AtomicLong currentLockId;
private Class<? extends HRegionInterface> serverInterfaceClass;
protected class BatchHandler {
private HashMap<RegionLocation, BatchUpdate> regionToBatch;
private HashMap<Long, BatchUpdate> lockToBatch;
/** constructor */
public BatchHandler() {
this.regionToBatch = new HashMap<RegionLocation, BatchUpdate>();
this.lockToBatch = new HashMap<Long, BatchUpdate>();
}
/**
* Start a batch row insertion/update.
*
* Manages multiple batch updates that are targeted for multiple servers,
* should the rows span several region servers.
*
* No changes are committed until the client commits the batch operation via
* HClient.batchCommit().
*
* The entire batch update can be abandoned by calling HClient.batchAbort();
*
* Callers to this method are given a handle that corresponds to the row being
* changed. The handle must be supplied on subsequent put or delete calls so
* that the row can be identified.
*
* @param row Name of row to start update against.
* @return Row lockid.
*/
public synchronized long startUpdate(Text row) {
RegionLocation info = getRegionLocation(row);
BatchUpdate batch = regionToBatch.get(info);
if(batch == null) {
batch = new BatchUpdate();
regionToBatch.put(info, batch);
}
long lockid = batch.startUpdate(row);
lockToBatch.put(lockid, batch);
return lockid;
}
/**
* Change the value for the specified column
*
* @param lockid lock id returned from startUpdate
* @param column column whose value is being set
* @param value new value for column
*/
public synchronized void put(long lockid, Text column, byte[] value) {
BatchUpdate batch = lockToBatch.get(lockid);
if (batch == null) {
throw new IllegalArgumentException("invalid lock id " + lockid);
}
batch.put(lockid, column, value);
}
/**
* Delete the value for a column
*
* @param lockid - lock id returned from startUpdate
* @param column - name of column whose value is to be deleted
*/
public synchronized void delete(long lockid, Text column) {
BatchUpdate batch = lockToBatch.get(lockid);
if (batch == null) {
throw new IllegalArgumentException("invalid lock id " + lockid);
}
batch.delete(lockid, column);
}
/**
* Finalize a batch mutation
*
* @param timestamp time to associate with all the changes
* @throws IOException
*/
public synchronized void commit(long timestamp) throws IOException {
try {
for(Map.Entry<RegionLocation, BatchUpdate> e: regionToBatch.entrySet()) {
RegionLocation r = e.getKey();
HRegionInterface server = getHRegionConnection(r.serverAddress);
server.batchUpdate(r.regionInfo.getRegionName(), timestamp,
e.getValue());
}
} catch (RemoteException e) {
throw RemoteExceptionHandler.decodeRemoteException(e);
}
}
}
private BatchHandler batch;
private AtomicReference<BatchUpdate> batch;
/*
* Data structure that holds current location for a region and its info.
@ -606,8 +518,8 @@ public class HClient implements HConstants {
*/
public HClient(Configuration conf) {
this.conf = conf;
this.batch = null;
this.currentLockId = -1;
this.batch = new AtomicReference<BatchUpdate>();
this.currentLockId = new AtomicLong(-1L);
this.pause = conf.getLong("hbase.client.pause", 30 * 1000);
this.numRetries = conf.getInt("hbase.client.retries.number", 5);
@ -1159,7 +1071,7 @@ public class HClient implements HConstants {
if(tableName == null || tableName.getLength() == 0) {
throw new IllegalArgumentException("table name cannot be null or zero length");
}
if(this.currentLockId != -1 || batch != null) {
if(this.currentLockId.get() != -1L || batch.get() != null) {
throw new IllegalStateException("update in progress");
}
this.currentTableServers = tableServers.getTableServers(tableName);
@ -1481,51 +1393,90 @@ public class HClient implements HConstants {
*
* No changes are committed until the call to commitBatchUpdate returns.
* A call to abortBatchUpdate will abandon the entire batch.
*
* Note that in batch mode, calls to commit or abort are ignored.
*
* @param row name of row to be updated
* @return lockid to be used in subsequent put, delete and commit calls
*/
public synchronized void startBatchUpdate() {
if(this.currentTableServers == null) {
public synchronized long startBatchUpdate(final Text row) {
if (this.currentTableServers == null) {
throw new IllegalStateException("Must open table first");
}
if(batch == null) {
batch = new BatchHandler();
if (batch.get() != null) {
throw new IllegalStateException("batch update in progress");
}
batch.set(new BatchUpdate());
return batch.get().startUpdate(row);
}
/**
* Abort a batch mutation
* @param lockid lock id returned by startBatchUpdate
*/
public synchronized void abortBatch() {
batch = null;
public synchronized void abortBatch(final long lockid) {
BatchUpdate u = batch.get();
if (u == null) {
throw new IllegalStateException("no batch update in progress");
}
if (u.getLockid() != lockid) {
throw new IllegalArgumentException("invalid lock id " + lockid);
}
batch.set(null);
}
/**
* Finalize a batch mutation
*
* @param lockid lock id returned by startBatchUpdate
* @throws IOException
*/
public synchronized void commitBatch() throws IOException {
commitBatch(System.currentTimeMillis());
public void commitBatch(final long lockid) throws IOException {
commitBatch(lockid, System.currentTimeMillis());
}
/**
* Finalize a batch mutation
*
* @param lockid lock id returned by startBatchUpdate
* @param timestamp time to associate with all the changes
* @throws IOException
*/
public synchronized void commitBatch(long timestamp) throws IOException {
if(batch == null) {
public synchronized void commitBatch(final long lockid, final long timestamp)
throws IOException {
BatchUpdate u = batch.get();
if (u == null) {
throw new IllegalStateException("no batch update in progress");
}
if (u.getLockid() != lockid) {
throw new IllegalArgumentException("invalid lock id " + lockid);
}
try {
batch.commit(timestamp);
for (int tries = 0; tries < numRetries; tries++) {
RegionLocation r = getRegionLocation(u.getRow());
HRegionInterface server = getHRegionConnection(r.serverAddress);
try {
server.batchUpdate(r.getRegionInfo().getRegionName(), timestamp, u);
break;
} catch (IOException e) {
if (tries < numRetries -1) {
reloadCurrentTable(r);
} else {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
throw e;
}
}
try {
Thread.sleep(pause);
} catch (InterruptedException e) {
}
}
} finally {
batch = null;
batch.set(null);
}
}
@ -1545,26 +1496,27 @@ public class HClient implements HConstants {
* @throws IOException
*/
public synchronized long startUpdate(final Text row) throws IOException {
if(this.currentLockId != -1) {
if (this.currentLockId.get() != -1L) {
throw new IllegalStateException("update in progress");
}
if(batch != null) {
return batch.startUpdate(row);
if (batch.get() != null) {
throw new IllegalStateException("batch update in progress");
}
for(int tries = 0; tries < numRetries; tries++) {
for (int tries = 0; tries < numRetries; tries++) {
IOException e = null;
RegionLocation info = getRegionLocation(row);
try {
currentServer = getHRegionConnection(info.serverAddress);
currentRegion = info.regionInfo.regionName;
clientid = rand.nextLong();
this.currentLockId = currentServer.startUpdate(currentRegion, clientid, row);
this.currentLockId.set(
currentServer.startUpdate(currentRegion, clientid, row));
break;
} catch (IOException ex) {
e = ex;
}
if(tries < numRetries - 1) {
if (tries < numRetries - 1) {
try {
Thread.sleep(this.pause);
@ -1577,13 +1529,13 @@ public class HClient implements HConstants {
e = ex;
}
} else {
if(e instanceof RemoteException) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
throw e;
}
}
return this.currentLockId;
return this.currentLockId.get();
}
/**
@ -1596,29 +1548,29 @@ public class HClient implements HConstants {
* @throws IOException
*/
public void put(long lockid, Text column, byte val[]) throws IOException {
if(val == null) {
if (val == null) {
throw new IllegalArgumentException("value cannot be null");
}
if(batch != null) {
batch.put(lockid, column, val);
if (batch.get() != null) {
batch.get().put(lockid, column, val);
return;
}
if(lockid != this.currentLockId) {
if (lockid != this.currentLockId.get()) {
throw new IllegalArgumentException("invalid lockid");
}
try {
this.currentServer.put(this.currentRegion, this.clientid, lockid, column,
val);
} catch(IOException e) {
} catch (IOException e) {
try {
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
} catch(IOException e2) {
} catch (IOException e2) {
LOG.warn(e2);
}
this.currentServer = null;
this.currentRegion = null;
if(e instanceof RemoteException) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
throw e;
@ -1633,18 +1585,18 @@ public class HClient implements HConstants {
* @throws IOException
*/
public void delete(long lockid, Text column) throws IOException {
if(batch != null) {
batch.delete(lockid, column);
if (batch.get() != null) {
batch.get().delete(lockid, column);
return;
}
if(lockid != this.currentLockId) {
if (lockid != this.currentLockId.get()) {
throw new IllegalArgumentException("invalid lockid");
}
try {
this.currentServer.delete(this.currentRegion, this.clientid, lockid,
column);
} catch(IOException e) {
} catch (IOException e) {
try {
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
} catch(IOException e2) {
@ -1652,7 +1604,7 @@ public class HClient implements HConstants {
}
this.currentServer = null;
this.currentRegion = null;
if(e instanceof RemoteException) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
throw e;
@ -1666,24 +1618,25 @@ public class HClient implements HConstants {
* @throws IOException
*/
public void abort(long lockid) throws IOException {
if(batch != null) {
if (batch.get() != null) {
abortBatch(lockid);
return;
}
if(lockid != this.currentLockId) {
if (lockid != this.currentLockId.get()) {
throw new IllegalArgumentException("invalid lockid");
}
try {
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
} catch(IOException e) {
} catch (IOException e) {
this.currentServer = null;
this.currentRegion = null;
if(e instanceof RemoteException) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
throw e;
} finally {
this.currentLockId = -1;
this.currentLockId.set(-1L);
}
}
@ -1705,11 +1658,12 @@ public class HClient implements HConstants {
* @throws IOException
*/
public void commit(long lockid, long timestamp) throws IOException {
if(batch != null) {
if (batch.get() != null) {
commitBatch(lockid, timestamp);
return;
}
if(lockid != this.currentLockId) {
if (lockid != this.currentLockId.get()) {
throw new IllegalArgumentException("invalid lockid");
}
try {
@ -1724,7 +1678,7 @@ public class HClient implements HConstants {
}
throw e;
} finally {
this.currentLockId = -1;
this.currentLockId.set(-1L);
}
}
@ -1735,24 +1689,24 @@ public class HClient implements HConstants {
* @throws IOException
*/
public void renewLease(long lockid) throws IOException {
if(batch != null) {
if (batch.get() != null) {
return;
}
if(lockid != this.currentLockId) {
if (lockid != this.currentLockId.get()) {
throw new IllegalArgumentException("invalid lockid");
}
try {
this.currentServer.renewLease(lockid, this.clientid);
} catch(IOException e) {
} catch (IOException e) {
try {
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
} catch(IOException e2) {
} catch (IOException e2) {
LOG.warn(e2);
}
this.currentServer = null;
this.currentRegion = null;
if(e instanceof RemoteException) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
throw e;
@ -1770,7 +1724,7 @@ public class HClient implements HConstants {
private Text startRow;
private long scanTime;
private boolean closed;
private volatile RegionLocation[] regions;
private AtomicReferenceArray<RegionLocation> regions;
@SuppressWarnings("hiding")
private int currentRegion;
private HRegionInterface server;
@ -1791,7 +1745,8 @@ public class HClient implements HConstants {
Collection<RegionLocation> info =
currentTableServers.tailMap(firstServer).values();
this.regions = info.toArray(new RegionLocation[info.size()]);
this.regions = new AtomicReferenceArray<RegionLocation>(
info.toArray(new RegionLocation[info.size()]));
}
ClientScanner(Text[] columns, Text startRow, long timestamp,
@ -1821,14 +1776,14 @@ public class HClient implements HConstants {
this.scannerId = -1L;
}
this.currentRegion += 1;
if(this.currentRegion == this.regions.length) {
if(this.currentRegion == this.regions.length()) {
close();
return false;
}
try {
for(int tries = 0; tries < numRetries; tries++) {
RegionLocation info = this.regions[currentRegion];
this.server = getHRegionConnection(this.regions[currentRegion].serverAddress);
RegionLocation info = this.regions.get(currentRegion);
this.server = getHRegionConnection(info.serverAddress);
try {
if (this.filter == null) {

View File

@ -976,23 +976,20 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
*/
public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
throws IOException {
for(Map.Entry<Text, ArrayList<BatchOperation>> e: b) {
Text row = e.getKey();
long clientid = rand.nextLong();
long lockid = startUpdate(regionName, clientid, row);
for(BatchOperation op: e.getValue()) {
switch(op.getOp()) {
case BatchOperation.PUT_OP:
put(regionName, clientid, lockid, op.getColumn(), op.getValue());
break;
case BatchOperation.DELETE_OP:
delete(regionName, clientid, lockid, op.getColumn());
break;
}
long clientid = rand.nextLong();
long lockid = startUpdate(regionName, clientid, b.getRow());
for(BatchOperation op: b) {
switch(op.getOp()) {
case BatchOperation.PUT_OP:
put(regionName, clientid, lockid, op.getColumn(), op.getValue());
break;
case BatchOperation.DELETE_OP:
delete(regionName, clientid, lockid, op.getColumn());
break;
}
commit(regionName, clientid, lockid, timestamp);
}
commit(regionName, clientid, lockid, timestamp);
}
/**

View File

@ -23,9 +23,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.io.Text;
@ -38,25 +36,38 @@ import org.apache.hadoop.io.Writable;
* can result in multiple BatchUpdate objects if the batch contains rows that
* are served by multiple region servers.
*/
public class BatchUpdate implements Writable,
Iterable<Map.Entry<Text, ArrayList<BatchOperation>>> {
public class BatchUpdate implements Writable, Iterable<BatchOperation> {
// used to generate lock ids
private Random rand;
// the row being updated
private Text row;
// used on client side to map lockid to a set of row updates
private HashMap<Long, ArrayList<BatchOperation>> lockToRowOps;
// the lockid
private long lockid;
// the operations for each row
private HashMap<Text, ArrayList<BatchOperation>> operations;
// the batched operations
private ArrayList<BatchOperation> operations;
/** constructor */
public BatchUpdate() {
this.rand = new Random();
this.lockToRowOps = new HashMap<Long, ArrayList<BatchOperation>>();
this.operations = new HashMap<Text, ArrayList<BatchOperation>>();
this.row = new Text();
this.lockid = -1L;
this.operations = new ArrayList<BatchOperation>();
}
/** @return the lock id */
public long getLockid() {
return lockid;
}
/** @return the row */
public Text getRow() {
return row;
}
/**
* Start a batch row insertion/update.
*
@ -66,21 +77,15 @@ Iterable<Map.Entry<Text, ArrayList<BatchOperation>>> {
* The entire batch update can be abandoned by calling HClient.batchAbort();
*
* Callers to this method are given a handle that corresponds to the row being
* changed. The handle must be supplied on subsequent put or delete calls so
* that the row can be identified.
* changed. The handle must be supplied on subsequent put or delete calls.
*
* @param row Name of row to start update against.
* @return Row lockid.
*/
public synchronized long startUpdate(Text row) {
Long lockid = Long.valueOf(Math.abs(rand.nextLong()));
ArrayList<BatchOperation> ops = operations.get(row);
if(ops == null) {
ops = new ArrayList<BatchOperation>();
operations.put(row, ops);
}
lockToRowOps.put(lockid, ops);
return lockid.longValue();
public synchronized long startUpdate(final Text row) {
this.row = row;
this.lockid = Long.valueOf(Math.abs(rand.nextLong()));
return this.lockid;
}
/**
@ -90,12 +95,12 @@ Iterable<Map.Entry<Text, ArrayList<BatchOperation>>> {
* @param column - column whose value is being set
* @param val - new value for column
*/
public synchronized void put(long lockid, Text column, byte val[]) {
ArrayList<BatchOperation> ops = lockToRowOps.get(lockid);
if(ops == null) {
throw new IllegalArgumentException("no row for lockid " + lockid);
public synchronized void put(final long lockid, final Text column,
final byte val[]) {
if(this.lockid != lockid) {
throw new IllegalArgumentException("invalid lockid " + lockid);
}
ops.add(new BatchOperation(column, val));
operations.add(new BatchOperation(column, val));
}
/**
@ -104,12 +109,11 @@ Iterable<Map.Entry<Text, ArrayList<BatchOperation>>> {
* @param lockid - lock id returned from startUpdate
* @param column - name of column whose value is to be deleted
*/
public synchronized void delete(long lockid, Text column) {
ArrayList<BatchOperation> ops = lockToRowOps.get(lockid);
if(ops == null) {
throw new IllegalArgumentException("no row for lockid " + lockid);
public synchronized void delete(final long lockid, final Text column) {
if(this.lockid != lockid) {
throw new IllegalArgumentException("invalid lockid " + lockid);
}
ops.add(new BatchOperation(column));
operations.add(new BatchOperation(column));
}
//
@ -117,11 +121,10 @@ Iterable<Map.Entry<Text, ArrayList<BatchOperation>>> {
//
/**
* @return Iterator<Map.Entry<Text, ArrayList<BatchOperation>>>
* Text row -> ArrayList<BatchOperation> changes
* @return Iterator<BatchOperation>
*/
public Iterator<Map.Entry<Text, ArrayList<BatchOperation>>> iterator() {
return operations.entrySet().iterator();
public Iterator<BatchOperation> iterator() {
return operations.iterator();
}
//
@ -132,20 +135,12 @@ Iterable<Map.Entry<Text, ArrayList<BatchOperation>>> {
* {@inheritDoc}
*/
public void readFields(DataInput in) throws IOException {
row.readFields(in);
int nOps = in.readInt();
for (int i = 0; i < nOps; i++) {
Text row = new Text();
row.readFields(in);
int nRowOps = in.readInt();
ArrayList<BatchOperation> rowOps = new ArrayList<BatchOperation>();
for(int j = 0; j < nRowOps; j++) {
BatchOperation op = new BatchOperation();
op.readFields(in);
rowOps.add(op);
}
operations.put(row, rowOps);
BatchOperation op = new BatchOperation();
op.readFields(in);
operations.add(op);
}
}
@ -153,16 +148,10 @@ Iterable<Map.Entry<Text, ArrayList<BatchOperation>>> {
* {@inheritDoc}
*/
public void write(DataOutput out) throws IOException {
row.write(out);
out.writeInt(operations.size());
for (Map.Entry<Text, ArrayList<BatchOperation>> e: operations.entrySet()) {
e.getKey().write(out);
ArrayList<BatchOperation> ops = e.getValue();
out.writeInt(ops.size());
for(BatchOperation op: ops) {
op.write(out);
}
for (BatchOperation op: operations) {
op.write(out);
}
}
}

View File

@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase;
import java.io.UnsupportedEncodingException;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.io.Text;
@ -29,11 +30,21 @@ import org.apache.hadoop.io.Text;
public class TestBatchUpdate extends HBaseClusterTestCase {
private static final String CONTENTS_STR = "contents:";
private static final Text CONTENTS = new Text(CONTENTS_STR);
private static final byte[] value = { 1, 2, 3, 4 };
private byte[] value;
private HTableDescriptor desc = null;
private HClient client = null;
/** constructor */
public TestBatchUpdate() {
try {
value = "abcd".getBytes(HConstants.UTF8_ENCODING);
} catch (UnsupportedEncodingException e) {
fail();
}
}
/**
* {@inheritDoc}
*/
@ -56,7 +67,7 @@ public class TestBatchUpdate extends HBaseClusterTestCase {
/** the test case */
public void testBatchUpdate() {
try {
client.commitBatch();
client.commitBatch(-1L);
} catch (IllegalStateException e) {
// expected
@ -65,7 +76,7 @@ public class TestBatchUpdate extends HBaseClusterTestCase {
fail();
}
client.startBatchUpdate();
long lockid = client.startBatchUpdate(new Text("row1"));
try {
client.openTable(HConstants.META_TABLE_NAME);
@ -77,14 +88,22 @@ public class TestBatchUpdate extends HBaseClusterTestCase {
fail();
}
try {
long lockid = client.startUpdate(new Text("row1"));
try {
@SuppressWarnings("unused")
long dummy = client.startUpdate(new Text("row2"));
} catch (IllegalStateException e) {
// expected
} catch (Exception e) {
e.printStackTrace();
fail();
}
client.put(lockid, CONTENTS, value);
client.delete(lockid, CONTENTS);
client.commitBatch(lockid);
lockid = client.startUpdate(new Text("row2"));
lockid = client.startBatchUpdate(new Text("row2"));
client.put(lockid, CONTENTS, value);
client.commitBatch();
client.commit(lockid);
Text[] columns = { CONTENTS };
HScannerInterface scanner = client.obtainScanner(columns, new Text());