HBASE-2353. Batch puts should sync HLog as few times as possible

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@954285 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2010-06-13 18:54:25 +00:00
parent d92c49629d
commit 18a53dfd6b
8 changed files with 440 additions and 120 deletions

View File

@ -693,6 +693,7 @@ Release 0.21.0 - Unreleased
(Lars Francke via Stack) (Lars Francke via Stack)
HBASE-2468 Improvements to prewarm META cache on clients HBASE-2468 Improvements to prewarm META cache on clients
(Mingjie Lai via Stack) (Mingjie Lai via Stack)
HBASE-2353 Batch puts should sync HLog as few times as possible
NEW FEATURES NEW FEATURES
HBASE-1961 HBase EC2 scripts HBASE-1961 HBase EC2 scripts

View File

@ -26,6 +26,15 @@ import org.apache.hadoop.hbase.util.Bytes;
* HConstants holds a bunch of HBase-related constants * HConstants holds a bunch of HBase-related constants
*/ */
public final class HConstants { public final class HConstants {
/**
* Status codes used for return values of bulk operations.
*/
public enum OperationStatusCode {
NOT_RUN,
SUCCESS,
BAD_FAMILY,
FAILURE;
}
/** long constant for zero */ /** long constant for zero */
public static final Long ZERO_L = Long.valueOf(0L); public static final Long ZERO_L = Long.valueOf(0L);
@ -342,5 +351,4 @@ public final class HConstants {
private HConstants() { private HConstants() {
// Can't be instantiated with this ctor. // Can't be instantiated with this ctor.
} }
} }

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
@ -54,16 +55,20 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import com.google.common.collect.Lists;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.util.AbstractList; import java.util.AbstractList;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -1240,7 +1245,7 @@ public class HRegion implements HeapSize { // , Writable{
try { try {
byte [] row = delete.getRow(); byte [] row = delete.getRow();
// If we did not pass an existing row lock, obtain a new one // If we did not pass an existing row lock, obtain a new one
lid = getLock(lockid, row); lid = getLock(lockid, row, true);
// All edits for the given row (across all column families) must happen atomically. // All edits for the given row (across all column families) must happen atomically.
prepareDelete(delete); prepareDelete(delete);
@ -1265,7 +1270,6 @@ public class HRegion implements HeapSize { // , Writable{
boolean flush = false; boolean flush = false;
updatesLock.readLock().lock(); updatesLock.readLock().lock();
ReadWriteConsistencyControl.WriteEntry w = null;
try { try {
@ -1275,7 +1279,6 @@ public class HRegion implements HeapSize { // , Writable{
List<KeyValue> kvs = e.getValue(); List<KeyValue> kvs = e.getValue();
Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
Store store = getStore(family);
for (KeyValue kv: kvs) { for (KeyValue kv: kvs) {
// Check if time is LATEST, change to time of most recent addition if so // Check if time is LATEST, change to time of most recent addition if so
// This is expensive. // This is expensive.
@ -1315,50 +1318,24 @@ public class HRegion implements HeapSize { // , Writable{
} }
if (writeToWAL) { if (writeToWAL) {
//
// write/sync to WAL should happen before we touch memstore. // write/sync to WAL should happen before we touch memstore.
// //
// If order is reversed, i.e. we write to memstore first, and // If order is reversed, i.e. we write to memstore first, and
// for some reason fail to write/sync to commit log, the memstore // for some reason fail to write/sync to commit log, the memstore
// will contain uncommitted transactions. // will contain uncommitted transactions.
// //
// bunch up all edits across all column families into a // bunch up all edits across all column families into a
// single WALEdit. // single WALEdit.
WALEdit walEdit = new WALEdit(); WALEdit walEdit = new WALEdit();
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) { addFamilyMapToWALEdit(familyMap, byteNow, walEdit);
List<KeyValue> kvs = e.getValue(); this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
for (KeyValue kv : kvs) {
walEdit.add(kv);
}
}
// append the edit to WAL. The append also does the sync.
if (!walEdit.isEmpty()) {
this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
walEdit, now); walEdit, now);
}
} }
// Now make changes to the memstore. // Now make changes to the memstore.
long addedSize = applyFamilyMapToMemstore(familyMap);
long size = 0; flush = isFlushSize(memstoreSize.addAndGet(addedSize));
w = rwcc.beginMemstoreInsert();
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
List<KeyValue> kvs = e.getValue();
Store store = getStore(family);
for (KeyValue kv: kvs) {
kv.setMemstoreTS(w.getWriteNumber());
size = this.memstoreSize.addAndGet(store.delete(kv));
}
}
flush = isFlushSize(size);
} finally { } finally {
if (w != null) rwcc.completeMemstoreInsert(w);
this.updatesLock.readLock().unlock(); this.updatesLock.readLock().unlock();
} }
@ -1419,7 +1396,7 @@ public class HRegion implements HeapSize { // , Writable{
// invokes a HRegion#abort. // invokes a HRegion#abort.
byte [] row = put.getRow(); byte [] row = put.getRow();
// If we did not pass an existing row lock, obtain a new one // If we did not pass an existing row lock, obtain a new one
Integer lid = getLock(lockid, row); Integer lid = getLock(lockid, row, true);
try { try {
// All edits for the given row (across all column families) must happen atomically. // All edits for the given row (across all column families) must happen atomically.
@ -1432,6 +1409,162 @@ public class HRegion implements HeapSize { // , Writable{
} }
} }
/**
* Struct-like class that tracks the progress of a batch operation,
* accumulating status codes and tracking the index at which processing
* is proceeding.
*/
private static class BatchOperationInProgress<T> {
T[] operations;
OperationStatusCode[] retCodes;
int nextIndexToProcess = 0;
public BatchOperationInProgress(T[] operations) {
this.operations = operations;
retCodes = new OperationStatusCode[operations.length];
Arrays.fill(retCodes, OperationStatusCode.NOT_RUN);
}
public boolean isDone() {
return nextIndexToProcess == operations.length;
}
}
/**
* Perform a batch put with no pre-specified locks
* @see HRegion#put(Pair[])
*/
public OperationStatusCode[] put(Put[] puts) throws IOException {
@SuppressWarnings("unchecked")
Pair<Put, Integer> putsAndLocks[] = new Pair[puts.length];
for (int i = 0; i < puts.length; i++) {
putsAndLocks[i] = new Pair<Put, Integer>(puts[i], null);
}
return put(putsAndLocks);
}
/**
* Perform a batch of puts.
* @param putsAndLocks the list of puts paired with their requested lock IDs.
* @throws IOException
*/
public OperationStatusCode[] put(Pair<Put, Integer>[] putsAndLocks) throws IOException {
BatchOperationInProgress<Pair<Put, Integer>> batchOp =
new BatchOperationInProgress<Pair<Put,Integer>>(putsAndLocks);
while (!batchOp.isDone()) {
checkReadOnly();
checkResources();
long newSize;
splitsAndClosesLock.readLock().lock();
try {
long addedSize = doMiniBatchPut(batchOp);
newSize = memstoreSize.addAndGet(addedSize);
} finally {
splitsAndClosesLock.readLock().unlock();
}
if (isFlushSize(newSize)) {
requestFlush();
}
}
return batchOp.retCodes;
}
private long doMiniBatchPut(BatchOperationInProgress<Pair<Put, Integer>> batchOp) throws IOException {
long now = EnvironmentEdgeManager.currentTimeMillis();
byte[] byteNow = Bytes.toBytes(now);
/** Keep track of the locks we hold so we can release them in finally clause */
List<Integer> acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
// We try to set up a batch in the range [firstIndex,lastIndexExclusive)
int firstIndex = batchOp.nextIndexToProcess;
int lastIndexExclusive = firstIndex;
boolean success = false;
try {
// ------------------------------------
// STEP 1. Try to acquire as many locks as we can, and ensure
// we acquire at least one.
// ----------------------------------
int numReadyToWrite = 0;
while (lastIndexExclusive < batchOp.operations.length) {
Pair<Put, Integer> nextPair = batchOp.operations[lastIndexExclusive];
Put put = nextPair.getFirst();
Integer providedLockId = nextPair.getSecond();
// Check the families in the put. If bad, skip this one.
try {
checkFamilies(put.getFamilyMap().keySet());
} catch (NoSuchColumnFamilyException nscf) {
LOG.warn("No such column family in batch put", nscf);
batchOp.retCodes[lastIndexExclusive] = OperationStatusCode.BAD_FAMILY;
lastIndexExclusive++;
continue;
}
// If we haven't got any rows in our batch, we should block to
// get the next one.
boolean shouldBlock = numReadyToWrite == 0;
Integer acquiredLockId = getLock(providedLockId, put.getRow(), shouldBlock);
if (acquiredLockId == null) {
// We failed to grab another lock
assert !shouldBlock : "Should never fail to get lock when blocking";
break; // stop acquiring more rows for this batch
}
if (providedLockId == null) {
acquiredLocks.add(acquiredLockId);
}
lastIndexExclusive++;
numReadyToWrite++;
}
// We've now grabbed as many puts off the list as we can
assert numReadyToWrite > 0;
// ------------------------------------
// STEP 2. Write to WAL
// ----------------------------------
WALEdit walEdit = new WALEdit();
for (int i = firstIndex; i < lastIndexExclusive; i++) {
// Skip puts that were determined to be invalid during preprocessing
if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
Put p = batchOp.operations[i].getFirst();
if (!p.getWriteToWAL()) continue;
addFamilyMapToWALEdit(p.getFamilyMap(), byteNow, walEdit);
}
// Append the edit to WAL
this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
walEdit, now);
// ------------------------------------
// STEP 3. Write back to memstore
// ----------------------------------
long addedSize = 0;
for (int i = firstIndex; i < lastIndexExclusive; i++) {
if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
Put p = batchOp.operations[i].getFirst();
addedSize += applyFamilyMapToMemstore(p.getFamilyMap());
batchOp.retCodes[i] = OperationStatusCode.SUCCESS;
}
success = true;
return addedSize;
} finally {
for (Integer toRelease : acquiredLocks) {
releaseRowLock(toRelease);
}
if (!success) {
for (int i = firstIndex; i < lastIndexExclusive; i++) {
if (batchOp.retCodes[i] == OperationStatusCode.NOT_RUN) {
batchOp.retCodes[i] = OperationStatusCode.FAILURE;
}
}
}
batchOp.nextIndexToProcess = lastIndexExclusive;
}
}
//TODO, Think that gets/puts and deletes should be refactored a bit so that //TODO, Think that gets/puts and deletes should be refactored a bit so that
//the getting of the lock happens before, so that you would just pass it into //the getting of the lock happens before, so that you would just pass it into
@ -1467,7 +1600,7 @@ public class HRegion implements HeapSize { // , Writable{
get.addColumn(family, qualifier); get.addColumn(family, qualifier);
// Lock row // Lock row
Integer lid = getLock(lockId, get.getRow()); Integer lid = getLock(lockId, get.getRow(), true);
List<KeyValue> result = new ArrayList<KeyValue>(); List<KeyValue> result = new ArrayList<KeyValue>();
try { try {
result = get(get); result = get(get);
@ -1619,65 +1752,24 @@ public class HRegion implements HeapSize { // , Writable{
byte[] byteNow = Bytes.toBytes(now); byte[] byteNow = Bytes.toBytes(now);
boolean flush = false; boolean flush = false;
this.updatesLock.readLock().lock(); this.updatesLock.readLock().lock();
ReadWriteConsistencyControl.WriteEntry w = null;
try { try {
WALEdit walEdit = new WALEdit(); checkFamilies(familyMap.keySet());
// check if column families are valid;
// check if any timestampupdates are needed;
// and if writeToWAL is set, then also collapse edits into a single list.
for (Map.Entry<byte[], List<KeyValue>> e: familyMap.entrySet()) {
List<KeyValue> edits = e.getValue();
byte[] family = e.getKey();
// is this a valid column family?
checkFamily(family);
// update timestamp on keys if required.
if (updateKeys(edits, byteNow)) {
if (writeToWAL) {
// bunch up all edits across all column families into a
// single WALEdit.
for (KeyValue kv : edits) {
walEdit.add(kv);
}
}
}
}
// append to and sync WAL
if (!walEdit.isEmpty()) {
//
// write/sync to WAL should happen before we touch memstore.
//
// If order is reversed, i.e. we write to memstore first, and
// for some reason fail to write/sync to commit log, the memstore
// will contain uncommitted transactions.
//
// write/sync to WAL should happen before we touch memstore.
//
// If order is reversed, i.e. we write to memstore first, and
// for some reason fail to write/sync to commit log, the memstore
// will contain uncommitted transactions.
if (writeToWAL) {
WALEdit walEdit = new WALEdit();
addFamilyMapToWALEdit(familyMap, byteNow, walEdit);
this.log.append(regionInfo, regionInfo.getTableDesc().getName(), this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
walEdit, now); walEdit, now);
} }
long size = 0; long addedSize = applyFamilyMapToMemstore(familyMap);
flush = isFlushSize(memstoreSize.addAndGet(addedSize));
w = rwcc.beginMemstoreInsert();
// now make changes to the memstore
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
List<KeyValue> edits = e.getValue();
Store store = getStore(family);
for (KeyValue kv: edits) {
kv.setMemstoreTS(w.getWriteNumber());
size = this.memstoreSize.addAndGet(store.add(kv));
}
}
flush = isFlushSize(size);
} finally { } finally {
if (w != null) rwcc.completeMemstoreInsert(w);
this.updatesLock.readLock().unlock(); this.updatesLock.readLock().unlock();
} }
if (flush) { if (flush) {
@ -1686,6 +1778,70 @@ public class HRegion implements HeapSize { // , Writable{
} }
} }
/**
* Atomically apply the given map of family->edits to the memstore.
* This handles the consistency control on its own, but the caller
* should already have locked updatesLock.readLock(). This also does
* <b>not</b> check the families for validity.
*
* @return the additional memory usage of the memstore caused by the
* new entries.
*/
private long applyFamilyMapToMemstore(Map<byte[], List<KeyValue>> familyMap) {
ReadWriteConsistencyControl.WriteEntry w = null;
long size = 0;
try {
w = rwcc.beginMemstoreInsert();
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
List<KeyValue> edits = e.getValue();
Store store = getStore(family);
for (KeyValue kv: edits) {
kv.setMemstoreTS(w.getWriteNumber());
size += store.add(kv);
}
}
} finally {
rwcc.completeMemstoreInsert(w);
}
return size;
}
/**
* Check the collection of families for validity.
* @throws NoSuchColumnFamilyException if a family does not exist.
*/
private void checkFamilies(Collection<byte[]> families)
throws NoSuchColumnFamilyException {
for (byte[] family : families) {
checkFamily(family);
}
}
/**
* Append the given map of family->edits to a WALEdit data structure.
* Also updates the timestamps of the edits where they have not
* been specified by the user. This does not write to the HLog itself.
* @param familyMap map of family->edits
* @param byteNow timestamp to use when unspecified
* @param walEdit the destination entry to append into
*/
private void addFamilyMapToWALEdit(Map<byte[], List<KeyValue>> familyMap,
byte[] byteNow, WALEdit walEdit) {
for (List<KeyValue> edits : familyMap.values()) {
// update timestamp on keys if required.
if (updateKeys(edits, byteNow)) {
// bunch up all edits across all column families into a
// single WALEdit.
for (KeyValue kv : edits) {
walEdit.add(kv);
}
}
}
}
private void requestFlush() { private void requestFlush() {
if (this.flushListener == null) { if (this.flushListener == null) {
return; return;
@ -1776,6 +1932,27 @@ public class HRegion implements HeapSize { // , Writable{
* @return The id of the held lock. * @return The id of the held lock.
*/ */
public Integer obtainRowLock(final byte [] row) throws IOException { public Integer obtainRowLock(final byte [] row) throws IOException {
return internalObtainRowLock(row, true);
}
/**
* Tries to obtain a row lock on the given row, but does not block if the
* row lock is not available. If the lock is not available, returns false.
* Otherwise behaves the same as the above method.
* @see HRegion#obtainRowLock(byte[])
*/
public Integer tryObtainRowLock(final byte[] row) throws IOException {
return internalObtainRowLock(row, false);
}
/**
* Obtains or tries to obtain the given row lock.
* @param waitForLock if true, will block until the lock is available.
* Otherwise, just tries to obtain the lock and returns
* null if unavailable.
*/
private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
throws IOException {
checkRow(row); checkRow(row);
splitsAndClosesLock.readLock().lock(); splitsAndClosesLock.readLock().lock();
try { try {
@ -1784,6 +1961,9 @@ public class HRegion implements HeapSize { // , Writable{
} }
synchronized (lockedRows) { synchronized (lockedRows) {
while (lockedRows.contains(row)) { while (lockedRows.contains(row)) {
if (!waitForLock) {
return null;
}
try { try {
lockedRows.wait(); lockedRows.wait();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
@ -1815,7 +1995,7 @@ public class HRegion implements HeapSize { // , Writable{
splitsAndClosesLock.readLock().unlock(); splitsAndClosesLock.readLock().unlock();
} }
} }
/** /**
* Used by unit tests. * Used by unit tests.
* @param lockid * @param lockid
@ -1844,7 +2024,7 @@ public class HRegion implements HeapSize { // , Writable{
* @param lockid * @param lockid
* @return boolean * @return boolean
*/ */
private boolean isRowLocked(final Integer lockid) { boolean isRowLocked(final Integer lockid) {
synchronized (lockedRows) { synchronized (lockedRows) {
if (lockIds.get(lockid) != null) { if (lockIds.get(lockid) != null) {
return true; return true;
@ -1856,14 +2036,17 @@ public class HRegion implements HeapSize { // , Writable{
/** /**
* Returns existing row lock if found, otherwise * Returns existing row lock if found, otherwise
* obtains a new row lock and returns it. * obtains a new row lock and returns it.
* @param lockid * @param lockid requested by the user, or null if the user didn't already hold lock
* @return lockid * @param row the row to lock
* @param waitForLock if true, will block until the lock is available, otherwise will
* simply return null if it could not acquire the lock.
* @return lockid or null if waitForLock is false and the lock was unavailable.
*/ */
private Integer getLock(Integer lockid, byte [] row) private Integer getLock(Integer lockid, byte [] row, boolean waitForLock)
throws IOException { throws IOException {
Integer lid = null; Integer lid = null;
if (lockid == null) { if (lockid == null) {
lid = obtainRowLock(row); lid = internalObtainRowLock(row, waitForLock);
} else { } else {
if (!isRowLocked(lockid)) { if (!isRowLocked(lockid)) {
throw new IOException("Invalid row lock"); throw new IOException("Invalid row lock");

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.UnknownRowLockException; import org.apache.hadoop.hbase.UnknownRowLockException;
import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HMsg.Type; import org.apache.hadoop.hbase.HMsg.Type;
import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
@ -97,6 +98,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.InfoServer; import org.apache.hadoop.hbase.util.InfoServer;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
@ -1678,34 +1680,33 @@ public class HRegionServer implements HRegionInterface,
public int put(final byte[] regionName, final List<Put> puts) public int put(final byte[] regionName, final List<Put> puts)
throws IOException { throws IOException {
// Count of Puts processed.
int i = 0;
checkOpen(); checkOpen();
HRegion region = null; HRegion region = null;
boolean writeToWAL = true;
try { try {
region = getRegion(regionName); region = getRegion(regionName);
if (!region.getRegionInfo().isMetaTable()) { if (!region.getRegionInfo().isMetaTable()) {
this.cacheFlusher.reclaimMemStoreMemory(); this.cacheFlusher.reclaimMemStoreMemory();
} }
for (Put put: puts) {
this.requestCount.incrementAndGet(); @SuppressWarnings("unchecked")
Integer lock = getLockFromId(put.getLockId()); Pair<Put, Integer>[] putsWithLocks = new Pair[puts.size()];
writeToWAL &= put.getWriteToWAL();
region.put(put, lock); int i = 0;
for (Put p : puts) {
Integer lock = getLockFromId(p.getLockId());
putsWithLocks[i++] = new Pair<Put, Integer>(p, lock);
} }
} catch (WrongRegionException ex) { this.requestCount.addAndGet(puts.size());
LOG.debug("Batch puts: " + i, ex); OperationStatusCode[] codes = region.put(putsWithLocks);
return i; for (i = 0; i < codes.length; i++) {
} catch (NotServingRegionException ex) { if (codes[i] != OperationStatusCode.SUCCESS)
LOG.debug("Batch puts interrupted at index=" + i + " because:" + return i;
ex.getMessage()); }
return i; return -1;
} catch (Throwable t) { } catch (Throwable t) {
throw convertThrowableToIOE(cleanup(t)); throw convertThrowableToIOE(cleanup(t));
} }
return -1;
} }
private boolean checkAndMutate(final byte[] regionName, final byte [] row, private boolean checkAndMutate(final byte[] regionName, final byte [] row,

View File

@ -806,6 +806,8 @@ public class HLog implements Syncable {
public void append(HRegionInfo info, byte [] tableName, WALEdit edits, public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
final long now) final long now)
throws IOException { throws IOException {
if (edits.isEmpty()) return;
byte[] regionName = info.getRegionName(); byte[] regionName = info.getRegionName();
if (this.closed) { if (this.closed) {
throw new IOException("Cannot append; log is closed"); throw new IOException("Cannot append; log is closed");

View File

@ -81,18 +81,23 @@ public abstract class MultithreadedTestUtil {
threadDoneCount++; threadDoneCount++;
} }
public void stop() throws InterruptedException { public void stop() throws Exception {
synchronized (this) { synchronized (this) {
stopped = true; stopped = true;
} }
for (TestThread t : testThreads) { for (TestThread t : testThreads) {
t.join(); t.join();
} }
checkException();
} }
} }
/**
* A thread that can be added to a test context, and properly
* passes exceptions through.
*/
public static abstract class TestThread extends Thread { public static abstract class TestThread extends Thread {
final TestContext ctx; protected final TestContext ctx;
protected boolean stopped; protected boolean stopped;
public TestThread(TestContext ctx) { public TestThread(TestContext ctx) {
@ -101,19 +106,34 @@ public abstract class MultithreadedTestUtil {
public void run() { public void run() {
try { try {
while (ctx.shouldRun() && !stopped) { doWork();
doAnAction();
}
} catch (Throwable t) { } catch (Throwable t) {
ctx.threadFailed(t); ctx.threadFailed(t);
} }
ctx.threadDone(); ctx.threadDone();
} }
public abstract void doWork() throws Exception;
protected void stopTestThread() { protected void stopTestThread() {
this.stopped = true; this.stopped = true;
} }
}
/**
* A test thread that performs a repeating operation.
*/
public static abstract class RepeatingTestThread extends TestThread {
public RepeatingTestThread(TestContext ctx) {
super(ctx);
}
public final void doWork() throws Exception {
while (ctx.shouldRun() && !stopped) {
doAnAction();
}
}
public abstract void doAnAction() throws Exception; public abstract void doAnAction() throws Exception;
} }
} }

View File

@ -28,7 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
@ -65,7 +65,7 @@ public class TestAcidGuarantees {
util = new HBaseTestingUtility(); util = new HBaseTestingUtility();
} }
public static class AtomicityWriter extends TestThread { public static class AtomicityWriter extends RepeatingTestThread {
Random rand = new Random(); Random rand = new Random();
byte data[] = new byte[10]; byte data[] = new byte[10];
byte targetRow[]; byte targetRow[];
@ -95,7 +95,7 @@ public class TestAcidGuarantees {
} }
} }
public static class AtomicityReader extends TestThread { public static class AtomicityReader extends RepeatingTestThread {
byte targetRow[]; byte targetRow[];
byte targetFamilies[][]; byte targetFamilies[][];
HTable table; HTable table;

View File

@ -27,9 +27,12 @@ import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
@ -44,11 +47,16 @@ import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -58,6 +66,7 @@ import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/** /**
@ -328,6 +337,102 @@ public class TestHRegion extends HBaseTestCase {
assertTrue(exception); assertTrue(exception);
} }
@SuppressWarnings("unchecked")
public void testBatchPut() throws Exception {
byte[] b = Bytes.toBytes(getName());
byte[] cf = Bytes.toBytes("cf");
byte[] qual = Bytes.toBytes("qual");
byte[] val = Bytes.toBytes("val");
initHRegion(b, getName(), cf);
assertEquals(0, HLog.getSyncOps());
LOG.info("First a batch put with all valid puts");
final Put[] puts = new Put[10];
for (int i = 0; i < 10; i++) {
puts[i] = new Put(Bytes.toBytes("row_" + i));
puts[i].add(cf, qual, val);
}
OperationStatusCode[] codes = this.region.put(puts);
assertEquals(10, codes.length);
for (int i = 0; i < 10; i++) {
assertEquals(OperationStatusCode.SUCCESS, codes[i]);
}
assertEquals(1, HLog.getSyncOps());
LOG.info("Next a batch put with one invalid family");
puts[5].add(Bytes.toBytes("BAD_CF"), qual, val);
codes = this.region.put(puts);
assertEquals(10, codes.length);
for (int i = 0; i < 10; i++) {
assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY :
OperationStatusCode.SUCCESS, codes[i]);
}
assertEquals(1, HLog.getSyncOps());
LOG.info("Next a batch put that has to break into two batches to avoid a lock");
Integer lockedRow = region.obtainRowLock(Bytes.toBytes("row_2"));
MultithreadedTestUtil.TestContext ctx =
new MultithreadedTestUtil.TestContext(HBaseConfiguration.create());
final AtomicReference<OperationStatusCode[]> retFromThread =
new AtomicReference<OperationStatusCode[]>();
TestThread putter = new TestThread(ctx) {
@Override
public void doWork() throws IOException {
retFromThread.set(region.put(puts));
}
};
LOG.info("...starting put thread while holding lock");
ctx.addThread(putter);
ctx.startThreads();
LOG.info("...waiting for put thread to sync first time");
long startWait = System.currentTimeMillis();
while (HLog.getSyncOps() == 0) {
Thread.sleep(100);
if (System.currentTimeMillis() - startWait > 10000) {
fail("Timed out waiting for thread to sync first minibatch");
}
}
LOG.info("...releasing row lock, which should let put thread continue");
region.releaseRowLock(lockedRow);
LOG.info("...joining on thread");
ctx.stop();
LOG.info("...checking that next batch was synced");
assertEquals(1, HLog.getSyncOps());
codes = retFromThread.get();
for (int i = 0; i < 10; i++) {
assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY :
OperationStatusCode.SUCCESS, codes[i]);
}
LOG.info("Nexta, a batch put which uses an already-held lock");
lockedRow = region.obtainRowLock(Bytes.toBytes("row_2"));
LOG.info("...obtained row lock");
List<Pair<Put, Integer>> putsAndLocks = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
Pair<Put, Integer> pair = new Pair<Put, Integer>(puts[i], null);
if (i == 2) pair.setSecond(lockedRow);
putsAndLocks.add(pair);
}
codes = region.put(putsAndLocks.toArray(new Pair[0]));
LOG.info("...performed put");
for (int i = 0; i < 10; i++) {
assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY :
OperationStatusCode.SUCCESS, codes[i]);
}
// Make sure we didn't do an extra batch
assertEquals(1, HLog.getSyncOps());
// Make sure we still hold lock
assertTrue(region.isRowLocked(lockedRow));
LOG.info("...releasing lock");
region.releaseRowLock(lockedRow);
}
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// checkAndMutate tests // checkAndMutate tests
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////