From 79a7bf770a69eb68b12201aaf81e788d845c7586 Mon Sep 17 00:00:00 2001 From: Ruanhui <32773751+frostruan@users.noreply.github.com> Date: Thu, 2 Mar 2023 11:24:20 +0800 Subject: [PATCH] HBASE-27458 Use ReadWriteLock for region scanner readpoint map (#5069) Signed-off-by: Duo Zhang --- .../hadoop/hbase/regionserver/HRegion.java | 17 ++-- .../ReadPointCalculationLock.java | 90 +++++++++++++++++++ .../hbase/regionserver/RegionScannerImpl.java | 5 +- 3 files changed, 103 insertions(+), 9 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReadPointCalculationLock.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 614f60b0135..2d0ed75f664 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -397,6 +397,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool(); final ConcurrentHashMap scannerReadPoints; + final ReadPointCalculationLock smallestReadPointCalcLock; /** * The sequence ID that was enLongAddered when this region was opened. @@ -435,19 +436,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * this readPoint, are included in every read operation. */ public long getSmallestReadPoint() { - long minimumReadPoint; // We need to ensure that while we are calculating the smallestReadPoint // no new RegionScanners can grab a readPoint that we are unaware of. - // We achieve this by synchronizing on the scannerReadPoints object. - synchronized (scannerReadPoints) { - minimumReadPoint = mvcc.getReadPoint(); + smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.CALCULATION_LOCK); + try { + long minimumReadPoint = mvcc.getReadPoint(); for (Long readPoint : this.scannerReadPoints.values()) { - if (readPoint < minimumReadPoint) { - minimumReadPoint = readPoint; - } + minimumReadPoint = Math.min(minimumReadPoint, readPoint); } + return minimumReadPoint; + } finally { + smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.CALCULATION_LOCK); } - return minimumReadPoint; } /* @@ -812,6 +812,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi setHTableSpecificConf(); this.scannerReadPoints = new ConcurrentHashMap<>(); + this.smallestReadPointCalcLock = new ReadPointCalculationLock(conf); this.busyWaitDuration = conf.getLong("hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION); this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReadPointCalculationLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReadPointCalculationLock.java new file mode 100644 index 00000000000..380b82de93a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReadPointCalculationLock.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hadoop.conf.Configuration; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Lock to manage concurrency between {@link RegionScanner} and + * {@link HRegion#getSmallestReadPoint()}. We need to ensure that while we are calculating the + * smallest read point, no new scanners can modify the scannerReadPoints Map. We used to achieve + * this by synchronizing on the scannerReadPoints object. But this may block the read thread and + * reduce the read performance. Since the scannerReadPoints object is a + * {@link java.util.concurrent.ConcurrentHashMap}, which is thread-safe, so the + * {@link RegionScanner} can record their read points concurrently, what it needs to do is just + * acquiring a shared lock. When we calculate the smallest read point, we need to acquire an + * exclusive lock. This can improve read performance in most scenarios, only not when we have a lot + * of delta operations, like {@link org.apache.hadoop.hbase.client.Append} or + * {@link org.apache.hadoop.hbase.client.Increment}. So we introduce a flag to enable/disable this + * feature. + */ +@InterfaceAudience.Private +public class ReadPointCalculationLock { + + public enum LockType { + CALCULATION_LOCK, + RECORDING_LOCK + } + + private final boolean useReadWriteLockForReadPoints; + private Lock lock; + private ReadWriteLock readWriteLock; + + ReadPointCalculationLock(Configuration conf) { + this.useReadWriteLockForReadPoints = + conf.getBoolean("hbase.region.readpoints.read.write.lock.enable", false); + if (useReadWriteLockForReadPoints) { + readWriteLock = new ReentrantReadWriteLock(); + } else { + lock = new ReentrantLock(); + } + } + + void lock(LockType lockType) { + if (useReadWriteLockForReadPoints) { + assert lock == null; + if (lockType == LockType.CALCULATION_LOCK) { + readWriteLock.writeLock().lock(); + } else { + readWriteLock.readLock().lock(); + } + } else { + assert readWriteLock == null; + lock.lock(); + } + } + + void unlock(LockType lockType) { + if (useReadWriteLockForReadPoints) { + assert lock == null; + if (lockType == LockType.CALCULATION_LOCK) { + readWriteLock.writeLock().unlock(); + } else { + readWriteLock.readLock().unlock(); + } + } else { + assert readWriteLock == null; + lock.unlock(); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java index d010d71f6cf..11d4c20f581 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java @@ -130,7 +130,8 @@ class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback { long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan); this.scannerReadPoints = region.scannerReadPoints; this.rsServices = region.getRegionServerServices(); - synchronized (scannerReadPoints) { + region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK); + try { if (mvccReadPoint > 0) { this.readPt = mvccReadPoint; } else if (hasNonce(region, nonce)) { @@ -139,6 +140,8 @@ class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback { this.readPt = region.getReadPoint(isolationLevel); } scannerReadPoints.put(this, this.readPt); + } finally { + region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK); } initializeScanners(scan, additionalScanners); }