diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java index e36ee43d1a7..b678f5529da 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java @@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.OperationStatus; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; @@ -146,20 +145,19 @@ public class BulkDeleteEndpoint extends BulkDeleteService implements Coprocessor } } if (deleteRows.size() > 0) { - Pair[] deleteWithLockArr = new Pair[deleteRows.size()]; + Mutation[] deleteArr = new Mutation[deleteRows.size()]; int i = 0; for (List deleteRow : deleteRows) { - Delete delete = createDeleteMutation(deleteRow, deleteType, timestamp); - deleteWithLockArr[i++] = new Pair(delete, null); + deleteArr[i++] = createDeleteMutation(deleteRow, deleteType, timestamp); } - OperationStatus[] opStatus = region.batchMutate(deleteWithLockArr); + OperationStatus[] opStatus = region.batchMutate(deleteArr); for (i = 0; i < opStatus.length; i++) { if (opStatus[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { break; } totalRowsDeleted++; if (deleteType == DeleteType.VERSION) { - byte[] versionsDeleted = deleteWithLockArr[i].getFirst().getAttribute( + byte[] versionsDeleted = deleteArr[i].getAttribute( NO_OF_VERSIONS_TO_DELETE); if (versionsDeleted != null) { totalVersionsDeleted += Bytes.toInt(versionsDeleted); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java index 9ea9ffda2a4..6839aefca14 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -253,12 +253,12 @@ public abstract class BaseRegionObserver implements RegionObserver { @Override public void preBatchMutate(final ObserverContext c, - final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + final MiniBatchOperationInProgress miniBatchOp) throws IOException { } @Override public void postBatchMutate(final ObserverContext c, - final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + final MiniBatchOperationInProgress miniBatchOp) throws IOException { } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 91b6eaee6aa..9f3cb8d7a82 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -554,7 +554,7 @@ public interface RegionObserver extends Coprocessor { * @throws IOException if an error occurred on the coprocessor */ void preBatchMutate(final ObserverContext c, - final MiniBatchOperationInProgress> miniBatchOp) throws IOException; + final MiniBatchOperationInProgress miniBatchOp) throws IOException; /** * This will be called after applying a batch of Mutations on a region. The Mutations are added to @@ -564,7 +564,7 @@ public interface RegionObserver extends Coprocessor { * @throws IOException if an error occurred on the coprocessor */ void postBatchMutate(final ObserverContext c, - final MiniBatchOperationInProgress> miniBatchOp) throws IOException; + final MiniBatchOperationInProgress miniBatchOp) throws IOException; /** * Called before checkAndPut diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e1bbfdf52ef..ed17c0a2aec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -34,7 +34,6 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; -import java.util.Random; import java.util.Set; import java.util.TreeMap; import java.util.UUID; @@ -138,6 +137,7 @@ import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.util.StringUtils; import org.cliffc.high_scale_lib.Counter; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -223,12 +223,13 @@ public class HRegion implements HeapSize { // , Writable{ // Members ////////////////////////////////////////////////////////////////////////////// - private final ConcurrentHashMap lockedRows = - new ConcurrentHashMap(); - private final ConcurrentHashMap lockIds = - new ConcurrentHashMap(); - private final AtomicInteger lockIdGenerator = new AtomicInteger(1); - static private Random rand = new Random(); + // map from a locked row to the context for that lock including: + // - CountDownLatch for threads waiting on that row + // - the thread that owns the lock (allow reentrancy) + // - reference count of (reentrant) locks held by the thread + // - the row itself + private final ConcurrentHashMap lockedRows = + new ConcurrentHashMap(); protected final Map stores = new ConcurrentSkipListMap( Bytes.BYTES_RAWCOMPARATOR); @@ -1764,7 +1765,7 @@ public class HRegion implements HeapSize { // , Writable{ try { delete.getRow(); // All edits for the given row (across all column families) must happen atomically. - doBatchMutate(delete, null); + doBatchMutate(delete); } finally { closeRegionOperation(); } @@ -1787,7 +1788,7 @@ public class HRegion implements HeapSize { // , Writable{ delete.setFamilyMap(familyMap); delete.setClusterId(clusterId); delete.setDurability(durability); - doBatchMutate(delete, null); + doBatchMutate(delete); } /** @@ -1862,7 +1863,7 @@ public class HRegion implements HeapSize { // , Writable{ this.writeRequestsCount.increment(); try { // All edits for the given row (across all column families) must happen atomically. - doBatchMutate(put, null); + doBatchMutate(put); } finally { closeRegionOperation(); } @@ -1892,46 +1893,29 @@ public class HRegion implements HeapSize { // , Writable{ } /** - * Perform a batch put with no pre-specified locks - * @see HRegion#batchMutate(Pair[]) + * Perform a batch of mutations. + * It supports only Put and Delete mutations and will ignore other types passed. + * @param mutations the list of mutations + * @return an array of OperationStatus which internally contains the + * OperationStatusCode and the exceptionMessage if any. + * @throws IOException */ - public OperationStatus[] put(Put[] puts) throws IOException { - @SuppressWarnings("unchecked") - Pair putsAndLocks[] = new Pair[puts.length]; - - for (int i = 0; i < puts.length; i++) { - putsAndLocks[i] = new Pair(puts[i], null); - } - return batchMutate(putsAndLocks); + public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException { + return batchMutate(mutations, false); } /** * Perform a batch of mutations. * It supports only Put and Delete mutations and will ignore other types passed. - * @param mutationsAndLocks - * the list of mutations paired with their requested lock IDs. + * @param mutations the list of mutations * @return an array of OperationStatus which internally contains the * OperationStatusCode and the exceptionMessage if any. * @throws IOException */ - public OperationStatus[] batchMutate( - Pair[] mutationsAndLocks) throws IOException { - return batchMutate(mutationsAndLocks, false); - } - - /** - * Perform a batch of mutations. - * It supports only Put and Delete mutations and will ignore other types passed. - * @param mutationsAndLocks - * the list of mutations paired with their requested lock IDs. - * @return an array of OperationStatus which internally contains the - * OperationStatusCode and the exceptionMessage if any. - * @throws IOException - */ - OperationStatus[] batchMutate(Pair[] mutationsAndLocks, boolean isReplay) + OperationStatus[] batchMutate(Mutation[] mutations, boolean isReplay) throws IOException { - BatchOperationInProgress> batchOp = - new BatchOperationInProgress>(mutationsAndLocks); + BatchOperationInProgress batchOp = + new BatchOperationInProgress(mutations); boolean initialized = false; @@ -1969,14 +1953,13 @@ public class HRegion implements HeapSize { // , Writable{ } - private void doPreMutationHook(BatchOperationInProgress> batchOp) + private void doPreMutationHook(BatchOperationInProgress batchOp) throws IOException { /* Run coprocessor pre hook outside of locks to avoid deadlock */ WALEdit walEdit = new WALEdit(); if (coprocessorHost != null) { for (int i = 0 ; i < batchOp.operations.length; i++) { - Pair nextPair = batchOp.operations[i]; - Mutation m = nextPair.getFirst(); + Mutation m = batchOp.operations[i]; if (m instanceof Put) { if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) { // pre hook says skip this Put @@ -2005,7 +1988,7 @@ public class HRegion implements HeapSize { // , Writable{ } @SuppressWarnings("unchecked") - private long doMiniBatchMutation(BatchOperationInProgress> batchOp, + private long doMiniBatchMutation(BatchOperationInProgress batchOp, boolean isInReplay) throws IOException { // variable to note if all Put items are for the same CF -- metrics related @@ -2024,7 +2007,7 @@ public class HRegion implements HeapSize { // , Writable{ boolean locked = false; /** Keep track of the locks we hold so we can release them in finally clause */ - List acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); + List acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); // reference family maps directly so coprocessors can mutate them if desired Map>[] familyMaps = new Map[batchOp.operations.length]; // We try to set up a batch in the range [firstIndex,lastIndexExclusive) @@ -2040,10 +2023,8 @@ public class HRegion implements HeapSize { // , Writable{ int numReadyToWrite = 0; long now = EnvironmentEdgeManager.currentTimeMillis(); while (lastIndexExclusive < batchOp.operations.length) { - Pair nextPair = batchOp.operations[lastIndexExclusive]; - Mutation mutation = nextPair.getFirst(); + Mutation mutation = batchOp.operations[lastIndexExclusive]; boolean isPutMutation = mutation instanceof Put; - Integer providedLockId = nextPair.getSecond(); Map> familyMap = mutation.getFamilyMap(); // store the family map reference to allow for mutations @@ -2081,25 +2062,25 @@ public class HRegion implements HeapSize { // , Writable{ lastIndexExclusive++; continue; } + // If we haven't got any rows in our batch, we should block to // get the next one. boolean shouldBlock = numReadyToWrite == 0; - Integer acquiredLockId = null; + RowLock rowLock = null; try { - acquiredLockId = getLock(providedLockId, mutation.getRow(), - shouldBlock); + rowLock = getRowLock(mutation.getRow(), shouldBlock); } catch (IOException ioe) { LOG.warn("Failed getting lock in batch put, row=" - + Bytes.toStringBinary(mutation.getRow()), ioe); + + Bytes.toStringBinary(mutation.getRow()), ioe); } - if (acquiredLockId == null) { + if (rowLock == null) { // We failed to grab another lock assert !shouldBlock : "Should never fail to get lock when blocking"; break; // stop acquiring more rows for this batch + } else { + acquiredRowLocks.add(rowLock); } - if (providedLockId == null) { - acquiredLocks.add(acquiredLockId); - } + lastIndexExclusive++; numReadyToWrite++; @@ -2141,7 +2122,7 @@ public class HRegion implements HeapSize { // , Writable{ if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) continue; - Mutation mutation = batchOp.operations[i].getFirst(); + Mutation mutation = batchOp.operations[i]; if (mutation instanceof Put) { updateKVTimestamps(familyMaps[i].values(), byteNow); noOfPuts++; @@ -2162,8 +2143,8 @@ public class HRegion implements HeapSize { // , Writable{ // calling the pre CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { - MiniBatchOperationInProgress> miniBatchOp = - new MiniBatchOperationInProgress>(batchOp.operations, + MiniBatchOperationInProgress miniBatchOp = + new MiniBatchOperationInProgress(batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L; } @@ -2198,7 +2179,7 @@ public class HRegion implements HeapSize { // , Writable{ } batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; - Mutation m = batchOp.operations[i].getFirst(); + Mutation m = batchOp.operations[i]; Durability tmpDur = getEffectiveDurability(m.getDurability()); if (tmpDur.ordinal() > durability.ordinal()) { durability = tmpDur; @@ -2221,10 +2202,10 @@ public class HRegion implements HeapSize { // , Writable{ // ------------------------- // STEP 5. Append the edit to WAL. Do not sync wal. // ------------------------- - Mutation first = batchOp.operations[firstIndex].getFirst(); + Mutation mutation = batchOp.operations[firstIndex]; if (walEdit.size() > 0) { txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), - walEdit, first.getClusterId(), now, this.htableDescriptor); + walEdit, mutation.getClusterId(), now, this.htableDescriptor); } // ------------------------------- @@ -2234,12 +2215,8 @@ public class HRegion implements HeapSize { // , Writable{ this.updatesLock.readLock().unlock(); locked = false; } - if (acquiredLocks != null) { - for (Integer toRelease : acquiredLocks) { - releaseRowLock(toRelease); - } - acquiredLocks = null; - } + releaseRowLocks(acquiredRowLocks); + // ------------------------- // STEP 7. Sync wal. // ------------------------- @@ -2249,8 +2226,8 @@ public class HRegion implements HeapSize { // , Writable{ walSyncSuccessful = true; // calling the post CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { - MiniBatchOperationInProgress> miniBatchOp = - new MiniBatchOperationInProgress>(batchOp.operations, + MiniBatchOperationInProgress miniBatchOp = + new MiniBatchOperationInProgress(batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); coprocessorHost.postBatchMutate(miniBatchOp); } @@ -2274,7 +2251,7 @@ public class HRegion implements HeapSize { // , Writable{ != OperationStatusCode.SUCCESS) { continue; } - Mutation m = batchOp.operations[i].getFirst(); + Mutation m = batchOp.operations[i]; if (m instanceof Put) { coprocessorHost.postPut((Put) m, walEdit, m.getDurability()); } else { @@ -2296,12 +2273,7 @@ public class HRegion implements HeapSize { // , Writable{ if (locked) { this.updatesLock.readLock().unlock(); } - - if (acquiredLocks != null) { - for (Integer toRelease : acquiredLocks) { - releaseRowLock(toRelease); - } - } + releaseRowLocks(acquiredRowLocks); // See if the column families were consistent through the whole thing. // if they were then keep them. If they were not then pass a null. @@ -2378,8 +2350,8 @@ public class HRegion implements HeapSize { // , Writable{ checkFamily(family); get.addColumn(family, qualifier); - // Lock row - Integer lid = getLock(null, get.getRow(), true); + // Lock row - note that doBatchMutate will relock this row if called + RowLock rowLock = getRowLock(get.getRow()); // wait for all previous transactions to complete (with lock held) mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); List result = null; @@ -2425,27 +2397,23 @@ public class HRegion implements HeapSize { // , Writable{ if (matches) { // All edits for the given row (across all column families) must // happen atomically. - doBatchMutate((Mutation)w, lid); + doBatchMutate((Mutation)w); this.checkAndMutateChecksPassed.increment(); return true; } this.checkAndMutateChecksFailed.increment(); return false; } finally { - releaseRowLock(lid); + rowLock.release(); } } finally { closeRegionOperation(); } } - @SuppressWarnings("unchecked") - private void doBatchMutate(Mutation mutation, Integer lid) throws IOException, + private void doBatchMutate(Mutation mutation) throws IOException, org.apache.hadoop.hbase.exceptions.DoNotRetryIOException { - Pair[] mutateWithLocks = new Pair[] { - new Pair(mutation, lid) - }; - OperationStatus[] batchMutate = this.batchMutate(mutateWithLocks); + OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation }); if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) { throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg()); } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) { @@ -2621,7 +2589,7 @@ public class HRegion implements HeapSize { // , Writable{ Put p = new Put(row); p.setFamilyMap(familyMap); p.setClusterId(HConstants.DEFAULT_CLUSTER_ID); - doBatchMutate(p, null); + doBatchMutate(p); } /** @@ -2672,7 +2640,7 @@ public class HRegion implements HeapSize { // , Writable{ * called when a Put/Delete has updated memstore but subequently fails to update * the wal. This method is then invoked to rollback the memstore. */ - private void rollbackMemstore(BatchOperationInProgress> batchOp, + private void rollbackMemstore(BatchOperationInProgress batchOp, Map>[] familyMaps, int start, int end) { int kvsRolledback = 0; @@ -3182,138 +3150,76 @@ public class HRegion implements HeapSize { // , Writable{ } /** - * Obtain a lock on the given row. Blocks until success. - * - * I know it's strange to have two mappings: - *
-   *   ROWS  ==> LOCKS
-   * 
- * as well as - *
-   *   LOCKS ==> ROWS
-   * 
- *

It would be more memory-efficient to just have one mapping; - * maybe we'll do that in the future. - * - * @param row Name of row to lock. - * @throws IOException - * @return The id of the held lock. - */ - public Integer obtainRowLock(final byte [] row) throws IOException { - startRegionOperation(); - this.writeRequestsCount.increment(); - try { - return internalObtainRowLock(row, true); - } finally { - closeRegionOperation(); - } - } - - /** - * Obtains or tries to obtain the given row lock. + * Tries to acquire a lock on the given row. * @param waitForLock if true, will block until the lock is available. * Otherwise, just tries to obtain the lock and returns - * null if unavailable. + * false if unavailable. + * @return the row lock if acquired, + * null if waitForLock was false and the lock was not acquired + * @throws IOException if waitForLock was true and the lock could not be acquired after waiting */ - private Integer internalObtainRowLock(final byte[] row, boolean waitForLock) - throws IOException { + public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException { checkRow(row, "row lock"); startRegionOperation(); try { HashedBytes rowKey = new HashedBytes(row); - CountDownLatch rowLatch = new CountDownLatch(1); + RowLockContext rowLockContext = new RowLockContext(rowKey); // loop until we acquire the row lock (unless !waitForLock) while (true) { - CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch); - if (existingLatch == null) { + RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext); + if (existingContext == null) { + // Row is not already locked by any thread, use newly created context. + break; + } else if (existingContext.ownedByCurrentThread()) { + // Row is already locked by current thread, reuse existing context instead. + rowLockContext = existingContext; break; } else { - // row already locked + // Row is already locked by some other thread, give up or wait for it if (!waitForLock) { return null; } try { - if (!existingLatch.await(this.rowLockWaitDuration, - TimeUnit.MILLISECONDS)) { - throw new IOException("Timed out on getting lock for row=" - + Bytes.toStringBinary(row)); + if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { + throw new IOException("Timed out waiting for lock for row: " + rowKey); } } catch (InterruptedException ie) { - LOG.warn("internalObtainRowLock interrupted for row=" + Bytes.toStringBinary(row)); + LOG.warn("Thread interrupted waiting for lock on row: " + rowKey); InterruptedIOException iie = new InterruptedIOException(); iie.initCause(ie); throw iie; } } } - - // loop until we generate an unused lock id - while (true) { - Integer lockId = lockIdGenerator.incrementAndGet(); - HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey); - if (existingRowKey == null) { - return lockId; - } else { - // lockId already in use, jump generator to a new spot - lockIdGenerator.set(rand.nextInt()); - } - } + + // allocate new lock for this thread + return rowLockContext.newLock(); } finally { closeRegionOperation(); } } /** - * Release the row lock! - * @param lockId The lock ID to release. + * Acqures a lock on the given row. + * The same thread may acquire multiple locks on the same row. + * @return the acquired row lock + * @throws IOException if the lock could not be acquired after waiting */ - public void releaseRowLock(final Integer lockId) { - if (lockId == null) return; // null lock id, do nothing - HashedBytes rowKey = lockIds.remove(lockId); - if (rowKey == null) { - LOG.warn("Release unknown lockId: " + lockId); - return; - } - CountDownLatch rowLatch = lockedRows.remove(rowKey); - if (rowLatch == null) { - LOG.error("Releases row not locked, lockId: " + lockId + " row: " - + rowKey); - return; - } - rowLatch.countDown(); + public RowLock getRowLock(byte[] row) throws IOException { + return getRowLock(row, true); } /** - * See if row is currently locked. - * @param lockId - * @return boolean + * If the given list of row locks is not null, releases all locks. */ - boolean isRowLocked(final Integer lockId) { - return lockIds.containsKey(lockId); - } - - /** - * Returns existing row lock if found, otherwise - * obtains a new row lock and returns it. - * @param lockid requested by the user, or null if the user didn't already hold lock - * @param row the row to lock - * @param waitForLock if true, will block until the lock is available, otherwise will - * simply return null if it could not acquire the lock. - * @return lockid or null if waitForLock is false and the lock was unavailable. - */ - public Integer getLock(Integer lockid, byte [] row, boolean waitForLock) - throws IOException { - Integer lid = null; - if (lockid == null) { - lid = internalObtainRowLock(row, waitForLock); - } else { - if (!isRowLocked(lockid)) { - throw new IOException("Invalid row lock"); + public void releaseRowLocks(List rowLocks) { + if (rowLocks != null) { + for (RowLock rowLock : rowLocks) { + rowLock.release(); } - lid = lockid; + rowLocks.clear(); } - return lid; } /** @@ -4583,24 +4489,19 @@ public class HRegion implements HeapSize { // , Writable{ MultiVersionConsistencyControl.WriteEntry writeEntry = null; boolean locked = false; boolean walSyncSuccessful = false; - List acquiredLocks = null; + List acquiredRowLocks = null; long addedSize = 0; List mutations = new ArrayList(); Collection rowsToLock = processor.getRowsToLock(); try { // 2. Acquire the row lock(s) - acquiredLocks = new ArrayList(rowsToLock.size()); + acquiredRowLocks = new ArrayList(rowsToLock.size()); for (byte[] row : rowsToLock) { - // Attempt to lock all involved rows, fail if one lock times out - Integer lid = getLock(null, row, true); - if (lid == null) { - throw new IOException("Failed to acquire lock on " - + Bytes.toStringBinary(row)); - } - acquiredLocks.add(lid); + // Attempt to lock all involved rows, throw if any lock times out + acquiredRowLocks.add(getRowLock(row)); } // 3. Region lock - lock(this.updatesLock.readLock(), acquiredLocks.size()); + lock(this.updatesLock.readLock(), acquiredRowLocks.size()); locked = true; long now = EnvironmentEdgeManager.currentTimeMillis(); @@ -4635,12 +4536,8 @@ public class HRegion implements HeapSize { // , Writable{ } // 9. Release row lock(s) - if (acquiredLocks != null) { - for (Integer lid : acquiredLocks) { - releaseRowLock(lid); - } - acquiredLocks = null; - } + releaseRowLocks(acquiredRowLocks); + // 10. Sync edit log if (txid != 0) { syncOrDefer(txid, getEffectiveDurability(processor.useDurability())); @@ -4665,12 +4562,8 @@ public class HRegion implements HeapSize { // , Writable{ this.updatesLock.readLock().unlock(); locked = false; } - if (acquiredLocks != null) { - for (Integer lid : acquiredLocks) { - releaseRowLock(lid); - } - } - + // release locks if some were acquired but another timed out + releaseRowLocks(acquiredRowLocks); } // 12. Run post-process hook @@ -4765,125 +4658,129 @@ public class HRegion implements HeapSize { // , Writable{ startRegionOperation(Operation.APPEND); this.writeRequestsCount.increment(); WriteEntry w = null; + RowLock rowLock = null; try { - Integer lid = getLock(null, row, true); - lock(this.updatesLock.readLock()); - // wait for all prior MVCC transactions to finish - while we hold the row lock - // (so that we are guaranteed to see the latest state) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); - // now start my own transaction - w = mvcc.beginMemstoreInsert(); + rowLock = getRowLock(row); try { - long now = EnvironmentEdgeManager.currentTimeMillis(); - // Process each family - for (Map.Entry> family : append.getFamilyMap().entrySet()) { - - Store store = stores.get(family.getKey()); - List kvs = new ArrayList(family.getValue().size()); - - // Get previous values for all columns in this family - Get get = new Get(row); - for (Cell cell : family.getValue()) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - get.addColumn(family.getKey(), kv.getQualifier()); - } - List results = get(get, false); - - // Iterate the input columns and update existing values if they were - // found, otherwise add new column initialized to the append value - - // Avoid as much copying as possible. Every byte is copied at most - // once. - // Would be nice if KeyValue had scatter/gather logic - int idx = 0; - for (Cell cell : family.getValue()) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - KeyValue newKV; - if (idx < results.size() - && results.get(idx).matchingQualifier(kv.getBuffer(), - kv.getQualifierOffset(), kv.getQualifierLength())) { - KeyValue oldKv = results.get(idx); - // allocate an empty kv once - newKV = new KeyValue(row.length, kv.getFamilyLength(), - kv.getQualifierLength(), now, KeyValue.Type.Put, - oldKv.getValueLength() + kv.getValueLength()); - // copy in the value - System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(), - newKV.getBuffer(), newKV.getValueOffset(), - oldKv.getValueLength()); - System.arraycopy(kv.getBuffer(), kv.getValueOffset(), - newKV.getBuffer(), - newKV.getValueOffset() + oldKv.getValueLength(), - kv.getValueLength()); - idx++; - } else { - // allocate an empty kv once - newKV = new KeyValue(row.length, kv.getFamilyLength(), - kv.getQualifierLength(), now, KeyValue.Type.Put, - kv.getValueLength()); - // copy in the value - System.arraycopy(kv.getBuffer(), kv.getValueOffset(), - newKV.getBuffer(), newKV.getValueOffset(), - kv.getValueLength()); - } - // copy in row, family, and qualifier - System.arraycopy(kv.getBuffer(), kv.getRowOffset(), - newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength()); - System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(), - newKV.getBuffer(), newKV.getFamilyOffset(), - kv.getFamilyLength()); - System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(), - newKV.getBuffer(), newKV.getQualifierOffset(), - kv.getQualifierLength()); - - newKV.setMemstoreTS(w.getWriteNumber()); - kvs.add(newKV); - - // Append update to WAL - if (writeToWAL) { - if (walEdits == null) { - walEdits = new WALEdit(); - } - walEdits.add(newKV); - } - } - - //store the kvs to the temporary memstore before writing HLog - tempMemstore.put(store, kvs); - } - - // Actually write to WAL now - if (writeToWAL) { - // Using default cluster id, as this can only happen in the orginating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), - walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(), - this.htableDescriptor); - } else { - recordMutationWithoutWal(append.getFamilyMap()); - } - - //Actually write to Memstore now - for (Map.Entry> entry : tempMemstore.entrySet()) { - Store store = entry.getKey(); - if (store.getFamily().getMaxVersions() == 1) { - // upsert if VERSIONS for this CF == 1 - size += store.upsert(entry.getValue(), getSmallestReadPoint()); - } else { - // otherwise keep older versions around - for (Cell cell: entry.getValue()) { + lock(this.updatesLock.readLock()); + // wait for all prior MVCC transactions to finish - while we hold the row lock + // (so that we are guaranteed to see the latest state) + mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + // now start my own transaction + w = mvcc.beginMemstoreInsert(); + try { + long now = EnvironmentEdgeManager.currentTimeMillis(); + // Process each family + for (Map.Entry> family : append.getFamilyMap().entrySet()) { + + Store store = stores.get(family.getKey()); + List kvs = new ArrayList(family.getValue().size()); + + // Get previous values for all columns in this family + Get get = new Get(row); + for (Cell cell : family.getValue()) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - size += store.add(kv); + get.addColumn(family.getKey(), kv.getQualifier()); } + List results = get(get, false); + + // Iterate the input columns and update existing values if they were + // found, otherwise add new column initialized to the append value + + // Avoid as much copying as possible. Every byte is copied at most + // once. + // Would be nice if KeyValue had scatter/gather logic + int idx = 0; + for (Cell cell : family.getValue()) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + KeyValue newKV; + if (idx < results.size() + && results.get(idx).matchingQualifier(kv.getBuffer(), + kv.getQualifierOffset(), kv.getQualifierLength())) { + KeyValue oldKv = results.get(idx); + // allocate an empty kv once + newKV = new KeyValue(row.length, kv.getFamilyLength(), + kv.getQualifierLength(), now, KeyValue.Type.Put, + oldKv.getValueLength() + kv.getValueLength()); + // copy in the value + System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(), + newKV.getBuffer(), newKV.getValueOffset(), + oldKv.getValueLength()); + System.arraycopy(kv.getBuffer(), kv.getValueOffset(), + newKV.getBuffer(), + newKV.getValueOffset() + oldKv.getValueLength(), + kv.getValueLength()); + idx++; + } else { + // allocate an empty kv once + newKV = new KeyValue(row.length, kv.getFamilyLength(), + kv.getQualifierLength(), now, KeyValue.Type.Put, + kv.getValueLength()); + // copy in the value + System.arraycopy(kv.getBuffer(), kv.getValueOffset(), + newKV.getBuffer(), newKV.getValueOffset(), + kv.getValueLength()); + } + // copy in row, family, and qualifier + System.arraycopy(kv.getBuffer(), kv.getRowOffset(), + newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength()); + System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(), + newKV.getBuffer(), newKV.getFamilyOffset(), + kv.getFamilyLength()); + System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(), + newKV.getBuffer(), newKV.getQualifierOffset(), + kv.getQualifierLength()); + + newKV.setMemstoreTS(w.getWriteNumber()); + kvs.add(newKV); + + // Append update to WAL + if (writeToWAL) { + if (walEdits == null) { + walEdits = new WALEdit(); + } + walEdits.add(newKV); + } + } + + //store the kvs to the temporary memstore before writing HLog + tempMemstore.put(store, kvs); } - allKVs.addAll(entry.getValue()); + + // Actually write to WAL now + if (writeToWAL) { + // Using default cluster id, as this can only happen in the orginating + // cluster. A slave cluster receives the final value (not the delta) + // as a Put. + txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), + walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(), + this.htableDescriptor); + } else { + recordMutationWithoutWal(append.getFamilyMap()); + } + + //Actually write to Memstore now + for (Map.Entry> entry : tempMemstore.entrySet()) { + Store store = entry.getKey(); + if (store.getFamily().getMaxVersions() == 1) { + // upsert if VERSIONS for this CF == 1 + size += store.upsert(entry.getValue(), getSmallestReadPoint()); + } else { + // otherwise keep older versions around + for (Cell cell: entry.getValue()) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + size += store.add(kv); + } + } + allKVs.addAll(entry.getValue()); + } + size = this.addAndGetGlobalMemstoreSize(size); + flush = isFlushSize(size); + } finally { + this.updatesLock.readLock().unlock(); } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); } finally { - this.updatesLock.readLock().unlock(); - releaseRowLock(lid); + rowLock.release(); } if (writeToWAL) { // sync the transaction log outside the rowlock @@ -4936,100 +4833,103 @@ public class HRegion implements HeapSize { // , Writable{ this.writeRequestsCount.increment(); WriteEntry w = null; try { - Integer lid = getLock(null, row, true); - lock(this.updatesLock.readLock()); - // wait for all prior MVCC transactions to finish - while we hold the row lock - // (so that we are guaranteed to see the latest state) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); - // now start my own transaction - w = mvcc.beginMemstoreInsert(); + RowLock rowLock = getRowLock(row); try { - long now = EnvironmentEdgeManager.currentTimeMillis(); - // Process each family - for (Map.Entry> family: - increment.getFamilyMap().entrySet()) { - - Store store = stores.get(family.getKey()); - List kvs = new ArrayList(family.getValue().size()); - - // Get previous values for all columns in this family - Get get = new Get(row); - for (Cell cell: family.getValue()) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - get.addColumn(family.getKey(), kv.getQualifier()); - } - get.setTimeRange(tr.getMin(), tr.getMax()); - List results = get(get, false); - - // Iterate the input columns and update existing values if they were - // found, otherwise add new column initialized to the increment amount - int idx = 0; - for (Cell cell: family.getValue()) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - long amount = Bytes.toLong(kv.getValue()); - byte [] qualifier = kv.getQualifier(); - if (idx < results.size() && results.get(idx).matchingQualifier(qualifier)) { - kv = results.get(idx); - if(kv.getValueLength() == Bytes.SIZEOF_LONG) { - amount += Bytes.toLong(kv.getBuffer(), kv.getValueOffset(), Bytes.SIZEOF_LONG); - } else { - // throw DoNotRetryIOException instead of IllegalArgumentException - throw new org.apache.hadoop.hbase.exceptions.DoNotRetryIOException( - "Attempted to increment field that isn't 64 bits wide"); - } - idx++; - } - - // Append new incremented KeyValue to list - KeyValue newKV = - new KeyValue(row, family.getKey(), qualifier, now, Bytes.toBytes(amount)); - newKV.setMemstoreTS(w.getWriteNumber()); - kvs.add(newKV); - - // Prepare WAL updates - if (writeToWAL) { - if (walEdits == null) { - walEdits = new WALEdit(); - } - walEdits.add(newKV); - } - } - - //store the kvs to the temporary memstore before writing HLog - tempMemstore.put(store, kvs); - } - - // Actually write to WAL now - if (writeToWAL) { - // Using default cluster id, as this can only happen in the orginating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), - walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(), - this.htableDescriptor); - } else { - recordMutationWithoutWal(increment.getFamilyMap()); - } - //Actually write to Memstore now - for (Map.Entry> entry : tempMemstore.entrySet()) { - Store store = entry.getKey(); - if (store.getFamily().getMaxVersions() == 1) { - // upsert if VERSIONS for this CF == 1 - size += store.upsert(entry.getValue(), getSmallestReadPoint()); - } else { - // otherwise keep older versions around - for (Cell cell : entry.getValue()) { + lock(this.updatesLock.readLock()); + // wait for all prior MVCC transactions to finish - while we hold the row lock + // (so that we are guaranteed to see the latest state) + mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + // now start my own transaction + w = mvcc.beginMemstoreInsert(); + try { + long now = EnvironmentEdgeManager.currentTimeMillis(); + // Process each family + for (Map.Entry> family: + increment.getFamilyMap().entrySet()) { + + Store store = stores.get(family.getKey()); + List kvs = new ArrayList(family.getValue().size()); + + // Get previous values for all columns in this family + Get get = new Get(row); + for (Cell cell: family.getValue()) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - size += store.add(kv); + get.addColumn(family.getKey(), kv.getQualifier()); } + get.setTimeRange(tr.getMin(), tr.getMax()); + List results = get(get, false); + + // Iterate the input columns and update existing values if they were + // found, otherwise add new column initialized to the increment amount + int idx = 0; + for (Cell cell: family.getValue()) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + long amount = Bytes.toLong(kv.getValue()); + byte [] qualifier = kv.getQualifier(); + if (idx < results.size() && results.get(idx).matchingQualifier(qualifier)) { + kv = results.get(idx); + if(kv.getValueLength() == Bytes.SIZEOF_LONG) { + amount += Bytes.toLong(kv.getBuffer(), kv.getValueOffset(), Bytes.SIZEOF_LONG); + } else { + // throw DoNotRetryIOException instead of IllegalArgumentException + throw new org.apache.hadoop.hbase.exceptions.DoNotRetryIOException( + "Attempted to increment field that isn't 64 bits wide"); + } + idx++; + } + + // Append new incremented KeyValue to list + KeyValue newKV = + new KeyValue(row, family.getKey(), qualifier, now, Bytes.toBytes(amount)); + newKV.setMemstoreTS(w.getWriteNumber()); + kvs.add(newKV); + + // Prepare WAL updates + if (writeToWAL) { + if (walEdits == null) { + walEdits = new WALEdit(); + } + walEdits.add(newKV); + } + } + + //store the kvs to the temporary memstore before writing HLog + tempMemstore.put(store, kvs); } - allKVs.addAll(entry.getValue()); + + // Actually write to WAL now + if (writeToWAL) { + // Using default cluster id, as this can only happen in the orginating + // cluster. A slave cluster receives the final value (not the delta) + // as a Put. + txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), + walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(), + this.htableDescriptor); + } else { + recordMutationWithoutWal(increment.getFamilyMap()); + } + //Actually write to Memstore now + for (Map.Entry> entry : tempMemstore.entrySet()) { + Store store = entry.getKey(); + if (store.getFamily().getMaxVersions() == 1) { + // upsert if VERSIONS for this CF == 1 + size += store.upsert(entry.getValue(), getSmallestReadPoint()); + } else { + // otherwise keep older versions around + for (Cell cell : entry.getValue()) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + size += store.add(kv); + } + } + allKVs.addAll(entry.getValue()); + } + size = this.addAndGetGlobalMemstoreSize(size); + flush = isFlushSize(size); + } finally { + this.updatesLock.readLock().unlock(); } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); } finally { - this.updatesLock.readLock().unlock(); - releaseRowLock(lid); + rowLock.release(); } if (writeToWAL) { // sync the transaction log outside the rowlock @@ -5069,22 +4969,32 @@ public class HRegion implements HeapSize { // , Writable{ public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + - (12 * Bytes.SIZEOF_LONG) + - 2 * Bytes.SIZEOF_BOOLEAN); + 38 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + (11 * Bytes.SIZEOF_LONG) + + 4 * Bytes.SIZEOF_BOOLEAN); + // woefully out of date - currently missing: + // 1 x HashMap - coprocessorServiceHandlers + // 6 org.cliffc.high_scale_lib.Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL, + // checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount, + // writeRequestsCount, updatesBlockedMs + // 1 x HRegion$WriteState - writestate + // 1 x RegionCoprocessorHost - coprocessorHost + // 1 x RegionSplitPolicy - splitPolicy + // 1 x MetricsRegion - metricsRegion + // 1 x MetricsRegionWrapperImpl - metricsRegionWrapper public static final long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.OBJECT + // closeLock (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing (3 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL - ClassSize.ATOMIC_INTEGER + // lockIdGenerator - (3 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds, scannerReadPoints + (2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, scannerReadPoints WriteState.HEAP_SIZE + // writestate ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock ClassSize.ARRAYLIST + // recentFlushes MultiVersionConsistencyControl.FIXED_SIZE // mvcc + ClassSize.TREEMAP // maxSeqIdInStores + + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress ; @Override @@ -5093,7 +5003,7 @@ public class HRegion implements HeapSize { // , Writable{ for (Store store : this.stores.values()) { heapSize += store.heapSize(); } - // this does not take into account row locks, recent flushes, mvcc entries + // this does not take into account row locks, recent flushes, mvcc entries, and more return heapSize; } @@ -5657,4 +5567,68 @@ public class HRegion implements HeapSize { // , Writable{ */ void failedBulkLoad(byte[] family, String srcPath) throws IOException; } + + @VisibleForTesting class RowLockContext { + private final HashedBytes row; + private final CountDownLatch latch = new CountDownLatch(1); + private final Thread thread; + private int lockCount = 0; + + RowLockContext(HashedBytes row) { + this.row = row; + this.thread = Thread.currentThread(); + } + + boolean ownedByCurrentThread() { + return thread == Thread.currentThread(); + } + + RowLock newLock() { + lockCount++; + return new RowLock(this); + } + + void releaseLock() { + if (!ownedByCurrentThread()) { + throw new IllegalArgumentException("Lock held by thread: " + thread + + " cannot be released by different thread: " + Thread.currentThread()); + } + lockCount--; + if (lockCount == 0) { + // no remaining locks by the thread, unlock and allow other threads to access + RowLockContext existingContext = lockedRows.remove(row); + if (existingContext != this) { + throw new RuntimeException( + "Internal row lock state inconsistent, should not happen, row: " + row); + } + latch.countDown(); + } + } + } + + /** + * Row lock held by a given thread. + * One thread may acquire multiple locks on the same row simultaneously. + * The locks must be released by calling release() from the same thread. + */ + public class RowLock { + @VisibleForTesting final RowLockContext context; + private boolean released = false; + + @VisibleForTesting RowLock(RowLockContext context) { + this.context = context; + } + + /** + * Release the given lock. If there are no remaining locks held by the current thread + * then unlock the row and allow other threads to acquire the lock. + * @throws IllegalArgumentException if called by a different thread than the lock owning thread + */ + public void release() { + if (!released) { + context.releaseLock(); + released = true; + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 25e3657be81..d9f65ddbea9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3956,8 +3956,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa */ protected void doBatchOp(final MultiResponse.Builder builder, final HRegion region, final List mutations, final CellScanner cells, boolean isReplay) { - @SuppressWarnings("unchecked") - Pair[] mutationsWithLocks = new Pair[mutations.size()]; + Mutation[] mArray = new Mutation[mutations.size()]; long before = EnvironmentEdgeManager.currentTimeMillis(); boolean batchContainsPuts = false, batchContainsDelete = false; try { @@ -3974,7 +3973,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa mutation = ProtobufUtil.toDelete(m, cells); batchContainsDelete = true; } - mutationsWithLocks[i++] = new Pair(mutation, null); + mArray[i++] = mutation; builder.addResult(result); } @@ -3983,7 +3982,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa cacheFlusher.reclaimMemStoreMemory(); } - OperationStatus codes[] = region.batchMutate(mutationsWithLocks, isReplay); + OperationStatus codes[] = region.batchMutate(mArray); for (i = 0; i < codes.length; i++) { switch (codes[i].getOperationStatusCode()) { case BAD_FAMILY: diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 9102aaa51ce..4fe11f4f5d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -993,7 +993,7 @@ public class RegionCoprocessorHost * @throws IOException */ public boolean preBatchMutate( - final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + final MiniBatchOperationInProgress miniBatchOp) throws IOException { boolean bypass = false; ObserverContext ctx = null; for (RegionEnvironment env : coprocessors) { @@ -1018,7 +1018,7 @@ public class RegionCoprocessorHost * @throws IOException */ public void postBatchMutate( - final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + final MiniBatchOperationInProgress miniBatchOp) throws IOException { ObserverContext ctx = null; for (RegionEnvironment env : coprocessors) { if (env.getInstance() instanceof RegionObserver) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index 68dc969e619..06dd06afec3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -1061,7 +1061,7 @@ public class HBaseFsck extends Configured implements Tool { "You may need to restore the previously sidelined .META."); return false; } - meta.put(puts.toArray(new Put[0])); + meta.batchMutate(puts.toArray(new Put[0])); HRegion.closeHRegion(meta); LOG.info("Success! .META. table rebuilt."); LOG.info("Old .META. is moved into " + backupDir); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index 0a588afab1f..d6c0c97cd09 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -407,7 +407,7 @@ public class SimpleRegionObserver extends BaseRegionObserver { @Override public void preBatchMutate(ObserverContext c, - MiniBatchOperationInProgress> miniBatchOp) throws IOException { + MiniBatchOperationInProgress miniBatchOp) throws IOException { RegionCoprocessorEnvironment e = c.getEnvironment(); assertNotNull(e); assertNotNull(e.getRegion()); @@ -417,7 +417,7 @@ public class SimpleRegionObserver extends BaseRegionObserver { @Override public void postBatchMutate(final ObserverContext c, - final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + final MiniBatchOperationInProgress miniBatchOp) throws IOException { RegionCoprocessorEnvironment e = c.getEnvironment(); assertNotNull(e); assertNotNull(e.getRegion()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index 6d4cbe6c14a..eb230a30d41 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -59,11 +59,8 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; -import org.apache.hadoop.hbase.util.Pair; import org.junit.experimental.categories.Category; -import com.google.common.collect.Lists; - /** * Testing of HRegion.incrementColumnValue, HRegion.increment, @@ -528,16 +525,12 @@ public class TestAtomicOperation extends HBaseTestCase { final MockHRegion region = (MockHRegion) TestHRegion.initHRegion( Bytes.toBytes(tableName), tableName, conf, Bytes.toBytes(family)); - List> putsAndLocks = Lists.newArrayList(); Put[] puts = new Put[1]; Put put = new Put(Bytes.toBytes("r1")); put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10")); puts[0] = put; - Pair pair = new Pair(puts[0], null); - - putsAndLocks.add(pair); - - region.batchMutate(putsAndLocks.toArray(new Pair[0])); + + region.batchMutate(puts); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); ctx.addThread(new PutThread(ctx, region)); @@ -565,15 +558,12 @@ public class TestAtomicOperation extends HBaseTestCase { } public void doWork() throws Exception { - List> putsAndLocks = Lists.newArrayList(); Put[] puts = new Put[1]; Put put = new Put(Bytes.toBytes("r1")); put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50")); puts[0] = put; - Pair pair = new Pair(puts[0], null); - putsAndLocks.add(pair); testStep = TestStep.PUT_STARTED; - region.batchMutate(putsAndLocks.toArray(new Pair[0])); + region.batchMutate(puts); } } @@ -607,43 +597,50 @@ public class TestAtomicOperation extends HBaseTestCase { } @Override - public void releaseRowLock(Integer lockId) { - if (testStep == TestStep.INIT) { - super.releaseRowLock(lockId); - return; - } - - if (testStep == TestStep.PUT_STARTED) { - try { - testStep = TestStep.PUT_COMPLETED; - super.releaseRowLock(lockId); - // put has been written to the memstore and the row lock has been released, but the - // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of - // operations would cause the non-atomicity to show up: - // 1) Put releases row lock (where we are now) - // 2) CheckAndPut grabs row lock and reads the value prior to the put (10) - // because the MVCC has not advanced - // 3) Put advances MVCC - // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock - // (see below), and then wait some more to give the checkAndPut time to read the old - // value. - latch.await(); - Thread.sleep(1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - else if (testStep == TestStep.CHECKANDPUT_STARTED) { - super.releaseRowLock(lockId); - } - } - - @Override - public Integer getLock(Integer lockid, byte[] row, boolean waitForLock) throws IOException { + public RowLock getRowLock(final byte[] row, boolean waitForLock) throws IOException { if (testStep == TestStep.CHECKANDPUT_STARTED) { latch.countDown(); } - return super.getLock(lockid, row, waitForLock); + return new WrappedRowLock(super.getRowLock(row, waitForLock)); + } + + public class WrappedRowLock extends RowLock { + + private WrappedRowLock(RowLock rowLock) { + super(rowLock.context); + } + + @Override + public void release() { + if (testStep == TestStep.INIT) { + super.release(); + return; + } + + if (testStep == TestStep.PUT_STARTED) { + try { + testStep = TestStep.PUT_COMPLETED; + super.release(); + // put has been written to the memstore and the row lock has been released, but the + // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of + // operations would cause the non-atomicity to show up: + // 1) Put releases row lock (where we are now) + // 2) CheckAndPut grabs row lock and reads the value prior to the put (10) + // because the MVCC has not advanced + // 3) Put advances MVCC + // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock + // (see below), and then wait some more to give the checkAndPut time to read the old + // value. + latch.await(); + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + else if (testStep == TestStep.CHECKANDPUT_STARTED) { + super.release(); + } + } } } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 8937b17252d..3479a74572e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -72,7 +72,6 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -96,6 +95,7 @@ import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; +import org.apache.hadoop.hbase.regionserver.HRegion.RowLock; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; @@ -107,7 +107,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.util.Threads; import org.junit.Assert; @@ -764,7 +763,6 @@ public class TestHRegion extends HBaseTestCase { } } - @SuppressWarnings("unchecked") public void testBatchPut() throws Exception { byte[] b = Bytes.toBytes(getName()); byte[] cf = Bytes.toBytes(COLUMN_FAMILY); @@ -783,7 +781,7 @@ public class TestHRegion extends HBaseTestCase { puts[i].add(cf, qual, val); } - OperationStatus[] codes = this.region.put(puts); + OperationStatus[] codes = this.region.batchMutate(puts); assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { assertEquals(OperationStatusCode.SUCCESS, codes[i] @@ -794,7 +792,7 @@ public class TestHRegion extends HBaseTestCase { LOG.info("Next a batch put with one invalid family"); puts[5].add(Bytes.toBytes("BAD_CF"), qual, val); - codes = this.region.put(puts); + codes = this.region.batchMutate(puts); assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : @@ -804,7 +802,7 @@ public class TestHRegion extends HBaseTestCase { metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source); LOG.info("Next a batch put that has to break into two batches to avoid a lock"); - Integer lockedRow = region.obtainRowLock(Bytes.toBytes("row_2")); + RowLock rowLock = region.getRowLock(Bytes.toBytes("row_2")); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); @@ -813,7 +811,7 @@ public class TestHRegion extends HBaseTestCase { TestThread putter = new TestThread(ctx) { @Override public void doWork() throws IOException { - retFromThread.set(region.put(puts)); + retFromThread.set(region.batchMutate(puts)); } }; LOG.info("...starting put thread while holding lock"); @@ -829,7 +827,7 @@ public class TestHRegion extends HBaseTestCase { } } LOG.info("...releasing row lock, which should let put thread continue"); - region.releaseRowLock(lockedRow); + rowLock.release(); LOG.info("...joining on thread"); ctx.stop(); LOG.info("...checking that next batch was synced"); @@ -840,29 +838,6 @@ public class TestHRegion extends HBaseTestCase { OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); } - LOG.info("Nexta, a batch put which uses an already-held lock"); - lockedRow = region.obtainRowLock(Bytes.toBytes("row_2")); - LOG.info("...obtained row lock"); - List> putsAndLocks = Lists.newArrayList(); - for (int i = 0; i < 10; i++) { - Pair pair = new Pair(puts[i], null); - if (i == 2) pair.setSecond(lockedRow); - putsAndLocks.add(pair); - } - - codes = region.batchMutate(putsAndLocks.toArray(new Pair[0])); - LOG.info("...performed put"); - for (int i = 0; i < 10; i++) { - assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : - OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); - } - // Make sure we didn't do an extra batch - metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 5, source); - - // Make sure we still hold lock - assertTrue(region.isRowLocked(lockedRow)); - LOG.info("...releasing lock"); - region.releaseRowLock(lockedRow); } finally { HRegion.closeHRegion(this.region); this.region = null; @@ -891,7 +866,7 @@ public class TestHRegion extends HBaseTestCase { puts[i].add(cf, qual, val); } - OperationStatus[] codes = this.region.put(puts); + OperationStatus[] codes = this.region.batchMutate(puts); assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i] diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java index f4e7c30a74d..9fae1ca2ad1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java @@ -233,7 +233,7 @@ public class TestParallelPut extends HBaseTestCase { put.add(fam1, qual1, value); in[0] = put; try { - OperationStatus[] ret = region.put(in); + OperationStatus[] ret = region.batchMutate(in); assertEquals(1, ret.length); assertEquals(OperationStatusCode.SUCCESS, ret[0].getOperationStatusCode()); assertGet(rowkey, fam1, qual1, value);