diff --git a/CHANGES.txt b/CHANGES.txt index 5233cca1b8b..439570841a2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -112,6 +112,7 @@ Release 0.91.0 - Unreleased HBASE-3912 [Stargate] Columns not handle by Scan HBASE-3903 A successful write to client write-buffer may be lost or not visible (Doug Meil) + HBASE-3894 Thread contention over row locks set monitor (Dave Latham) IMPROVEMENTS HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack) diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c2e51b89977..3e7d1da7e05 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1,5 +1,5 @@ /* - * Copyright 2010 The Apache Software Foundation + * Copyright 2011 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -38,11 +38,12 @@ import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; import java.util.Random; -import java.util.Set; import java.util.TreeMap; -import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -89,6 +90,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.ClassSize; @@ -159,11 +161,11 @@ public class HRegion implements HeapSize { // , Writable{ // Members ////////////////////////////////////////////////////////////////////////////// - private final Set lockedRows = - new TreeSet(Bytes.BYTES_COMPARATOR); - private final Map lockIds = - new HashMap(); - private int lockIdGenerator = 1; + private final ConcurrentHashMap lockedRows = + new ConcurrentHashMap(); + private final ConcurrentHashMap lockIds = + new ConcurrentHashMap(); + private final AtomicInteger lockIdGenerator = new AtomicInteger(1); static private Random rand = new Random(); protected final Map stores = @@ -197,8 +199,6 @@ public class HRegion implements HeapSize { // , Writable{ final Path regiondir; KeyValue.KVComparator comparator; - private Pair lastCompactInfo = null; - /* * Data structure of write state flags used coordinating flushes, * compactions and closes. @@ -232,6 +232,9 @@ public class HRegion implements HeapSize { // , Writable{ boolean isFlushRequested() { return this.flushRequested; } + + static final long HEAP_SIZE = ClassSize.align( + ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN); } final WriteState writestate = new WriteState(); @@ -2365,41 +2368,41 @@ public class HRegion implements HeapSize { // , Writable{ * null if unavailable. */ private Integer internalObtainRowLock(final byte[] row, boolean waitForLock) - throws IOException { + throws IOException { checkRow(row); startRegionOperation(); try { - synchronized (lockedRows) { - while (lockedRows.contains(row)) { + HashedBytes rowKey = new HashedBytes(row); + CountDownLatch rowLatch = new CountDownLatch(1); + + // loop until we acquire the row lock (unless !waitForLock) + while (true) { + CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch); + if (existingLatch == null) { + break; + } else { + // row already locked if (!waitForLock) { return null; } try { - lockedRows.wait(); + existingLatch.await(); } catch (InterruptedException ie) { // Empty } } - // generate a new lockid. Attempt to insert the new [lockid, row]. - // if this lockid already exists in the map then revert and retry - // We could have first done a lockIds.get, and if it does not exist only - // then do a lockIds.put, but the hope is that the lockIds.put will - // mostly return null the first time itself because there won't be - // too many lockId collisions. - byte [] prev = null; - Integer lockId = null; - do { - lockId = new Integer(lockIdGenerator++); - prev = lockIds.put(lockId, row); - if (prev != null) { - lockIds.put(lockId, prev); // revert old value - lockIdGenerator = rand.nextInt(); // generate new start point - } - } while (prev != null); - - lockedRows.add(row); - lockedRows.notifyAll(); - return lockId; + } + + // loop until we generate an unused lock id + while (true) { + Integer lockId = lockIdGenerator.incrementAndGet(); + HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey); + if (existingRowKey == null) { + return lockId; + } else { + // lockId already in use, jump generator to a new spot + lockIdGenerator.set(rand.nextInt()); + } } } finally { closeRegionOperation(); @@ -2411,22 +2414,28 @@ public class HRegion implements HeapSize { // , Writable{ * @param lockid * @return Row that goes with lockid */ - byte [] getRowFromLock(final Integer lockid) { - synchronized (lockedRows) { - return lockIds.get(lockid); - } + byte[] getRowFromLock(final Integer lockid) { + HashedBytes rowKey = lockIds.get(lockid); + return rowKey == null ? null : rowKey.getBytes(); } - + /** * Release the row lock! * @param lockid The lock ID to release. */ - public void releaseRowLock(final Integer lockid) { - synchronized (lockedRows) { - byte[] row = lockIds.remove(lockid); - lockedRows.remove(row); - lockedRows.notifyAll(); + public void releaseRowLock(final Integer lockId) { + HashedBytes rowKey = lockIds.remove(lockId); + if (rowKey == null) { + LOG.warn("Release unknown lockId: " + lockId); + return; } + CountDownLatch rowLatch = lockedRows.remove(rowKey); + if (rowLatch == null) { + LOG.error("Releases row not locked, lockId: " + lockId + " row: " + + rowKey); + return; + } + rowLatch.countDown(); } /** @@ -2434,13 +2443,8 @@ public class HRegion implements HeapSize { // , Writable{ * @param lockid * @return boolean */ - boolean isRowLocked(final Integer lockid) { - synchronized (lockedRows) { - if (lockIds.get(lockid) != null) { - return true; - } - return false; - } + boolean isRowLocked(final Integer lockId) { + return lockIds.containsKey(lockId); } /** @@ -2567,7 +2571,8 @@ public class HRegion implements HeapSize { // , Writable{ } } - public synchronized boolean next(List outResults, int limit) + @Override + public synchronized boolean next(List outResults, int limit) throws IOException { if (this.filterClosed) { throw new UnknownScannerException("Scanner was closed (timed out?) " + @@ -2596,7 +2601,8 @@ public class HRegion implements HeapSize { // , Writable{ } } - public synchronized boolean next(List outResults) + @Override + public synchronized boolean next(List outResults) throws IOException { // apply the batching limit by default return next(outResults, batch); @@ -2692,7 +2698,8 @@ public class HRegion implements HeapSize { // , Writable{ currentRow, 0, currentRow.length) <= isScan); } - public synchronized void close() { + @Override + public synchronized void close() { if (storeHeap != null) { storeHeap.close(); storeHeap = null; @@ -3493,30 +3500,32 @@ public class HRegion implements HeapSize { // , Writable{ } public static final long FIXED_OVERHEAD = ClassSize.align( - (4 * Bytes.SIZEOF_LONG) + ClassSize.ARRAY + - ClassSize.align(26 * ClassSize.REFERENCE) + ClassSize.OBJECT + - ClassSize.align(Bytes.SIZEOF_INT)); + ClassSize.OBJECT + // this + (4 * Bytes.SIZEOF_LONG) + // memstoreFlushSize, lastFlushTime, blockingMemStoreSize, threadWakeFrequency + Bytes.SIZEOF_BOOLEAN + // splitRequest + ClassSize.ARRAY + // splitPoint + (26 * ClassSize.REFERENCE)); - public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + - (ClassSize.OBJECT * 2) + (2 * ClassSize.ATOMIC_BOOLEAN) + - ClassSize.ATOMIC_LONG + ClassSize.ATOMIC_INTEGER + - - // Using TreeMap for TreeSet - ClassSize.TREEMAP + - - // Using TreeMap for HashMap - ClassSize.TREEMAP + - - ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + - ClassSize.align(ClassSize.OBJECT + - (5 * Bytes.SIZEOF_BOOLEAN)) + - (3 * ClassSize.REENTRANT_LOCK)); + public static final long DEEP_OVERHEAD = FIXED_OVERHEAD + + ClassSize.OBJECT + // closeLock + (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing + ClassSize.ATOMIC_LONG + // memStoreSize + ClassSize.ATOMIC_INTEGER + // lockIdGenerator + (2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds + WriteState.HEAP_SIZE + // writestate + ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores + (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock + ClassSize.ARRAYLIST + // recentFlushes + ReadWriteConsistencyControl.FIXED_SIZE // rwcc + ; + @Override public long heapSize() { long heapSize = DEEP_OVERHEAD; for(Store store : this.stores.values()) { heapSize += store.heapSize(); } + // this does not take into account row locks, recent flushes, rwcc entries return heapSize; } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java index f4d7be574ee..8ec53d31d2a 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java @@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.regionserver; import java.util.LinkedList; -import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; /** * Manages the read/write consistency within memstore. This provides @@ -158,4 +160,10 @@ public class ReadWriteConsistencyControl { return this.writeNumber; } } + + public static final long FIXED_SIZE = ClassSize.align( + ClassSize.OBJECT + + 2 * Bytes.SIZEOF_LONG + + 2 * ClassSize.REFERENCE); + } diff --git a/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java b/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java new file mode 100644 index 00000000000..e6471cbbfd1 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java @@ -0,0 +1,61 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Arrays; + +/** + * This class encapsulates a byte array and overrides hashCode and equals so + * that it's identity is based on the data rather than the array instance. + */ +public class HashedBytes { + + private final byte[] bytes; + private final int hashCode; + + public HashedBytes(byte[] bytes) { + this.bytes = bytes; + hashCode = Bytes.hashCode(bytes); + } + + public byte[] getBytes() { + return bytes; + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null || getClass() != obj.getClass()) + return false; + HashedBytes other = (HashedBytes) obj; + return Arrays.equals(bytes, other.bytes); + } + + @Override + public String toString() { + return Bytes.toStringBinary(bytes); + } +} \ No newline at end of file