From 18a53dfd6b3c70fad26423e5b12d36f8d71dcffc Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Sun, 13 Jun 2010 18:54:25 +0000 Subject: [PATCH] HBASE-2353. Batch puts should sync HLog as few times as possible git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@954285 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../org/apache/hadoop/hbase/HConstants.java | 10 +- .../hadoop/hbase/regionserver/HRegion.java | 369 +++++++++++++----- .../hbase/regionserver/HRegionServer.java | 35 +- .../hadoop/hbase/regionserver/wal/HLog.java | 2 + .../hadoop/hbase/MultithreadedTestUtil.java | 32 +- .../hadoop/hbase/TestAcidGuarantees.java | 6 +- .../hbase/regionserver/TestHRegion.java | 105 +++++ 8 files changed, 440 insertions(+), 120 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 1e481d08941..cb97069cc92 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -693,6 +693,7 @@ Release 0.21.0 - Unreleased (Lars Francke via Stack) HBASE-2468 Improvements to prewarm META cache on clients (Mingjie Lai via Stack) + HBASE-2353 Batch puts should sync HLog as few times as possible NEW FEATURES HBASE-1961 HBase EC2 scripts diff --git a/src/main/java/org/apache/hadoop/hbase/HConstants.java b/src/main/java/org/apache/hadoop/hbase/HConstants.java index 1e59533af7c..f5d3e949c0f 100644 --- a/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -26,6 +26,15 @@ import org.apache.hadoop.hbase.util.Bytes; * HConstants holds a bunch of HBase-related constants */ public final class HConstants { + /** + * Status codes used for return values of bulk operations. + */ + public enum OperationStatusCode { + NOT_RUN, + SUCCESS, + BAD_FAMILY, + FAILURE; + } /** long constant for zero */ public static final Long ZERO_L = Long.valueOf(0L); @@ -342,5 +351,4 @@ public final class HConstants { private HConstants() { // Can't be instantiated with this ctor. } - } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index adc505b485c..06e022cb410 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -54,16 +55,20 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.StringUtils; +import com.google.common.collect.Lists; + import java.io.IOException; import java.io.UnsupportedEncodingException; import java.lang.reflect.Constructor; import java.util.AbstractList; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -1240,7 +1245,7 @@ public class HRegion implements HeapSize { // , Writable{ try { byte [] row = delete.getRow(); // If we did not pass an existing row lock, obtain a new one - lid = getLock(lockid, row); + lid = getLock(lockid, row, true); // All edits for the given row (across all column families) must happen atomically. prepareDelete(delete); @@ -1265,7 +1270,6 @@ public class HRegion implements HeapSize { // , Writable{ boolean flush = false; updatesLock.readLock().lock(); - ReadWriteConsistencyControl.WriteEntry w = null; try { @@ -1275,7 +1279,6 @@ public class HRegion implements HeapSize { // , Writable{ List kvs = e.getValue(); Map kvCount = new TreeMap(Bytes.BYTES_COMPARATOR); - Store store = getStore(family); for (KeyValue kv: kvs) { // Check if time is LATEST, change to time of most recent addition if so // This is expensive. @@ -1315,50 +1318,24 @@ public class HRegion implements HeapSize { // , Writable{ } if (writeToWAL) { - // // write/sync to WAL should happen before we touch memstore. // // If order is reversed, i.e. we write to memstore first, and // for some reason fail to write/sync to commit log, the memstore // will contain uncommitted transactions. // - // bunch up all edits across all column families into a // single WALEdit. WALEdit walEdit = new WALEdit(); - for (Map.Entry> e : familyMap.entrySet()) { - List kvs = e.getValue(); - for (KeyValue kv : kvs) { - walEdit.add(kv); - } - } - // append the edit to WAL. The append also does the sync. - if (!walEdit.isEmpty()) { - this.log.append(regionInfo, regionInfo.getTableDesc().getName(), + addFamilyMapToWALEdit(familyMap, byteNow, walEdit); + this.log.append(regionInfo, regionInfo.getTableDesc().getName(), walEdit, now); - } } // Now make changes to the memstore. - - long size = 0; - w = rwcc.beginMemstoreInsert(); - - for (Map.Entry> e : familyMap.entrySet()) { - - byte[] family = e.getKey(); - List kvs = e.getValue(); - - Store store = getStore(family); - for (KeyValue kv: kvs) { - kv.setMemstoreTS(w.getWriteNumber()); - size = this.memstoreSize.addAndGet(store.delete(kv)); - } - } - flush = isFlushSize(size); + long addedSize = applyFamilyMapToMemstore(familyMap); + flush = isFlushSize(memstoreSize.addAndGet(addedSize)); } finally { - if (w != null) rwcc.completeMemstoreInsert(w); - this.updatesLock.readLock().unlock(); } @@ -1419,7 +1396,7 @@ public class HRegion implements HeapSize { // , Writable{ // invokes a HRegion#abort. byte [] row = put.getRow(); // If we did not pass an existing row lock, obtain a new one - Integer lid = getLock(lockid, row); + Integer lid = getLock(lockid, row, true); try { // All edits for the given row (across all column families) must happen atomically. @@ -1432,6 +1409,162 @@ public class HRegion implements HeapSize { // , Writable{ } } + /** + * Struct-like class that tracks the progress of a batch operation, + * accumulating status codes and tracking the index at which processing + * is proceeding. + */ + private static class BatchOperationInProgress { + T[] operations; + OperationStatusCode[] retCodes; + int nextIndexToProcess = 0; + + public BatchOperationInProgress(T[] operations) { + this.operations = operations; + retCodes = new OperationStatusCode[operations.length]; + Arrays.fill(retCodes, OperationStatusCode.NOT_RUN); + } + + public boolean isDone() { + return nextIndexToProcess == operations.length; + } + } + + /** + * Perform a batch put with no pre-specified locks + * @see HRegion#put(Pair[]) + */ + public OperationStatusCode[] put(Put[] puts) throws IOException { + @SuppressWarnings("unchecked") + Pair putsAndLocks[] = new Pair[puts.length]; + + for (int i = 0; i < puts.length; i++) { + putsAndLocks[i] = new Pair(puts[i], null); + } + return put(putsAndLocks); + } + + /** + * Perform a batch of puts. + * @param putsAndLocks the list of puts paired with their requested lock IDs. + * @throws IOException + */ + public OperationStatusCode[] put(Pair[] putsAndLocks) throws IOException { + BatchOperationInProgress> batchOp = + new BatchOperationInProgress>(putsAndLocks); + + while (!batchOp.isDone()) { + checkReadOnly(); + checkResources(); + + long newSize; + splitsAndClosesLock.readLock().lock(); + try { + long addedSize = doMiniBatchPut(batchOp); + newSize = memstoreSize.addAndGet(addedSize); + } finally { + splitsAndClosesLock.readLock().unlock(); + } + if (isFlushSize(newSize)) { + requestFlush(); + } + } + return batchOp.retCodes; + } + + private long doMiniBatchPut(BatchOperationInProgress> batchOp) throws IOException { + long now = EnvironmentEdgeManager.currentTimeMillis(); + byte[] byteNow = Bytes.toBytes(now); + + /** Keep track of the locks we hold so we can release them in finally clause */ + List acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); + // We try to set up a batch in the range [firstIndex,lastIndexExclusive) + int firstIndex = batchOp.nextIndexToProcess; + int lastIndexExclusive = firstIndex; + boolean success = false; + try { + // ------------------------------------ + // STEP 1. Try to acquire as many locks as we can, and ensure + // we acquire at least one. + // ---------------------------------- + int numReadyToWrite = 0; + while (lastIndexExclusive < batchOp.operations.length) { + Pair nextPair = batchOp.operations[lastIndexExclusive]; + Put put = nextPair.getFirst(); + Integer providedLockId = nextPair.getSecond(); + + // Check the families in the put. If bad, skip this one. + try { + checkFamilies(put.getFamilyMap().keySet()); + } catch (NoSuchColumnFamilyException nscf) { + LOG.warn("No such column family in batch put", nscf); + batchOp.retCodes[lastIndexExclusive] = OperationStatusCode.BAD_FAMILY; + lastIndexExclusive++; + continue; + } + + // If we haven't got any rows in our batch, we should block to + // get the next one. + boolean shouldBlock = numReadyToWrite == 0; + Integer acquiredLockId = getLock(providedLockId, put.getRow(), shouldBlock); + if (acquiredLockId == null) { + // We failed to grab another lock + assert !shouldBlock : "Should never fail to get lock when blocking"; + break; // stop acquiring more rows for this batch + } + if (providedLockId == null) { + acquiredLocks.add(acquiredLockId); + } + lastIndexExclusive++; + numReadyToWrite++; + } + // We've now grabbed as many puts off the list as we can + assert numReadyToWrite > 0; + + // ------------------------------------ + // STEP 2. Write to WAL + // ---------------------------------- + WALEdit walEdit = new WALEdit(); + for (int i = firstIndex; i < lastIndexExclusive; i++) { + // Skip puts that were determined to be invalid during preprocessing + if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue; + + Put p = batchOp.operations[i].getFirst(); + if (!p.getWriteToWAL()) continue; + addFamilyMapToWALEdit(p.getFamilyMap(), byteNow, walEdit); + } + + // Append the edit to WAL + this.log.append(regionInfo, regionInfo.getTableDesc().getName(), + walEdit, now); + + // ------------------------------------ + // STEP 3. Write back to memstore + // ---------------------------------- + long addedSize = 0; + for (int i = firstIndex; i < lastIndexExclusive; i++) { + if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue; + + Put p = batchOp.operations[i].getFirst(); + addedSize += applyFamilyMapToMemstore(p.getFamilyMap()); + batchOp.retCodes[i] = OperationStatusCode.SUCCESS; + } + success = true; + return addedSize; + } finally { + for (Integer toRelease : acquiredLocks) { + releaseRowLock(toRelease); + } + if (!success) { + for (int i = firstIndex; i < lastIndexExclusive; i++) { + if (batchOp.retCodes[i] == OperationStatusCode.NOT_RUN) { + batchOp.retCodes[i] = OperationStatusCode.FAILURE; + } + } + } + batchOp.nextIndexToProcess = lastIndexExclusive; + } + } //TODO, Think that gets/puts and deletes should be refactored a bit so that //the getting of the lock happens before, so that you would just pass it into @@ -1467,7 +1600,7 @@ public class HRegion implements HeapSize { // , Writable{ get.addColumn(family, qualifier); // Lock row - Integer lid = getLock(lockId, get.getRow()); + Integer lid = getLock(lockId, get.getRow(), true); List result = new ArrayList(); try { result = get(get); @@ -1619,65 +1752,24 @@ public class HRegion implements HeapSize { // , Writable{ byte[] byteNow = Bytes.toBytes(now); boolean flush = false; this.updatesLock.readLock().lock(); - ReadWriteConsistencyControl.WriteEntry w = null; try { - WALEdit walEdit = new WALEdit(); - - // check if column families are valid; - // check if any timestampupdates are needed; - // and if writeToWAL is set, then also collapse edits into a single list. - for (Map.Entry> e: familyMap.entrySet()) { - List edits = e.getValue(); - byte[] family = e.getKey(); - - // is this a valid column family? - checkFamily(family); - - // update timestamp on keys if required. - if (updateKeys(edits, byteNow)) { - if (writeToWAL) { - // bunch up all edits across all column families into a - // single WALEdit. - for (KeyValue kv : edits) { - walEdit.add(kv); - } - } - } - } - - // append to and sync WAL - if (!walEdit.isEmpty()) { - // - // write/sync to WAL should happen before we touch memstore. - // - // If order is reversed, i.e. we write to memstore first, and - // for some reason fail to write/sync to commit log, the memstore - // will contain uncommitted transactions. - // + checkFamilies(familyMap.keySet()); + // write/sync to WAL should happen before we touch memstore. + // + // If order is reversed, i.e. we write to memstore first, and + // for some reason fail to write/sync to commit log, the memstore + // will contain uncommitted transactions. + if (writeToWAL) { + WALEdit walEdit = new WALEdit(); + addFamilyMapToWALEdit(familyMap, byteNow, walEdit); this.log.append(regionInfo, regionInfo.getTableDesc().getName(), walEdit, now); } - long size = 0; - - w = rwcc.beginMemstoreInsert(); - - // now make changes to the memstore - for (Map.Entry> e : familyMap.entrySet()) { - byte[] family = e.getKey(); - List edits = e.getValue(); - - Store store = getStore(family); - for (KeyValue kv: edits) { - kv.setMemstoreTS(w.getWriteNumber()); - size = this.memstoreSize.addAndGet(store.add(kv)); - } - } - flush = isFlushSize(size); + long addedSize = applyFamilyMapToMemstore(familyMap); + flush = isFlushSize(memstoreSize.addAndGet(addedSize)); } finally { - if (w != null) rwcc.completeMemstoreInsert(w); - this.updatesLock.readLock().unlock(); } if (flush) { @@ -1686,6 +1778,70 @@ public class HRegion implements HeapSize { // , Writable{ } } + /** + * Atomically apply the given map of family->edits to the memstore. + * This handles the consistency control on its own, but the caller + * should already have locked updatesLock.readLock(). This also does + * not check the families for validity. + * + * @return the additional memory usage of the memstore caused by the + * new entries. + */ + private long applyFamilyMapToMemstore(Map> familyMap) { + ReadWriteConsistencyControl.WriteEntry w = null; + long size = 0; + try { + w = rwcc.beginMemstoreInsert(); + + for (Map.Entry> e : familyMap.entrySet()) { + byte[] family = e.getKey(); + List edits = e.getValue(); + + Store store = getStore(family); + for (KeyValue kv: edits) { + kv.setMemstoreTS(w.getWriteNumber()); + size += store.add(kv); + } + } + } finally { + rwcc.completeMemstoreInsert(w); + } + return size; + } + + /** + * Check the collection of families for validity. + * @throws NoSuchColumnFamilyException if a family does not exist. + */ + private void checkFamilies(Collection families) + throws NoSuchColumnFamilyException { + for (byte[] family : families) { + checkFamily(family); + } + } + + /** + * Append the given map of family->edits to a WALEdit data structure. + * Also updates the timestamps of the edits where they have not + * been specified by the user. This does not write to the HLog itself. + * @param familyMap map of family->edits + * @param byteNow timestamp to use when unspecified + * @param walEdit the destination entry to append into + */ + private void addFamilyMapToWALEdit(Map> familyMap, + byte[] byteNow, WALEdit walEdit) { + for (List edits : familyMap.values()) { + // update timestamp on keys if required. + if (updateKeys(edits, byteNow)) { + // bunch up all edits across all column families into a + // single WALEdit. + for (KeyValue kv : edits) { + walEdit.add(kv); + } + } + } + } + private void requestFlush() { if (this.flushListener == null) { return; @@ -1776,6 +1932,27 @@ public class HRegion implements HeapSize { // , Writable{ * @return The id of the held lock. */ public Integer obtainRowLock(final byte [] row) throws IOException { + return internalObtainRowLock(row, true); + } + + /** + * Tries to obtain a row lock on the given row, but does not block if the + * row lock is not available. If the lock is not available, returns false. + * Otherwise behaves the same as the above method. + * @see HRegion#obtainRowLock(byte[]) + */ + public Integer tryObtainRowLock(final byte[] row) throws IOException { + return internalObtainRowLock(row, false); + } + + /** + * Obtains or tries to obtain the given row lock. + * @param waitForLock if true, will block until the lock is available. + * Otherwise, just tries to obtain the lock and returns + * null if unavailable. + */ + private Integer internalObtainRowLock(final byte[] row, boolean waitForLock) + throws IOException { checkRow(row); splitsAndClosesLock.readLock().lock(); try { @@ -1784,6 +1961,9 @@ public class HRegion implements HeapSize { // , Writable{ } synchronized (lockedRows) { while (lockedRows.contains(row)) { + if (!waitForLock) { + return null; + } try { lockedRows.wait(); } catch (InterruptedException ie) { @@ -1815,7 +1995,7 @@ public class HRegion implements HeapSize { // , Writable{ splitsAndClosesLock.readLock().unlock(); } } - + /** * Used by unit tests. * @param lockid @@ -1844,7 +2024,7 @@ public class HRegion implements HeapSize { // , Writable{ * @param lockid * @return boolean */ - private boolean isRowLocked(final Integer lockid) { + boolean isRowLocked(final Integer lockid) { synchronized (lockedRows) { if (lockIds.get(lockid) != null) { return true; @@ -1856,14 +2036,17 @@ public class HRegion implements HeapSize { // , Writable{ /** * Returns existing row lock if found, otherwise * obtains a new row lock and returns it. - * @param lockid - * @return lockid + * @param lockid requested by the user, or null if the user didn't already hold lock + * @param row the row to lock + * @param waitForLock if true, will block until the lock is available, otherwise will + * simply return null if it could not acquire the lock. + * @return lockid or null if waitForLock is false and the lock was unavailable. */ - private Integer getLock(Integer lockid, byte [] row) + private Integer getLock(Integer lockid, byte [] row, boolean waitForLock) throws IOException { Integer lid = null; if (lockid == null) { - lid = obtainRowLock(row); + lid = internalObtainRowLock(row, waitForLock); } else { if (!isRowLocked(lockid)) { throw new IOException("Invalid row lock"); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index efdc0d70622..bca819e2b10 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.UnknownRowLockException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.YouAreDeadException; +import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HMsg.Type; import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.client.Delete; @@ -97,6 +98,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.InfoServer; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; @@ -1678,34 +1680,33 @@ public class HRegionServer implements HRegionInterface, public int put(final byte[] regionName, final List puts) throws IOException { - // Count of Puts processed. - int i = 0; checkOpen(); HRegion region = null; - boolean writeToWAL = true; try { region = getRegion(regionName); if (!region.getRegionInfo().isMetaTable()) { this.cacheFlusher.reclaimMemStoreMemory(); } - for (Put put: puts) { - this.requestCount.incrementAndGet(); - Integer lock = getLockFromId(put.getLockId()); - writeToWAL &= put.getWriteToWAL(); - region.put(put, lock); + + @SuppressWarnings("unchecked") + Pair[] putsWithLocks = new Pair[puts.size()]; + + int i = 0; + for (Put p : puts) { + Integer lock = getLockFromId(p.getLockId()); + putsWithLocks[i++] = new Pair(p, lock); } - - } catch (WrongRegionException ex) { - LOG.debug("Batch puts: " + i, ex); - return i; - } catch (NotServingRegionException ex) { - LOG.debug("Batch puts interrupted at index=" + i + " because:" + - ex.getMessage()); - return i; + + this.requestCount.addAndGet(puts.size()); + OperationStatusCode[] codes = region.put(putsWithLocks); + for (i = 0; i < codes.length; i++) { + if (codes[i] != OperationStatusCode.SUCCESS) + return i; + } + return -1; } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); } - return -1; } private boolean checkAndMutate(final byte[] regionName, final byte [] row, diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index ba0482064bc..05cf17f8e10 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -806,6 +806,8 @@ public class HLog implements Syncable { public void append(HRegionInfo info, byte [] tableName, WALEdit edits, final long now) throws IOException { + if (edits.isEmpty()) return; + byte[] regionName = info.getRegionName(); if (this.closed) { throw new IOException("Cannot append; log is closed"); diff --git a/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java b/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java index 870f925f65a..7c062d7ffbb 100644 --- a/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java +++ b/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java @@ -81,18 +81,23 @@ public abstract class MultithreadedTestUtil { threadDoneCount++; } - public void stop() throws InterruptedException { + public void stop() throws Exception { synchronized (this) { stopped = true; } for (TestThread t : testThreads) { t.join(); } + checkException(); } } + /** + * A thread that can be added to a test context, and properly + * passes exceptions through. + */ public static abstract class TestThread extends Thread { - final TestContext ctx; + protected final TestContext ctx; protected boolean stopped; public TestThread(TestContext ctx) { @@ -101,19 +106,34 @@ public abstract class MultithreadedTestUtil { public void run() { try { - while (ctx.shouldRun() && !stopped) { - doAnAction(); - } + doWork(); } catch (Throwable t) { ctx.threadFailed(t); } ctx.threadDone(); } + public abstract void doWork() throws Exception; + protected void stopTestThread() { this.stopped = true; } - + } + + /** + * A test thread that performs a repeating operation. + */ + public static abstract class RepeatingTestThread extends TestThread { + public RepeatingTestThread(TestContext ctx) { + super(ctx); + } + + public final void doWork() throws Exception { + while (ctx.shouldRun() && !stopped) { + doAnAction(); + } + } + public abstract void doAnAction() throws Exception; } } diff --git a/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java b/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java index 8a5206c8a5a..75f3c8b4dc4 100644 --- a/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java +++ b/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java @@ -28,7 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; -import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; +import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; @@ -65,7 +65,7 @@ public class TestAcidGuarantees { util = new HBaseTestingUtility(); } - public static class AtomicityWriter extends TestThread { + public static class AtomicityWriter extends RepeatingTestThread { Random rand = new Random(); byte data[] = new byte[10]; byte targetRow[]; @@ -95,7 +95,7 @@ public class TestAcidGuarantees { } } - public static class AtomicityReader extends TestThread { + public static class AtomicityReader extends RepeatingTestThread { byte targetRow[]; byte targetFamilies[][]; HTable table; diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index e75767d0abd..1c1cd4be200 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -27,9 +27,12 @@ import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MultithreadedTestUtil; +import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -44,11 +47,16 @@ import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner; +import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -58,6 +66,7 @@ import java.util.Map; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; /** @@ -328,6 +337,102 @@ public class TestHRegion extends HBaseTestCase { assertTrue(exception); } + @SuppressWarnings("unchecked") + public void testBatchPut() throws Exception { + byte[] b = Bytes.toBytes(getName()); + byte[] cf = Bytes.toBytes("cf"); + byte[] qual = Bytes.toBytes("qual"); + byte[] val = Bytes.toBytes("val"); + initHRegion(b, getName(), cf); + + assertEquals(0, HLog.getSyncOps()); + + LOG.info("First a batch put with all valid puts"); + final Put[] puts = new Put[10]; + for (int i = 0; i < 10; i++) { + puts[i] = new Put(Bytes.toBytes("row_" + i)); + puts[i].add(cf, qual, val); + } + + OperationStatusCode[] codes = this.region.put(puts); + assertEquals(10, codes.length); + for (int i = 0; i < 10; i++) { + assertEquals(OperationStatusCode.SUCCESS, codes[i]); + } + assertEquals(1, HLog.getSyncOps()); + + LOG.info("Next a batch put with one invalid family"); + puts[5].add(Bytes.toBytes("BAD_CF"), qual, val); + codes = this.region.put(puts); + assertEquals(10, codes.length); + for (int i = 0; i < 10; i++) { + assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : + OperationStatusCode.SUCCESS, codes[i]); + } + assertEquals(1, HLog.getSyncOps()); + + LOG.info("Next a batch put that has to break into two batches to avoid a lock"); + Integer lockedRow = region.obtainRowLock(Bytes.toBytes("row_2")); + + MultithreadedTestUtil.TestContext ctx = + new MultithreadedTestUtil.TestContext(HBaseConfiguration.create()); + final AtomicReference retFromThread = + new AtomicReference(); + TestThread putter = new TestThread(ctx) { + @Override + public void doWork() throws IOException { + retFromThread.set(region.put(puts)); + } + }; + LOG.info("...starting put thread while holding lock"); + ctx.addThread(putter); + ctx.startThreads(); + + LOG.info("...waiting for put thread to sync first time"); + long startWait = System.currentTimeMillis(); + while (HLog.getSyncOps() == 0) { + Thread.sleep(100); + if (System.currentTimeMillis() - startWait > 10000) { + fail("Timed out waiting for thread to sync first minibatch"); + } + } + LOG.info("...releasing row lock, which should let put thread continue"); + region.releaseRowLock(lockedRow); + LOG.info("...joining on thread"); + ctx.stop(); + LOG.info("...checking that next batch was synced"); + assertEquals(1, HLog.getSyncOps()); + codes = retFromThread.get(); + for (int i = 0; i < 10; i++) { + assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : + OperationStatusCode.SUCCESS, codes[i]); + } + + LOG.info("Nexta, a batch put which uses an already-held lock"); + lockedRow = region.obtainRowLock(Bytes.toBytes("row_2")); + LOG.info("...obtained row lock"); + List> putsAndLocks = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + Pair pair = new Pair(puts[i], null); + if (i == 2) pair.setSecond(lockedRow); + putsAndLocks.add(pair); + } + + codes = region.put(putsAndLocks.toArray(new Pair[0])); + LOG.info("...performed put"); + for (int i = 0; i < 10; i++) { + assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : + OperationStatusCode.SUCCESS, codes[i]); + } + // Make sure we didn't do an extra batch + assertEquals(1, HLog.getSyncOps()); + + // Make sure we still hold lock + assertTrue(region.isRowLocked(lockedRow)); + LOG.info("...releasing lock"); + region.releaseRowLock(lockedRow); + } + ////////////////////////////////////////////////////////////////////////////// // checkAndMutate tests //////////////////////////////////////////////////////////////////////////////