From ab4351a15528511f2a65f6f32f39a23a19a447fe Mon Sep 17 00:00:00 2001 From: chenglei Date: Wed, 23 Mar 2022 17:14:07 +0800 Subject: [PATCH] =?UTF-8?q?HBASE-26869=20RSRpcServices.scan=20should=20dee?= =?UTF-8?q?p=20clone=20cells=20when=20RpcCallCo=E2=80=A6=20(#4249)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/apache/hadoop/hbase/CellUtil.java | 15 ++ .../hadoop/hbase/regionserver/HRegion.java | 6 +- .../hbase/regionserver/RSRpcServices.java | 24 +- .../regionserver/TestRegionServerScan.java | 214 ++++++++++++++++++ 4 files changed, 252 insertions(+), 7 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerScan.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index 7533073edbd..1719c8aa784 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.KeyValue.getDelimiter; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -849,4 +850,18 @@ public final class CellUtil { if (diff != 0) return diff; return compareQualifiers(left, right, rqoffset, rqlength); } + + public static void cloneIfNecessary(ArrayList cells) { + if (cells == null || cells.isEmpty()) { + return; + } + for (int i = 0; i < cells.size(); i++) { + Cell cell = cells.get(i); + cells.set(i, cloneIfNecessary(cell)); + } + } + + public static Cell cloneIfNecessary(Cell cell) { + return (cell instanceof ByteBufferExtendedCell ? KeyValueUtil.copyToNewKeyValue(cell) : cell); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 8906871ed88..730cabc0bb3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -78,7 +78,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.ByteBufferExtendedCell; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CellComparator; @@ -94,7 +93,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.MetaCellComparator; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NotServingRegionException; @@ -7871,8 +7869,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap buffers. // See more details in HBASE-26036. for (Cell cell : tmp) { - results.add(cell instanceof ByteBufferExtendedCell ? - KeyValueUtil.copyToNewKeyValue(cell) : cell); + results.add( + CellUtil.cloneIfNecessary(cell)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index a23b22fffeb..e3dabfb947a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -3307,7 +3307,7 @@ public class RSRpcServices extends HBaseRpcServicesBase // This is cells inside a row. Default size is 10 so if many versions or many cfs, // then we'll resize. Resizings show in profiler. Set it higher than 10. For now // arbitrary 32. TODO: keep record of general size of results being returned. - List values = new ArrayList<>(32); + ArrayList values = new ArrayList<>(32); region.startRegionOperation(Operation.SCAN); long before = EnvironmentEdgeManager.currentTime(); // Used to check if we've matched the row limit set on the Scan @@ -3368,9 +3368,16 @@ public class RSRpcServices extends HBaseRpcServicesBase // reset the batch progress between nextRaw invocations since we don't want the // batch progress from previous calls to affect future calls scannerContext.setBatchProgress(0); + assert values.isEmpty(); // Collect values to be returned here moreRows = scanner.nextRaw(values, scannerContext); + if (context == null) { + // When there is no RpcCallContext,copy EC to heap, then the scanner would close, + // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap + // buffers.See more details in HBASE-26036. + CellUtil.cloneIfNecessary(values); + } numOfNextRawCalls++; if (!values.isEmpty()) { @@ -3727,14 +3734,25 @@ public class RSRpcServices extends HBaseRpcServicesBase if (context != null) { context.setCallBack(rsh.shippedCallback); } else { - // When context != null, adding back the lease will be done in callback set above. - addScannerLeaseBack(lease); + // If context is null,here we call rsh.shippedCallback directly to reuse the logic in + // rsh.shippedCallback to release the internal resources in rsh,and lease is also added + // back to regionserver's LeaseManager in rsh.shippedCallback. + runShippedCallback(rsh); } } quota.close(); } } + private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException { + assert rsh.shippedCallback != null; + try { + rsh.shippedCallback.run(); + } catch (IOException ioe) { + throw new ServiceException(ioe); + } + } + private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, RpcCallContext context) throws IOException { if (region.getCoprocessorHost() != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerScan.java new file mode 100644 index 00000000000..43576696a62 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerScan.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.io.DeallocateRewriteByteBuffAllocator; +import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; + +@Category({ RegionServerTests.class, LargeTests.class }) +public class TestRegionServerScan { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRegionServerScan.class); + + @Rule + public TestName name = new TestName(); + + private static final byte[] CF = Bytes.toBytes("CF"); + private static final byte[] CQ = Bytes.toBytes("CQ"); + private static final byte[] VALUE = new byte[1200]; + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final Configuration conf = TEST_UTIL.getConfiguration(); + private static Admin admin = null; + static final TableName tableName = TableName.valueOf("TestRegionServerScan"); + static final byte[] r0 = Bytes.toBytes("row-0"); + static final byte[] r1 = Bytes.toBytes("row-1"); + static final byte[] r2 = Bytes.toBytes("row-2"); + + @BeforeClass + public static void setupBeforeClass() throws Exception { + /** + * Use {@link DeallocateRewriteByteBuffAllocator} to rewrite the bytebuffers right after + * released. + */ + conf.set(ByteBuffAllocator.BYTEBUFF_ALLOCATOR_CLASS, + DeallocateRewriteByteBuffAllocator.class.getName()); + conf.setBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); + conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0); + conf.setInt(BlockCacheFactory.BUCKET_CACHE_WRITER_THREADS_KEY, 20); + conf.setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 2048); + conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); + conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 64); + conf.setStrings(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName()); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 60 * 1000); + conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30 * 60 * 1000); + + conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 60 * 60 * 1000); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10000); + conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 1024 * 1024 * 1024); + TEST_UTIL.startMiniCluster(1); + admin = TEST_UTIL.getAdmin(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testScannWhenRpcCallContextNull() throws Exception { + ResultScanner resultScanner = null; + Table table = null; + try { + table = + TEST_UTIL.createTable(tableName, new byte[][] { CF }, 1, 1024, null); + putToTable(table, r0); + putToTable(table, r1); + putToTable(table, r2); + + admin.flush(table.getName()); + + Scan scan = new Scan(); + scan.setCaching(2); + scan.withStartRow(r0, true).withStopRow(r2, true); + + MyRSRpcServices.inTest = true; + resultScanner = table.getScanner(scan); + Result result = resultScanner.next(); + byte[] rowKey = result.getRow(); + assertTrue(Bytes.equals(r0, rowKey)); + + result = resultScanner.next(); + rowKey = result.getRow(); + assertTrue(Bytes.equals(r1, rowKey)); + + result = resultScanner.next(); + rowKey = result.getRow(); + assertTrue(Bytes.equals(r2, rowKey)); + assertNull(resultScanner.next()); + assertTrue(MyRSRpcServices.exceptionRef.get() == null); + } finally { + MyRSRpcServices.inTest = false; + if (resultScanner != null) { + resultScanner.close(); + } + if (table != null) { + table.close(); + } + } + } + + private static void putToTable(Table table, byte[] rowkey) throws IOException { + Put put = new Put(rowkey); + put.addColumn(CF, CQ, VALUE); + table.put(put); + } + + private static class MyRegionServer extends MiniHBaseClusterRegionServer { + public MyRegionServer(Configuration conf) throws IOException, InterruptedException { + super(conf); + } + + @Override + protected RSRpcServices createRpcServices() throws IOException { + return new MyRSRpcServices(this); + } + } + + private static class MyRSRpcServices extends RSRpcServices { + private static AtomicReference exceptionRef = new AtomicReference(null); + private static volatile boolean inTest = false; + + public MyRSRpcServices(HRegionServer rs) throws IOException { + super(rs); + } + + @Override + public ScanResponse scan(RpcController controller, ScanRequest request) + throws ServiceException { + try { + if (!inTest) { + return super.scan(controller, request); + } + + HRegion region = null; + if (request.hasRegion()) { + region = this.getRegion(request.getRegion()); + } + + if (region != null + && !tableName.equals(region.getTableDescriptor().getTableName())) { + return super.scan(controller, request); + } + + ScanResponse result = null; + //Simulate RpcCallContext is null for test. + Optional rpcCall = RpcServer.unsetCurrentCall(); + try { + result = super.scan(controller, request); + } finally { + rpcCall.ifPresent(RpcServer::setCurrentCall); + } + return result; + } catch (Throwable e) { + exceptionRef.set(e); + throw new ServiceException(e); + } + } + } + +}