HBASE-748 Add an efficient way to batch update many rows

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@705770 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2008-10-17 22:03:29 +00:00
parent f5ded90780
commit a3b452e9b0
13 changed files with 334 additions and 51 deletions

View File

@ -54,6 +54,7 @@ Release 0.19.0 - Unreleased
(Andrzej Bialecki via Stack)
OPTIMIZATIONS
HBASE-748 Add an efficient way to batch update many rows
HBASE-887 Fix a hotspot in scanners
Release 0.18.0 - September 21st, 2008

View File

@ -51,6 +51,13 @@
<description>The address for the hbase master web UI
</description>
</property>
<property>
<name>hbase.client.write.buffer</name>
<value>10485760</value>
<description>Size of the write buffer in bytes. A bigger buffer takes more
memory but reduces the number of RPC.
</description>
</property>
<property>
<name>hbase.master.meta.thread.rescanfrequency</name>
<value>60000</value>

View File

@ -138,4 +138,16 @@ public interface HConnection {
*/
public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
throws IOException, RuntimeException;
/**
* Pass in a ServerCallable with your particular bit of logic defined and
* this method will pass it to the defined region server.
* @param <T> the type of the return value
* @param callable
* @return an object of type T
* @throws IOException
* @throws RuntimeException
*/
public <T> T getRegionServerForWithoutRetries(ServerCallable<T> callable)
throws IOException, RuntimeException;
}

View File

