HBASE-8877 Reentrant row locks (Dave Latham)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1504002 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
07337ac9c8
commit
00175ae834
|
@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
|
|||
import org.apache.hadoop.hbase.regionserver.OperationStatus;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
|
@ -146,20 +145,19 @@ public class BulkDeleteEndpoint extends BulkDeleteService implements Coprocessor
|
|||
}
|
||||
}
|
||||
if (deleteRows.size() > 0) {
|
||||
Pair<Mutation, Integer>[] deleteWithLockArr = new Pair[deleteRows.size()];
|
||||
Mutation[] deleteArr = new Mutation[deleteRows.size()];
|
||||
int i = 0;
|
||||
for (List<KeyValue> deleteRow : deleteRows) {
|
||||
Delete delete = createDeleteMutation(deleteRow, deleteType, timestamp);
|
||||
deleteWithLockArr[i++] = new Pair<Mutation, Integer>(delete, null);
|
||||
deleteArr[i++] = createDeleteMutation(deleteRow, deleteType, timestamp);
|
||||
}
|
||||
OperationStatus[] opStatus = region.batchMutate(deleteWithLockArr);
|
||||
OperationStatus[] opStatus = region.batchMutate(deleteArr);
|
||||
for (i = 0; i < opStatus.length; i++) {
|
||||
if (opStatus[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
|
||||
break;
|
||||
}
|
||||
totalRowsDeleted++;
|
||||
if (deleteType == DeleteType.VERSION) {
|
||||
byte[] versionsDeleted = deleteWithLockArr[i].getFirst().getAttribute(
|
||||
byte[] versionsDeleted = deleteArr[i].getAttribute(
|
||||
NO_OF_VERSIONS_TO_DELETE);
|
||||
if (versionsDeleted != null) {
|
||||
totalVersionsDeleted += Bytes.toInt(versionsDeleted);
|
||||
|
|
|
@ -253,12 +253,12 @@ public abstract class BaseRegionObserver implements RegionObserver {
|
|||
|
||||
@Override
|
||||
public void preBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
|
||||
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
|
||||
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -554,7 +554,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
void preBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException;
|
||||
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException;
|
||||
|
||||
/**
|
||||
* This will be called after applying a batch of Mutations on a region. The Mutations are added to
|
||||
|
@ -564,7 +564,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException;
|
||||
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException;
|
||||
|
||||
/**
|
||||
* Called before checkAndPut
|
||||
|
|
|
@ -34,7 +34,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
|
@ -138,6 +137,7 @@ import org.apache.hadoop.io.MultipleIOException;
|
|||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.cliffc.high_scale_lib.Counter;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
@ -223,12 +223,13 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// Members
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows =
|
||||
new ConcurrentHashMap<HashedBytes, CountDownLatch>();
|
||||
private final ConcurrentHashMap<Integer, HashedBytes> lockIds =
|
||||
new ConcurrentHashMap<Integer, HashedBytes>();
|
||||
private final AtomicInteger lockIdGenerator = new AtomicInteger(1);
|
||||
static private Random rand = new Random();
|
||||
// map from a locked row to the context for that lock including:
|
||||
// - CountDownLatch for threads waiting on that row
|
||||
// - the thread that owns the lock (allow reentrancy)
|
||||
// - reference count of (reentrant) locks held by the thread
|
||||
// - the row itself
|
||||
private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
|
||||
new ConcurrentHashMap<HashedBytes, RowLockContext>();
|
||||
|
||||
protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
|
||||
Bytes.BYTES_RAWCOMPARATOR);
|
||||
|
@ -1764,7 +1765,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
try {
|
||||
delete.getRow();
|
||||
// All edits for the given row (across all column families) must happen atomically.
|
||||
doBatchMutate(delete, null);
|
||||
doBatchMutate(delete);
|
||||
} finally {
|
||||
closeRegionOperation();
|
||||
}
|
||||
|
@ -1787,7 +1788,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
delete.setFamilyMap(familyMap);
|
||||
delete.setClusterId(clusterId);
|
||||
delete.setDurability(durability);
|
||||
doBatchMutate(delete, null);
|
||||
doBatchMutate(delete);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1862,7 +1863,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
this.writeRequestsCount.increment();
|
||||
try {
|
||||
// All edits for the given row (across all column families) must happen atomically.
|
||||
doBatchMutate(put, null);
|
||||
doBatchMutate(put);
|
||||
} finally {
|
||||
closeRegionOperation();
|
||||
}
|
||||
|
@ -1892,46 +1893,29 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
/**
|
||||
* Perform a batch put with no pre-specified locks
|
||||
* @see HRegion#batchMutate(Pair[])
|
||||
* Perform a batch of mutations.
|
||||
* It supports only Put and Delete mutations and will ignore other types passed.
|
||||
* @param mutations the list of mutations
|
||||
* @return an array of OperationStatus which internally contains the
|
||||
* OperationStatusCode and the exceptionMessage if any.
|
||||
* @throws IOException
|
||||
*/
|
||||
public OperationStatus[] put(Put[] puts) throws IOException {
|
||||
@SuppressWarnings("unchecked")
|
||||
Pair<Mutation, Integer> putsAndLocks[] = new Pair[puts.length];
|
||||
|
||||
for (int i = 0; i < puts.length; i++) {
|
||||
putsAndLocks[i] = new Pair<Mutation, Integer>(puts[i], null);
|
||||
}
|
||||
return batchMutate(putsAndLocks);
|
||||
public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
|
||||
return batchMutate(mutations, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform a batch of mutations.
|
||||
* It supports only Put and Delete mutations and will ignore other types passed.
|
||||
* @param mutationsAndLocks
|
||||
* the list of mutations paired with their requested lock IDs.
|
||||
* @param mutations the list of mutations
|
||||
* @return an array of OperationStatus which internally contains the
|
||||
* OperationStatusCode and the exceptionMessage if any.
|
||||
* @throws IOException
|
||||
*/
|
||||
public OperationStatus[] batchMutate(
|
||||
Pair<Mutation, Integer>[] mutationsAndLocks) throws IOException {
|
||||
return batchMutate(mutationsAndLocks, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform a batch of mutations.
|
||||
* It supports only Put and Delete mutations and will ignore other types passed.
|
||||
* @param mutationsAndLocks
|
||||
* the list of mutations paired with their requested lock IDs.
|
||||
* @return an array of OperationStatus which internally contains the
|
||||
* OperationStatusCode and the exceptionMessage if any.
|
||||
* @throws IOException
|
||||
*/
|
||||
OperationStatus[] batchMutate(Pair<Mutation, Integer>[] mutationsAndLocks, boolean isReplay)
|
||||
OperationStatus[] batchMutate(Mutation[] mutations, boolean isReplay)
|
||||
throws IOException {
|
||||
BatchOperationInProgress<Pair<Mutation, Integer>> batchOp =
|
||||
new BatchOperationInProgress<Pair<Mutation,Integer>>(mutationsAndLocks);
|
||||
BatchOperationInProgress<Mutation> batchOp =
|
||||
new BatchOperationInProgress<Mutation>(mutations);
|
||||
|
||||
boolean initialized = false;
|
||||
|
||||
|
@ -1969,14 +1953,13 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
|
||||
private void doPreMutationHook(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp)
|
||||
private void doPreMutationHook(BatchOperationInProgress<Mutation> batchOp)
|
||||
throws IOException {
|
||||
/* Run coprocessor pre hook outside of locks to avoid deadlock */
|
||||
WALEdit walEdit = new WALEdit();
|
||||
if (coprocessorHost != null) {
|
||||
for (int i = 0 ; i < batchOp.operations.length; i++) {
|
||||
Pair<Mutation, Integer> nextPair = batchOp.operations[i];
|
||||
Mutation m = nextPair.getFirst();
|
||||
Mutation m = batchOp.operations[i];
|
||||
if (m instanceof Put) {
|
||||
if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
|
||||
// pre hook says skip this Put
|
||||
|
@ -2005,7 +1988,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private long doMiniBatchMutation(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp,
|
||||
private long doMiniBatchMutation(BatchOperationInProgress<Mutation> batchOp,
|
||||
boolean isInReplay) throws IOException {
|
||||
|
||||
// variable to note if all Put items are for the same CF -- metrics related
|
||||
|
@ -2024,7 +2007,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
boolean locked = false;
|
||||
|
||||
/** Keep track of the locks we hold so we can release them in finally clause */
|
||||
List<Integer> acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
|
||||
List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
|
||||
// reference family maps directly so coprocessors can mutate them if desired
|
||||
Map<byte[], List<? extends Cell>>[] familyMaps = new Map[batchOp.operations.length];
|
||||
// We try to set up a batch in the range [firstIndex,lastIndexExclusive)
|
||||
|
@ -2040,10 +2023,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
int numReadyToWrite = 0;
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
while (lastIndexExclusive < batchOp.operations.length) {
|
||||
Pair<Mutation, Integer> nextPair = batchOp.operations[lastIndexExclusive];
|
||||
Mutation mutation = nextPair.getFirst();
|
||||
Mutation mutation = batchOp.operations[lastIndexExclusive];
|
||||
boolean isPutMutation = mutation instanceof Put;
|
||||
Integer providedLockId = nextPair.getSecond();
|
||||
|
||||
Map<byte[], List<? extends Cell>> familyMap = mutation.getFamilyMap();
|
||||
// store the family map reference to allow for mutations
|
||||
|
@ -2081,25 +2062,25 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
lastIndexExclusive++;
|
||||
continue;
|
||||
}
|
||||
|
||||
// If we haven't got any rows in our batch, we should block to
|
||||
// get the next one.
|
||||
boolean shouldBlock = numReadyToWrite == 0;
|
||||
Integer acquiredLockId = null;
|
||||
RowLock rowLock = null;
|
||||
try {
|
||||
acquiredLockId = getLock(providedLockId, mutation.getRow(),
|
||||
shouldBlock);
|
||||
rowLock = getRowLock(mutation.getRow(), shouldBlock);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Failed getting lock in batch put, row="
|
||||
+ Bytes.toStringBinary(mutation.getRow()), ioe);
|
||||
}
|
||||
if (acquiredLockId == null) {
|
||||
if (rowLock == null) {
|
||||
// We failed to grab another lock
|
||||
assert !shouldBlock : "Should never fail to get lock when blocking";
|
||||
break; // stop acquiring more rows for this batch
|
||||
} else {
|
||||
acquiredRowLocks.add(rowLock);
|
||||
}
|
||||
if (providedLockId == null) {
|
||||
acquiredLocks.add(acquiredLockId);
|
||||
}
|
||||
|
||||
lastIndexExclusive++;
|
||||
numReadyToWrite++;
|
||||
|
||||
|
@ -2141,7 +2122,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
if (batchOp.retCodeDetails[i].getOperationStatusCode()
|
||||
!= OperationStatusCode.NOT_RUN) continue;
|
||||
|
||||
Mutation mutation = batchOp.operations[i].getFirst();
|
||||
Mutation mutation = batchOp.operations[i];
|
||||
if (mutation instanceof Put) {
|
||||
updateKVTimestamps(familyMaps[i].values(), byteNow);
|
||||
noOfPuts++;
|
||||
|
@ -2162,8 +2143,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
// calling the pre CP hook for batch mutation
|
||||
if (!isInReplay && coprocessorHost != null) {
|
||||
MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
|
||||
new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
|
||||
MiniBatchOperationInProgress<Mutation> miniBatchOp =
|
||||
new MiniBatchOperationInProgress<Mutation>(batchOp.operations,
|
||||
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
|
||||
if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
|
||||
}
|
||||
|
@ -2198,7 +2179,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
|
||||
|
||||
Mutation m = batchOp.operations[i].getFirst();
|
||||
Mutation m = batchOp.operations[i];
|
||||
Durability tmpDur = getEffectiveDurability(m.getDurability());
|
||||
if (tmpDur.ordinal() > durability.ordinal()) {
|
||||
durability = tmpDur;
|
||||
|
@ -2221,10 +2202,10 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// -------------------------
|
||||
// STEP 5. Append the edit to WAL. Do not sync wal.
|
||||
// -------------------------
|
||||
Mutation first = batchOp.operations[firstIndex].getFirst();
|
||||
Mutation mutation = batchOp.operations[firstIndex];
|
||||
if (walEdit.size() > 0) {
|
||||
txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(),
|
||||
walEdit, first.getClusterId(), now, this.htableDescriptor);
|
||||
walEdit, mutation.getClusterId(), now, this.htableDescriptor);
|
||||
}
|
||||
|
||||
// -------------------------------
|
||||
|
@ -2234,12 +2215,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
this.updatesLock.readLock().unlock();
|
||||
locked = false;
|
||||
}
|
||||
if (acquiredLocks != null) {
|
||||
for (Integer toRelease : acquiredLocks) {
|
||||
releaseRowLock(toRelease);
|
||||
}
|
||||
acquiredLocks = null;
|
||||
}
|
||||
releaseRowLocks(acquiredRowLocks);
|
||||
|
||||
// -------------------------
|
||||
// STEP 7. Sync wal.
|
||||
// -------------------------
|
||||
|
@ -2249,8 +2226,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
walSyncSuccessful = true;
|
||||
// calling the post CP hook for batch mutation
|
||||
if (!isInReplay && coprocessorHost != null) {
|
||||
MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
|
||||
new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
|
||||
MiniBatchOperationInProgress<Mutation> miniBatchOp =
|
||||
new MiniBatchOperationInProgress<Mutation>(batchOp.operations,
|
||||
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
|
||||
coprocessorHost.postBatchMutate(miniBatchOp);
|
||||
}
|
||||
|
@ -2274,7 +2251,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
!= OperationStatusCode.SUCCESS) {
|
||||
continue;
|
||||
}
|
||||
Mutation m = batchOp.operations[i].getFirst();
|
||||
Mutation m = batchOp.operations[i];
|
||||
if (m instanceof Put) {
|
||||
coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
|
||||
} else {
|
||||
|
@ -2296,12 +2273,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
if (locked) {
|
||||
this.updatesLock.readLock().unlock();
|
||||
}
|
||||
|
||||
if (acquiredLocks != null) {
|
||||
for (Integer toRelease : acquiredLocks) {
|
||||
releaseRowLock(toRelease);
|
||||
}
|
||||
}
|
||||
releaseRowLocks(acquiredRowLocks);
|
||||
|
||||
// See if the column families were consistent through the whole thing.
|
||||
// if they were then keep them. If they were not then pass a null.
|
||||
|
@ -2378,8 +2350,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
checkFamily(family);
|
||||
get.addColumn(family, qualifier);
|
||||
|
||||
// Lock row
|
||||
Integer lid = getLock(null, get.getRow(), true);
|
||||
// Lock row - note that doBatchMutate will relock this row if called
|
||||
RowLock rowLock = getRowLock(get.getRow());
|
||||
// wait for all previous transactions to complete (with lock held)
|
||||
mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
|
||||
List<KeyValue> result = null;
|
||||
|
@ -2425,27 +2397,23 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
if (matches) {
|
||||
// All edits for the given row (across all column families) must
|
||||
// happen atomically.
|
||||
doBatchMutate((Mutation)w, lid);
|
||||
doBatchMutate((Mutation)w);
|
||||
this.checkAndMutateChecksPassed.increment();
|
||||
return true;
|
||||
}
|
||||
this.checkAndMutateChecksFailed.increment();
|
||||
return false;
|
||||
} finally {
|
||||
releaseRowLock(lid);
|
||||
rowLock.release();
|
||||
}
|
||||
} finally {
|
||||
closeRegionOperation();
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void doBatchMutate(Mutation mutation, Integer lid) throws IOException,
|
||||
private void doBatchMutate(Mutation mutation) throws IOException,
|
||||
org.apache.hadoop.hbase.exceptions.DoNotRetryIOException {
|
||||
Pair<Mutation, Integer>[] mutateWithLocks = new Pair[] {
|
||||
new Pair<Mutation, Integer>(mutation, lid)
|
||||
};
|
||||
OperationStatus[] batchMutate = this.batchMutate(mutateWithLocks);
|
||||
OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation });
|
||||
if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
|
||||
throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
|
||||
} else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
|
||||
|
@ -2621,7 +2589,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
Put p = new Put(row);
|
||||
p.setFamilyMap(familyMap);
|
||||
p.setClusterId(HConstants.DEFAULT_CLUSTER_ID);
|
||||
doBatchMutate(p, null);
|
||||
doBatchMutate(p);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2672,7 +2640,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* called when a Put/Delete has updated memstore but subequently fails to update
|
||||
* the wal. This method is then invoked to rollback the memstore.
|
||||
*/
|
||||
private void rollbackMemstore(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp,
|
||||
private void rollbackMemstore(BatchOperationInProgress<Mutation> batchOp,
|
||||
Map<byte[], List<? extends Cell>>[] familyMaps,
|
||||
int start, int end) {
|
||||
int kvsRolledback = 0;
|
||||
|
@ -3182,65 +3150,42 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
/**
|
||||
* Obtain a lock on the given row. Blocks until success.
|
||||
*
|
||||
* I know it's strange to have two mappings:
|
||||
* <pre>
|
||||
* ROWS ==> LOCKS
|
||||
* </pre>
|
||||
* as well as
|
||||
* <pre>
|
||||
* LOCKS ==> ROWS
|
||||
* </pre>
|
||||
* <p>It would be more memory-efficient to just have one mapping;
|
||||
* maybe we'll do that in the future.
|
||||
*
|
||||
* @param row Name of row to lock.
|
||||
* @throws IOException
|
||||
* @return The id of the held lock.
|
||||
*/
|
||||
public Integer obtainRowLock(final byte [] row) throws IOException {
|
||||
startRegionOperation();
|
||||
this.writeRequestsCount.increment();
|
||||
try {
|
||||
return internalObtainRowLock(row, true);
|
||||
} finally {
|
||||
closeRegionOperation();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtains or tries to obtain the given row lock.
|
||||
* Tries to acquire a lock on the given row.
|
||||
* @param waitForLock if true, will block until the lock is available.
|
||||
* Otherwise, just tries to obtain the lock and returns
|
||||
* null if unavailable.
|
||||
* false if unavailable.
|
||||
* @return the row lock if acquired,
|
||||
* null if waitForLock was false and the lock was not acquired
|
||||
* @throws IOException if waitForLock was true and the lock could not be acquired after waiting
|
||||
*/
|
||||
private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
|
||||
throws IOException {
|
||||
public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
|
||||
checkRow(row, "row lock");
|
||||
startRegionOperation();
|
||||
try {
|
||||
HashedBytes rowKey = new HashedBytes(row);
|
||||
CountDownLatch rowLatch = new CountDownLatch(1);
|
||||
RowLockContext rowLockContext = new RowLockContext(rowKey);
|
||||
|
||||
// loop until we acquire the row lock (unless !waitForLock)
|
||||
while (true) {
|
||||
CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch);
|
||||
if (existingLatch == null) {
|
||||
RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
|
||||
if (existingContext == null) {
|
||||
// Row is not already locked by any thread, use newly created context.
|
||||
break;
|
||||
} else if (existingContext.ownedByCurrentThread()) {
|
||||
// Row is already locked by current thread, reuse existing context instead.
|
||||
rowLockContext = existingContext;
|
||||
break;
|
||||
} else {
|
||||
// row already locked
|
||||
// Row is already locked by some other thread, give up or wait for it
|
||||
if (!waitForLock) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
if (!existingLatch.await(this.rowLockWaitDuration,
|
||||
TimeUnit.MILLISECONDS)) {
|
||||
throw new IOException("Timed out on getting lock for row="
|
||||
+ Bytes.toStringBinary(row));
|
||||
if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
|
||||
throw new IOException("Timed out waiting for lock for row: " + rowKey);
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("internalObtainRowLock interrupted for row=" + Bytes.toStringBinary(row));
|
||||
LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
|
||||
InterruptedIOException iie = new InterruptedIOException();
|
||||
iie.initCause(ie);
|
||||
throw iie;
|
||||
|
@ -3248,72 +3193,33 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
}
|
||||
|
||||
// loop until we generate an unused lock id
|
||||
while (true) {
|
||||
Integer lockId = lockIdGenerator.incrementAndGet();
|
||||
HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey);
|
||||
if (existingRowKey == null) {
|
||||
return lockId;
|
||||
} else {
|
||||
// lockId already in use, jump generator to a new spot
|
||||
lockIdGenerator.set(rand.nextInt());
|
||||
}
|
||||
}
|
||||
// allocate new lock for this thread
|
||||
return rowLockContext.newLock();
|
||||
} finally {
|
||||
closeRegionOperation();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Release the row lock!
|
||||
* @param lockId The lock ID to release.
|
||||
* Acqures a lock on the given row.
|
||||
* The same thread may acquire multiple locks on the same row.
|
||||
* @return the acquired row lock
|
||||
* @throws IOException if the lock could not be acquired after waiting
|
||||
*/
|
||||
public void releaseRowLock(final Integer lockId) {
|
||||
if (lockId == null) return; // null lock id, do nothing
|
||||
HashedBytes rowKey = lockIds.remove(lockId);
|
||||
if (rowKey == null) {
|
||||
LOG.warn("Release unknown lockId: " + lockId);
|
||||
return;
|
||||
}
|
||||
CountDownLatch rowLatch = lockedRows.remove(rowKey);
|
||||
if (rowLatch == null) {
|
||||
LOG.error("Releases row not locked, lockId: " + lockId + " row: "
|
||||
+ rowKey);
|
||||
return;
|
||||
}
|
||||
rowLatch.countDown();
|
||||
public RowLock getRowLock(byte[] row) throws IOException {
|
||||
return getRowLock(row, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* See if row is currently locked.
|
||||
* @param lockId
|
||||
* @return boolean
|
||||
* If the given list of row locks is not null, releases all locks.
|
||||
*/
|
||||
boolean isRowLocked(final Integer lockId) {
|
||||
return lockIds.containsKey(lockId);
|
||||
public void releaseRowLocks(List<RowLock> rowLocks) {
|
||||
if (rowLocks != null) {
|
||||
for (RowLock rowLock : rowLocks) {
|
||||
rowLock.release();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns existing row lock if found, otherwise
|
||||
* obtains a new row lock and returns it.
|
||||
* @param lockid requested by the user, or null if the user didn't already hold lock
|
||||
* @param row the row to lock
|
||||
* @param waitForLock if true, will block until the lock is available, otherwise will
|
||||
* simply return null if it could not acquire the lock.
|
||||
* @return lockid or null if waitForLock is false and the lock was unavailable.
|
||||
*/
|
||||
public Integer getLock(Integer lockid, byte [] row, boolean waitForLock)
|
||||
throws IOException {
|
||||
Integer lid = null;
|
||||
if (lockid == null) {
|
||||
lid = internalObtainRowLock(row, waitForLock);
|
||||
} else {
|
||||
if (!isRowLocked(lockid)) {
|
||||
throw new IOException("Invalid row lock");
|
||||
rowLocks.clear();
|
||||
}
|
||||
lid = lockid;
|
||||
}
|
||||
return lid;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -4583,24 +4489,19 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
MultiVersionConsistencyControl.WriteEntry writeEntry = null;
|
||||
boolean locked = false;
|
||||
boolean walSyncSuccessful = false;
|
||||
List<Integer> acquiredLocks = null;
|
||||
List<RowLock> acquiredRowLocks = null;
|
||||
long addedSize = 0;
|
||||
List<KeyValue> mutations = new ArrayList<KeyValue>();
|
||||
Collection<byte[]> rowsToLock = processor.getRowsToLock();
|
||||
try {
|
||||
// 2. Acquire the row lock(s)
|
||||
acquiredLocks = new ArrayList<Integer>(rowsToLock.size());
|
||||
acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
|
||||
for (byte[] row : rowsToLock) {
|
||||
// Attempt to lock all involved rows, fail if one lock times out
|
||||
Integer lid = getLock(null, row, true);
|
||||
if (lid == null) {
|
||||
throw new IOException("Failed to acquire lock on "
|
||||
+ Bytes.toStringBinary(row));
|
||||
}
|
||||
acquiredLocks.add(lid);
|
||||
// Attempt to lock all involved rows, throw if any lock times out
|
||||
acquiredRowLocks.add(getRowLock(row));
|
||||
}
|
||||
// 3. Region lock
|
||||
lock(this.updatesLock.readLock(), acquiredLocks.size());
|
||||
lock(this.updatesLock.readLock(), acquiredRowLocks.size());
|
||||
locked = true;
|
||||
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
|
@ -4635,12 +4536,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
// 9. Release row lock(s)
|
||||
if (acquiredLocks != null) {
|
||||
for (Integer lid : acquiredLocks) {
|
||||
releaseRowLock(lid);
|
||||
}
|
||||
acquiredLocks = null;
|
||||
}
|
||||
releaseRowLocks(acquiredRowLocks);
|
||||
|
||||
// 10. Sync edit log
|
||||
if (txid != 0) {
|
||||
syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
|
||||
|
@ -4665,12 +4562,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
this.updatesLock.readLock().unlock();
|
||||
locked = false;
|
||||
}
|
||||
if (acquiredLocks != null) {
|
||||
for (Integer lid : acquiredLocks) {
|
||||
releaseRowLock(lid);
|
||||
}
|
||||
}
|
||||
|
||||
// release locks if some were acquired but another timed out
|
||||
releaseRowLocks(acquiredRowLocks);
|
||||
}
|
||||
|
||||
// 12. Run post-process hook
|
||||
|
@ -4765,8 +4658,10 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
startRegionOperation(Operation.APPEND);
|
||||
this.writeRequestsCount.increment();
|
||||
WriteEntry w = null;
|
||||
RowLock rowLock = null;
|
||||
try {
|
||||
rowLock = getRowLock(row);
|
||||
try {
|
||||
Integer lid = getLock(null, row, true);
|
||||
lock(this.updatesLock.readLock());
|
||||
// wait for all prior MVCC transactions to finish - while we hold the row lock
|
||||
// (so that we are guaranteed to see the latest state)
|
||||
|
@ -4883,7 +4778,9 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
flush = isFlushSize(size);
|
||||
} finally {
|
||||
this.updatesLock.readLock().unlock();
|
||||
releaseRowLock(lid);
|
||||
}
|
||||
} finally {
|
||||
rowLock.release();
|
||||
}
|
||||
if (writeToWAL) {
|
||||
// sync the transaction log outside the rowlock
|
||||
|
@ -4936,7 +4833,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
this.writeRequestsCount.increment();
|
||||
WriteEntry w = null;
|
||||
try {
|
||||
Integer lid = getLock(null, row, true);
|
||||
RowLock rowLock = getRowLock(row);
|
||||
try {
|
||||
lock(this.updatesLock.readLock());
|
||||
// wait for all prior MVCC transactions to finish - while we hold the row lock
|
||||
// (so that we are guaranteed to see the latest state)
|
||||
|
@ -5029,7 +4927,9 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
flush = isFlushSize(size);
|
||||
} finally {
|
||||
this.updatesLock.readLock().unlock();
|
||||
releaseRowLock(lid);
|
||||
}
|
||||
} finally {
|
||||
rowLock.release();
|
||||
}
|
||||
if (writeToWAL) {
|
||||
// sync the transaction log outside the rowlock
|
||||
|
@ -5069,22 +4969,32 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
public static final long FIXED_OVERHEAD = ClassSize.align(
|
||||
ClassSize.OBJECT +
|
||||
ClassSize.ARRAY +
|
||||
39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
|
||||
(12 * Bytes.SIZEOF_LONG) +
|
||||
2 * Bytes.SIZEOF_BOOLEAN);
|
||||
38 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
|
||||
(11 * Bytes.SIZEOF_LONG) +
|
||||
4 * Bytes.SIZEOF_BOOLEAN);
|
||||
|
||||
// woefully out of date - currently missing:
|
||||
// 1 x HashMap - coprocessorServiceHandlers
|
||||
// 6 org.cliffc.high_scale_lib.Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL,
|
||||
// checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount,
|
||||
// writeRequestsCount, updatesBlockedMs
|
||||
// 1 x HRegion$WriteState - writestate
|
||||
// 1 x RegionCoprocessorHost - coprocessorHost
|
||||
// 1 x RegionSplitPolicy - splitPolicy
|
||||
// 1 x MetricsRegion - metricsRegion
|
||||
// 1 x MetricsRegionWrapperImpl - metricsRegionWrapper
|
||||
public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
|
||||
ClassSize.OBJECT + // closeLock
|
||||
(2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
|
||||
(3 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL
|
||||
ClassSize.ATOMIC_INTEGER + // lockIdGenerator
|
||||
(3 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds, scannerReadPoints
|
||||
(2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, scannerReadPoints
|
||||
WriteState.HEAP_SIZE + // writestate
|
||||
ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
|
||||
(2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
|
||||
ClassSize.ARRAYLIST + // recentFlushes
|
||||
MultiVersionConsistencyControl.FIXED_SIZE // mvcc
|
||||
+ ClassSize.TREEMAP // maxSeqIdInStores
|
||||
+ 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
|
||||
;
|
||||
|
||||
@Override
|
||||
|
@ -5093,7 +5003,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
for (Store store : this.stores.values()) {
|
||||
heapSize += store.heapSize();
|
||||
}
|
||||
// this does not take into account row locks, recent flushes, mvcc entries
|
||||
// this does not take into account row locks, recent flushes, mvcc entries, and more
|
||||
return heapSize;
|
||||
}
|
||||
|
||||
|
@ -5657,4 +5567,68 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
*/
|
||||
void failedBulkLoad(byte[] family, String srcPath) throws IOException;
|
||||
}
|
||||
|
||||
@VisibleForTesting class RowLockContext {
|
||||
private final HashedBytes row;
|
||||
private final CountDownLatch latch = new CountDownLatch(1);
|
||||
private final Thread thread;
|
||||
private int lockCount = 0;
|
||||
|
||||
RowLockContext(HashedBytes row) {
|
||||
this.row = row;
|
||||
this.thread = Thread.currentThread();
|
||||
}
|
||||
|
||||
boolean ownedByCurrentThread() {
|
||||
return thread == Thread.currentThread();
|
||||
}
|
||||
|
||||
RowLock newLock() {
|
||||
lockCount++;
|
||||
return new RowLock(this);
|
||||
}
|
||||
|
||||
void releaseLock() {
|
||||
if (!ownedByCurrentThread()) {
|
||||
throw new IllegalArgumentException("Lock held by thread: " + thread
|
||||
+ " cannot be released by different thread: " + Thread.currentThread());
|
||||
}
|
||||
lockCount--;
|
||||
if (lockCount == 0) {
|
||||
// no remaining locks by the thread, unlock and allow other threads to access
|
||||
RowLockContext existingContext = lockedRows.remove(row);
|
||||
if (existingContext != this) {
|
||||
throw new RuntimeException(
|
||||
"Internal row lock state inconsistent, should not happen, row: " + row);
|
||||
}
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Row lock held by a given thread.
|
||||
* One thread may acquire multiple locks on the same row simultaneously.
|
||||
* The locks must be released by calling release() from the same thread.
|
||||
*/
|
||||
public class RowLock {
|
||||
@VisibleForTesting final RowLockContext context;
|
||||
private boolean released = false;
|
||||
|
||||
@VisibleForTesting RowLock(RowLockContext context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
/**
|
||||
* Release the given lock. If there are no remaining locks held by the current thread
|
||||
* then unlock the row and allow other threads to acquire the lock.
|
||||
* @throws IllegalArgumentException if called by a different thread than the lock owning thread
|
||||
*/
|
||||
public void release() {
|
||||
if (!released) {
|
||||
context.releaseLock();
|
||||
released = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3956,8 +3956,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
*/
|
||||
protected void doBatchOp(final MultiResponse.Builder builder, final HRegion region,
|
||||
final List<MutationProto> mutations, final CellScanner cells, boolean isReplay) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Pair<Mutation, Integer>[] mutationsWithLocks = new Pair[mutations.size()];
|
||||
Mutation[] mArray = new Mutation[mutations.size()];
|
||||
long before = EnvironmentEdgeManager.currentTimeMillis();
|
||||
boolean batchContainsPuts = false, batchContainsDelete = false;
|
||||
try {
|
||||
|
@ -3974,7 +3973,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
mutation = ProtobufUtil.toDelete(m, cells);
|
||||
batchContainsDelete = true;
|
||||
}
|
||||
mutationsWithLocks[i++] = new Pair<Mutation, Integer>(mutation, null);
|
||||
mArray[i++] = mutation;
|
||||
builder.addResult(result);
|
||||
}
|
||||
|
||||
|
@ -3983,7 +3982,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
cacheFlusher.reclaimMemStoreMemory();
|
||||
}
|
||||
|
||||
OperationStatus codes[] = region.batchMutate(mutationsWithLocks, isReplay);
|
||||
OperationStatus codes[] = region.batchMutate(mArray);
|
||||
for (i = 0; i < codes.length; i++) {
|
||||
switch (codes[i].getOperationStatusCode()) {
|
||||
case BAD_FAMILY:
|
||||
|
|
|
@ -993,7 +993,7 @@ public class RegionCoprocessorHost
|
|||
* @throws IOException
|
||||
*/
|
||||
public boolean preBatchMutate(
|
||||
final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
|
||||
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
|
||||
boolean bypass = false;
|
||||
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
|
||||
for (RegionEnvironment env : coprocessors) {
|
||||
|
@ -1018,7 +1018,7 @@ public class RegionCoprocessorHost
|
|||
* @throws IOException
|
||||
*/
|
||||
public void postBatchMutate(
|
||||
final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
|
||||
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
|
||||
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
|
||||
for (RegionEnvironment env : coprocessors) {
|
||||
if (env.getInstance() instanceof RegionObserver) {
|
||||
|
|
|
@ -1061,7 +1061,7 @@ public class HBaseFsck extends Configured implements Tool {
|
|||
"You may need to restore the previously sidelined .META.");
|
||||
return false;
|
||||
}
|
||||
meta.put(puts.toArray(new Put[0]));
|
||||
meta.batchMutate(puts.toArray(new Put[0]));
|
||||
HRegion.closeHRegion(meta);
|
||||
LOG.info("Success! .META. table rebuilt.");
|
||||
LOG.info("Old .META. is moved into " + backupDir);
|
||||
|
|
|
@ -407,7 +407,7 @@ public class SimpleRegionObserver extends BaseRegionObserver {
|
|||
|
||||
@Override
|
||||
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
|
||||
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
|
||||
RegionCoprocessorEnvironment e = c.getEnvironment();
|
||||
assertNotNull(e);
|
||||
assertNotNull(e.getRegion());
|
||||
|
@ -417,7 +417,7 @@ public class SimpleRegionObserver extends BaseRegionObserver {
|
|||
|
||||
@Override
|
||||
public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
|
||||
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
|
||||
RegionCoprocessorEnvironment e = c.getEnvironment();
|
||||
assertNotNull(e);
|
||||
assertNotNull(e.getRegion());
|
||||
|
|
|
@ -59,11 +59,8 @@ import org.apache.hadoop.hbase.io.HeapSize;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
|
||||
/**
|
||||
* Testing of HRegion.incrementColumnValue, HRegion.increment,
|
||||
|
@ -528,16 +525,12 @@ public class TestAtomicOperation extends HBaseTestCase {
|
|||
final MockHRegion region = (MockHRegion) TestHRegion.initHRegion(
|
||||
Bytes.toBytes(tableName), tableName, conf, Bytes.toBytes(family));
|
||||
|
||||
List<Pair<Mutation, Integer>> putsAndLocks = Lists.newArrayList();
|
||||
Put[] puts = new Put[1];
|
||||
Put put = new Put(Bytes.toBytes("r1"));
|
||||
put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
|
||||
puts[0] = put;
|
||||
Pair<Mutation, Integer> pair = new Pair<Mutation, Integer>(puts[0], null);
|
||||
|
||||
putsAndLocks.add(pair);
|
||||
|
||||
region.batchMutate(putsAndLocks.toArray(new Pair[0]));
|
||||
region.batchMutate(puts);
|
||||
MultithreadedTestUtil.TestContext ctx =
|
||||
new MultithreadedTestUtil.TestContext(conf);
|
||||
ctx.addThread(new PutThread(ctx, region));
|
||||
|
@ -565,15 +558,12 @@ public class TestAtomicOperation extends HBaseTestCase {
|
|||
}
|
||||
|
||||
public void doWork() throws Exception {
|
||||
List<Pair<Mutation, Integer>> putsAndLocks = Lists.newArrayList();
|
||||
Put[] puts = new Put[1];
|
||||
Put put = new Put(Bytes.toBytes("r1"));
|
||||
put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
|
||||
puts[0] = put;
|
||||
Pair<Mutation, Integer> pair = new Pair<Mutation, Integer>(puts[0], null);
|
||||
putsAndLocks.add(pair);
|
||||
testStep = TestStep.PUT_STARTED;
|
||||
region.batchMutate(putsAndLocks.toArray(new Pair[0]));
|
||||
region.batchMutate(puts);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -607,16 +597,30 @@ public class TestAtomicOperation extends HBaseTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void releaseRowLock(Integer lockId) {
|
||||
public RowLock getRowLock(final byte[] row, boolean waitForLock) throws IOException {
|
||||
if (testStep == TestStep.CHECKANDPUT_STARTED) {
|
||||
latch.countDown();
|
||||
}
|
||||
return new WrappedRowLock(super.getRowLock(row, waitForLock));
|
||||
}
|
||||
|
||||
public class WrappedRowLock extends RowLock {
|
||||
|
||||
private WrappedRowLock(RowLock rowLock) {
|
||||
super(rowLock.context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void release() {
|
||||
if (testStep == TestStep.INIT) {
|
||||
super.releaseRowLock(lockId);
|
||||
super.release();
|
||||
return;
|
||||
}
|
||||
|
||||
if (testStep == TestStep.PUT_STARTED) {
|
||||
try {
|
||||
testStep = TestStep.PUT_COMPLETED;
|
||||
super.releaseRowLock(lockId);
|
||||
super.release();
|
||||
// put has been written to the memstore and the row lock has been released, but the
|
||||
// MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of
|
||||
// operations would cause the non-atomicity to show up:
|
||||
|
@ -634,16 +638,9 @@ public class TestAtomicOperation extends HBaseTestCase {
|
|||
}
|
||||
}
|
||||
else if (testStep == TestStep.CHECKANDPUT_STARTED) {
|
||||
super.releaseRowLock(lockId);
|
||||
super.release();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer getLock(Integer lockid, byte[] row, boolean waitForLock) throws IOException {
|
||||
if (testStep == TestStep.CHECKANDPUT_STARTED) {
|
||||
latch.countDown();
|
||||
}
|
||||
return super.getLock(lockid, row, waitForLock);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -72,7 +72,6 @@ import org.apache.hadoop.hbase.client.Durability;
|
|||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
|
@ -96,6 +95,7 @@ import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
|||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||
|
@ -107,7 +107,6 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.Assert;
|
||||
|
@ -764,7 +763,6 @@ public class TestHRegion extends HBaseTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testBatchPut() throws Exception {
|
||||
byte[] b = Bytes.toBytes(getName());
|
||||
byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
|
||||
|
@ -783,7 +781,7 @@ public class TestHRegion extends HBaseTestCase {
|
|||
puts[i].add(cf, qual, val);
|
||||
}
|
||||
|
||||
OperationStatus[] codes = this.region.put(puts);
|
||||
OperationStatus[] codes = this.region.batchMutate(puts);
|
||||
assertEquals(10, codes.length);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertEquals(OperationStatusCode.SUCCESS, codes[i]
|
||||
|
@ -794,7 +792,7 @@ public class TestHRegion extends HBaseTestCase {
|
|||
|
||||
LOG.info("Next a batch put with one invalid family");
|
||||
puts[5].add(Bytes.toBytes("BAD_CF"), qual, val);
|
||||
codes = this.region.put(puts);
|
||||
codes = this.region.batchMutate(puts);
|
||||
assertEquals(10, codes.length);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY :
|
||||
|
@ -804,7 +802,7 @@ public class TestHRegion extends HBaseTestCase {
|
|||
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);
|
||||
|
||||
LOG.info("Next a batch put that has to break into two batches to avoid a lock");
|
||||
Integer lockedRow = region.obtainRowLock(Bytes.toBytes("row_2"));
|
||||
RowLock rowLock = region.getRowLock(Bytes.toBytes("row_2"));
|
||||
|
||||
MultithreadedTestUtil.TestContext ctx =
|
||||
new MultithreadedTestUtil.TestContext(conf);
|
||||
|
@ -813,7 +811,7 @@ public class TestHRegion extends HBaseTestCase {
|
|||
TestThread putter = new TestThread(ctx) {
|
||||
@Override
|
||||
public void doWork() throws IOException {
|
||||
retFromThread.set(region.put(puts));
|
||||
retFromThread.set(region.batchMutate(puts));
|
||||
}
|
||||
};
|
||||
LOG.info("...starting put thread while holding lock");
|
||||
|
@ -829,7 +827,7 @@ public class TestHRegion extends HBaseTestCase {
|
|||
}
|
||||
}
|
||||
LOG.info("...releasing row lock, which should let put thread continue");
|
||||
region.releaseRowLock(lockedRow);
|
||||
rowLock.release();
|
||||
LOG.info("...joining on thread");
|
||||
ctx.stop();
|
||||
LOG.info("...checking that next batch was synced");
|
||||
|
@ -840,29 +838,6 @@ public class TestHRegion extends HBaseTestCase {
|
|||
OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
|
||||
}
|
||||
|
||||
LOG.info("Nexta, a batch put which uses an already-held lock");
|
||||
lockedRow = region.obtainRowLock(Bytes.toBytes("row_2"));
|
||||
LOG.info("...obtained row lock");
|
||||
List<Pair<Mutation, Integer>> putsAndLocks = Lists.newArrayList();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Pair<Mutation, Integer> pair = new Pair<Mutation, Integer>(puts[i], null);
|
||||
if (i == 2) pair.setSecond(lockedRow);
|
||||
putsAndLocks.add(pair);
|
||||
}
|
||||
|
||||
codes = region.batchMutate(putsAndLocks.toArray(new Pair[0]));
|
||||
LOG.info("...performed put");
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY :
|
||||
OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
|
||||
}
|
||||
// Make sure we didn't do an extra batch
|
||||
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 5, source);
|
||||
|
||||
// Make sure we still hold lock
|
||||
assertTrue(region.isRowLocked(lockedRow));
|
||||
LOG.info("...releasing lock");
|
||||
region.releaseRowLock(lockedRow);
|
||||
} finally {
|
||||
HRegion.closeHRegion(this.region);
|
||||
this.region = null;
|
||||
|
@ -891,7 +866,7 @@ public class TestHRegion extends HBaseTestCase {
|
|||
puts[i].add(cf, qual, val);
|
||||
}
|
||||
|
||||
OperationStatus[] codes = this.region.put(puts);
|
||||
OperationStatus[] codes = this.region.batchMutate(puts);
|
||||
assertEquals(10, codes.length);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i]
|
||||
|
|
|
@ -233,7 +233,7 @@ public class TestParallelPut extends HBaseTestCase {
|
|||
put.add(fam1, qual1, value);
|
||||
in[0] = put;
|
||||
try {
|
||||
OperationStatus[] ret = region.put(in);
|
||||
OperationStatus[] ret = region.batchMutate(in);
|
||||
assertEquals(1, ret.length);
|
||||
assertEquals(OperationStatusCode.SUCCESS, ret[0].getOperationStatusCode());
|
||||
assertGet(rowkey, fam1, qual1, value);
|
||||
|
|
Loading…
Reference in New Issue