HBASE-17782 Extend IdReadWriteLock to support using both weak and soft reference

This commit is contained in:
Yu Li 2017-03-15 11:07:42 +08:00
parent 14fb57cab2
commit aace02a230
4 changed files with 86 additions and 21 deletions

View File

@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.IdReadWriteLock; import org.apache.hadoop.hbase.util.IdReadWriteLock;
import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -185,9 +186,11 @@ public class BucketCache implements BlockCache, HeapSize {
/** /**
* A ReentrantReadWriteLock to lock on a particular block identified by offset. * A ReentrantReadWriteLock to lock on a particular block identified by offset.
* The purpose of this is to avoid freeing the block which is being read. * The purpose of this is to avoid freeing the block which is being read.
* <p>
* Key set of offsets in BucketCache is limited so soft reference is the best choice here.
*/ */
@VisibleForTesting @VisibleForTesting
final IdReadWriteLock offsetLock = new IdReadWriteLock(); final IdReadWriteLock offsetLock = new IdReadWriteLock(ReferenceType.SOFT);
private final NavigableSet<BlockCacheKey> blocksByHFile = private final NavigableSet<BlockCacheKey> blocksByHFile =
new ConcurrentSkipListSet<>(new Comparator<BlockCacheKey>() { new ConcurrentSkipListSet<>(new Comparator<BlockCacheKey>() {

View File

@ -18,6 +18,7 @@
*/ */
package org.apache.hadoop.hbase.util; package org.apache.hadoop.hbase.util;
import java.lang.ref.Reference;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -44,16 +45,48 @@ import com.google.common.annotations.VisibleForTesting;
public class IdReadWriteLock { public class IdReadWriteLock {
// The number of lock we want to easily support. It's not a maximum. // The number of lock we want to easily support. It's not a maximum.
private static final int NB_CONCURRENT_LOCKS = 1000; private static final int NB_CONCURRENT_LOCKS = 1000;
// The pool to get entry from, entries are mapped by soft reference and will be /**
// automatically garbage-collected when JVM memory pressure is high * The pool to get entry from, entries are mapped by {@link Reference} and will be automatically
private final ObjectPool<Long, ReentrantReadWriteLock> lockPool = * garbage-collected by JVM
new SoftObjectPool<>( */
new ObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() { private final ObjectPool<Long, ReentrantReadWriteLock> lockPool;
@Override private final ReferenceType refType;
public ReentrantReadWriteLock createObject(Long id) {
return new ReentrantReadWriteLock(); public IdReadWriteLock() {
} this(ReferenceType.WEAK);
}, NB_CONCURRENT_LOCKS); }
/**
* Constructor of IdReadWriteLock
* @param referenceType type of the reference used in lock pool, {@link ReferenceType#WEAK} by
* default. Use {@link ReferenceType#SOFT} if the key set is limited and the locks will
* be reused with a high frequency
*/
public IdReadWriteLock(ReferenceType referenceType) {
this.refType = referenceType;
switch (referenceType) {
case SOFT:
lockPool = new SoftObjectPool<>(new ObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() {
@Override
public ReentrantReadWriteLock createObject(Long id) {
return new ReentrantReadWriteLock();
}
}, NB_CONCURRENT_LOCKS);
break;
case WEAK:
default:
lockPool = new WeakObjectPool<>(new ObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() {
@Override
public ReentrantReadWriteLock createObject(Long id) {
return new ReentrantReadWriteLock();
}
}, NB_CONCURRENT_LOCKS);
}
}
public static enum ReferenceType {
WEAK, SOFT
}
/** /**
* Get the ReentrantReadWriteLock corresponding to the given id * Get the ReentrantReadWriteLock corresponding to the given id
@ -93,4 +126,9 @@ public class IdReadWriteLock {
Thread.sleep(50); Thread.sleep(50);
} }
} }
@VisibleForTesting
public ReferenceType getReferenceType() {
return this.refType;
}
} }

View File

@ -27,7 +27,6 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -36,7 +35,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
// imports for classes still in regionserver.wal // imports for classes still in regionserver.wal
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.IdReadWriteLock; import org.apache.hadoop.hbase.util.IdLock;
/** /**
* A WAL Provider that returns a WAL per group of regions. * A WAL Provider that returns a WAL per group of regions.
@ -132,7 +131,7 @@ public class RegionGroupingProvider implements WALProvider {
/** A group-provider mapping, make sure one-one rather than many-one mapping */ /** A group-provider mapping, make sure one-one rather than many-one mapping */
private final ConcurrentMap<String, WALProvider> cached = new ConcurrentHashMap<>(); private final ConcurrentMap<String, WALProvider> cached = new ConcurrentHashMap<>();
private final IdReadWriteLock createLock = new IdReadWriteLock(); private final IdLock createLock = new IdLock();
private RegionGroupingStrategy strategy = null; private RegionGroupingStrategy strategy = null;
private WALFactory factory = null; private WALFactory factory = null;
@ -181,16 +180,18 @@ public class RegionGroupingProvider implements WALProvider {
private WAL getWAL(final String group) throws IOException { private WAL getWAL(final String group) throws IOException {
WALProvider provider = cached.get(group); WALProvider provider = cached.get(group);
if (provider == null) { if (provider == null) {
Lock lock = createLock.getLock(group.hashCode()).writeLock(); IdLock.Entry lockEntry = null;
lock.lock();
try { try {
lockEntry = createLock.getLockEntry(group.hashCode());
provider = cached.get(group); provider = cached.get(group);
if (provider == null) { if (provider == null) {
provider = createProvider(group); provider = createProvider(group);
cached.put(group, provider); cached.put(group, provider);
} }
} finally { } finally {
lock.unlock(); if (lockEntry != null) {
createLock.releaseLockEntry(lockEntry);
}
} }
} }
return provider.getWAL(null, null); return provider.getWAL(null, null);

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -38,9 +39,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
@Category({MiscTests.class, MediumTests.class}) @Category({MiscTests.class, MediumTests.class})
// Medium as it creates 100 threads; seems better to run it isolated // Medium as it creates 100 threads; seems better to run it isolated
public class TestIdReadWriteLock { public class TestIdReadWriteLock {
@ -51,7 +56,14 @@ public class TestIdReadWriteLock {
private static final int NUM_THREADS = 128; private static final int NUM_THREADS = 128;
private static final int NUM_SECONDS = 15; private static final int NUM_SECONDS = 15;
private IdReadWriteLock idLock = new IdReadWriteLock(); @Parameterized.Parameter
public IdReadWriteLock idLock;
@Parameterized.Parameters
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] { { new IdReadWriteLock(ReferenceType.WEAK) },
{ new IdReadWriteLock(ReferenceType.SOFT) } });
}
private Map<Long, String> idOwner = new ConcurrentHashMap<>(); private Map<Long, String> idOwner = new ConcurrentHashMap<>();
@ -111,11 +123,22 @@ public class TestIdReadWriteLock {
Future<Boolean> result = ecs.take(); Future<Boolean> result = ecs.take();
assertTrue(result.get()); assertTrue(result.get());
} }
// make sure the entry pool won't be cleared when JVM memory is enough
// even after GC and purge call
int entryPoolSize = idLock.purgeAndGetEntryPoolSize(); int entryPoolSize = idLock.purgeAndGetEntryPoolSize();
LOG.debug("Size of entry pool after gc and purge: " + entryPoolSize); LOG.debug("Size of entry pool after gc and purge: " + entryPoolSize);
assertEquals(NUM_IDS, entryPoolSize); ReferenceType refType = idLock.getReferenceType();
switch (refType) {
case WEAK:
// make sure the entry pool will be cleared after GC and purge call
assertEquals(0, entryPoolSize);
break;
case SOFT:
// make sure the entry pool won't be cleared when JVM memory is enough
// even after GC and purge call
assertEquals(NUM_IDS, entryPoolSize);
break;
default:
break;
}
} finally { } finally {
exec.shutdown(); exec.shutdown();
exec.awaitTermination(5000, TimeUnit.MILLISECONDS); exec.awaitTermination(5000, TimeUnit.MILLISECONDS);