@ -460,7 +460,6 @@ public class HConnectionManager implements HConstants {
final byte [] tableName, final byte [] row, boolean useCache)
throws IOException{
HRegionLocation location = null;
// if we're supposed to be using the cache, then check it for a possible
// hit. otherwise, delete any existing cached location so it won't
// interfere.
@ -472,7 +471,7 @@ public class HConnectionManager implements HConstants {
} else {
deleteCachedLocation(tableName, row);
}
// build the key of the meta region we should be looking for.
// the extra 9's on the end are necessary to allow "exact" matches
// without knowing the precise region names.
@ -879,6 +878,29 @@ public class HConnectionManager implements HConstants {
}
return null;
}
public <T> T getRegionServerForWithoutRetries(ServerCallable<T> callable)
throws IOException, RuntimeException {
getMaster();
try {
callable.instantiateServer(false);
return callable.call();
} catch (Throwable t) {
if (t instanceof UndeclaredThrowableException) {
t = t.getCause();
}
if (t instanceof RemoteException) {
t = RemoteExceptionHandler.decodeRemoteException((RemoteException) t);
}
if (t instanceof DoNotRetryIOException) {
throw (DoNotRetryIOException) t;
}
if (t instanceof IOException) {
throw (IOException) t;
}
}
return null;
}
void close(boolean stopProxy) {
if (master != null) {

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@ -54,6 +55,10 @@ public class HTable {
private final HConnection connection;
private final byte [] tableName;
private HBaseConfiguration configuration;
private ArrayList<BatchUpdate> writeBuffer;
private long writeBufferSize;
private boolean autoFlush;
private long currentWriteBufferSize;
private int scannerCaching;
/**
@ -103,6 +108,11 @@ public class HTable {
this.configuration = conf;
this.tableName = tableName;
this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
this.writeBuffer = new ArrayList<BatchUpdate>();
this.writeBufferSize =
this.configuration.getLong("hbase.client.write.buffer", 10485760);
this.autoFlush = true;
this.currentWriteBufferSize = 0;
this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 30);
}
@ -964,6 +974,7 @@ public class HTable {
/**
* Commit a BatchUpdate to the table.
* If autoFlush is false, the update is buffered
* @param batchUpdate
* @throws IOException
*/
@ -974,6 +985,7 @@ public class HTable {
/**
* Commit a BatchUpdate to the table using existing row lock.
* If autoFlush is false, the update is buffered
* @param batchUpdate
* @param rl Existing row lock
* @throws IOException
@ -982,43 +994,104 @@ public class HTable {
final RowLock rl)
throws IOException {
checkRowAndColumns(batchUpdate);
connection.getRegionServerWithRetries(
new ServerCallable<Boolean>(connection, tableName, batchUpdate.getRow()) {
public Boolean call() throws IOException {
long lockId = -1L;
if(rl != null) {
lockId = rl.getLockId();
}
server.batchUpdate(location.getRegionInfo().getRegionName(),
batchUpdate, lockId);
return null;
}
}
);
writeBuffer.add(batchUpdate);
currentWriteBufferSize += batchUpdate.getSize();
if(autoFlush || currentWriteBufferSize > writeBufferSize) {
flushCommits();
}
}
/**
* Commit a RowsBatchUpdate to the table.
* Commit a List of BatchUpdate to the table.
* If autoFlush is false, the updates are buffered
* @param batchUpdates
* @throws IOException
*/
public synchronized void commit(final List<BatchUpdate> batchUpdates)
throws IOException {
for (BatchUpdate batchUpdate : batchUpdates)
commit(batchUpdate,null);
public synchronized void commit(final List<BatchUpdate> batchUpdates)
throws IOException {
for(BatchUpdate bu : batchUpdates) {
checkRowAndColumns(bu);
writeBuffer.add(bu);
currentWriteBufferSize += bu.getSize();
}
if(autoFlush || currentWriteBufferSize > writeBufferSize) {
flushCommits();
}
}
/**
* Utility method that checks rows existence, length and
* columns well formedness.
* Commit to the table the buffer of BatchUpdate.
* Called automaticaly in the commit methods when autoFlush is true.
* @throws IOException
*/
public void flushCommits() throws IOException {
try {
// See HBASE-748 for pseudo code of this method
if (writeBuffer.isEmpty()) {
return;
}
Collections.sort(writeBuffer);
List<BatchUpdate> tempUpdates = new ArrayList<BatchUpdate>();
byte[] currentRegion = connection.getRegionLocation(tableName,
writeBuffer.get(0).getRow(), false).getRegionInfo().getRegionName();
byte[] region = currentRegion;
boolean isLastRow = false;
for (int i = 0; i < writeBuffer.size(); i++) {
BatchUpdate batchUpdate = writeBuffer.get(i);
tempUpdates.add(batchUpdate);
isLastRow = (i + 1) == writeBuffer.size();
if (!isLastRow) {
region = connection.getRegionLocation(tableName,
writeBuffer.get(i + 1).getRow(), false).getRegionInfo()
.getRegionName();
}
if (!Bytes.equals(currentRegion, region) || isLastRow) {
final BatchUpdate[] updates = tempUpdates.toArray(new BatchUpdate[0]);
int index = connection
.getRegionServerForWithoutRetries(new ServerCallable<Integer>(
connection, tableName, batchUpdate.getRow()) {
public Integer call() throws IOException {
int i = server.batchUpdates(location.getRegionInfo()
.getRegionName(), updates);
return i;
}
});
if (index != updates.length - 1) {
// Basic waiting time. If many updates are flushed, tests have shown
// that this is barely needed but when commiting 1 update this may
// get retried hundreds of times.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// continue
}
i = i - updates.length + index;
region = connection.getRegionLocation(tableName,
writeBuffer.get(i + 1).getRow(), true).getRegionInfo()
.getRegionName();
}
currentRegion = region;
tempUpdates.clear();
}
}
} finally {
currentWriteBufferSize = 0;
writeBuffer.clear();
}
}
/**
* Utility method that checks rows existence, length and columns well
* formedness.
*
* @param bu
* @throws IllegalArgumentException
* @throws IOException
*/
private void checkRowAndColumns(BatchUpdate bu)
throws IllegalArgumentException, IOException {
if (bu.getRow() == null ||
bu.getRow().length > HConstants.MAX_ROW_LENGTH) {
if (bu.getRow() == null || bu.getRow().length > HConstants.MAX_ROW_LENGTH) {
throw new IllegalArgumentException("Row key is invalid");
}
for (BatchOperation bo : bu) {
@ -1063,6 +1136,46 @@ public class HTable {
}
);
}
/**
* Get the value of autoFlush. If true, updates will not be buffered
* @return value of autoFlush
*/
public boolean isAutoFlush() {
return autoFlush;
}
/**
* Set if this instanciation of HTable will autoFlush
* @param autoFlush
*/
public void setAutoFlush(boolean autoFlush) {
this.autoFlush = autoFlush;
}
/**
* Get the maximum size in bytes of the write buffer for this HTable
* @return the size of the write buffer in bytes
*/
public long getWriteBufferSize() {
return writeBufferSize;
}
/**
* Set the size of the buffer in bytes
* @param writeBufferSize
*/
public void setWriteBufferSize(long writeBufferSize) {
this.writeBufferSize = writeBufferSize;
}
/**
* Get the write buffer
* @return the current write buffer
*/
public ArrayList<BatchUpdate> getWriteBuffer() {
return writeBuffer;
}
/**
* Implements the scanner interface for the HBase client.

View File

@ -28,7 +28,7 @@ import java.util.Iterator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
/**
* A Writable object that contains a series of BatchOperations
@ -37,10 +37,12 @@ import org.apache.hadoop.io.Writable;
* can result in multiple BatchUpdate objects if the batch contains rows that
* are served by multiple region servers.
*/
public class BatchUpdate implements Writable, Iterable<BatchOperation> {
public class BatchUpdate implements WritableComparable<BatchUpdate>,
Iterable<BatchOperation> {
// the row being updated
private byte [] row = null;
private long size = 0;
// the batched operations
private ArrayList<BatchOperation> operations =
@ -95,6 +97,7 @@ public class BatchUpdate implements Writable, Iterable<BatchOperation> {
this.row = row;
this.timestamp = timestamp;
this.operations = new ArrayList<BatchOperation>();
this.size = (row == null)? 0: row.length;
}
/** @return the row */
@ -102,6 +105,13 @@ public class BatchUpdate implements Writable, Iterable<BatchOperation> {
return row;
}
/**
* @return BatchUpdate size in bytes.
*/
public long getSize() {
return size;
}
/**
* @return the timestamp this BatchUpdate will be committed with.
*/
@ -201,6 +211,7 @@ public class BatchUpdate implements Writable, Iterable<BatchOperation> {
// If null, the PUT becomes a DELETE operation.
throw new IllegalArgumentException("Passed value cannot be null");
}
size += val.length + column.length;
operations.add(new BatchOperation(column, val));
}
@ -265,6 +276,7 @@ public class BatchUpdate implements Writable, Iterable<BatchOperation> {
}
this.row = Bytes.readByteArray(in);
timestamp = in.readLong();
this.size = in.readLong();
int nOps = in.readInt();
for (int i = 0; i < nOps; i++) {
BatchOperation op = new BatchOperation();
@ -276,9 +288,14 @@ public class BatchUpdate implements Writable, Iterable<BatchOperation> {
public void write(final DataOutput out) throws IOException {
Bytes.writeByteArray(out, this.row);
out.writeLong(timestamp);
out.writeLong(this.size);
out.writeInt(operations.size());
for (BatchOperation op: operations) {
op.write(out);
}
}
}
public int compareTo(BatchUpdate o) {
return Bytes.compareTo(this.row, o.getRow());
}
}

View File

@ -131,6 +131,7 @@ public class HbaseObjectWritable implements Writable, Configurable {
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
addToMap(BatchUpdate[].class, code++);
}
private Class<?> declaredClass;

View File

@ -108,9 +108,19 @@ public interface HRegionInterface extends VersionedProtocol {
throws IOException;
/**
* Delete all cells that match the passed row and column and whose
* timestamp is equal-to or older than the passed timestamp.
*
* Applies a batch of updates via one RPC for many rows
*
* @param regionName name of the region to update
* @param b BatchUpdate[]
* @throws IOException
*/
public int batchUpdates(final byte[] regionName, final BatchUpdate[] b)
throws IOException;
/**
* Delete all cells that match the passed row and column and whose timestamp
* is equal-to or older than the passed timestamp.
*
* @param regionName region name
* @param row row key
* @param column column key

View File

@ -442,7 +442,6 @@ class ServerManager implements HConstants {
// it was assigned, and it's not a duplicate assignment, so take it out
// of the unassigned list.
master.regionManager.noLongerUnassigned(region);
if (region.isRootRegion()) {
// Store the Root Region location (in memory)
HServerAddress rootServer = serverInfo.getServerAddress();

View File

@ -1353,6 +1353,22 @@ public class HRegion implements HConstants {
// set() methods for client use.
//////////////////////////////////////////////////////////////////////////////
/**
* Batch update many rows and take splitsAndClosesLock so we don't get
* blocked while updating.
* @param bus
*/
public void batchUpdate(BatchUpdate[] bus) throws IOException {
splitsAndClosesLock.readLock().lock();
try {
for (BatchUpdate bu : bus) {
batchUpdate(bu, null);
}
} finally {
splitsAndClosesLock.readLock().unlock();
}
}
/**
* @param b
* @throws IOException
@ -1465,32 +1481,29 @@ public class HRegion implements HConstants {
* the notify.
*/
private void checkResources() {
if (this.memcacheSize.get() > this.blockingMemcacheSize) {
requestFlush();
doBlocking();
}
}
private synchronized void doBlocking() {
boolean blocked = false;
while (this.memcacheSize.get() > this.blockingMemcacheSize) {
requestFlush();
if (!blocked) {
LOG.info("Blocking updates for '" + Thread.currentThread().getName() +
"' on region " + Bytes.toString(getRegionName()) + ": Memcache size " +
StringUtils.humanReadableInt(this.memcacheSize.get()) +
" is >= than blocking " +
StringUtils.humanReadableInt(this.blockingMemcacheSize) + " size");
"' on region " + Bytes.toString(getRegionName()) +
": Memcache size " +
StringUtils.humanReadableInt(this.memcacheSize.get()) +
" is >= than blocking " +
StringUtils.humanReadableInt(this.blockingMemcacheSize) + " size");
}
blocked = true;
try {
wait(threadWakeFrequency);
} catch (InterruptedException e) {
// continue;
synchronized(this) {
try {
wait(threadWakeFrequency);
} catch (InterruptedException e) {
// continue;
}
}
}
if (blocked) {
LOG.info("Unblocking updates for region " + this + " '" +
Thread.currentThread().getName() + "'");
LOG.info("Unblocking updates for region " + this + " '"
+ Thread.currentThread().getName() + "'");
}
}

View File

@ -1125,6 +1125,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
throws IOException {
if (b.getRow() == null)
throw new IllegalArgumentException("update has null row");
checkOpen();
this.requestCount.incrementAndGet();
HRegion region = getRegion(regionName);
@ -1141,6 +1142,33 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
}
public int batchUpdates(final byte[] regionName, final BatchUpdate[] b)
throws IOException {
int i = 0;
checkOpen();
try {
HRegion region = getRegion(regionName);
this.cacheFlusher.reclaimMemcacheMemory();
for (BatchUpdate batchUpdate : b) {
this.requestCount.incrementAndGet();
validateValuesLength(batchUpdate, region);
}
i+= b.length-1;
region.batchUpdate(b);
} catch (OutOfMemoryError error) {
abort();
LOG.fatal("Ran out of memory", error);
} catch(WrongRegionException ex) {
return i;
} catch (NotServingRegionException ex) {
return i;
} catch (IOException e) {
checkFileSystem();
throw e;
}
return i;
}
/**
* Utility method to verify values length
* @param batchUpdate The update to verify

View File

@ -372,11 +372,13 @@ public class PerformanceEvaluation implements HConstants {
void testSetup() throws IOException {
this.admin = new HBaseAdmin(conf);
this.table = new HTable(conf, tableDescriptor.getName());
this.table.setAutoFlush(false);
this.table.setWriteBufferSize(1024*1024*12);
}
@SuppressWarnings("unused")
void testTakedown() throws IOException {
// Empty
this.table.flushCommits();
}
/*

View File

@ -55,8 +55,8 @@ public class TestBatchUpdate extends HBaseClusterTestCase {
*/
public TestBatchUpdate() throws UnsupportedEncodingException {
super();
value = "abcd".getBytes(HConstants.UTF8_ENCODING);
smallValue = "a".getBytes(HConstants.UTF8_ENCODING);
value = Bytes.toBytes("abcd");
smallValue = Bytes.toBytes("a");
}
@Override
@ -153,4 +153,62 @@ public class TestBatchUpdate extends HBaseClusterTestCase {
fail("This is unexpected : " + e);
}
}
public void testRowsBatchUpdateBufferedOneFlush() {
table.setAutoFlush(false);
ArrayList<BatchUpdate> rowsUpdate = new ArrayList<BatchUpdate>();
for(int i = 0; i < NB_BATCH_ROWS*10; i++) {
BatchUpdate batchUpdate = new BatchUpdate("row"+i);
batchUpdate.put(CONTENTS, value);
rowsUpdate.add(batchUpdate);
}
try {
table.commit(rowsUpdate);
byte [][] columns = { CONTENTS };
Scanner scanner = table.getScanner(columns, HConstants.EMPTY_START_ROW);
int nbRows = 0;
for(RowResult row : scanner)
nbRows++;
assertEquals(0, nbRows);
scanner.close();
table.flushCommits();
scanner = table.getScanner(columns, HConstants.EMPTY_START_ROW);
nbRows = 0;
for(RowResult row : scanner)
nbRows++;
assertEquals(NB_BATCH_ROWS*10, nbRows);
} catch (IOException e) {
fail("This is unexpected : " + e);
}
}
public void testRowsBatchUpdateBufferedManyManyFlushes() {
table.setAutoFlush(false);
table.setWriteBufferSize(10);
ArrayList<BatchUpdate> rowsUpdate = new ArrayList<BatchUpdate>();
for(int i = 0; i < NB_BATCH_ROWS*10; i++) {
BatchUpdate batchUpdate = new BatchUpdate("row"+i);
batchUpdate.put(CONTENTS, value);
rowsUpdate.add(batchUpdate);
}
try {
table.commit(rowsUpdate);
table.flushCommits();
byte [][] columns = { CONTENTS };
Scanner scanner = table.getScanner(columns, HConstants.EMPTY_START_ROW);
int nbRows = 0;
for(RowResult row : scanner)
nbRows++;
assertEquals(NB_BATCH_ROWS*10, nbRows);
} catch (IOException e) {
fail("This is unexpected : " + e);
}
}
}