HBASE-3894 Thread contention over row locks set monitor

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1127374 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-05-25 04:18:12 +00:00
parent 06b8461cc5
commit 85a3cf14f9
4 changed files with 151 additions and 72 deletions

View File

@ -112,6 +112,7 @@ Release 0.91.0 - Unreleased
HBASE-3912 [Stargate] Columns not handle by Scan HBASE-3912 [Stargate] Columns not handle by Scan
HBASE-3903 A successful write to client write-buffer may be lost or not HBASE-3903 A successful write to client write-buffer may be lost or not
visible (Doug Meil) visible (Doug Meil)
HBASE-3894 Thread contention over row locks set monitor (Dave Latham)
IMPROVEMENTS IMPROVEMENTS
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack) HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2010 The Apache Software Foundation * Copyright 2011 The Apache Software Foundation
* *
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
@ -38,11 +38,12 @@ import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -89,6 +90,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.HashedBytes;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
@ -159,11 +161,11 @@ public class HRegion implements HeapSize { // , Writable{
// Members // Members
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
private final Set<byte[]> lockedRows = private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows =
new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); new ConcurrentHashMap<HashedBytes, CountDownLatch>();
private final Map<Integer, byte []> lockIds = private final ConcurrentHashMap<Integer, HashedBytes> lockIds =
new HashMap<Integer, byte []>(); new ConcurrentHashMap<Integer, HashedBytes>();
private int lockIdGenerator = 1; private final AtomicInteger lockIdGenerator = new AtomicInteger(1);
static private Random rand = new Random(); static private Random rand = new Random();
protected final Map<byte [], Store> stores = protected final Map<byte [], Store> stores =
@ -197,8 +199,6 @@ public class HRegion implements HeapSize { // , Writable{
final Path regiondir; final Path regiondir;
KeyValue.KVComparator comparator; KeyValue.KVComparator comparator;
private Pair<Long,Long> lastCompactInfo = null;
/* /*
* Data structure of write state flags used coordinating flushes, * Data structure of write state flags used coordinating flushes,
* compactions and closes. * compactions and closes.
@ -232,6 +232,9 @@ public class HRegion implements HeapSize { // , Writable{
boolean isFlushRequested() { boolean isFlushRequested() {
return this.flushRequested; return this.flushRequested;
} }
static final long HEAP_SIZE = ClassSize.align(
ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
} }
final WriteState writestate = new WriteState(); final WriteState writestate = new WriteState();
@ -2369,37 +2372,37 @@ public class HRegion implements HeapSize { // , Writable{
checkRow(row); checkRow(row);
startRegionOperation(); startRegionOperation();
try { try {
synchronized (lockedRows) { HashedBytes rowKey = new HashedBytes(row);
while (lockedRows.contains(row)) { CountDownLatch rowLatch = new CountDownLatch(1);
// loop until we acquire the row lock (unless !waitForLock)
while (true) {
CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch);
if (existingLatch == null) {
break;
} else {
// row already locked
if (!waitForLock) { if (!waitForLock) {
return null; return null;
} }
try { try {
lockedRows.wait(); existingLatch.await();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
// Empty // Empty
} }
} }
// generate a new lockid. Attempt to insert the new [lockid, row].
// if this lockid already exists in the map then revert and retry
// We could have first done a lockIds.get, and if it does not exist only
// then do a lockIds.put, but the hope is that the lockIds.put will
// mostly return null the first time itself because there won't be
// too many lockId collisions.
byte [] prev = null;
Integer lockId = null;
do {
lockId = new Integer(lockIdGenerator++);
prev = lockIds.put(lockId, row);
if (prev != null) {
lockIds.put(lockId, prev); // revert old value
lockIdGenerator = rand.nextInt(); // generate new start point
} }
} while (prev != null);
lockedRows.add(row); // loop until we generate an unused lock id
lockedRows.notifyAll(); while (true) {
Integer lockId = lockIdGenerator.incrementAndGet();
HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey);
if (existingRowKey == null) {
return lockId; return lockId;
} else {
// lockId already in use, jump generator to a new spot
lockIdGenerator.set(rand.nextInt());
}
} }
} finally { } finally {
closeRegionOperation(); closeRegionOperation();
@ -2412,21 +2415,27 @@ public class HRegion implements HeapSize { // , Writable{
* @return Row that goes with <code>lockid</code> * @return Row that goes with <code>lockid</code>
*/ */
byte[] getRowFromLock(final Integer lockid) { byte[] getRowFromLock(final Integer lockid) {
synchronized (lockedRows) { HashedBytes rowKey = lockIds.get(lockid);
return lockIds.get(lockid); return rowKey == null ? null : rowKey.getBytes();
}
} }
/** /**
* Release the row lock! * Release the row lock!
* @param lockid The lock ID to release. * @param lockid The lock ID to release.
*/ */
public void releaseRowLock(final Integer lockid) { public void releaseRowLock(final Integer lockId) {
synchronized (lockedRows) { HashedBytes rowKey = lockIds.remove(lockId);
byte[] row = lockIds.remove(lockid); if (rowKey == null) {
lockedRows.remove(row); LOG.warn("Release unknown lockId: " + lockId);
lockedRows.notifyAll(); return;
} }
CountDownLatch rowLatch = lockedRows.remove(rowKey);
if (rowLatch == null) {
LOG.error("Releases row not locked, lockId: " + lockId + " row: "
+ rowKey);
return;
}
rowLatch.countDown();
} }
/** /**
@ -2434,13 +2443,8 @@ public class HRegion implements HeapSize { // , Writable{
* @param lockid * @param lockid
* @return boolean * @return boolean
*/ */
boolean isRowLocked(final Integer lockid) { boolean isRowLocked(final Integer lockId) {
synchronized (lockedRows) { return lockIds.containsKey(lockId);
if (lockIds.get(lockid) != null) {
return true;
}
return false;
}
} }
/** /**
@ -2567,6 +2571,7 @@ public class HRegion implements HeapSize { // , Writable{
} }
} }
@Override
public synchronized boolean next(List<KeyValue> outResults, int limit) public synchronized boolean next(List<KeyValue> outResults, int limit)
throws IOException { throws IOException {
if (this.filterClosed) { if (this.filterClosed) {
@ -2596,6 +2601,7 @@ public class HRegion implements HeapSize { // , Writable{
} }
} }
@Override
public synchronized boolean next(List<KeyValue> outResults) public synchronized boolean next(List<KeyValue> outResults)
throws IOException { throws IOException {
// apply the batching limit by default // apply the batching limit by default
@ -2692,6 +2698,7 @@ public class HRegion implements HeapSize { // , Writable{
currentRow, 0, currentRow.length) <= isScan); currentRow, 0, currentRow.length) <= isScan);
} }
@Override
public synchronized void close() { public synchronized void close() {
if (storeHeap != null) { if (storeHeap != null) {
storeHeap.close(); storeHeap.close();
@ -3493,30 +3500,32 @@ public class HRegion implements HeapSize { // , Writable{
} }
public static final long FIXED_OVERHEAD = ClassSize.align( public static final long FIXED_OVERHEAD = ClassSize.align(
(4 * Bytes.SIZEOF_LONG) + ClassSize.ARRAY + ClassSize.OBJECT + // this
ClassSize.align(26 * ClassSize.REFERENCE) + ClassSize.OBJECT + (4 * Bytes.SIZEOF_LONG) + // memstoreFlushSize, lastFlushTime, blockingMemStoreSize, threadWakeFrequency
ClassSize.align(Bytes.SIZEOF_INT)); Bytes.SIZEOF_BOOLEAN + // splitRequest
ClassSize.ARRAY + // splitPoint
(26 * ClassSize.REFERENCE));
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
(ClassSize.OBJECT * 2) + (2 * ClassSize.ATOMIC_BOOLEAN) + ClassSize.OBJECT + // closeLock
ClassSize.ATOMIC_LONG + ClassSize.ATOMIC_INTEGER + (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
ClassSize.ATOMIC_LONG + // memStoreSize
// Using TreeMap for TreeSet ClassSize.ATOMIC_INTEGER + // lockIdGenerator
ClassSize.TREEMAP + (2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds
WriteState.HEAP_SIZE + // writestate
// Using TreeMap for HashMap ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
ClassSize.TREEMAP + (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
ClassSize.ARRAYLIST + // recentFlushes
ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ReadWriteConsistencyControl.FIXED_SIZE // rwcc
ClassSize.align(ClassSize.OBJECT + ;
(5 * Bytes.SIZEOF_BOOLEAN)) +
(3 * ClassSize.REENTRANT_LOCK));
@Override
public long heapSize() { public long heapSize() {
long heapSize = DEEP_OVERHEAD; long heapSize = DEEP_OVERHEAD;
for(Store store : this.stores.values()) { for(Store store : this.stores.values()) {
heapSize += store.heapSize(); heapSize += store.heapSize();
} }
// this does not take into account row locks, recent flushes, rwcc entries
return heapSize; return heapSize;
} }

View File

@ -20,7 +20,9 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
/** /**
* Manages the read/write consistency within memstore. This provides * Manages the read/write consistency within memstore. This provides
@ -158,4 +160,10 @@ public class ReadWriteConsistencyControl {
return this.writeNumber; return this.writeNumber;
} }
} }
public static final long FIXED_SIZE = ClassSize.align(
ClassSize.OBJECT +
2 * Bytes.SIZEOF_LONG +
2 * ClassSize.REFERENCE);
} }

View File

@ -0,0 +1,61 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.util.Arrays;
/**
* This class encapsulates a byte array and overrides hashCode and equals so
* that it's identity is based on the data rather than the array instance.
*/
public class HashedBytes {
private final byte[] bytes;
private final int hashCode;
public HashedBytes(byte[] bytes) {
this.bytes = bytes;
hashCode = Bytes.hashCode(bytes);
}
public byte[] getBytes() {
return bytes;
}
@Override
public int hashCode() {
return hashCode;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null || getClass() != obj.getClass())
return false;
HashedBytes other = (HashedBytes) obj;
return Arrays.equals(bytes, other.bytes);
}
@Override
public String toString() {
return Bytes.toStringBinary(bytes);
}
}