HBASE-23223 Support the offsetLock of bucketCache to use strong ref (#764)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
bsglz 2019-11-21 16:39:58 +08:00 committed by Wellington Ramos Chevreuil
parent 77b4e8c972
commit 4ea792246f
7 changed files with 237 additions and 89 deletions

View File

@ -72,7 +72,9 @@ import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.IdReadWriteLock; import org.apache.hadoop.hbase.util.IdReadWriteLock;
import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType; import org.apache.hadoop.hbase.util.IdReadWriteLockStrongRef;
import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool;
import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -113,6 +115,10 @@ public class BucketCache implements BlockCache, HeapSize {
static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor";
static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor";
/** Use strong reference for offsetLock or not */
private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref";
private static final boolean STRONG_REF_DEFAULT = false;
/** Priority buckets */ /** Priority buckets */
@VisibleForTesting @VisibleForTesting
static final float DEFAULT_SINGLE_FACTOR = 0.25f; static final float DEFAULT_SINGLE_FACTOR = 0.25f;
@ -199,10 +205,9 @@ public class BucketCache implements BlockCache, HeapSize {
* A ReentrantReadWriteLock to lock on a particular block identified by offset. * A ReentrantReadWriteLock to lock on a particular block identified by offset.
* The purpose of this is to avoid freeing the block which is being read. * The purpose of this is to avoid freeing the block which is being read.
* <p> * <p>
* Key set of offsets in BucketCache is limited so soft reference is the best choice here.
*/ */
@VisibleForTesting @VisibleForTesting
transient final IdReadWriteLock<Long> offsetLock = new IdReadWriteLock<>(ReferenceType.SOFT); transient final IdReadWriteLock<Long> offsetLock;
private final NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>((a, b) -> { private final NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>((a, b) -> {
int nameComparison = a.getHfileName().compareTo(b.getHfileName()); int nameComparison = a.getHfileName().compareTo(b.getHfileName());
@ -257,6 +262,12 @@ public class BucketCache implements BlockCache, HeapSize {
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
Configuration conf) throws IOException { Configuration conf) throws IOException {
boolean useStrongRef = conf.getBoolean(STRONG_REF_KEY, STRONG_REF_DEFAULT);
if (useStrongRef) {
this.offsetLock = new IdReadWriteLockStrongRef<>();
} else {
this.offsetLock = new IdReadWriteLockWithObjectPool<>(ReferenceType.SOFT);
}
this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM);
this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath);
this.writerThreads = new WriterThread[writerThreadNum]; this.writerThreads = new WriterThread[writerThreadNum];
@ -277,7 +288,7 @@ public class BucketCache implements BlockCache, HeapSize {
LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor + ", minFactor: " + minFactor + LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor + ", minFactor: " + minFactor +
", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " + singleFactor + ", multiFactor: " + multiFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " + singleFactor + ", multiFactor: " + multiFactor +
", memoryFactor: " + memoryFactor); ", memoryFactor: " + memoryFactor + ", useStrongRef: " + useStrongRef);
this.cacheCapacity = capacity; this.cacheCapacity = capacity;
this.persistencePath = persistencePath; this.persistencePath = persistencePath;

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.util.IdReadWriteLock; import org.apache.hadoop.hbase.util.IdReadWriteLock;
import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool;
import org.apache.hadoop.hbase.util.ZKDataMigrator; import org.apache.hadoop.hbase.util.ZKDataMigrator;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
@ -62,7 +63,7 @@ public class TableStateManager {
private static final String MIGRATE_TABLE_STATE_FROM_ZK_KEY = private static final String MIGRATE_TABLE_STATE_FROM_ZK_KEY =
"hbase.migrate.table.state.from.zookeeper"; "hbase.migrate.table.state.from.zookeeper";
private final IdReadWriteLock<TableName> tnLock = new IdReadWriteLock<>(); private final IdReadWriteLock<TableName> tnLock = new IdReadWriteLockWithObjectPool<>();
protected final MasterServices master; protected final MasterServices master;
private final ConcurrentMap<TableName, TableState.State> tableName2State = private final ConcurrentMap<TableName, TableState.State> tableName2State =

View File

@ -18,11 +18,9 @@
*/ */
package org.apache.hadoop.hbase.util; package org.apache.hadoop.hbase.util;
import java.lang.ref.Reference;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/** /**
@ -42,80 +40,13 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
* For write lock, use lock.writeLock() * For write lock, use lock.writeLock()
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class IdReadWriteLock<T> { public abstract class IdReadWriteLock<T> {
// The number of lock we want to easily support. It's not a maximum. public abstract ReentrantReadWriteLock getLock(T id);
private static final int NB_CONCURRENT_LOCKS = 1000;
/**
* The pool to get entry from, entries are mapped by {@link Reference} and will be automatically
* garbage-collected by JVM
*/
private final ObjectPool<T, ReentrantReadWriteLock> lockPool;
private final ReferenceType refType;
public IdReadWriteLock() {
this(ReferenceType.WEAK);
}
/**
* Constructor of IdReadWriteLock
* @param referenceType type of the reference used in lock pool, {@link ReferenceType#WEAK} by
* default. Use {@link ReferenceType#SOFT} if the key set is limited and the locks will
* be reused with a high frequency
*/
public IdReadWriteLock(ReferenceType referenceType) {
this.refType = referenceType;
switch (referenceType) {
case SOFT:
lockPool = new SoftObjectPool<>(new ObjectPool.ObjectFactory<T, ReentrantReadWriteLock>() {
@Override
public ReentrantReadWriteLock createObject(T id) {
return new ReentrantReadWriteLock();
}
}, NB_CONCURRENT_LOCKS);
break;
case WEAK:
default:
lockPool = new WeakObjectPool<>(new ObjectPool.ObjectFactory<T, ReentrantReadWriteLock>() {
@Override
public ReentrantReadWriteLock createObject(T id) {
return new ReentrantReadWriteLock();
}
}, NB_CONCURRENT_LOCKS);
}
}
public static enum ReferenceType {
WEAK, SOFT
}
/**
* Get the ReentrantReadWriteLock corresponding to the given id
* @param id an arbitrary number to identify the lock
*/
public ReentrantReadWriteLock getLock(T id) {
lockPool.purge();
ReentrantReadWriteLock readWriteLock = lockPool.get(id);
return readWriteLock;
}
/** For testing */
@VisibleForTesting
int purgeAndGetEntryPoolSize() {
gc();
Threads.sleep(200);
lockPool.purge();
return lockPool.size();
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DM_GC", justification="Intentional")
private void gc() {
System.gc();
}
@VisibleForTesting @VisibleForTesting
public void waitForWaiters(T id, int numWaiters) throws InterruptedException { public void waitForWaiters(T id, int numWaiters) throws InterruptedException {
for (ReentrantReadWriteLock readWriteLock;;) { for (ReentrantReadWriteLock readWriteLock;;) {
readWriteLock = lockPool.get(id); readWriteLock = getLock(id);
if (readWriteLock != null) { if (readWriteLock != null) {
synchronized (readWriteLock) { synchronized (readWriteLock) {
if (readWriteLock.getQueueLength() >= numWaiters) { if (readWriteLock.getQueueLength() >= numWaiters) {
@ -126,9 +57,4 @@ public class IdReadWriteLock<T> {
Thread.sleep(50); Thread.sleep(50);
} }
} }
@VisibleForTesting
public ReferenceType getReferenceType() {
return this.refType;
}
} }

View File

@ -0,0 +1,49 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
public class IdReadWriteLockStrongRef<T> extends IdReadWriteLock<T> {
final ConcurrentHashMap<T, ReentrantReadWriteLock> map = new ConcurrentHashMap<>();
@VisibleForTesting
@Override
public ReentrantReadWriteLock getLock(T id) {
ReentrantReadWriteLock existing = map.get(id);
if (existing != null) {
return existing;
}
ReentrantReadWriteLock newLock = new ReentrantReadWriteLock();
existing = map.putIfAbsent(id, newLock);
if (existing == null) {
return newLock;
} else {
return existing;
}
}
}

View File

@ -0,0 +1,104 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.lang.ref.Reference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
public class IdReadWriteLockWithObjectPool<T> extends IdReadWriteLock<T>{
// The number of lock we want to easily support. It's not a maximum.
private static final int NB_CONCURRENT_LOCKS = 1000;
/**
* The pool to get entry from, entries are mapped by {@link Reference} and will be automatically
* garbage-collected by JVM
*/
private final ObjectPool<T, ReentrantReadWriteLock> lockPool;
private final ReferenceType refType;
public IdReadWriteLockWithObjectPool() {
this(ReferenceType.WEAK);
}
/**
* Constructor of IdReadWriteLockWithObjectPool
* @param referenceType type of the reference used in lock pool, {@link ReferenceType#WEAK} by
* default. Use {@link ReferenceType#SOFT} if the key set is limited and the locks will
* be reused with a high frequency
*/
public IdReadWriteLockWithObjectPool(ReferenceType referenceType) {
this.refType = referenceType;
switch (referenceType) {
case SOFT:
lockPool = new SoftObjectPool<>(new ObjectPool.ObjectFactory<T, ReentrantReadWriteLock>() {
@Override
public ReentrantReadWriteLock createObject(T id) {
return new ReentrantReadWriteLock();
}
}, NB_CONCURRENT_LOCKS);
break;
case WEAK:
default:
lockPool = new WeakObjectPool<>(new ObjectPool.ObjectFactory<T, ReentrantReadWriteLock>() {
@Override
public ReentrantReadWriteLock createObject(T id) {
return new ReentrantReadWriteLock();
}
}, NB_CONCURRENT_LOCKS);
}
}
public static enum ReferenceType {
WEAK, SOFT
}
/**
* Get the ReentrantReadWriteLock corresponding to the given id
* @param id an arbitrary number to identify the lock
*/
@Override
public ReentrantReadWriteLock getLock(T id) {
lockPool.purge();
ReentrantReadWriteLock readWriteLock = lockPool.get(id);
return readWriteLock;
}
/** For testing */
@VisibleForTesting
int purgeAndGetEntryPoolSize() {
gc();
Threads.sleep(200);
lockPool.purge();
return lockPool.size();
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DM_GC", justification="Intentional")
private void gc() {
System.gc();
}
@VisibleForTesting
public ReferenceType getReferenceType() {
return this.refType;
}
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ SmallTests.class })
public class TestIdReadWriteLockStrongRef {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestIdReadWriteLockStrongRef.class);
private static final Logger LOG = LoggerFactory.getLogger(TestIdReadWriteLockStrongRef.class);
private IdReadWriteLockStrongRef<Long> idLock = new IdReadWriteLockStrongRef<>();
@Test
public void testGetLock() throws Exception {
Long offset_1 = 1L;
Long offset_2 = 2L;
ReentrantReadWriteLock offsetLock_1 = idLock.getLock(offset_1);
ReentrantReadWriteLock offsetLock_2 = idLock.getLock(offset_1);
Assert.assertEquals(offsetLock_1,offsetLock_2);
ReentrantReadWriteLock offsetLock_3 = idLock.getLock(offset_2);
Assert.assertNotEquals(offsetLock_1,offsetLock_3);
}
}

View File

@ -35,7 +35,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType; import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -47,25 +47,27 @@ import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
@Category({MiscTests.class, MediumTests.class}) @Category({MiscTests.class, MediumTests.class})
// Medium as it creates 100 threads; seems better to run it isolated // Medium as it creates 100 threads; seems better to run it isolated
public class TestIdReadWriteLock { public class TestIdReadWriteLockWithObjectPool {
@ClassRule @ClassRule
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestIdReadWriteLock.class); HBaseClassTestRule.forClass(TestIdReadWriteLockWithObjectPool.class);
private static final Logger LOG = LoggerFactory.getLogger(TestIdReadWriteLock.class); private static final Logger LOG =
LoggerFactory.getLogger(TestIdReadWriteLockWithObjectPool.class);
private static final int NUM_IDS = 16; private static final int NUM_IDS = 16;
private static final int NUM_THREADS = 128; private static final int NUM_THREADS = 128;
private static final int NUM_SECONDS = 15; private static final int NUM_SECONDS = 15;
@Parameterized.Parameter @Parameterized.Parameter
public IdReadWriteLock<Long> idLock; public IdReadWriteLockWithObjectPool<Long> idLock;
@Parameterized.Parameters @Parameterized.Parameters
public static Iterable<Object[]> data() { public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] { { new IdReadWriteLock<Long>(ReferenceType.WEAK) }, return Arrays.asList(new Object[][] {
{ new IdReadWriteLock<Long>(ReferenceType.SOFT) } }); { new IdReadWriteLockWithObjectPool<Long>(ReferenceType.WEAK) },
{ new IdReadWriteLockWithObjectPool<Long>(ReferenceType.SOFT) } });
} }
private Map<Long, String> idOwner = new ConcurrentHashMap<>(); private Map<Long, String> idOwner = new ConcurrentHashMap<>();